package com.emotion.service.impl; import com.emotion.dto.request.WebSocketRequest; import com.emotion.dto.websocket.ChatRequest; import com.emotion.dto.websocket.ConnectRequest; import com.emotion.dto.websocket.WebSocketMessage; import com.emotion.entity.Message; import com.emotion.entity.Conversation; import com.emotion.service.WebSocketService; import com.emotion.service.AiChatService; import com.emotion.service.MessageService; import com.emotion.service.ConversationService; import com.emotion.util.SnowflakeIdGenerator; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.messaging.simp.SimpMessagingTemplate; import org.springframework.stereotype.Service; import org.springframework.util.StringUtils; import java.security.Principal; import java.time.LocalDateTime; import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; /** * WebSocket服务实现类 * * @author huazhongmin * @date 2025-07-25 */ @Slf4j @Service public class WebSocketServiceImpl implements WebSocketService { @Autowired private SimpMessagingTemplate messagingTemplate; @Autowired private AiChatService aiChatService; @Autowired private MessageService messageService; @Autowired private ConversationService conversationService; @Autowired private SnowflakeIdGenerator snowflakeIdGenerator; // 在线用户管理 private final ConcurrentHashMap onlineUsers = new ConcurrentHashMap<>(); /** * 处理聊天消息 */ @Override public void handleChatMessage(WebSocketRequest webSocketRequest, String sessionId, Principal principal) { try { log.info("处理聊天消息: request={}, sessionId={}, principal={}", webSocketRequest, sessionId, principal); // 验证请求参数 if (webSocketRequest.getContent() == null || webSocketRequest.getContent().trim().isEmpty()) { sendErrorMessage(getUserId(principal, sessionId), "消息内容不能为空"); return; } // 设置默认值 setWebSocketRequestDefaults(webSocketRequest, principal, sessionId); log.info("确定用户身份: userId={}, senderType={}", webSocketRequest.getSenderId(), webSocketRequest.getSenderType()); // 转换请求对象 ChatRequest chatRequest = convertToChatRequest(webSocketRequest); // 构建用户消息 WebSocketMessage userMessage = WebSocketMessage.builder() .messageId(UUID.randomUUID().toString()) .conversationId(chatRequest.getConversationId()) .type("TEXT") .content(chatRequest.getContent()) .senderId(chatRequest.getSenderId()) .senderType(getSenderType(chatRequest.getSenderType())) .status("SENT") .createTime(LocalDateTime.now()) .build(); // 发送用户消息到会话 if (chatRequest.getConversationId() != null) { messagingTemplate.convertAndSend("/topic/conversation/" + chatRequest.getConversationId(), userMessage); } // 发送给用户私有队列 messagingTemplate.convertAndSendToUser(chatRequest.getSenderId(), "/queue/messages", userMessage); // 发送AI思考状态 sendAiThinkingMessage(chatRequest.getSenderId(), chatRequest.getConversationId()); // 异步调用AI服务 processAiResponse(chatRequest); } catch (Exception e) { log.error("处理聊天消息失败", e); sendErrorMessage(getUserId(principal, sessionId), "消息处理失败,请稍后重试"); } } /** * 处理用户连接 */ @Override public void handleUserConnect(ConnectRequest request, String sessionId, Principal principal) { try { // 设置默认值 setConnectRequestDefaults(request, principal, sessionId); log.info("用户连接WebSocket: userId={}, sessionId={}, authenticated={}", request.getUserId(), sessionId, isUserAuthenticated(request.getUserId())); // 记录在线用户 onlineUsers.put(sessionId, request.getUserId()); // 发送连接成功消息 WebSocketMessage connectMessage = WebSocketMessage.builder() .messageId(UUID.randomUUID().toString()) .type("CONNECTION") .content("连接成功") .senderId("system") .senderType("SYSTEM") .status("SENT") .createTime(LocalDateTime.now()) .build(); messagingTemplate.convertAndSendToUser(request.getUserId(), "/queue/messages", connectMessage); } catch (Exception e) { log.error("处理用户连接失败", e); } } /** * 处理用户断开连接 */ @Override public void handleUserDisconnect(String sessionId, Principal principal) { try { String userId = onlineUsers.remove(sessionId); log.info("用户断开WebSocket连接: userId={}, sessionId={}", userId, sessionId); } catch (Exception e) { log.error("处理用户断开连接失败", e); } } /** * 处理心跳消息 */ @Override public void handleHeartbeat(String sessionId, Principal principal) { try { String userId = onlineUsers.get(sessionId); if (userId == null && principal != null) { userId = principal.getName(); } // 发送心跳响应 WebSocketMessage heartbeatMessage = WebSocketMessage.builder() .messageId(UUID.randomUUID().toString()) .type("HEARTBEAT") .content("pong") .senderId("system") .senderType("SYSTEM") .status("SENT") .createTime(LocalDateTime.now()) .build(); if (userId != null) { messagingTemplate.convertAndSendToUser(userId, "/queue/messages", heartbeatMessage); } } catch (Exception e) { log.error("处理心跳消息失败", e); } } /** * 发送AI思考状态消息 */ private void sendAiThinkingMessage(String userId, String conversationId) { WebSocketMessage thinkingMessage = WebSocketMessage.builder() .messageId(UUID.randomUUID().toString()) .conversationId(conversationId) .type("AI_THINKING") .content("AI正在思考中...") .senderId("ai") .senderType("AI") .status("SENT") .createTime(LocalDateTime.now()) .build(); messagingTemplate.convertAndSendToUser(userId, "/queue/messages", thinkingMessage); if (conversationId != null) { messagingTemplate.convertAndSend("/topic/conversation/" + conversationId, thinkingMessage); } } /** * 异步处理AI响应 */ private void processAiResponse(ChatRequest request) { // 使用线程池异步处理AI响应 new Thread(() -> { try { String userId = request.getSenderId(); String conversationId = request.getConversationId(); // 如果没有会话ID,创建新会话 if (conversationId == null || conversationId.trim().isEmpty()) { conversationId = createNewConversation(userId, request); request.setConversationId(conversationId); } // 确保会话存在并更新活跃时间 ensureConversationExists(conversationId, userId, request); // 保存用户消息到数据库 Message userMessage = new Message(); userMessage.setConversationId(conversationId); userMessage.setUserId(userId); userMessage.setCreateBy(userId); // 设置创建人为当前用户 userMessage .setUserType("USER".equals(request.getSenderType()) ? "registered" : "guest"); userMessage.setContent(request.getContent()); userMessage.setType("text"); userMessage.setSender("user"); userMessage.setCozeRole("user"); userMessage.setCozeContentType("text"); userMessage = messageService.createMessage(userMessage); // 调用AI服务(WebSocket专用方法,传递messageId) String aiReply = aiChatService.sendChatMessageForWebSocket( conversationId, userMessage.getId(), // 传递用户消息ID request.getContent(), userId ); // 根据换行符分割AI回复并按顺序发送多条消息 sendAiReplyInParts(userId, conversationId, aiReply); // 更新会话的最后活跃时间和消息数量 updateConversationActivity(conversationId); } catch (Exception e) { log.error("AI响应处理失败", e); sendErrorMessage(request.getSenderId(), "AI服务暂时不可用,请稍后重试"); } }).start(); } /** * 发送错误消息 */ private void sendErrorMessage(String userId, String errorContent) { WebSocketMessage errorMessage = WebSocketMessage.builder() .messageId(UUID.randomUUID().toString()) .type("ERROR") .content(errorContent) .senderId("system") .senderType("SYSTEM") .status("SENT") .createTime(LocalDateTime.now()) .build(); messagingTemplate.convertAndSendToUser(userId, "/queue/messages", errorMessage); } /** * 获取在线用户数量 */ @Override public int getOnlineUserCount() { return onlineUsers.size(); } /** * 创建新会话 */ private String createNewConversation(String userId, ChatRequest request) { try { String conversationId = "conv_" + System.currentTimeMillis() + "_" + UUID.randomUUID().toString().substring(0, 8); Conversation conversation = Conversation.builder() .userId(userId) .userType("USER".equals(request.getSenderType()) ? "registered" : "guest") .title("新对话") .type("chat") .conversationStatus("active") .startTime(LocalDateTime.now()) .lastActiveTime(LocalDateTime.now()) .messageCount(0) .build(); // 设置ID conversation.setId(conversationId); conversationService.save(conversation); log.info("创建新会话: conversationId={}, userId={}", conversationId, userId); return conversationId; } catch (Exception e) { log.error("创建新会话失败: userId={}", userId, e); throw new RuntimeException("创建会话失败", e); } } /** * 确保会话存在并更新活跃时间 */ private void ensureConversationExists(String conversationId, String userId, ChatRequest request) { try { Conversation conversation = conversationService.getById(conversationId); if (conversation == null) { // 如果会话不存在,创建一个 conversation = Conversation.builder() .userId(userId) .userType("USER".equals(request.getSenderType()) ? "registered" : "guest") .title("对话") .type("chat") .conversationStatus("active") .startTime(LocalDateTime.now()) .lastActiveTime(LocalDateTime.now()) .messageCount(0) .build(); // 设置ID conversation.setId(conversationId); conversationService.save(conversation); log.info("创建会话: conversationId={}, userId={}", conversationId, userId); } else { // 更新最后活跃时间 conversation.setLastActiveTime(LocalDateTime.now()); conversationService.updateById(conversation); } } catch (Exception e) { log.error("确保会话存在失败: conversationId={}, userId={}", conversationId, userId, e); } } /** * 更新会话活跃状态 */ private void updateConversationActivity(String conversationId) { try { Conversation conversation = conversationService.getById(conversationId); if (conversation != null) { conversation.setLastActiveTime(LocalDateTime.now()); conversation.setMessageCount((conversation.getMessageCount() != null ? conversation.getMessageCount() : 0) + 1); conversationService.updateById(conversation); } } catch (Exception e) { log.error("更新会话活跃状态失败: conversationId={}", conversationId, e); } } /** * 根据换行符分割AI回复并按顺序发送多条消息 * * @param userId 用户ID * @param conversationId 会话ID * @param aiReply AI回复内容 */ private void sendAiReplyInParts(String userId, String conversationId, String aiReply) { try { log.info("开始处理AI回复消息: userId={}, conversationId={}, aiReply长度={}", userId, conversationId, aiReply != null ? aiReply.length() : 0); if (aiReply == null || aiReply.trim().isEmpty()) { log.warn("AI回复内容为空,跳过发送"); return; } // 检查是否需要分割 boolean needsSplit = aiReply.contains("\n\n") || aiReply.contains("\n"); if (!needsSplit) { // 不需要分割,直接发送完整消息 log.info("AI回复无换行符,发送完整消息"); sendSingleAiMessage(userId, conversationId, aiReply.trim()); return; } // 需要分割,按换行符分割并发送多条消息 log.info("AI回复包含换行符,开始分割发送"); String[] replyParts = splitAiReply(aiReply); log.info("AI回复分割完成,共{}个部分", replyParts.length); // 按顺序发送每个部分 int sentCount = 0; for (int i = 0; i < replyParts.length; i++) { String part = replyParts[i].trim(); // 跳过空白部分 if (part.isEmpty()) { continue; } // 发送消息部分 sendSingleAiMessage(userId, conversationId, part); sentCount++; log.info("发送AI回复部分 {}/{}: 内容长度={}", sentCount, replyParts.length, part.length()); // 在多个部分之间添加短暂延迟,模拟自然对话节奏 if (i < replyParts.length - 1) { try { Thread.sleep(500); // 延迟500毫秒 } catch (InterruptedException e) { Thread.currentThread().interrupt(); log.warn("发送AI回复时被中断"); break; } } } log.info("AI回复发送完成: userId={}, conversationId={}, 实际发送{}条消息", userId, conversationId, sentCount); } catch (Exception e) { log.error("分割发送AI回复失败: userId={}, conversationId={}", userId, conversationId, e); // 发送错误时,尝试发送完整的原始回复 try { WebSocketMessage fallbackMessage = WebSocketMessage.builder() .messageId(UUID.randomUUID().toString()) .conversationId(conversationId) .type("TEXT") .content(aiReply) .senderId("ai") .senderType("AI") .status("SENT") .createTime(LocalDateTime.now()) .build(); messagingTemplate.convertAndSendToUser(userId, "/queue/messages", fallbackMessage); if (conversationId != null) { messagingTemplate.convertAndSend("/topic/conversation/" + conversationId, fallbackMessage); } log.info("已发送完整AI回复作为备用方案"); } catch (Exception fallbackError) { log.error("发送备用AI回复也失败", fallbackError); sendErrorMessage(userId, "AI回复发送失败,请稍后重试"); } } } /** * 发送单条AI消息 * * @param userId 用户ID * @param conversationId 会话ID * @param content 消息内容 */ private void sendSingleAiMessage(String userId, String conversationId, String content) { try { // 保存AI消息到数据库 Message aiMessage = new Message(); aiMessage.setId(snowflakeIdGenerator.nextIdAsString()); aiMessage.setConversationId(conversationId); aiMessage.setUserId(userId); aiMessage.setCreateBy("ai"); aiMessage.setContent(content); aiMessage.setType("text"); aiMessage.setSender("ai"); aiMessage.setCozeRole("assistant"); aiMessage.setCozeContentType("text"); aiMessage.setTimestamp(LocalDateTime.now()); messageService.createMessage(aiMessage); log.info("AI消息已保存到数据库: messageId={}, conversationId={}, contentLength={}", aiMessage.getId(), conversationId, content.length()); // 构建WebSocket消息 WebSocketMessage wsMessage = WebSocketMessage.builder() .messageId(aiMessage.getId()) .conversationId(conversationId) .type("TEXT") .content(content) .senderId("ai") .senderType("AI") .status("SENT") .createTime(LocalDateTime.now()) .build(); // 发送给用户私有队列 messagingTemplate.convertAndSendToUser(userId, "/queue/messages", wsMessage); // 发送到会话公共频道 if (conversationId != null) { messagingTemplate.convertAndSend("/topic/conversation/" + conversationId, wsMessage); } } catch (Exception e) { log.error("发送单条AI消息失败: userId={}, conversationId={}", userId, conversationId, e); sendErrorMessage(userId, "消息发送失败,请稍后重试"); } } /** * 智能分割AI回复内容 * * @param aiReply AI回复内容 * @return 分割后的内容数组 */ private String[] splitAiReply(String aiReply) { if (aiReply == null || aiReply.trim().isEmpty()) { return new String[0]; } // 首先尝试按双换行符分割(段落分割) if (aiReply.contains("\n\n")) { String[] parts = aiReply.split("\n\n"); log.debug("按双换行符分割,得到{}个部分", parts.length); return parts; } // 如果没有双换行符,按单换行符分割(行分割) if (aiReply.contains("\n")) { String[] parts = aiReply.split("\n"); log.debug("按单换行符分割,得到{}个部分", parts.length); return parts; } // 如果没有换行符,返回原始内容(这种情况不应该到达这里) log.debug("没有换行符,返回原始内容"); return new String[]{aiReply}; } /** * 设置WebSocket请求的默认值 */ private void setWebSocketRequestDefaults(WebSocketRequest request, Principal principal, String sessionId) { // 如果请求中没有发送者ID,尝试从Principal获取 if (request.getSenderId() == null && principal != null) { request.setSenderId(principal.getName()); } // 如果还是没有发送者ID,使用会话ID作为访客ID if (request.getSenderId() == null) { request.setSenderId("guest_" + sessionId); request.setSenderType("GUEST"); } // 设置时间戳 if (request.getTimestamp() == null) { request.setTimestamp(System.currentTimeMillis()); } } /** * 设置连接请求的默认值 */ private void setConnectRequestDefaults(ConnectRequest request, Principal principal, String sessionId) { // 优先从Principal获取认证用户信息 if (principal != null) { request.setUserId(principal.getName()); } // 如果还没有userId,生成访客ID if (request.getUserId() == null) { request.setUserId("guest_" + sessionId); } // 设置连接时间戳 if (request.getTimestamp() == null) { request.setTimestamp(System.currentTimeMillis()); } } /** * 获取用户ID */ private String getUserId(Principal principal, String sessionId) { if (principal != null) { return principal.getName(); } return "guest_" + sessionId; } /** * 判断用户是否已认证 */ private boolean isUserAuthenticated(String userId) { return userId != null && !userId.startsWith("guest_"); } /** * 获取发送者类型 */ private String getSenderType(String senderType) { if ("USER".equals(senderType)) { return "USER"; } else if ("GUEST".equals(senderType)) { return "GUEST"; } else if ("AI".equals(senderType)) { return "AI"; } else if ("SYSTEM".equals(senderType)) { return "SYSTEM"; } return "USER"; } /** * 转换WebSocketRequest到ChatRequest * * @param webSocketRequest WebSocket请求对象 * @return ChatRequest对象 */ private ChatRequest convertToChatRequest(WebSocketRequest webSocketRequest) { return ChatRequest.builder() .content(webSocketRequest.getContent()) .senderId(webSocketRequest.getSenderId()) .senderType(webSocketRequest.getSenderType()) .messageType(webSocketRequest.getMessageType()) .conversationId(webSocketRequest.getConversationId()) .timestamp(webSocketRequest.getTimestamp()) .build(); } }