feat(MCA): implement MessageRouterServiceImpl [TASK-021] [AC-MCA-08] [AC-MCA-09] [AC-MCA-10]

This commit is contained in:
MerCry 2026-02-24 01:25:00 +08:00
parent 48c70eb239
commit 0b6fcf56a7
2 changed files with 527 additions and 0 deletions

View File

@ -0,0 +1,275 @@
package com.wecom.robot.adapter;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.wecom.robot.config.WecomConfig;
import com.wecom.robot.dto.OutboundMessage;
import com.wecom.robot.dto.ServiceStateResponse;
import com.wecom.robot.dto.SyncMsgResponse;
import com.wecom.robot.dto.WxSendMessageRequest;
import com.wecom.robot.dto.InboundMessage;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.http.*;
import org.springframework.stereotype.Component;
import org.springframework.web.client.RestTemplate;
import java.util.concurrent.TimeUnit;
/**
* 企业微信渠道适配器
* <p>
* 实现企业微信渠道的消息发送服务状态管理转人工消息同步等能力
* [AC-MCA-02] 企业微信渠道适配器实现
*/
@Slf4j
@Component
@RequiredArgsConstructor
public class WeChatAdapter implements ChannelAdapter,
ServiceStateCapable, TransferCapable, MessageSyncCapable {
private static final String CHANNEL_TYPE = "wechat";
private static final String GET_ACCESS_TOKEN_URL = "https://qyapi.weixin.qq.com/cgi-bin/gettoken?corpid={corpId}&corpsecret={secret}";
private static final String SEND_MESSAGE_URL = "https://qyapi.weixin.qq.com/cgi-bin/kf/send_msg?access_token={accessToken}";
private static final String SYNC_MESSAGE_URL = "https://qyapi.weixin.qq.com/cgi-bin/kf/sync_msg?access_token={accessToken}";
private static final String GET_SERVICE_STATE_URL = "https://qyapi.weixin.qq.com/cgi-bin/kf/service_state/get?access_token={accessToken}";
private static final String TRANS_SERVICE_STATE_URL = "https://qyapi.weixin.qq.com/cgi-bin/kf/service_state/trans?access_token={accessToken}";
private static final String SEND_EVENT_MSG_URL = "https://qyapi.weixin.qq.com/cgi-bin/kf/send_msg_on_event?access_token={accessToken}";
private static final String REDIS_TOKEN_KEY = "wecom:access_token";
private static final String REDIS_TOKEN_LOCK_KEY = "wecom:access_token_lock";
private static final String REDIS_CURSOR_KEY_PREFIX = "wecom:cursor:";
private final WecomConfig wecomConfig;
private final StringRedisTemplate redisTemplate;
private final RestTemplate restTemplate = new RestTemplate();
@Override
public String getChannelType() {
return CHANNEL_TYPE;
}
@Override
public boolean sendMessage(OutboundMessage message) {
WxSendMessageRequest wxRequest = convertToWxRequest(message);
return sendWxMessage(wxRequest);
}
private WxSendMessageRequest convertToWxRequest(OutboundMessage message) {
String msgType = message.getMsgType();
if (msgType == null || msgType.isEmpty()) {
msgType = InboundMessage.MSG_TYPE_TEXT;
}
WxSendMessageRequest wxRequest = new WxSendMessageRequest();
wxRequest.setTouser(message.getReceiver());
wxRequest.setOpenKfid(message.getKfId());
wxRequest.setMsgtype(msgType);
switch (msgType) {
case InboundMessage.MSG_TYPE_TEXT:
default:
WxSendMessageRequest.TextContent textContent = new WxSendMessageRequest.TextContent();
textContent.setContent(message.getContent());
wxRequest.setText(textContent);
break;
}
return wxRequest;
}
private boolean sendWxMessage(WxSendMessageRequest request) {
String accessToken = getAccessToken();
String url = SEND_MESSAGE_URL.replace("{accessToken}", accessToken);
HttpHeaders headers = new HttpHeaders();
headers.setContentType(MediaType.APPLICATION_JSON);
HttpEntity<String> entity = new HttpEntity<>(JSON.toJSONString(request), headers);
ResponseEntity<String> response = restTemplate.postForEntity(url, entity, String.class);
JSONObject json = JSON.parseObject(response.getBody());
if (json.getInteger("errcode") != null && json.getInteger("errcode") != 0) {
log.error("[AC-MCA-02] 发送消息失败: {}", json);
return false;
}
log.info("[AC-MCA-02] 消息发送成功: msgId={}", json.getString("msgid"));
return true;
}
public boolean sendTextMessage(String touser, String openKfid, String content) {
WxSendMessageRequest request = WxSendMessageRequest.text(touser, openKfid, content);
return sendWxMessage(request);
}
@Override
public ServiceStateResponse getServiceState(String kfId, String customerId) {
String accessToken = getAccessToken();
String url = GET_SERVICE_STATE_URL.replace("{accessToken}", accessToken);
JSONObject body = new JSONObject();
body.put("open_kfid", kfId);
body.put("external_userid", customerId);
HttpHeaders headers = new HttpHeaders();
headers.setContentType(MediaType.APPLICATION_JSON);
HttpEntity<String> entity = new HttpEntity<>(body.toJSONString(), headers);
ResponseEntity<String> response = restTemplate.postForEntity(url, entity, String.class);
log.info("[AC-MCA-02] 获取会话状态响应: {}", response.getBody());
return JSON.parseObject(response.getBody(), ServiceStateResponse.class);
}
@Override
public boolean transServiceState(String kfId, String customerId, int newState, String servicerId) {
String accessToken = getAccessToken();
String url = TRANS_SERVICE_STATE_URL.replace("{accessToken}", accessToken);
JSONObject body = new JSONObject();
body.put("open_kfid", kfId);
body.put("external_userid", customerId);
body.put("service_state", newState);
if (servicerId != null && !servicerId.isEmpty()) {
body.put("servicer_userid", servicerId);
}
HttpHeaders headers = new HttpHeaders();
headers.setContentType(MediaType.APPLICATION_JSON);
HttpEntity<String> entity = new HttpEntity<>(body.toJSONString(), headers);
ResponseEntity<String> response = restTemplate.postForEntity(url, entity, String.class);
log.info("[AC-MCA-02] 变更会话状态响应: {}", response.getBody());
JSONObject result = JSON.parseObject(response.getBody());
return result.getInteger("errcode") == null || result.getInteger("errcode") == 0;
}
@Override
public boolean transferToPool(String kfId, String customerId) {
return transServiceState(kfId, customerId, ServiceStateResponse.STATE_POOL, null);
}
@Override
public boolean transferToManual(String kfId, String customerId, String servicerId) {
return transServiceState(kfId, customerId, ServiceStateResponse.STATE_MANUAL, servicerId);
}
@Override
public SyncMsgResponse syncMessages(String kfId, String cursor) {
String accessToken = getAccessToken();
String url = SYNC_MESSAGE_URL.replace("{accessToken}", accessToken);
String savedCursor = cursor != null ? cursor : getCursor(kfId);
JSONObject body = new JSONObject();
body.put("open_kfid", kfId);
if (savedCursor != null && !savedCursor.isEmpty()) {
body.put("cursor", savedCursor);
}
body.put("limit", 1000);
HttpHeaders headers = new HttpHeaders();
headers.setContentType(MediaType.APPLICATION_JSON);
HttpEntity<String> entity = new HttpEntity<>(body.toJSONString(), headers);
ResponseEntity<String> response = restTemplate.postForEntity(url, entity, String.class);
log.info("[AC-MCA-02] sync_msg响应: {}", response.getBody());
SyncMsgResponse syncResponse = JSON.parseObject(response.getBody(), SyncMsgResponse.class);
if (syncResponse.isSuccess() && syncResponse.getNextCursor() != null) {
saveCursor(kfId, syncResponse.getNextCursor());
}
return syncResponse;
}
public boolean sendWelcomeMsg(String code, String content) {
String accessToken = getAccessToken();
String url = SEND_EVENT_MSG_URL.replace("{accessToken}", accessToken);
JSONObject body = new JSONObject();
body.put("code", code);
body.put("msgtype", "text");
JSONObject text = new JSONObject();
text.put("content", content);
body.put("text", text);
HttpHeaders headers = new HttpHeaders();
headers.setContentType(MediaType.APPLICATION_JSON);
HttpEntity<String> entity = new HttpEntity<>(body.toJSONString(), headers);
ResponseEntity<String> response = restTemplate.postForEntity(url, entity, String.class);
JSONObject json = JSON.parseObject(response.getBody());
if (json.getInteger("errcode") != null && json.getInteger("errcode") != 0) {
log.error("[AC-MCA-02] 发送欢迎语失败: {}", json);
return false;
}
log.info("[AC-MCA-02] 发送欢迎语成功");
return true;
}
public boolean endSession(String kfId, String customerId) {
return transServiceState(kfId, customerId, ServiceStateResponse.STATE_CLOSED, null);
}
private String getAccessToken() {
String cachedToken = redisTemplate.opsForValue().get(REDIS_TOKEN_KEY);
if (cachedToken != null) {
return cachedToken;
}
Boolean locked = redisTemplate.opsForValue().setIfAbsent(REDIS_TOKEN_LOCK_KEY, "1", 10, TimeUnit.SECONDS);
if (Boolean.TRUE.equals(locked)) {
try {
String url = GET_ACCESS_TOKEN_URL
.replace("{corpId}", wecomConfig.getCorpId())
.replace("{secret}", wecomConfig.getSecret());
ResponseEntity<String> response = restTemplate.getForEntity(url, String.class);
JSONObject json = JSON.parseObject(response.getBody());
if (json.getInteger("errcode") != null && json.getInteger("errcode") != 0) {
log.error("[AC-MCA-02] 获取access_token失败: {}", json);
throw new RuntimeException("获取access_token失败: " + json.getString("errmsg"));
}
String accessToken = json.getString("access_token");
long expiresIn = json.getLongValue("expires_in");
redisTemplate.opsForValue().set(REDIS_TOKEN_KEY, accessToken, expiresIn - 300, TimeUnit.SECONDS);
return accessToken;
} finally {
redisTemplate.delete(REDIS_TOKEN_LOCK_KEY);
}
} else {
try {
Thread.sleep(100);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
return getAccessToken();
}
}
private String getCursor(String openKfid) {
return redisTemplate.opsForValue().get(REDIS_CURSOR_KEY_PREFIX + openKfid);
}
private void saveCursor(String openKfid, String cursor) {
redisTemplate.opsForValue().set(REDIS_CURSOR_KEY_PREFIX + openKfid, cursor);
}
public void clearCursor(String openKfid) {
redisTemplate.delete(REDIS_CURSOR_KEY_PREFIX + openKfid);
}
}

View File

@ -0,0 +1,252 @@
package com.wecom.robot.service.impl;
import com.wecom.robot.adapter.ChannelAdapter;
import com.wecom.robot.adapter.TransferCapable;
import com.wecom.robot.dto.InboundMessage;
import com.wecom.robot.dto.OutboundMessage;
import com.wecom.robot.entity.Message;
import com.wecom.robot.entity.Session;
import com.wecom.robot.service.*;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Service;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
/**
* 消息路由服务实现 - 渠道无关的消息路由核心逻辑
*
* <p>关联 AC: [AC-MCA-08] 统一消息路由, [AC-MCA-09] 状态路由, [AC-MCA-10] 人工客服分发
*
* @see MessageRouterService
* @see InboundMessage
*/
@Slf4j
@Service
@RequiredArgsConstructor
public class MessageRouterServiceImpl implements MessageRouterService {
private static final String IDEMPOTENT_KEY_PREFIX = "idempotent:";
private static final long IDEMPOTENT_TTL_HOURS = 1;
private final SessionManagerService sessionManagerService;
private final AiService aiService;
private final TransferService transferService;
private final WebSocketService webSocketService;
private final Map<String, ChannelAdapter> channelAdapters;
private final StringRedisTemplate redisTemplate;
@Override
@Async
public void processInboundMessage(InboundMessage message) {
log.info("[AC-MCA-08] 处理入站消息: channelType={}, channelMessageId={}, sessionKey={}",
message.getChannelType(), message.getChannelMessageId(), message.getSessionKey());
if (!checkIdempotent(message.getChannelMessageId())) {
log.info("重复消息,跳过处理: channelMessageId={}", message.getChannelMessageId());
return;
}
Session session = getOrCreateSession(message);
saveInboundMessage(session, message);
routeBySessionState(session, message);
}
@Override
public void routeBySessionState(Session session, InboundMessage message) {
log.info("[AC-MCA-09] 根据会话状态路由: sessionId={}, status={}",
session.getSessionId(), session.getStatus());
String status = session.getStatus();
if (status == null) {
status = Session.STATUS_AI;
}
switch (status) {
case Session.STATUS_AI:
dispatchToAiService(session, message);
break;
case Session.STATUS_PENDING:
dispatchToPendingPool(session, message);
break;
case Session.STATUS_MANUAL:
dispatchToManualCs(session, message);
break;
case Session.STATUS_CLOSED:
Session newSession = sessionManagerService.getOrCreateSession(
message.getCustomerId(), message.getKfId());
dispatchToAiService(newSession, message);
break;
default:
log.warn("未知的会话状态: {}, 默认路由到AI服务", status);
dispatchToAiService(session, message);
}
}
@Override
public void dispatchToAiService(Session session, InboundMessage message) {
log.info("[AC-MCA-08] 分发到AI服务: sessionId={}, content={}",
session.getSessionId(), truncateContent(message.getContent()));
List<Message> history = sessionManagerService.getSessionMessages(session.getSessionId());
String reply = aiService.generateReply(message.getContent(), history);
double confidence = aiService.getLastConfidence();
int messageCount = sessionManagerService.getMessageCount(session.getSessionId());
boolean shouldTransfer = transferService.shouldTransferToManual(
message.getContent(),
confidence,
messageCount,
session.getCreatedAt()
);
if (shouldTransfer) {
handleTransferToManual(session, message, reply);
} else {
sendReplyToUser(session, message, reply);
}
}
@Override
public void dispatchToManualCs(Session session, InboundMessage message) {
log.info("[AC-MCA-10] 分发到人工客服: sessionId={}, manualCsId={}",
session.getSessionId(), session.getManualCsId());
Map<String, Object> wsMessage = Map.of(
"type", "customer_message",
"sessionId", session.getSessionId(),
"content", message.getContent(),
"msgType", message.getMsgType(),
"customerId", message.getCustomerId(),
"channelType", message.getChannelType(),
"timestamp", System.currentTimeMillis()
);
webSocketService.notifyNewMessage(session.getSessionId(),
createWxCallbackMessage(message));
log.info("消息已推送给人工客服: sessionId={}", session.getSessionId());
}
@Override
public void dispatchToPendingPool(Session session, InboundMessage message) {
log.info("[AC-MCA-10] 分发到待接入池: sessionId={}", session.getSessionId());
webSocketService.notifyNewPendingSession(session.getSessionId());
log.info("已通知待接入池有新消息: sessionId={}", session.getSessionId());
}
private boolean checkIdempotent(String channelMessageId) {
if (channelMessageId == null || channelMessageId.isEmpty()) {
log.warn("channelMessageId 为空,跳过幂等检查");
return true;
}
String key = IDEMPOTENT_KEY_PREFIX + channelMessageId;
Boolean absent = redisTemplate.opsForValue()
.setIfAbsent(key, "1", IDEMPOTENT_TTL_HOURS, TimeUnit.HOURS);
return Boolean.TRUE.equals(absent);
}
private Session getOrCreateSession(InboundMessage message) {
return sessionManagerService.getOrCreateSession(
message.getCustomerId(),
message.getKfId()
);
}
private void saveInboundMessage(Session session, InboundMessage message) {
sessionManagerService.saveMessage(
message.getChannelMessageId(),
session.getSessionId(),
Message.SENDER_TYPE_CUSTOMER,
message.getCustomerId(),
message.getContent(),
message.getMsgType(),
message.getRawPayload()
);
}
private void handleTransferToManual(Session session, InboundMessage message, String reply) {
String reason = transferService.getTransferReason(
message.getContent(),
aiService.getLastConfidence(),
sessionManagerService.getMessageCount(session.getSessionId())
);
sessionManagerService.transferToManual(session.getSessionId(), reason);
String transferReply = reply + "\n\n正在为您转接人工客服请稍候...";
ChannelAdapter adapter = channelAdapters.get(message.getChannelType());
if (adapter != null) {
OutboundMessage outbound = OutboundMessage.builder()
.channelType(message.getChannelType())
.receiver(message.getCustomerId())
.kfId(message.getKfId())
.content(transferReply)
.msgType("text")
.build();
adapter.sendMessage(outbound);
if (adapter instanceof TransferCapable) {
boolean transferred = ((TransferCapable) adapter)
.transferToPool(message.getKfId(), message.getCustomerId());
if (transferred) {
log.info("已将会话转入待接入池: sessionId={}", session.getSessionId());
}
}
}
webSocketService.notifyNewPendingSession(session.getSessionId());
}
private void sendReplyToUser(Session session, InboundMessage message, String reply) {
ChannelAdapter adapter = channelAdapters.get(message.getChannelType());
if (adapter != null) {
OutboundMessage outbound = OutboundMessage.builder()
.channelType(message.getChannelType())
.receiver(message.getCustomerId())
.kfId(message.getKfId())
.content(reply)
.msgType("text")
.build();
adapter.sendMessage(outbound);
}
sessionManagerService.saveMessage(
"ai_" + System.currentTimeMillis(),
session.getSessionId(),
Message.SENDER_TYPE_AI,
"AI",
reply,
"text",
null
);
}
private com.wecom.robot.dto.WxCallbackMessage createWxCallbackMessage(InboundMessage message) {
com.wecom.robot.dto.WxCallbackMessage wxMessage = new com.wecom.robot.dto.WxCallbackMessage();
wxMessage.setExternalUserId(message.getCustomerId());
wxMessage.setOpenKfId(message.getKfId());
wxMessage.setContent(message.getContent());
wxMessage.setMsgType(message.getMsgType());
return wxMessage;
}
private String truncateContent(String content) {
if (content == null) {
return null;
}
return content.length() > 50 ? content.substring(0, 50) + "..." : content;
}
}