From accd1ae9daa51cc3c3a73870ea6d2b4935d2a9c6 Mon Sep 17 00:00:00 2001 From: huazhongmin Date: Sun, 26 Oct 2025 17:43:24 +0800 Subject: [PATCH] =?UTF-8?q?=E8=81=8A=E5=A4=A9=E9=A1=B5bug=E4=BF=AE?= =?UTF-8?q?=E5=A4=8D?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../service/impl/AiChatServiceImpl.java | 19 +-- .../service/impl/WebSocketServiceImpl.java | 60 +++++++--- web/src/stores/chat.ts | 109 +++++++++++++----- web/src/views/Chat/index.vue | 8 +- 4 files changed, 131 insertions(+), 65 deletions(-) diff --git a/backend-single/src/main/java/com/emotion/service/impl/AiChatServiceImpl.java b/backend-single/src/main/java/com/emotion/service/impl/AiChatServiceImpl.java index 32f62bb..69e485f 100644 --- a/backend-single/src/main/java/com/emotion/service/impl/AiChatServiceImpl.java +++ b/backend-single/src/main/java/com/emotion/service/impl/AiChatServiceImpl.java @@ -442,21 +442,12 @@ public class AiChatServiceImpl implements AiChatService { // 调用Coze API(带messageId) String aiReply = sendMessageWithMessageId(conversationId, messageId, message, userId); - // 注意:不保存用户消息,因为WebSocket处理器已经保存了 - // 只保存AI回复 - Message aiMessage = new Message(); - aiMessage.setId(snowflakeIdGenerator.nextIdAsString()); - aiMessage.setConversationId(conversationId); - aiMessage.setCreateBy(userId); // 设置创建人为当前用户 - aiMessage.setContent(aiReply); - aiMessage.setType("text"); - aiMessage.setSender("ai"); - aiMessage.setCozeRole("assistant"); - aiMessage.setCozeContentType("text"); - aiMessage = messageService.createMessage(aiMessage); + // 注意:不在这里保存AI回复消息 + // WebSocket处理器会在sendAiReplyInParts中分割并保存每条消息 + // 这样可以确保分割后的消息都被正确保存到数据库 - log.info("WebSocket聊天消息处理完成(带messageId): userMessageId={}, aiMessageId={}", - messageId, aiMessage.getId()); + log.info("WebSocket聊天消息处理完成(带messageId): userMessageId={}, aiReplyLength={}", + messageId, aiReply != null ? aiReply.length() : 0); return aiReply; diff --git a/backend-single/src/main/java/com/emotion/service/impl/WebSocketServiceImpl.java b/backend-single/src/main/java/com/emotion/service/impl/WebSocketServiceImpl.java index a154e93..ca1baf9 100644 --- a/backend-single/src/main/java/com/emotion/service/impl/WebSocketServiceImpl.java +++ b/backend-single/src/main/java/com/emotion/service/impl/WebSocketServiceImpl.java @@ -10,6 +10,7 @@ import com.emotion.service.WebSocketService; import com.emotion.service.AiChatService; import com.emotion.service.MessageService; import com.emotion.service.ConversationService; +import com.emotion.util.SnowflakeIdGenerator; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.messaging.simp.SimpMessagingTemplate; @@ -42,7 +43,10 @@ public class WebSocketServiceImpl implements WebSocketService { @Autowired private ConversationService conversationService; - + + @Autowired + private SnowflakeIdGenerator snowflakeIdGenerator; + // 在线用户管理 private final ConcurrentHashMap onlineUsers = new ConcurrentHashMap<>(); @@ -464,24 +468,46 @@ public class WebSocketServiceImpl implements WebSocketService { * @param content 消息内容 */ private void sendSingleAiMessage(String userId, String conversationId, String content) { - // 构建AI回复消息 - WebSocketMessage aiMessage = WebSocketMessage.builder() - .messageId(UUID.randomUUID().toString()) - .conversationId(conversationId) - .type("TEXT") - .content(content) - .senderId("ai") - .senderType("AI") - .status("SENT") - .createTime(LocalDateTime.now()) - .build(); + try { + // 保存AI消息到数据库 + Message aiMessage = new Message(); + aiMessage.setId(snowflakeIdGenerator.nextIdAsString()); + aiMessage.setConversationId(conversationId); + aiMessage.setUserId(userId); + aiMessage.setCreateBy("ai"); + aiMessage.setContent(content); + aiMessage.setType("text"); + aiMessage.setSender("ai"); + aiMessage.setCozeRole("assistant"); + aiMessage.setCozeContentType("text"); + aiMessage.setTimestamp(LocalDateTime.now()); + messageService.createMessage(aiMessage); - // 发送给用户私有队列 - messagingTemplate.convertAndSendToUser(userId, "/queue/messages", aiMessage); + log.info("AI消息已保存到数据库: messageId={}, conversationId={}, contentLength={}", + aiMessage.getId(), conversationId, content.length()); - // 发送到会话公共频道 - if (conversationId != null) { - messagingTemplate.convertAndSend("/topic/conversation/" + conversationId, aiMessage); + // 构建WebSocket消息 + WebSocketMessage wsMessage = WebSocketMessage.builder() + .messageId(aiMessage.getId()) + .conversationId(conversationId) + .type("TEXT") + .content(content) + .senderId("ai") + .senderType("AI") + .status("SENT") + .createTime(LocalDateTime.now()) + .build(); + + // 发送给用户私有队列 + messagingTemplate.convertAndSendToUser(userId, "/queue/messages", wsMessage); + + // 发送到会话公共频道 + if (conversationId != null) { + messagingTemplate.convertAndSend("/topic/conversation/" + conversationId, wsMessage); + } + } catch (Exception e) { + log.error("发送单条AI消息失败: userId={}, conversationId={}", userId, conversationId, e); + sendErrorMessage(userId, "消息发送失败,请稍后重试"); } } diff --git a/web/src/stores/chat.ts b/web/src/stores/chat.ts index 3e74182..0375f95 100644 --- a/web/src/stores/chat.ts +++ b/web/src/stores/chat.ts @@ -27,6 +27,12 @@ export interface ChatSession { messageCount: number } +// 消息队列项 +interface QueuedMessage { + message: ChatMessage + timestamp: number +} + export const useChatStore = defineStore('chat', () => { const authStore = useAuthStore() @@ -39,6 +45,11 @@ export const useChatStore = defineStore('chat', () => { const connectionStatus = ref('DISCONNECTED') const wsConnected = ref(false) + // 消息队列和处理状态 + const messageQueue = ref([]) + const isProcessingQueue = ref(false) + const queueProcessingInterval = ref(null) + // 计算属性 const currentMessages = computed(() => { if (!currentSession.value) return [] @@ -48,7 +59,37 @@ export const useChatStore = defineStore('chat', () => { ) }) - // 添加消息 + // 处理消息队列 - 批量处理消息以避免竞态条件 + const processMessageQueue = async () => { + if (isProcessingQueue.value || messageQueue.value.length === 0) { + return + } + + isProcessingQueue.value = true + try { + // 一次性取出所有待处理消息 + const batch = messageQueue.value.splice(0, messageQueue.value.length) + + // 批量添加消息到数组 + const newMessages = batch.map(item => item.message) + messages.value.push(...newMessages) + + console.log(`✅ 消息队列处理完成,添加了 ${newMessages.length} 条消息,当前总数: ${messages.value.length}`) + } finally { + isProcessingQueue.value = false + } + } + + // 将消息加入队列而不是直接添加 + const queueMessage = (message: ChatMessage) => { + messageQueue.value.push({ + message, + timestamp: Date.now() + }) + console.log(`📝 消息已加入队列,队列长度: ${messageQueue.value.length}`) + } + + // 添加消息 - 现在返回消息但不直接添加到数组 const addMessage = (message: Omit) => { const newMessage: ChatMessage = { ...message, @@ -56,7 +97,8 @@ export const useChatStore = defineStore('chat', () => { timestamp: new Date().toISOString(), status: message.type === 'user' ? 'sending' : 'sent' } - messages.value.push(newMessage) + // 将消息加入队列而不是直接添加 + queueMessage(newMessage) return newMessage } @@ -360,15 +402,10 @@ export const useChatStore = defineStore('chat', () => { const timeA = parseTime(a.timestamp) const timeB = parseTime(b.timestamp) - console.log('📝 排序比较:', { - a: { id: a.id.substring(0, 8), timestamp: a.timestamp, parsed: new Date(timeA).toLocaleString() }, - b: { id: b.id.substring(0, 8), timestamp: b.timestamp, parsed: new Date(timeB).toLocaleString() }, - result: timeA - timeB - }) - return timeA - timeB }) + // 直接设置消息数组(初始加载时不使用队列) messages.value = chatMessages console.log('📝 最近消息已加载并排序,消息总数:', messages.value.length) @@ -380,42 +417,39 @@ export const useChatStore = defineStore('chat', () => { } } - // 添加AI回复消息(直接显示完整内容) + // 添加AI回复消息(使用队列处理) const addAiReplyMessages = async (content: string) => { // 停止输入状态 isTyping.value = false - // 使用 nextTick 确保 DOM 更新的顺序性,避免与定时同步并发 - await nextTick() - - // 直接添加完整的AI回复 + // 添加AI消息到队列 const aiMessage = addMessage({ content: content.trim(), type: 'ai', conversationId: currentSession.value?.id }) - // 强制触发响应式更新 - console.log('✅ AI消息已添加,当前消息总数:', messages.value.length) - console.log('📝 最新AI消息:', aiMessage) - console.log('📊 所有消息:', messages.value) + console.log('✅ AI消息已加入队列,消息ID:', aiMessage.id) + + // 立即处理队列 + await processMessageQueue() } - // WebSocket消息处理 + // WebSocket消息处理 - 使用队列处理所有消息 let handleWebSocketMessage = async (wsMessage: WebSocketMessage) => { console.log('收到WebSocket消息:', wsMessage.type, wsMessage.senderType) switch (wsMessage.type) { case 'TEXT': if (wsMessage.senderType === 'AI') { - // AI回复消息 - 支持分段显示 + // AI回复消息 - 加入队列处理 await addAiReplyMessages(wsMessage.content) } break case 'AI_THINKING': - // AI正在思考 - isTyping.value = true + // AI正在思考 - 不修改响应式数据,避免竞态 + console.log('⏳ AI正在思考中...') break case 'CONNECTION': @@ -424,22 +458,23 @@ export const useChatStore = defineStore('chat', () => { break case 'ERROR': - // 错误消息 + // 错误消息 - 加入队列处理 addMessage({ content: wsMessage.content, type: 'ai', sessionId: currentSession.value?.id }) - isTyping.value = false + await processMessageQueue() break case 'SYSTEM': - // 系统消息 + // 系统消息 - 加入队列处理 addMessage({ content: wsMessage.content, type: 'ai', sessionId: currentSession.value?.id }) + await processMessageQueue() break default: @@ -466,6 +501,14 @@ export const useChatStore = defineStore('chat', () => { wsConnected.value = true isConnected.value = true + // 启动消息队列处理器 - 每50ms处理一次队列 + if (queueProcessingInterval.value === null) { + queueProcessingInterval.value = window.setInterval(() => { + processMessageQueue() + }, 50) + console.log('✅ 消息队列处理器已启动') + } + // 设置会话ID if (currentSession.value?.id) { stompWebSocketService.setConversationId(currentSession.value.id) @@ -475,13 +518,18 @@ export const useChatStore = defineStore('chat', () => { console.log('WebSocket连接断开') wsConnected.value = false isConnected.value = false - isTyping.value = false + + // 停止消息队列处理器 + if (queueProcessingInterval.value !== null) { + clearInterval(queueProcessingInterval.value) + queueProcessingInterval.value = null + console.log('✅ 消息队列处理器已停止') + } }, onError: (error) => { console.error('WebSocket错误:', error) wsConnected.value = false isConnected.value = false - isTyping.value = false // 显示用户友好的错误信息 if (error.userMessage) { @@ -490,6 +538,7 @@ export const useChatStore = defineStore('chat', () => { type: 'ai', sessionId: currentSession.value?.id }) + processMessageQueue() } }, onStatusChange: (status) => { @@ -508,7 +557,13 @@ export const useChatStore = defineStore('chat', () => { stompWebSocketService.disconnect() wsConnected.value = false isConnected.value = false - isTyping.value = false + + // 停止消息队列处理器 + if (queueProcessingInterval.value !== null) { + clearInterval(queueProcessingInterval.value) + queueProcessingInterval.value = null + console.log('✅ 消息队列处理器已停止') + } } // 初始化聊天 - 参考web项目的实现 diff --git a/web/src/views/Chat/index.vue b/web/src/views/Chat/index.vue index 69f4c73..8e214ab 100644 --- a/web/src/views/Chat/index.vue +++ b/web/src/views/Chat/index.vue @@ -161,7 +161,7 @@