refactor(MCA): integrate MessageRouterService into MessageProcessService [TASK-022] [AC-MCA-08]

This commit is contained in:
MerCry 2026-02-24 01:28:27 +08:00
parent f1e5c931bc
commit db378afd6d
1 changed files with 54 additions and 145 deletions

View File

@ -1,6 +1,7 @@
package com.wecom.robot.service; package com.wecom.robot.service;
import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSON;
import com.wecom.robot.dto.InboundMessage;
import com.wecom.robot.dto.ServiceStateResponse; import com.wecom.robot.dto.ServiceStateResponse;
import com.wecom.robot.dto.SyncMsgResponse; import com.wecom.robot.dto.SyncMsgResponse;
import com.wecom.robot.dto.WxCallbackMessage; import com.wecom.robot.dto.WxCallbackMessage;
@ -11,25 +12,35 @@ import lombok.extern.slf4j.Slf4j;
import org.springframework.scheduling.annotation.Async; import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
import java.util.List; /**
* 消息处理服务 - 微信渠道消息处理入口
*
* <p>职责
* <ul>
* <li>处理微信回调事件消息同步会话事件等</li>
* <li>构建 InboundMessage 并委托给 MessageRouterService</li>
* </ul>
*
* <p>关联 AC: [AC-MCA-08] 统一消息路由
*
* @see MessageRouterService
* @see InboundMessage
*/
@Slf4j @Slf4j
@Service @Service
@RequiredArgsConstructor @RequiredArgsConstructor
public class MessageProcessService { public class MessageProcessService {
private final SessionManagerService sessionManagerService; private final SessionManagerService sessionManagerService;
private final AiService aiService;
private final TransferService transferService;
private final WecomApiService wecomApiService; private final WecomApiService wecomApiService;
private final WebSocketService webSocketService; private final MessageRouterService messageRouterService;
@Async @Async
public void processKfMessageEvent(WxCallbackMessage event) { public void processKfMessageEvent(WxCallbackMessage event) {
String openKfId = event.getOpenKfId(); String openKfId = event.getOpenKfId();
String token = event.getToken(); String token = event.getToken();
log.info("处理客户消息事件: openKfId={}, token={}", openKfId, token); log.info("[AC-MCA-08] 处理客户消息事件: openKfId={}, token={}", openKfId, token);
if (openKfId == null) { if (openKfId == null) {
log.warn("事件缺少openKfId"); log.warn("事件缺少openKfId");
@ -81,7 +92,7 @@ public class MessageProcessService {
String customerId = msgItem.getExternalUserId(); String customerId = msgItem.getExternalUserId();
String kfId = msgItem.getOpenKfId(); String kfId = msgItem.getOpenKfId();
log.info("处理消息项: msgId={}, origin={}, msgType={}, customerId={}", log.info("[AC-MCA-08] 处理消息项: msgId={}, origin={}, msgType={}, customerId={}",
msgItem.getMsgId(), msgItem.getOrigin(), msgItem.getMsgType(), customerId); msgItem.getMsgId(), msgItem.getOrigin(), msgItem.getMsgType(), customerId);
if (msgItem.isEvent()) { if (msgItem.isEvent()) {
@ -99,21 +110,6 @@ public class MessageProcessService {
return; return;
} }
String content = extractContent(msgItem);
String msgType = msgItem.getMsgType();
Session session = sessionManagerService.getOrCreateSession(customerId, kfId);
sessionManagerService.saveMessage(
msgItem.getMsgId(),
session.getSessionId(),
Message.SENDER_TYPE_CUSTOMER,
customerId,
content,
msgType,
msgItem.getOriginData()
);
ServiceStateResponse wxState = wecomApiService.getServiceState(kfId, customerId); ServiceStateResponse wxState = wecomApiService.getServiceState(kfId, customerId);
if (!wxState.isSuccess()) { if (!wxState.isSuccess()) {
log.warn("获取微信会话状态失败: errcode={}, errmsg={}", log.warn("获取微信会话状态失败: errcode={}, errmsg={}",
@ -121,9 +117,13 @@ public class MessageProcessService {
} }
log.info("微信会话状态: {} ({})", wxState.getStateDesc(), wxState.getServiceState()); log.info("微信会话状态: {} ({})", wxState.getStateDesc(), wxState.getServiceState());
Session session = sessionManagerService.getOrCreateSession(customerId, kfId);
sessionManagerService.updateWxServiceState(session.getSessionId(), wxState.getServiceState()); sessionManagerService.updateWxServiceState(session.getSessionId(), wxState.getServiceState());
processByWxState(session, customerId, kfId, content, msgType, wxState); InboundMessage inboundMessage = buildInboundMessage(msgItem, customerId, kfId);
messageRouterService.processInboundMessage(inboundMessage);
} }
private void processEventMessage(SyncMsgResponse.MsgItem msgItem) { private void processEventMessage(SyncMsgResponse.MsgItem msgItem) {
@ -186,7 +186,6 @@ public class MessageProcessService {
Integer changeType = event.getChangeType(); Integer changeType = event.getChangeType();
String newServicerUserid = event.getNewServicerUserid(); String newServicerUserid = event.getNewServicerUserid();
String oldServicerUserid = event.getOldServicerUserid(); String oldServicerUserid = event.getOldServicerUserid();
String msgCode = event.getMsgCode();
log.info("会话状态变更: changeType={}, oldServicer={}, newServicer={}", log.info("会话状态变更: changeType={}, oldServicer={}, newServicer={}",
changeType, oldServicerUserid, newServicerUserid); changeType, oldServicerUserid, newServicerUserid);
@ -215,97 +214,23 @@ public class MessageProcessService {
} }
} }
private void processByWxState(Session session, String customerId, String kfId, private InboundMessage buildInboundMessage(SyncMsgResponse.MsgItem msgItem,
String content, String msgType, ServiceStateResponse wxState) { String customerId, String kfId) {
Integer state = wxState.getServiceState(); String content = extractContent(msgItem);
String sessionKey = kfId + "_" + customerId;
if (state == null) { return InboundMessage.builder()
state = ServiceStateResponse.STATE_UNTREATED; .channelType(InboundMessage.CHANNEL_WECHAT)
} .channelMessageId(msgItem.getMsgId())
.sessionKey(sessionKey)
switch (state) { .customerId(customerId)
case ServiceStateResponse.STATE_UNTREATED: .kfId(kfId)
case ServiceStateResponse.STATE_AI: .sender(customerId)
processAiMessage(session, customerId, kfId, content); .content(content)
break; .msgType(msgItem.getMsgType())
case ServiceStateResponse.STATE_POOL: .rawPayload(msgItem.getOriginData())
notifyPendingSession(session, customerId, kfId, content, msgType); .timestamp(System.currentTimeMillis())
break; .build();
case ServiceStateResponse.STATE_MANUAL:
pushToManualCs(session, customerId, kfId, content, msgType, wxState.getServicerUserid());
break;
case ServiceStateResponse.STATE_CLOSED:
Session newSession = sessionManagerService.getOrCreateSession(customerId, kfId);
processAiMessage(newSession, customerId, kfId, content);
break;
default:
log.warn("未知的微信会话状态: {}", state);
processAiMessage(session, customerId, kfId, content);
}
}
private void processAiMessage(Session session, String customerId, String kfId, String content) {
List<Message> history = sessionManagerService.getSessionMessages(session.getSessionId());
String reply = aiService.generateReply(content, history);
double confidence = aiService.getLastConfidence();
int messageCount = sessionManagerService.getMessageCount(session.getSessionId());
boolean shouldTransfer = transferService.shouldTransferToManual(
content,
confidence,
messageCount,
session.getCreatedAt()
);
if (shouldTransfer) {
String reason = transferService.getTransferReason(content, confidence, messageCount);
sessionManagerService.transferToManual(session.getSessionId(), reason);
reply = reply + "\n\n正在为您转接人工客服请稍候...";
wecomApiService.sendTextMessage(customerId, kfId, reply);
boolean transferred = wecomApiService.transferToPool(kfId, customerId);
if (transferred) {
log.info("已将会话转入待接入池: customerId={}, kfId={}", customerId, kfId);
sessionManagerService.updateWxServiceState(session.getSessionId(), ServiceStateResponse.STATE_POOL);
}
webSocketService.notifyNewPendingSession(session.getSessionId());
} else {
wecomApiService.sendTextMessage(customerId, kfId, reply);
sessionManagerService.saveMessage(
"ai_" + System.currentTimeMillis(),
session.getSessionId(),
Message.SENDER_TYPE_AI,
"AI",
reply,
"text",
null
);
}
}
private void notifyPendingSession(Session session, String customerId, String kfId,
String content, String msgType) {
WxCallbackMessage notifyMsg = new WxCallbackMessage();
notifyMsg.setExternalUserId(customerId);
notifyMsg.setOpenKfId(kfId);
notifyMsg.setContent(content);
notifyMsg.setMsgType(msgType);
webSocketService.notifyNewMessage(session.getSessionId(), notifyMsg);
}
private void pushToManualCs(Session session, String customerId, String kfId,
String content, String msgType, String servicerUserid) {
WxCallbackMessage pushMsg = new WxCallbackMessage();
pushMsg.setExternalUserId(customerId);
pushMsg.setOpenKfId(kfId);
pushMsg.setContent(content);
pushMsg.setMsgType(msgType);
pushMsg.setServicerUserid(servicerUserid);
webSocketService.pushMessageToCs(session.getSessionId(), pushMsg);
} }
private String extractContent(SyncMsgResponse.MsgItem msgItem) { private String extractContent(SyncMsgResponse.MsgItem msgItem) {
@ -347,7 +272,7 @@ public class MessageProcessService {
@Async @Async
public void processMessage(WxCallbackMessage message) { public void processMessage(WxCallbackMessage message) {
log.info("直接处理消息(测试用): msgType={}", message.getMsgType()); log.info("[AC-MCA-08] 直接处理消息(测试用): msgType={}", message.getMsgType());
String customerId = message.getExternalUserId(); String customerId = message.getExternalUserId();
String kfId = message.getOpenKfId(); String kfId = message.getOpenKfId();
@ -357,37 +282,21 @@ public class MessageProcessService {
return; return;
} }
Session session = sessionManagerService.getOrCreateSession(customerId, kfId); String sessionKey = kfId + "_" + customerId;
String status = sessionManagerService.getSessionStatus(session.getSessionId());
sessionManagerService.saveMessage( InboundMessage inboundMessage = InboundMessage.builder()
message.getMsgId() != null ? message.getMsgId() : "test_" + System.currentTimeMillis(), .channelType(InboundMessage.CHANNEL_WECHAT)
session.getSessionId(), .channelMessageId(message.getMsgId() != null ? message.getMsgId() : "test_" + System.currentTimeMillis())
Message.SENDER_TYPE_CUSTOMER, .sessionKey(sessionKey)
customerId, .customerId(customerId)
message.getContent(), .kfId(kfId)
message.getMsgType() != null ? message.getMsgType() : "text", .sender(customerId)
JSON.toJSONString(message.getRawData()) .content(message.getContent())
); .msgType(message.getMsgType() != null ? message.getMsgType() : "text")
.rawPayload(JSON.toJSONString(message.getRawData()))
.timestamp(System.currentTimeMillis())
.build();
List<Message> history = sessionManagerService.getSessionMessages(session.getSessionId()); messageRouterService.processInboundMessage(inboundMessage);
switch (status) {
case Session.STATUS_AI:
processAiMessage(session, customerId, kfId, message.getContent());
break;
case Session.STATUS_PENDING:
notifyPendingSession(session, customerId, kfId, message.getContent(), message.getMsgType());
break;
case Session.STATUS_MANUAL:
pushToManualCs(session, customerId, kfId, message.getContent(), message.getMsgType(), null);
break;
case Session.STATUS_CLOSED:
Session newSession = sessionManagerService.getOrCreateSession(customerId, kfId);
processAiMessage(newSession, customerId, kfId, message.getContent());
break;
default:
log.warn("未知的会话状态: {}", status);
}
} }
} }