package com.emotion.service; import com.emotion.dto.websocket.ChatRequest; import com.emotion.dto.websocket.ConnectRequest; import com.emotion.dto.websocket.WebSocketMessage; import com.emotion.entity.Message; import com.emotion.entity.Conversation; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.messaging.simp.SimpMessagingTemplate; import org.springframework.stereotype.Service; import java.security.Principal; import java.time.LocalDateTime; import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; /** * WebSocket服务 * * @author emotion-museum * @date 2025-07-23 */ @Slf4j @Service public class WebSocketService { @Autowired private SimpMessagingTemplate messagingTemplate; @Autowired private AIChatService aiChatService; @Autowired private MessageService messageService; @Autowired private ConversationService conversationService; // 在线用户管理 private final ConcurrentHashMap onlineUsers = new ConcurrentHashMap<>(); /** * 处理聊天消息 */ public void handleChatMessage(ChatRequest request, String sessionId, Principal principal) { try { log.info("处理聊天消息: request={}, sessionId={}, principal={}", request, sessionId, principal); // 验证请求参数 if (request.getContent() == null || request.getContent().trim().isEmpty()) { sendErrorMessage(request.getSenderId(), "消息内容不能为空"); return; } // 确定用户身份和类型 String userId = request.getSenderId(); WebSocketMessage.SenderType senderType = WebSocketMessage.SenderType.GUEST; if (principal != null) { userId = principal.getName(); // 如果用户ID不是以guest_开头,说明是认证用户 if (!userId.startsWith("guest_")) { senderType = WebSocketMessage.SenderType.USER; } } // 更新请求中的用户信息 request.setSenderId(userId); request.setSenderType(senderType == WebSocketMessage.SenderType.USER ? ChatRequest.SenderType.USER : ChatRequest.SenderType.GUEST); log.info("确定用户身份: userId={}, senderType={}", userId, senderType); // 构建用户消息 WebSocketMessage userMessage = WebSocketMessage.builder() .messageId(UUID.randomUUID().toString()) .conversationId(request.getConversationId()) .type(WebSocketMessage.MessageType.TEXT) .content(request.getContent()) .senderId(userId) .senderType(senderType) .status(WebSocketMessage.MessageStatus.SENT) .createTime(LocalDateTime.now()) .build(); // 发送用户消息到会话 if (request.getConversationId() != null) { messagingTemplate.convertAndSend("/topic/conversation/" + request.getConversationId(), userMessage); } // 发送给用户私有队列 messagingTemplate.convertAndSendToUser(request.getSenderId(), "/queue/messages", userMessage); // 发送AI思考状态 sendAiThinkingMessage(request.getSenderId(), request.getConversationId()); // 异步调用AI服务 processAiResponse(request); } catch (Exception e) { log.error("处理聊天消息失败", e); sendErrorMessage(request.getSenderId(), "消息处理失败,请稍后重试"); } } /** * 处理用户连接 */ public void handleUserConnect(ConnectRequest request, String sessionId, Principal principal) { try { String userId = request.getUserId(); boolean isAuthenticated = false; // 优先从Principal获取认证用户信息 if (principal != null) { userId = principal.getName(); // 检查是否是认证用户(不是访客) isAuthenticated = !userId.startsWith("guest_"); } // 如果还没有userId,生成访客ID if (userId == null) { userId = "guest_" + sessionId; } log.info("用户连接WebSocket: userId={}, sessionId={}, authenticated={}", userId, sessionId, isAuthenticated); // 记录在线用户 onlineUsers.put(sessionId, userId); // 发送连接成功消息 WebSocketMessage connectMessage = WebSocketMessage.builder() .messageId(UUID.randomUUID().toString()) .type(WebSocketMessage.MessageType.CONNECTION) .content("连接成功") .senderId("system") .senderType(WebSocketMessage.SenderType.SYSTEM) .status(WebSocketMessage.MessageStatus.SENT) .createTime(LocalDateTime.now()) .build(); messagingTemplate.convertAndSendToUser(userId, "/queue/messages", connectMessage); } catch (Exception e) { log.error("处理用户连接失败", e); } } /** * 处理用户断开连接 */ public void handleUserDisconnect(String sessionId, Principal principal) { try { String userId = onlineUsers.remove(sessionId); log.info("用户断开WebSocket连接: userId={}, sessionId={}", userId, sessionId); } catch (Exception e) { log.error("处理用户断开连接失败", e); } } /** * 处理心跳消息 */ public void handleHeartbeat(String sessionId, Principal principal) { try { String userId = onlineUsers.get(sessionId); if (userId == null && principal != null) { userId = principal.getName(); } // 发送心跳响应 WebSocketMessage heartbeatMessage = WebSocketMessage.builder() .messageId(UUID.randomUUID().toString()) .type(WebSocketMessage.MessageType.HEARTBEAT) .content("pong") .senderId("system") .senderType(WebSocketMessage.SenderType.SYSTEM) .status(WebSocketMessage.MessageStatus.SENT) .createTime(LocalDateTime.now()) .build(); if (userId != null) { messagingTemplate.convertAndSendToUser(userId, "/queue/messages", heartbeatMessage); } } catch (Exception e) { log.error("处理心跳消息失败", e); } } /** * 发送AI思考状态消息 */ private void sendAiThinkingMessage(String userId, String conversationId) { WebSocketMessage thinkingMessage = WebSocketMessage.builder() .messageId(UUID.randomUUID().toString()) .conversationId(conversationId) .type(WebSocketMessage.MessageType.AI_THINKING) .content("AI正在思考中...") .senderId("ai") .senderType(WebSocketMessage.SenderType.AI) .status(WebSocketMessage.MessageStatus.SENT) .createTime(LocalDateTime.now()) .build(); messagingTemplate.convertAndSendToUser(userId, "/queue/messages", thinkingMessage); if (conversationId != null) { messagingTemplate.convertAndSend("/topic/conversation/" + conversationId, thinkingMessage); } } /** * 异步处理AI响应 */ private void processAiResponse(ChatRequest request) { // 使用线程池异步处理AI响应 new Thread(() -> { try { String userId = request.getSenderId(); String conversationId = request.getConversationId(); // 如果没有会话ID,创建新会话 if (conversationId == null || conversationId.trim().isEmpty()) { conversationId = createNewConversation(userId, request); request.setConversationId(conversationId); } // 确保会话存在并更新活跃时间 ensureConversationExists(conversationId, userId, request); // 保存用户消息到数据库 Message userMessage = new Message(); userMessage.setConversationId(conversationId); userMessage.setUserId(userId); userMessage .setUserType(request.getSenderType() == ChatRequest.SenderType.USER ? "registered" : "guest"); userMessage.setContent(request.getContent()); userMessage.setType("text"); userMessage.setSender("user"); userMessage.setCozeRole("user"); userMessage.setCozeContentType("text"); messageService.createMessage(userMessage); // 调用AI服务 String aiReply = aiChatService.sendChatMessage( conversationId, request.getContent(), userId ); // 如果AI回复包含换行符,分割成多条消息 String[] replyParts = aiReply.split("\\n\\n|\\n"); for (String part : replyParts) { if (part.trim().isEmpty()) continue; // 构建AI回复消息 WebSocketMessage aiMessage = WebSocketMessage.builder() .messageId(UUID.randomUUID().toString()) .conversationId(conversationId) .type(WebSocketMessage.MessageType.TEXT) .content(part.trim()) .senderId("ai") .senderType(WebSocketMessage.SenderType.AI) .status(WebSocketMessage.MessageStatus.SENT) .createTime(LocalDateTime.now()) .build(); // 保存AI回复到数据库 Message aiDbMessage = new Message(); aiDbMessage.setConversationId(conversationId); aiDbMessage.setUserId(userId); aiDbMessage.setUserType( request.getSenderType() == ChatRequest.SenderType.USER ? "registered" : "guest"); aiDbMessage.setContent(part.trim()); aiDbMessage.setType("text"); aiDbMessage.setSender("ai"); aiDbMessage.setCozeRole("assistant"); aiDbMessage.setCozeContentType("text"); messageService.createMessage(aiDbMessage); // 发送AI回复 messagingTemplate.convertAndSendToUser(userId, "/queue/messages", aiMessage); if (conversationId != null) { messagingTemplate.convertAndSend("/topic/conversation/" + conversationId, aiMessage); } // 添加短暂延迟,模拟自然对话 Thread.sleep(500); } // 更新会话的最后活跃时间和消息数量 updateConversationActivity(conversationId); } catch (Exception e) { log.error("AI响应处理失败", e); sendErrorMessage(request.getSenderId(), "AI服务暂时不可用,请稍后重试"); } }).start(); } /** * 发送错误消息 */ private void sendErrorMessage(String userId, String errorContent) { WebSocketMessage errorMessage = WebSocketMessage.builder() .messageId(UUID.randomUUID().toString()) .type(WebSocketMessage.MessageType.ERROR) .content(errorContent) .senderId("system") .senderType(WebSocketMessage.SenderType.SYSTEM) .status(WebSocketMessage.MessageStatus.SENT) .createTime(LocalDateTime.now()) .build(); messagingTemplate.convertAndSendToUser(userId, "/queue/messages", errorMessage); } /** * 获取在线用户数量 */ public int getOnlineUserCount() { return onlineUsers.size(); } /** * 创建新会话 */ private String createNewConversation(String userId, ChatRequest request) { try { String conversationId = "conv_" + System.currentTimeMillis() + "_" + UUID.randomUUID().toString().substring(0, 8); Conversation conversation = Conversation.builder() .userId(userId) .userType(request.getSenderType() == ChatRequest.SenderType.USER ? "registered" : "guest") .title("新对话") .type("chat") .conversationStatus("active") .startTime(LocalDateTime.now()) .lastActiveTime(LocalDateTime.now()) .messageCount(0) .build(); // 设置ID conversation.setId(conversationId); conversationService.save(conversation); log.info("创建新会话: conversationId={}, userId={}", conversationId, userId); return conversationId; } catch (Exception e) { log.error("创建新会话失败: userId={}", userId, e); throw new RuntimeException("创建会话失败", e); } } /** * 确保会话存在并更新活跃时间 */ private void ensureConversationExists(String conversationId, String userId, ChatRequest request) { try { Conversation conversation = conversationService.getById(conversationId); if (conversation == null) { // 如果会话不存在,创建一个 conversation = Conversation.builder() .userId(userId) .userType(request.getSenderType() == ChatRequest.SenderType.USER ? "registered" : "guest") .title("对话") .type("chat") .conversationStatus("active") .startTime(LocalDateTime.now()) .lastActiveTime(LocalDateTime.now()) .messageCount(0) .build(); // 设置ID conversation.setId(conversationId); conversationService.save(conversation); log.info("创建会话: conversationId={}, userId={}", conversationId, userId); } else { // 更新最后活跃时间 conversation.setLastActiveTime(LocalDateTime.now()); conversationService.updateById(conversation); } } catch (Exception e) { log.error("确保会话存在失败: conversationId={}, userId={}", conversationId, userId, e); } } /** * 更新会话活跃状态 */ private void updateConversationActivity(String conversationId) { try { Conversation conversation = conversationService.getById(conversationId); if (conversation != null) { conversation.setLastActiveTime(LocalDateTime.now()); conversation.setMessageCount((conversation.getMessageCount() != null ? conversation.getMessageCount() : 0) + 1); conversationService.updateById(conversation); } } catch (Exception e) { log.error("更新会话活跃状态失败: conversationId={}", conversationId, e); } } }