Files
happy-life-star/backend-single/src/main/java/com/emotion/service/impl/WebSocketServiceImpl.java
T
2025-10-26 17:43:24 +08:00

634 lines
23 KiB
Java
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
package com.emotion.service.impl;
import com.emotion.dto.request.WebSocketRequest;
import com.emotion.dto.websocket.ChatRequest;
import com.emotion.dto.websocket.ConnectRequest;
import com.emotion.dto.websocket.WebSocketMessage;
import com.emotion.entity.Message;
import com.emotion.entity.Conversation;
import com.emotion.service.WebSocketService;
import com.emotion.service.AiChatService;
import com.emotion.service.MessageService;
import com.emotion.service.ConversationService;
import com.emotion.util.SnowflakeIdGenerator;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.simp.SimpMessagingTemplate;
import org.springframework.stereotype.Service;
import org.springframework.util.StringUtils;
import java.security.Principal;
import java.time.LocalDateTime;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
/**
* WebSocket服务实现类
*
* @author emotion-museum
* @date 2025-07-25
*/
@Slf4j
@Service
public class WebSocketServiceImpl implements WebSocketService {
@Autowired
private SimpMessagingTemplate messagingTemplate;
@Autowired
private AiChatService aiChatService;
@Autowired
private MessageService messageService;
@Autowired
private ConversationService conversationService;
@Autowired
private SnowflakeIdGenerator snowflakeIdGenerator;
// 在线用户管理
private final ConcurrentHashMap<String, String> onlineUsers = new ConcurrentHashMap<>();
/**
* 处理聊天消息
*/
@Override
public void handleChatMessage(WebSocketRequest webSocketRequest, String sessionId, Principal principal) {
try {
log.info("处理聊天消息: request={}, sessionId={}, principal={}", webSocketRequest, sessionId, principal);
// 验证请求参数
if (webSocketRequest.getContent() == null || webSocketRequest.getContent().trim().isEmpty()) {
sendErrorMessage(getUserId(principal, sessionId), "消息内容不能为空");
return;
}
// 设置默认值
setWebSocketRequestDefaults(webSocketRequest, principal, sessionId);
log.info("确定用户身份: userId={}, senderType={}", webSocketRequest.getSenderId(),
webSocketRequest.getSenderType());
// 转换请求对象
ChatRequest chatRequest = convertToChatRequest(webSocketRequest);
// 构建用户消息
WebSocketMessage userMessage = WebSocketMessage.builder()
.messageId(UUID.randomUUID().toString())
.conversationId(chatRequest.getConversationId())
.type("TEXT")
.content(chatRequest.getContent())
.senderId(chatRequest.getSenderId())
.senderType(getSenderType(chatRequest.getSenderType()))
.status("SENT")
.createTime(LocalDateTime.now())
.build();
// 发送用户消息到会话
if (chatRequest.getConversationId() != null) {
messagingTemplate.convertAndSend("/topic/conversation/" + chatRequest.getConversationId(), userMessage);
}
// 发送给用户私有队列
messagingTemplate.convertAndSendToUser(chatRequest.getSenderId(), "/queue/messages", userMessage);
// 发送AI思考状态
sendAiThinkingMessage(chatRequest.getSenderId(), chatRequest.getConversationId());
// 异步调用AI服务
processAiResponse(chatRequest);
} catch (Exception e) {
log.error("处理聊天消息失败", e);
sendErrorMessage(getUserId(principal, sessionId), "消息处理失败,请稍后重试");
}
}
/**
* 处理用户连接
*/
@Override
public void handleUserConnect(ConnectRequest request, String sessionId, Principal principal) {
try {
// 设置默认值
setConnectRequestDefaults(request, principal, sessionId);
log.info("用户连接WebSocket: userId={}, sessionId={}, authenticated={}",
request.getUserId(), sessionId, isUserAuthenticated(request.getUserId()));
// 记录在线用户
onlineUsers.put(sessionId, request.getUserId());
// 发送连接成功消息
WebSocketMessage connectMessage = WebSocketMessage.builder()
.messageId(UUID.randomUUID().toString())
.type("CONNECTION")
.content("连接成功")
.senderId("system")
.senderType("SYSTEM")
.status("SENT")
.createTime(LocalDateTime.now())
.build();
messagingTemplate.convertAndSendToUser(request.getUserId(), "/queue/messages", connectMessage);
} catch (Exception e) {
log.error("处理用户连接失败", e);
}
}
/**
* 处理用户断开连接
*/
@Override
public void handleUserDisconnect(String sessionId, Principal principal) {
try {
String userId = onlineUsers.remove(sessionId);
log.info("用户断开WebSocket连接: userId={}, sessionId={}", userId, sessionId);
} catch (Exception e) {
log.error("处理用户断开连接失败", e);
}
}
/**
* 处理心跳消息
*/
@Override
public void handleHeartbeat(String sessionId, Principal principal) {
try {
String userId = onlineUsers.get(sessionId);
if (userId == null && principal != null) {
userId = principal.getName();
}
// 发送心跳响应
WebSocketMessage heartbeatMessage = WebSocketMessage.builder()
.messageId(UUID.randomUUID().toString())
.type("HEARTBEAT")
.content("pong")
.senderId("system")
.senderType("SYSTEM")
.status("SENT")
.createTime(LocalDateTime.now())
.build();
if (userId != null) {
messagingTemplate.convertAndSendToUser(userId, "/queue/messages", heartbeatMessage);
}
} catch (Exception e) {
log.error("处理心跳消息失败", e);
}
}
/**
* 发送AI思考状态消息
*/
private void sendAiThinkingMessage(String userId, String conversationId) {
WebSocketMessage thinkingMessage = WebSocketMessage.builder()
.messageId(UUID.randomUUID().toString())
.conversationId(conversationId)
.type("AI_THINKING")
.content("AI正在思考中...")
.senderId("ai")
.senderType("AI")
.status("SENT")
.createTime(LocalDateTime.now())
.build();
messagingTemplate.convertAndSendToUser(userId, "/queue/messages", thinkingMessage);
if (conversationId != null) {
messagingTemplate.convertAndSend("/topic/conversation/" + conversationId, thinkingMessage);
}
}
/**
* 异步处理AI响应
*/
private void processAiResponse(ChatRequest request) {
// 使用线程池异步处理AI响应
new Thread(() -> {
try {
String userId = request.getSenderId();
String conversationId = request.getConversationId();
// 如果没有会话ID,创建新会话
if (conversationId == null || conversationId.trim().isEmpty()) {
conversationId = createNewConversation(userId, request);
request.setConversationId(conversationId);
}
// 确保会话存在并更新活跃时间
ensureConversationExists(conversationId, userId, request);
// 保存用户消息到数据库
Message userMessage = new Message();
userMessage.setConversationId(conversationId);
userMessage.setUserId(userId);
userMessage.setCreateBy(userId); // 设置创建人为当前用户
userMessage
.setUserType("USER".equals(request.getSenderType()) ? "registered" : "guest");
userMessage.setContent(request.getContent());
userMessage.setType("text");
userMessage.setSender("user");
userMessage.setCozeRole("user");
userMessage.setCozeContentType("text");
userMessage = messageService.createMessage(userMessage);
// 调用AI服务(WebSocket专用方法,传递messageId
String aiReply = aiChatService.sendChatMessageForWebSocket(
conversationId,
userMessage.getId(), // 传递用户消息ID
request.getContent(),
userId
);
// 根据换行符分割AI回复并按顺序发送多条消息
sendAiReplyInParts(userId, conversationId, aiReply);
// 更新会话的最后活跃时间和消息数量
updateConversationActivity(conversationId);
} catch (Exception e) {
log.error("AI响应处理失败", e);
sendErrorMessage(request.getSenderId(), "AI服务暂时不可用,请稍后重试");
}
}).start();
}
/**
* 发送错误消息
*/
private void sendErrorMessage(String userId, String errorContent) {
WebSocketMessage errorMessage = WebSocketMessage.builder()
.messageId(UUID.randomUUID().toString())
.type("ERROR")
.content(errorContent)
.senderId("system")
.senderType("SYSTEM")
.status("SENT")
.createTime(LocalDateTime.now())
.build();
messagingTemplate.convertAndSendToUser(userId, "/queue/messages", errorMessage);
}
/**
* 获取在线用户数量
*/
@Override
public int getOnlineUserCount() {
return onlineUsers.size();
}
/**
* 创建新会话
*/
private String createNewConversation(String userId, ChatRequest request) {
try {
String conversationId = "conv_" + System.currentTimeMillis() + "_" + UUID.randomUUID().toString().substring(0, 8);
Conversation conversation = Conversation.builder()
.userId(userId)
.userType("USER".equals(request.getSenderType()) ? "registered" : "guest")
.title("新对话")
.type("chat")
.conversationStatus("active")
.startTime(LocalDateTime.now())
.lastActiveTime(LocalDateTime.now())
.messageCount(0)
.build();
// 设置ID
conversation.setId(conversationId);
conversationService.save(conversation);
log.info("创建新会话: conversationId={}, userId={}", conversationId, userId);
return conversationId;
} catch (Exception e) {
log.error("创建新会话失败: userId={}", userId, e);
throw new RuntimeException("创建会话失败", e);
}
}
/**
* 确保会话存在并更新活跃时间
*/
private void ensureConversationExists(String conversationId, String userId, ChatRequest request) {
try {
Conversation conversation = conversationService.getById(conversationId);
if (conversation == null) {
// 如果会话不存在,创建一个
conversation = Conversation.builder()
.userId(userId)
.userType("USER".equals(request.getSenderType()) ? "registered" : "guest")
.title("对话")
.type("chat")
.conversationStatus("active")
.startTime(LocalDateTime.now())
.lastActiveTime(LocalDateTime.now())
.messageCount(0)
.build();
// 设置ID
conversation.setId(conversationId);
conversationService.save(conversation);
log.info("创建会话: conversationId={}, userId={}", conversationId, userId);
} else {
// 更新最后活跃时间
conversation.setLastActiveTime(LocalDateTime.now());
conversationService.updateById(conversation);
}
} catch (Exception e) {
log.error("确保会话存在失败: conversationId={}, userId={}", conversationId, userId, e);
}
}
/**
* 更新会话活跃状态
*/
private void updateConversationActivity(String conversationId) {
try {
Conversation conversation = conversationService.getById(conversationId);
if (conversation != null) {
conversation.setLastActiveTime(LocalDateTime.now());
conversation.setMessageCount((conversation.getMessageCount() != null ? conversation.getMessageCount() : 0) + 1);
conversationService.updateById(conversation);
}
} catch (Exception e) {
log.error("更新会话活跃状态失败: conversationId={}", conversationId, e);
}
}
/**
* 根据换行符分割AI回复并按顺序发送多条消息
*
* @param userId 用户ID
* @param conversationId 会话ID
* @param aiReply AI回复内容
*/
private void sendAiReplyInParts(String userId, String conversationId, String aiReply) {
try {
log.info("开始处理AI回复消息: userId={}, conversationId={}, aiReply长度={}",
userId, conversationId, aiReply != null ? aiReply.length() : 0);
if (aiReply == null || aiReply.trim().isEmpty()) {
log.warn("AI回复内容为空,跳过发送");
return;
}
// 检查是否需要分割
boolean needsSplit = aiReply.contains("\n\n") || aiReply.contains("\n");
if (!needsSplit) {
// 不需要分割,直接发送完整消息
log.info("AI回复无换行符,发送完整消息");
sendSingleAiMessage(userId, conversationId, aiReply.trim());
return;
}
// 需要分割,按换行符分割并发送多条消息
log.info("AI回复包含换行符,开始分割发送");
String[] replyParts = splitAiReply(aiReply);
log.info("AI回复分割完成,共{}个部分", replyParts.length);
// 按顺序发送每个部分
int sentCount = 0;
for (int i = 0; i < replyParts.length; i++) {
String part = replyParts[i].trim();
// 跳过空白部分
if (part.isEmpty()) {
continue;
}
// 发送消息部分
sendSingleAiMessage(userId, conversationId, part);
sentCount++;
log.info("发送AI回复部分 {}/{}: 内容长度={}", sentCount, replyParts.length, part.length());
// 在多个部分之间添加短暂延迟,模拟自然对话节奏
if (i < replyParts.length - 1) {
try {
Thread.sleep(500); // 延迟500毫秒
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
log.warn("发送AI回复时被中断");
break;
}
}
}
log.info("AI回复发送完成: userId={}, conversationId={}, 实际发送{}条消息",
userId, conversationId, sentCount);
} catch (Exception e) {
log.error("分割发送AI回复失败: userId={}, conversationId={}", userId, conversationId, e);
// 发送错误时,尝试发送完整的原始回复
try {
WebSocketMessage fallbackMessage = WebSocketMessage.builder()
.messageId(UUID.randomUUID().toString())
.conversationId(conversationId)
.type("TEXT")
.content(aiReply)
.senderId("ai")
.senderType("AI")
.status("SENT")
.createTime(LocalDateTime.now())
.build();
messagingTemplate.convertAndSendToUser(userId, "/queue/messages", fallbackMessage);
if (conversationId != null) {
messagingTemplate.convertAndSend("/topic/conversation/" + conversationId, fallbackMessage);
}
log.info("已发送完整AI回复作为备用方案");
} catch (Exception fallbackError) {
log.error("发送备用AI回复也失败", fallbackError);
sendErrorMessage(userId, "AI回复发送失败,请稍后重试");
}
}
}
/**
* 发送单条AI消息
*
* @param userId 用户ID
* @param conversationId 会话ID
* @param content 消息内容
*/
private void sendSingleAiMessage(String userId, String conversationId, String content) {
try {
// 保存AI消息到数据库
Message aiMessage = new Message();
aiMessage.setId(snowflakeIdGenerator.nextIdAsString());
aiMessage.setConversationId(conversationId);
aiMessage.setUserId(userId);
aiMessage.setCreateBy("ai");
aiMessage.setContent(content);
aiMessage.setType("text");
aiMessage.setSender("ai");
aiMessage.setCozeRole("assistant");
aiMessage.setCozeContentType("text");
aiMessage.setTimestamp(LocalDateTime.now());
messageService.createMessage(aiMessage);
log.info("AI消息已保存到数据库: messageId={}, conversationId={}, contentLength={}",
aiMessage.getId(), conversationId, content.length());
// 构建WebSocket消息
WebSocketMessage wsMessage = WebSocketMessage.builder()
.messageId(aiMessage.getId())
.conversationId(conversationId)
.type("TEXT")
.content(content)
.senderId("ai")
.senderType("AI")
.status("SENT")
.createTime(LocalDateTime.now())
.build();
// 发送给用户私有队列
messagingTemplate.convertAndSendToUser(userId, "/queue/messages", wsMessage);
// 发送到会话公共频道
if (conversationId != null) {
messagingTemplate.convertAndSend("/topic/conversation/" + conversationId, wsMessage);
}
} catch (Exception e) {
log.error("发送单条AI消息失败: userId={}, conversationId={}", userId, conversationId, e);
sendErrorMessage(userId, "消息发送失败,请稍后重试");
}
}
/**
* 智能分割AI回复内容
*
* @param aiReply AI回复内容
* @return 分割后的内容数组
*/
private String[] splitAiReply(String aiReply) {
if (aiReply == null || aiReply.trim().isEmpty()) {
return new String[0];
}
// 首先尝试按双换行符分割(段落分割)
if (aiReply.contains("\n\n")) {
String[] parts = aiReply.split("\n\n");
log.debug("按双换行符分割,得到{}个部分", parts.length);
return parts;
}
// 如果没有双换行符,按单换行符分割(行分割)
if (aiReply.contains("\n")) {
String[] parts = aiReply.split("\n");
log.debug("按单换行符分割,得到{}个部分", parts.length);
return parts;
}
// 如果没有换行符,返回原始内容(这种情况不应该到达这里)
log.debug("没有换行符,返回原始内容");
return new String[]{aiReply};
}
/**
* 设置WebSocket请求的默认值
*/
private void setWebSocketRequestDefaults(WebSocketRequest request, Principal principal, String sessionId) {
// 如果请求中没有发送者ID,尝试从Principal获取
if (request.getSenderId() == null && principal != null) {
request.setSenderId(principal.getName());
}
// 如果还是没有发送者ID,使用会话ID作为访客ID
if (request.getSenderId() == null) {
request.setSenderId("guest_" + sessionId);
request.setSenderType("GUEST");
}
// 设置时间戳
if (request.getTimestamp() == null) {
request.setTimestamp(System.currentTimeMillis());
}
}
/**
* 设置连接请求的默认值
*/
private void setConnectRequestDefaults(ConnectRequest request, Principal principal, String sessionId) {
// 优先从Principal获取认证用户信息
if (principal != null) {
request.setUserId(principal.getName());
}
// 如果还没有userId,生成访客ID
if (request.getUserId() == null) {
request.setUserId("guest_" + sessionId);
}
// 设置连接时间戳
if (request.getTimestamp() == null) {
request.setTimestamp(System.currentTimeMillis());
}
}
/**
* 获取用户ID
*/
private String getUserId(Principal principal, String sessionId) {
if (principal != null) {
return principal.getName();
}
return "guest_" + sessionId;
}
/**
* 判断用户是否已认证
*/
private boolean isUserAuthenticated(String userId) {
return userId != null && !userId.startsWith("guest_");
}
/**
* 获取发送者类型
*/
private String getSenderType(String senderType) {
if ("USER".equals(senderType)) {
return "USER";
} else if ("GUEST".equals(senderType)) {
return "GUEST";
} else if ("AI".equals(senderType)) {
return "AI";
} else if ("SYSTEM".equals(senderType)) {
return "SYSTEM";
}
return "USER";
}
/**
* 转换WebSocketRequest到ChatRequest
*
* @param webSocketRequest WebSocket请求对象
* @return ChatRequest对象
*/
private ChatRequest convertToChatRequest(WebSocketRequest webSocketRequest) {
return ChatRequest.builder()
.content(webSocketRequest.getContent())
.senderId(webSocketRequest.getSenderId())
.senderType(webSocketRequest.getSenderType())
.messageType(webSocketRequest.getMessageType())
.conversationId(webSocketRequest.getConversationId())
.timestamp(webSocketRequest.getTimestamp())
.build();
}
}