diff --git a/src/main/java/com/wecom/robot/adapter/WeChatAdapter.java b/src/main/java/com/wecom/robot/adapter/WeChatAdapter.java new file mode 100644 index 0000000..e4a0464 --- /dev/null +++ b/src/main/java/com/wecom/robot/adapter/WeChatAdapter.java @@ -0,0 +1,275 @@ +package com.wecom.robot.adapter; + +import com.alibaba.fastjson.JSON; +import com.alibaba.fastjson.JSONObject; +import com.wecom.robot.config.WecomConfig; +import com.wecom.robot.dto.OutboundMessage; +import com.wecom.robot.dto.ServiceStateResponse; +import com.wecom.robot.dto.SyncMsgResponse; +import com.wecom.robot.dto.WxSendMessageRequest; +import com.wecom.robot.dto.InboundMessage; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.springframework.data.redis.core.StringRedisTemplate; +import org.springframework.http.*; +import org.springframework.stereotype.Component; +import org.springframework.web.client.RestTemplate; + +import java.util.concurrent.TimeUnit; + +/** + * 企业微信渠道适配器 + *

+ * 实现企业微信渠道的消息发送、服务状态管理、转人工、消息同步等能力。 + * [AC-MCA-02] 企业微信渠道适配器实现 + */ +@Slf4j +@Component +@RequiredArgsConstructor +public class WeChatAdapter implements ChannelAdapter, + ServiceStateCapable, TransferCapable, MessageSyncCapable { + + private static final String CHANNEL_TYPE = "wechat"; + + private static final String GET_ACCESS_TOKEN_URL = "https://qyapi.weixin.qq.com/cgi-bin/gettoken?corpid={corpId}&corpsecret={secret}"; + private static final String SEND_MESSAGE_URL = "https://qyapi.weixin.qq.com/cgi-bin/kf/send_msg?access_token={accessToken}"; + private static final String SYNC_MESSAGE_URL = "https://qyapi.weixin.qq.com/cgi-bin/kf/sync_msg?access_token={accessToken}"; + private static final String GET_SERVICE_STATE_URL = "https://qyapi.weixin.qq.com/cgi-bin/kf/service_state/get?access_token={accessToken}"; + private static final String TRANS_SERVICE_STATE_URL = "https://qyapi.weixin.qq.com/cgi-bin/kf/service_state/trans?access_token={accessToken}"; + private static final String SEND_EVENT_MSG_URL = "https://qyapi.weixin.qq.com/cgi-bin/kf/send_msg_on_event?access_token={accessToken}"; + + private static final String REDIS_TOKEN_KEY = "wecom:access_token"; + private static final String REDIS_TOKEN_LOCK_KEY = "wecom:access_token_lock"; + private static final String REDIS_CURSOR_KEY_PREFIX = "wecom:cursor:"; + + private final WecomConfig wecomConfig; + private final StringRedisTemplate redisTemplate; + private final RestTemplate restTemplate = new RestTemplate(); + + @Override + public String getChannelType() { + return CHANNEL_TYPE; + } + + @Override + public boolean sendMessage(OutboundMessage message) { + WxSendMessageRequest wxRequest = convertToWxRequest(message); + return sendWxMessage(wxRequest); + } + + private WxSendMessageRequest convertToWxRequest(OutboundMessage message) { + String msgType = message.getMsgType(); + if (msgType == null || msgType.isEmpty()) { + msgType = InboundMessage.MSG_TYPE_TEXT; + } + + WxSendMessageRequest wxRequest = new WxSendMessageRequest(); + wxRequest.setTouser(message.getReceiver()); + wxRequest.setOpenKfid(message.getKfId()); + wxRequest.setMsgtype(msgType); + + switch (msgType) { + case InboundMessage.MSG_TYPE_TEXT: + default: + WxSendMessageRequest.TextContent textContent = new WxSendMessageRequest.TextContent(); + textContent.setContent(message.getContent()); + wxRequest.setText(textContent); + break; + } + + return wxRequest; + } + + private boolean sendWxMessage(WxSendMessageRequest request) { + String accessToken = getAccessToken(); + String url = SEND_MESSAGE_URL.replace("{accessToken}", accessToken); + + HttpHeaders headers = new HttpHeaders(); + headers.setContentType(MediaType.APPLICATION_JSON); + + HttpEntity entity = new HttpEntity<>(JSON.toJSONString(request), headers); + ResponseEntity response = restTemplate.postForEntity(url, entity, String.class); + + JSONObject json = JSON.parseObject(response.getBody()); + if (json.getInteger("errcode") != null && json.getInteger("errcode") != 0) { + log.error("[AC-MCA-02] 发送消息失败: {}", json); + return false; + } + + log.info("[AC-MCA-02] 消息发送成功: msgId={}", json.getString("msgid")); + return true; + } + + public boolean sendTextMessage(String touser, String openKfid, String content) { + WxSendMessageRequest request = WxSendMessageRequest.text(touser, openKfid, content); + return sendWxMessage(request); + } + + @Override + public ServiceStateResponse getServiceState(String kfId, String customerId) { + String accessToken = getAccessToken(); + String url = GET_SERVICE_STATE_URL.replace("{accessToken}", accessToken); + + JSONObject body = new JSONObject(); + body.put("open_kfid", kfId); + body.put("external_userid", customerId); + + HttpHeaders headers = new HttpHeaders(); + headers.setContentType(MediaType.APPLICATION_JSON); + + HttpEntity entity = new HttpEntity<>(body.toJSONString(), headers); + ResponseEntity response = restTemplate.postForEntity(url, entity, String.class); + + log.info("[AC-MCA-02] 获取会话状态响应: {}", response.getBody()); + + return JSON.parseObject(response.getBody(), ServiceStateResponse.class); + } + + @Override + public boolean transServiceState(String kfId, String customerId, int newState, String servicerId) { + String accessToken = getAccessToken(); + String url = TRANS_SERVICE_STATE_URL.replace("{accessToken}", accessToken); + + JSONObject body = new JSONObject(); + body.put("open_kfid", kfId); + body.put("external_userid", customerId); + body.put("service_state", newState); + if (servicerId != null && !servicerId.isEmpty()) { + body.put("servicer_userid", servicerId); + } + + HttpHeaders headers = new HttpHeaders(); + headers.setContentType(MediaType.APPLICATION_JSON); + + HttpEntity entity = new HttpEntity<>(body.toJSONString(), headers); + ResponseEntity response = restTemplate.postForEntity(url, entity, String.class); + + log.info("[AC-MCA-02] 变更会话状态响应: {}", response.getBody()); + + JSONObject result = JSON.parseObject(response.getBody()); + return result.getInteger("errcode") == null || result.getInteger("errcode") == 0; + } + + @Override + public boolean transferToPool(String kfId, String customerId) { + return transServiceState(kfId, customerId, ServiceStateResponse.STATE_POOL, null); + } + + @Override + public boolean transferToManual(String kfId, String customerId, String servicerId) { + return transServiceState(kfId, customerId, ServiceStateResponse.STATE_MANUAL, servicerId); + } + + @Override + public SyncMsgResponse syncMessages(String kfId, String cursor) { + String accessToken = getAccessToken(); + String url = SYNC_MESSAGE_URL.replace("{accessToken}", accessToken); + + String savedCursor = cursor != null ? cursor : getCursor(kfId); + + JSONObject body = new JSONObject(); + body.put("open_kfid", kfId); + if (savedCursor != null && !savedCursor.isEmpty()) { + body.put("cursor", savedCursor); + } + body.put("limit", 1000); + + HttpHeaders headers = new HttpHeaders(); + headers.setContentType(MediaType.APPLICATION_JSON); + + HttpEntity entity = new HttpEntity<>(body.toJSONString(), headers); + ResponseEntity response = restTemplate.postForEntity(url, entity, String.class); + + log.info("[AC-MCA-02] sync_msg响应: {}", response.getBody()); + + SyncMsgResponse syncResponse = JSON.parseObject(response.getBody(), SyncMsgResponse.class); + + if (syncResponse.isSuccess() && syncResponse.getNextCursor() != null) { + saveCursor(kfId, syncResponse.getNextCursor()); + } + + return syncResponse; + } + + public boolean sendWelcomeMsg(String code, String content) { + String accessToken = getAccessToken(); + String url = SEND_EVENT_MSG_URL.replace("{accessToken}", accessToken); + + JSONObject body = new JSONObject(); + body.put("code", code); + body.put("msgtype", "text"); + JSONObject text = new JSONObject(); + text.put("content", content); + body.put("text", text); + + HttpHeaders headers = new HttpHeaders(); + headers.setContentType(MediaType.APPLICATION_JSON); + + HttpEntity entity = new HttpEntity<>(body.toJSONString(), headers); + ResponseEntity response = restTemplate.postForEntity(url, entity, String.class); + + JSONObject json = JSON.parseObject(response.getBody()); + if (json.getInteger("errcode") != null && json.getInteger("errcode") != 0) { + log.error("[AC-MCA-02] 发送欢迎语失败: {}", json); + return false; + } + + log.info("[AC-MCA-02] 发送欢迎语成功"); + return true; + } + + public boolean endSession(String kfId, String customerId) { + return transServiceState(kfId, customerId, ServiceStateResponse.STATE_CLOSED, null); + } + + private String getAccessToken() { + String cachedToken = redisTemplate.opsForValue().get(REDIS_TOKEN_KEY); + if (cachedToken != null) { + return cachedToken; + } + + Boolean locked = redisTemplate.opsForValue().setIfAbsent(REDIS_TOKEN_LOCK_KEY, "1", 10, TimeUnit.SECONDS); + if (Boolean.TRUE.equals(locked)) { + try { + String url = GET_ACCESS_TOKEN_URL + .replace("{corpId}", wecomConfig.getCorpId()) + .replace("{secret}", wecomConfig.getSecret()); + + ResponseEntity response = restTemplate.getForEntity(url, String.class); + JSONObject json = JSON.parseObject(response.getBody()); + + if (json.getInteger("errcode") != null && json.getInteger("errcode") != 0) { + log.error("[AC-MCA-02] 获取access_token失败: {}", json); + throw new RuntimeException("获取access_token失败: " + json.getString("errmsg")); + } + + String accessToken = json.getString("access_token"); + long expiresIn = json.getLongValue("expires_in"); + + redisTemplate.opsForValue().set(REDIS_TOKEN_KEY, accessToken, expiresIn - 300, TimeUnit.SECONDS); + return accessToken; + } finally { + redisTemplate.delete(REDIS_TOKEN_LOCK_KEY); + } + } else { + try { + Thread.sleep(100); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + return getAccessToken(); + } + } + + private String getCursor(String openKfid) { + return redisTemplate.opsForValue().get(REDIS_CURSOR_KEY_PREFIX + openKfid); + } + + private void saveCursor(String openKfid, String cursor) { + redisTemplate.opsForValue().set(REDIS_CURSOR_KEY_PREFIX + openKfid, cursor); + } + + public void clearCursor(String openKfid) { + redisTemplate.delete(REDIS_CURSOR_KEY_PREFIX + openKfid); + } +} diff --git a/src/main/java/com/wecom/robot/service/impl/MessageRouterServiceImpl.java b/src/main/java/com/wecom/robot/service/impl/MessageRouterServiceImpl.java new file mode 100644 index 0000000..46df6ce --- /dev/null +++ b/src/main/java/com/wecom/robot/service/impl/MessageRouterServiceImpl.java @@ -0,0 +1,252 @@ +package com.wecom.robot.service.impl; + +import com.wecom.robot.adapter.ChannelAdapter; +import com.wecom.robot.adapter.TransferCapable; +import com.wecom.robot.dto.InboundMessage; +import com.wecom.robot.dto.OutboundMessage; +import com.wecom.robot.entity.Message; +import com.wecom.robot.entity.Session; +import com.wecom.robot.service.*; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.springframework.data.redis.core.StringRedisTemplate; +import org.springframework.scheduling.annotation.Async; +import org.springframework.stereotype.Service; + +import java.util.List; +import java.util.Map; +import java.util.concurrent.TimeUnit; + +/** + * 消息路由服务实现 - 渠道无关的消息路由核心逻辑 + * + *

关联 AC: [AC-MCA-08] 统一消息路由, [AC-MCA-09] 状态路由, [AC-MCA-10] 人工客服分发 + * + * @see MessageRouterService + * @see InboundMessage + */ +@Slf4j +@Service +@RequiredArgsConstructor +public class MessageRouterServiceImpl implements MessageRouterService { + + private static final String IDEMPOTENT_KEY_PREFIX = "idempotent:"; + private static final long IDEMPOTENT_TTL_HOURS = 1; + + private final SessionManagerService sessionManagerService; + private final AiService aiService; + private final TransferService transferService; + private final WebSocketService webSocketService; + private final Map channelAdapters; + private final StringRedisTemplate redisTemplate; + + @Override + @Async + public void processInboundMessage(InboundMessage message) { + log.info("[AC-MCA-08] 处理入站消息: channelType={}, channelMessageId={}, sessionKey={}", + message.getChannelType(), message.getChannelMessageId(), message.getSessionKey()); + + if (!checkIdempotent(message.getChannelMessageId())) { + log.info("重复消息,跳过处理: channelMessageId={}", message.getChannelMessageId()); + return; + } + + Session session = getOrCreateSession(message); + + saveInboundMessage(session, message); + + routeBySessionState(session, message); + } + + @Override + public void routeBySessionState(Session session, InboundMessage message) { + log.info("[AC-MCA-09] 根据会话状态路由: sessionId={}, status={}", + session.getSessionId(), session.getStatus()); + + String status = session.getStatus(); + if (status == null) { + status = Session.STATUS_AI; + } + + switch (status) { + case Session.STATUS_AI: + dispatchToAiService(session, message); + break; + case Session.STATUS_PENDING: + dispatchToPendingPool(session, message); + break; + case Session.STATUS_MANUAL: + dispatchToManualCs(session, message); + break; + case Session.STATUS_CLOSED: + Session newSession = sessionManagerService.getOrCreateSession( + message.getCustomerId(), message.getKfId()); + dispatchToAiService(newSession, message); + break; + default: + log.warn("未知的会话状态: {}, 默认路由到AI服务", status); + dispatchToAiService(session, message); + } + } + + @Override + public void dispatchToAiService(Session session, InboundMessage message) { + log.info("[AC-MCA-08] 分发到AI服务: sessionId={}, content={}", + session.getSessionId(), truncateContent(message.getContent())); + + List history = sessionManagerService.getSessionMessages(session.getSessionId()); + String reply = aiService.generateReply(message.getContent(), history); + + double confidence = aiService.getLastConfidence(); + int messageCount = sessionManagerService.getMessageCount(session.getSessionId()); + + boolean shouldTransfer = transferService.shouldTransferToManual( + message.getContent(), + confidence, + messageCount, + session.getCreatedAt() + ); + + if (shouldTransfer) { + handleTransferToManual(session, message, reply); + } else { + sendReplyToUser(session, message, reply); + } + } + + @Override + public void dispatchToManualCs(Session session, InboundMessage message) { + log.info("[AC-MCA-10] 分发到人工客服: sessionId={}, manualCsId={}", + session.getSessionId(), session.getManualCsId()); + + Map wsMessage = Map.of( + "type", "customer_message", + "sessionId", session.getSessionId(), + "content", message.getContent(), + "msgType", message.getMsgType(), + "customerId", message.getCustomerId(), + "channelType", message.getChannelType(), + "timestamp", System.currentTimeMillis() + ); + + webSocketService.notifyNewMessage(session.getSessionId(), + createWxCallbackMessage(message)); + + log.info("消息已推送给人工客服: sessionId={}", session.getSessionId()); + } + + @Override + public void dispatchToPendingPool(Session session, InboundMessage message) { + log.info("[AC-MCA-10] 分发到待接入池: sessionId={}", session.getSessionId()); + + webSocketService.notifyNewPendingSession(session.getSessionId()); + + log.info("已通知待接入池有新消息: sessionId={}", session.getSessionId()); + } + + private boolean checkIdempotent(String channelMessageId) { + if (channelMessageId == null || channelMessageId.isEmpty()) { + log.warn("channelMessageId 为空,跳过幂等检查"); + return true; + } + + String key = IDEMPOTENT_KEY_PREFIX + channelMessageId; + Boolean absent = redisTemplate.opsForValue() + .setIfAbsent(key, "1", IDEMPOTENT_TTL_HOURS, TimeUnit.HOURS); + + return Boolean.TRUE.equals(absent); + } + + private Session getOrCreateSession(InboundMessage message) { + return sessionManagerService.getOrCreateSession( + message.getCustomerId(), + message.getKfId() + ); + } + + private void saveInboundMessage(Session session, InboundMessage message) { + sessionManagerService.saveMessage( + message.getChannelMessageId(), + session.getSessionId(), + Message.SENDER_TYPE_CUSTOMER, + message.getCustomerId(), + message.getContent(), + message.getMsgType(), + message.getRawPayload() + ); + } + + private void handleTransferToManual(Session session, InboundMessage message, String reply) { + String reason = transferService.getTransferReason( + message.getContent(), + aiService.getLastConfidence(), + sessionManagerService.getMessageCount(session.getSessionId()) + ); + + sessionManagerService.transferToManual(session.getSessionId(), reason); + + String transferReply = reply + "\n\n正在为您转接人工客服,请稍候..."; + + ChannelAdapter adapter = channelAdapters.get(message.getChannelType()); + if (adapter != null) { + OutboundMessage outbound = OutboundMessage.builder() + .channelType(message.getChannelType()) + .receiver(message.getCustomerId()) + .kfId(message.getKfId()) + .content(transferReply) + .msgType("text") + .build(); + adapter.sendMessage(outbound); + + if (adapter instanceof TransferCapable) { + boolean transferred = ((TransferCapable) adapter) + .transferToPool(message.getKfId(), message.getCustomerId()); + if (transferred) { + log.info("已将会话转入待接入池: sessionId={}", session.getSessionId()); + } + } + } + + webSocketService.notifyNewPendingSession(session.getSessionId()); + } + + private void sendReplyToUser(Session session, InboundMessage message, String reply) { + ChannelAdapter adapter = channelAdapters.get(message.getChannelType()); + if (adapter != null) { + OutboundMessage outbound = OutboundMessage.builder() + .channelType(message.getChannelType()) + .receiver(message.getCustomerId()) + .kfId(message.getKfId()) + .content(reply) + .msgType("text") + .build(); + adapter.sendMessage(outbound); + } + + sessionManagerService.saveMessage( + "ai_" + System.currentTimeMillis(), + session.getSessionId(), + Message.SENDER_TYPE_AI, + "AI", + reply, + "text", + null + ); + } + + private com.wecom.robot.dto.WxCallbackMessage createWxCallbackMessage(InboundMessage message) { + com.wecom.robot.dto.WxCallbackMessage wxMessage = new com.wecom.robot.dto.WxCallbackMessage(); + wxMessage.setExternalUserId(message.getCustomerId()); + wxMessage.setOpenKfId(message.getKfId()); + wxMessage.setContent(message.getContent()); + wxMessage.setMsgType(message.getMsgType()); + return wxMessage; + } + + private String truncateContent(String content) { + if (content == null) { + return null; + } + return content.length() > 50 ? content.substring(0, 50) + "..." : content; + } +}