package com.wecom.robot.service; import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; import com.wecom.robot.entity.Message; import com.wecom.robot.entity.Session; import com.wecom.robot.entity.TransferLog; import com.wecom.robot.mapper.MessageMapper; import com.wecom.robot.mapper.SessionMapper; import com.wecom.robot.mapper.TransferLogMapper; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.springframework.data.redis.core.StringRedisTemplate; import org.springframework.stereotype.Service; import org.springframework.transaction.annotation.Transactional; import java.time.LocalDateTime; import java.util.List; import java.util.concurrent.TimeUnit; /** * 会话管理服务 * *

关联 AC: [AC-MCA-11] 会话管理, [AC-MCA-12] 渠道类型支持 */ @Slf4j @Service @RequiredArgsConstructor public class SessionManagerService { private static final String SESSION_STATUS_KEY_PREFIX = "session:status:"; private static final String SESSION_MESSAGE_COUNT_KEY_PREFIX = "session:msg_count:"; private final SessionMapper sessionMapper; private final MessageMapper messageMapper; private final TransferLogMapper transferLogMapper; private final StringRedisTemplate redisTemplate; public Session getOrCreateSession(String customerId, String kfId) { return getOrCreateSession(customerId, kfId, Session.CHANNEL_WECHAT); } public Session getOrCreateSession(String customerId, String kfId, String channelType) { LambdaQueryWrapper query = new LambdaQueryWrapper<>(); query.eq(Session::getCustomerId, customerId) .eq(Session::getKfId, kfId) .ne(Session::getStatus, Session.STATUS_CLOSED) .orderByDesc(Session::getCreatedAt) .last("LIMIT 1"); Session session = sessionMapper.selectOne(query); if (session == null) { session = new Session(); session.setSessionId(generateSessionId(customerId, kfId)); session.setCustomerId(customerId); session.setKfId(kfId); session.setChannelType(channelType != null ? channelType : Session.CHANNEL_WECHAT); session.setStatus(Session.STATUS_AI); session.setWxServiceState(0); session.setCreatedAt(LocalDateTime.now()); session.setUpdatedAt(LocalDateTime.now()); sessionMapper.insert(session); cacheSessionStatus(session.getSessionId(), Session.STATUS_AI); log.info("[AC-MCA-11] 创建新会话: sessionId={}, channelType={}", session.getSessionId(), session.getChannelType()); } return session; } public Session getSession(String sessionId) { return sessionMapper.selectById(sessionId); } public String getSessionStatus(String sessionId) { String cachedStatus = redisTemplate.opsForValue().get(SESSION_STATUS_KEY_PREFIX + sessionId); if (cachedStatus != null) { return cachedStatus; } Session session = sessionMapper.selectById(sessionId); if (session != null) { cacheSessionStatus(sessionId, session.getStatus()); return session.getStatus(); } return null; } public void updateSessionStatus(String sessionId, String status) { Session session = new Session(); session.setSessionId(sessionId); session.setStatus(status); session.setUpdatedAt(LocalDateTime.now()); sessionMapper.updateById(session); cacheSessionStatus(sessionId, status); } public void updateWxServiceState(String sessionId, Integer wxServiceState) { Session session = new Session(); session.setSessionId(sessionId); session.setWxServiceState(wxServiceState); session.setUpdatedAt(LocalDateTime.now()); sessionMapper.updateById(session); log.info("更新微信会话状态: sessionId={}, wxServiceState={}", sessionId, wxServiceState); } public void updateServicer(String sessionId, String servicerUserid) { Session session = new Session(); session.setSessionId(sessionId); session.setManualCsId(servicerUserid); session.setUpdatedAt(LocalDateTime.now()); sessionMapper.updateById(session); log.info("更新接待人员: sessionId={}, servicerUserid={}", sessionId, servicerUserid); } public void assignManualCs(String sessionId, String csId) { Session session = new Session(); session.setSessionId(sessionId); session.setStatus(Session.STATUS_MANUAL); session.setManualCsId(csId); session.setUpdatedAt(LocalDateTime.now()); sessionMapper.updateById(session); cacheSessionStatus(sessionId, Session.STATUS_MANUAL); } @Transactional public void transferToManual(String sessionId, String reason) { updateSessionStatus(sessionId, Session.STATUS_PENDING); TransferLog transferLog = new TransferLog(); transferLog.setSessionId(sessionId); transferLog.setTriggerReason(reason); transferLog.setTriggerTime(LocalDateTime.now()); transferLogMapper.insert(transferLog); log.info("会话转人工: sessionId={}, reason={}", sessionId, reason); } public void acceptTransfer(String sessionId, String csId) { assignManualCs(sessionId, csId); LambdaQueryWrapper query = new LambdaQueryWrapper<>(); query.eq(TransferLog::getSessionId, sessionId) .isNull(TransferLog::getAcceptedTime) .orderByDesc(TransferLog::getTriggerTime) .last("LIMIT 1"); TransferLog transferLog = transferLogMapper.selectOne(query); if (transferLog != null) { transferLog.setAcceptedTime(LocalDateTime.now()); transferLog.setAcceptedCsId(csId); transferLogMapper.updateById(transferLog); } log.info("客服接入会话: sessionId={}, csId={}", sessionId, csId); } public void closeSession(String sessionId) { updateSessionStatus(sessionId, Session.STATUS_CLOSED); redisTemplate.delete(SESSION_STATUS_KEY_PREFIX + sessionId); redisTemplate.delete(SESSION_MESSAGE_COUNT_KEY_PREFIX + sessionId); log.info("会话已关闭: sessionId={}", sessionId); } public void saveMessage(String msgId, String sessionId, String senderType, String senderId, String content, String msgType, String rawData) { Message message = new Message(); message.setMsgId(msgId); message.setSessionId(sessionId); message.setSenderType(senderType); message.setSenderId(senderId); message.setContent(content); message.setMsgType(msgType); message.setCreatedAt(LocalDateTime.now()); message.setRawData(rawData); messageMapper.insert(message); incrementMessageCount(sessionId); } public List getSessionMessages(String sessionId) { LambdaQueryWrapper query = new LambdaQueryWrapper<>(); query.eq(Message::getSessionId, sessionId) .orderByAsc(Message::getCreatedAt); return messageMapper.selectList(query); } public int getMessageCount(String sessionId) { String count = redisTemplate.opsForValue().get(SESSION_MESSAGE_COUNT_KEY_PREFIX + sessionId); if (count != null) { return Integer.parseInt(count); } LambdaQueryWrapper query = new LambdaQueryWrapper<>(); query.eq(Message::getSessionId, sessionId); long dbCount = messageMapper.selectCount(query); redisTemplate.opsForValue().set(SESSION_MESSAGE_COUNT_KEY_PREFIX + sessionId, String.valueOf(dbCount)); return (int) dbCount; } public List getSessionsByKfId(String kfId, String status, int limit) { LambdaQueryWrapper query = new LambdaQueryWrapper<>(); query.eq(Session::getKfId, kfId); if (status != null && !status.isEmpty() && !"all".equals(status)) { query.eq(Session::getStatus, status); } query.orderByDesc(Session::getUpdatedAt); query.last("LIMIT " + limit); return sessionMapper.selectList(query); } public List getSessionsByChannelType(String channelType, String status, int limit) { LambdaQueryWrapper query = new LambdaQueryWrapper<>(); query.eq(Session::getChannelType, channelType); if (status != null && !status.isEmpty() && !"all".equals(status)) { query.eq(Session::getStatus, status); } query.orderByDesc(Session::getUpdatedAt); query.last("LIMIT " + limit); return sessionMapper.selectList(query); } public List getAllSessions(int limit) { LambdaQueryWrapper query = new LambdaQueryWrapper<>(); query.orderByDesc(Session::getUpdatedAt); query.last("LIMIT " + limit); return sessionMapper.selectList(query); } private void cacheSessionStatus(String sessionId, String status) { redisTemplate.opsForValue().set(SESSION_STATUS_KEY_PREFIX + sessionId, status, 24, TimeUnit.HOURS); } private void incrementMessageCount(String sessionId) { redisTemplate.opsForValue().increment(SESSION_MESSAGE_COUNT_KEY_PREFIX + sessionId); } private String generateSessionId(String customerId, String kfId) { return kfId + "_" + customerId + "_" + System.currentTimeMillis(); } }