ai-robot-channel/src/main/java/com/wecom/robot/service/impl/MessageRouterServiceImpl.java

264 lines
10 KiB
Java
Raw Normal View History

package com.wecom.robot.service.impl;
import com.wecom.robot.adapter.ChannelAdapter;
import com.wecom.robot.adapter.TransferCapable;
import com.wecom.robot.dto.InboundMessage;
import com.wecom.robot.dto.OutboundMessage;
import com.wecom.robot.dto.ai.ChatRequest;
import com.wecom.robot.dto.ai.ChatResponse;
import com.wecom.robot.entity.Message;
import com.wecom.robot.entity.Session;
import com.wecom.robot.service.*;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Service;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
@Slf4j
@Service
@RequiredArgsConstructor
public class MessageRouterServiceImpl implements MessageRouterService {
private static final String IDEMPOTENT_KEY_PREFIX = "idempotent:";
private static final long IDEMPOTENT_TTL_HOURS = 1;
private final SessionManagerService sessionManagerService;
private final AiServiceClient aiServiceClient;
private final TransferService transferService;
private final WebSocketService webSocketService;
private final Map<String, ChannelAdapter> channelAdapters;
private final StringRedisTemplate redisTemplate;
@Override
@Async
public void processInboundMessage(InboundMessage message) {
log.info("[AC-MCA-08] 处理入站消息: channelType={}, channelMessageId={}, sessionKey={}",
message.getChannelType(), message.getChannelMessageId(), message.getSessionKey());
if (!checkIdempotent(message.getChannelMessageId())) {
log.info("重复消息,跳过处理: channelMessageId={}", message.getChannelMessageId());
return;
}
Session session = getOrCreateSession(message);
saveInboundMessage(session, message);
routeBySessionState(session, message);
}
@Override
public void routeBySessionState(Session session, InboundMessage message) {
log.info("[AC-MCA-09] 根据会话状态路由: sessionId={}, status={}",
session.getSessionId(), session.getStatus());
String status = session.getStatus();
if (status == null) {
status = Session.STATUS_AI;
}
switch (status) {
case Session.STATUS_AI:
dispatchToAiService(session, message);
break;
case Session.STATUS_PENDING:
dispatchToPendingPool(session, message);
break;
case Session.STATUS_MANUAL:
dispatchToManualCs(session, message);
break;
case Session.STATUS_CLOSED:
Session newSession = sessionManagerService.getOrCreateSession(
message.getCustomerId(), message.getKfId());
dispatchToAiService(newSession, message);
break;
default:
log.warn("未知的会话状态: {}, 默认路由到AI服务", status);
dispatchToAiService(session, message);
}
}
@Override
public void dispatchToAiService(Session session, InboundMessage message) {
log.info("[AC-MCA-08] 分发到AI服务: sessionId={}, content={}",
session.getSessionId(), truncateContent(message.getContent()));
List<Message> history = sessionManagerService.getSessionMessages(session.getSessionId());
ChatRequest chatRequest = ChatRequest.fromInboundMessage(message);
ChatResponse chatResponse;
try {
chatResponse = aiServiceClient.generateReply(chatRequest).get();
} catch (Exception e) {
log.error("[AC-MCA-06] AI服务调用失败: {}", e.getMessage());
chatResponse = ChatResponse.fallbackWithTransfer(
"抱歉,我暂时无法回答您的问题,正在为您转接人工客服...",
e.getMessage()
);
}
String reply = chatResponse.getReply();
double confidence = chatResponse.getConfidence() != null ? chatResponse.getConfidence() : 0.0;
int messageCount = sessionManagerService.getMessageCount(session.getSessionId());
boolean shouldTransfer = chatResponse.getShouldTransfer() != null && chatResponse.getShouldTransfer();
if (!shouldTransfer) {
shouldTransfer = transferService.shouldTransferToManual(
message.getContent(),
confidence,
messageCount,
session.getCreatedAt()
);
}
if (shouldTransfer) {
handleTransferToManual(session, message, reply, chatResponse.getTransferReason());
} else {
sendReplyToUser(session, message, reply);
}
}
@Override
public void dispatchToManualCs(Session session, InboundMessage message) {
log.info("[AC-MCA-10] 分发到人工客服: sessionId={}, manualCsId={}",
session.getSessionId(), session.getManualCsId());
Map<String, Object> wsMessage = new HashMap<>();
wsMessage.put("type", "customer_message");
wsMessage.put("sessionId", session.getSessionId());
wsMessage.put("content", message.getContent());
wsMessage.put("msgType", message.getMsgType());
wsMessage.put("customerId", message.getCustomerId());
wsMessage.put("channelType", message.getChannelType());
wsMessage.put("timestamp", System.currentTimeMillis());
webSocketService.notifyNewMessage(session.getSessionId(),
createWxCallbackMessage(message));
log.info("消息已推送给人工客服: sessionId={}", session.getSessionId());
}
@Override
public void dispatchToPendingPool(Session session, InboundMessage message) {
log.info("[AC-MCA-10] 分发到待接入池: sessionId={}", session.getSessionId());
webSocketService.notifyNewPendingSession(session.getSessionId());
log.info("已通知待接入池有新消息: sessionId={}", session.getSessionId());
}
private boolean checkIdempotent(String channelMessageId) {
if (channelMessageId == null || channelMessageId.isEmpty()) {
log.warn("channelMessageId 为空,跳过幂等检查");
return true;
}
String key = IDEMPOTENT_KEY_PREFIX + channelMessageId;
Boolean absent = redisTemplate.opsForValue()
.setIfAbsent(key, "1", IDEMPOTENT_TTL_HOURS, TimeUnit.HOURS);
return Boolean.TRUE.equals(absent);
}
private Session getOrCreateSession(InboundMessage message) {
return sessionManagerService.getOrCreateSession(
message.getCustomerId(),
message.getKfId(),
message.getChannelType()
);
}
private void saveInboundMessage(Session session, InboundMessage message) {
sessionManagerService.saveMessage(
message.getChannelMessageId(),
session.getSessionId(),
Message.SENDER_TYPE_CUSTOMER,
message.getCustomerId(),
message.getContent(),
message.getMsgType(),
message.getRawPayload()
);
}
private void handleTransferToManual(Session session, InboundMessage message, String reply, String transferReason) {
String reason = transferReason != null ? transferReason : transferService.getTransferReason(
message.getContent(),
0.0,
sessionManagerService.getMessageCount(session.getSessionId())
);
sessionManagerService.transferToManual(session.getSessionId(), reason);
String transferReply = reply + "\n\n正在为您转接人工客服请稍候...";
ChannelAdapter adapter = channelAdapters.get(message.getChannelType());
if (adapter != null) {
OutboundMessage outbound = OutboundMessage.builder()
.channelType(message.getChannelType())
.receiver(message.getCustomerId())
.kfId(message.getKfId())
.content(transferReply)
.msgType("text")
.build();
adapter.sendMessage(outbound);
if (adapter instanceof TransferCapable) {
boolean transferred = ((TransferCapable) adapter)
.transferToPool(message.getKfId(), message.getCustomerId());
if (transferred) {
log.info("已将会话转入待接入池: sessionId={}", session.getSessionId());
}
}
}
webSocketService.notifyNewPendingSession(session.getSessionId());
}
private void sendReplyToUser(Session session, InboundMessage message, String reply) {
ChannelAdapter adapter = channelAdapters.get(message.getChannelType());
if (adapter != null) {
OutboundMessage outbound = OutboundMessage.builder()
.channelType(message.getChannelType())
.receiver(message.getCustomerId())
.kfId(message.getKfId())
.content(reply)
.msgType("text")
.build();
adapter.sendMessage(outbound);
}
sessionManagerService.saveMessage(
"ai_" + System.currentTimeMillis(),
session.getSessionId(),
Message.SENDER_TYPE_AI,
"AI",
reply,
"text",
null
);
}
private com.wecom.robot.dto.WxCallbackMessage createWxCallbackMessage(InboundMessage message) {
com.wecom.robot.dto.WxCallbackMessage wxMessage = new com.wecom.robot.dto.WxCallbackMessage();
wxMessage.setExternalUserId(message.getCustomerId());
wxMessage.setOpenKfId(message.getKfId());
wxMessage.setContent(message.getContent());
wxMessage.setMsgType(message.getMsgType());
return wxMessage;
}
private String truncateContent(String content) {
if (content == null) {
return null;
}
return content.length() > 50 ? content.substring(0, 50) + "..." : content;
}
}