From 886f04046bd30aaad1c154d558d68ebf5c24bdee Mon Sep 17 00:00:00 2001 From: Peanut Date: Sun, 24 May 2026 18:35:56 +0800 Subject: [PATCH] =?UTF-8?q?feat:=20AI=20=E6=B5=81=E5=BC=8F=E6=9C=8D?= =?UTF-8?q?=E5=8A=A1=E5=AE=8C=E5=96=84=E3=80=81ProviderHttp=20=E4=BC=98?= =?UTF-8?q?=E5=8C=96=E5=8F=8A=20web-admin=20API=20=E8=B0=83=E6=95=B4?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - AiRuntimeServiceImpl: 流式输出逻辑优化,支持多 Provider 适配 - ProviderHttpSupport: HTTP 请求处理优化 - AiRoutingController: 新增日志查询接口 - AiCallLogService: 分页查询支持 - AiRuntimeRequest: 补充用户字段 - web-admin aiconfig API: 新增分页查询接口 Co-Authored-By: Claude Opus 4.7 --- .../controller/AiRoutingController.java | 14 +++- .../dto/request/ai/AiRuntimeRequest.java | 4 + .../com/emotion/service/AiCallLogService.java | 2 + .../service/ai/CozeProviderAdapter.java | 7 +- .../service/ai/DifyProviderAdapter.java | 7 +- .../service/ai/ProviderHttpSupport.java | 29 ++++--- .../service/impl/AiCallLogServiceImpl.java | 13 +++ .../service/impl/AiRuntimeServiceImpl.java | 81 ++++++++++++++----- web-admin/src/api/aiconfig.ts | 35 +++++++- 9 files changed, 153 insertions(+), 39 deletions(-) diff --git a/backend-single/src/main/java/com/emotion/controller/AiRoutingController.java b/backend-single/src/main/java/com/emotion/controller/AiRoutingController.java index 6c11e0c..642af4d 100644 --- a/backend-single/src/main/java/com/emotion/controller/AiRoutingController.java +++ b/backend-single/src/main/java/com/emotion/controller/AiRoutingController.java @@ -169,6 +169,16 @@ public class AiRoutingController { return Result.success(callLogService.query(request)); } + @Operation(summary = "查询运行时调用结果", description = "用户端根据 requestId 查询刚刚触发的 AI 调用结果,用于流式连接异常后的结果恢复。") + @GetMapping("/runtime/result") + public Result runtimeResult(@RequestParam String requestId) { + AiCallLog log = callLogService.findByRequestId(requestId, UserContextHolder.getCurrentUserId()); + if (log == null) { + return Result.notFound("AI 调用结果未生成"); + } + return Result.success(log); + } + @Operation(summary = "运行时测试", description = "对指定的 AI 配置进行运行时连通性测试,支持同步和流式模式。") @PostMapping("/runtime/test") public Result runtimeTest(@RequestBody JSONObject payload) { @@ -223,7 +233,9 @@ public class AiRoutingController { request.setUserId(UserContextHolder.getCurrentUserId()); request.setUserName(UserContextHolder.getCurrentUsername()); request.setUserType(UserContextHolder.getCurrentUserType()); - request.setRequestId(UserContextHolder.getRequestId()); + if (!org.springframework.util.StringUtils.hasText(request.getRequestId())) { + request.setRequestId(UserContextHolder.getRequestId()); + } return request; } diff --git a/backend-single/src/main/java/com/emotion/dto/request/ai/AiRuntimeRequest.java b/backend-single/src/main/java/com/emotion/dto/request/ai/AiRuntimeRequest.java index a75f4a0..3b64a8a 100644 --- a/backend-single/src/main/java/com/emotion/dto/request/ai/AiRuntimeRequest.java +++ b/backend-single/src/main/java/com/emotion/dto/request/ai/AiRuntimeRequest.java @@ -36,6 +36,10 @@ public class AiRuntimeRequest { } request.setSceneCode(sceneCode); request.setEndpointId(payload.getString("endpointId")); + request.setRequestId(payload.getString("requestId")); + request.setUserId(payload.getString("userId")); + request.setUserName(payload.getString("userName")); + request.setUserType(payload.getString("userType")); JSONObject inputs = payload.getJSONObject("inputs"); if (inputs == null) { diff --git a/backend-single/src/main/java/com/emotion/service/AiCallLogService.java b/backend-single/src/main/java/com/emotion/service/AiCallLogService.java index e63e615..e222b02 100644 --- a/backend-single/src/main/java/com/emotion/service/AiCallLogService.java +++ b/backend-single/src/main/java/com/emotion/service/AiCallLogService.java @@ -13,4 +13,6 @@ public interface AiCallLogService extends IService { List latest(Integer limit); PageResult query(AiCallLogQueryRequest request); + + AiCallLog findByRequestId(String requestId, String userId); } diff --git a/backend-single/src/main/java/com/emotion/service/ai/CozeProviderAdapter.java b/backend-single/src/main/java/com/emotion/service/ai/CozeProviderAdapter.java index 124da39..59f834a 100644 --- a/backend-single/src/main/java/com/emotion/service/ai/CozeProviderAdapter.java +++ b/backend-single/src/main/java/com/emotion/service/ai/CozeProviderAdapter.java @@ -53,8 +53,11 @@ public class CozeProviderAdapter implements AiProviderAdapter { try (BufferedReader reader = new BufferedReader(new InputStreamReader(response.getBody(), StandardCharsets.UTF_8))) { String line; while ((line = reader.readLine()) != null) { - if (line.startsWith("data:")) { - httpSupport.emitSseData(line.substring(5).trim(), consumer, this::extractCozeDelta, counter); + String trimmed = line.trim(); + if (trimmed.startsWith("data:")) { + httpSupport.emitSseData(trimmed.substring(5).trim(), consumer, this::extractCozeDelta, counter); + } else if (trimmed.startsWith("{")) { + httpSupport.emitSseData(trimmed, consumer, this::extractCozeDelta, counter); } } } diff --git a/backend-single/src/main/java/com/emotion/service/ai/DifyProviderAdapter.java b/backend-single/src/main/java/com/emotion/service/ai/DifyProviderAdapter.java index 4501f07..1ac8892 100644 --- a/backend-single/src/main/java/com/emotion/service/ai/DifyProviderAdapter.java +++ b/backend-single/src/main/java/com/emotion/service/ai/DifyProviderAdapter.java @@ -52,8 +52,11 @@ public class DifyProviderAdapter implements AiProviderAdapter { try (BufferedReader reader = new BufferedReader(new InputStreamReader(response.getBody(), StandardCharsets.UTF_8))) { String line; while ((line = reader.readLine()) != null) { - if (line.startsWith("data:")) { - httpSupport.emitSseData(line.substring(5).trim(), consumer, this::extractDifyDelta, counter); + String trimmed = line.trim(); + if (trimmed.startsWith("data:")) { + httpSupport.emitSseData(trimmed.substring(5).trim(), consumer, this::extractDifyDelta, counter); + } else if (trimmed.startsWith("{")) { + httpSupport.emitSseData(trimmed, consumer, this::extractDifyDelta, counter); } } } diff --git a/backend-single/src/main/java/com/emotion/service/ai/ProviderHttpSupport.java b/backend-single/src/main/java/com/emotion/service/ai/ProviderHttpSupport.java index 88e6906..266784f 100644 --- a/backend-single/src/main/java/com/emotion/service/ai/ProviderHttpSupport.java +++ b/backend-single/src/main/java/com/emotion/service/ai/ProviderHttpSupport.java @@ -43,19 +43,26 @@ public class ProviderHttpSupport { if (!StringUtils.hasText(data) || "[DONE]".equals(data.trim())) { return; } + JSONObject json; try { - JSONObject json = JSON.parseObject(data); - String event = json.getString("event"); - String type = json.getString("type"); - if ("error".equalsIgnoreCase(event) || "error".equalsIgnoreCase(type)) { - String code = firstText(json, "code", "error_code", "errorCode"); - String message = firstText(json, "message", "error", "error_msg", "errorMessage"); - if (!StringUtils.hasText(message)) { - message = data; - } - consumer.accept(AiStreamEvent.error(StringUtils.hasText(code) ? code : "AI_PROVIDER_ERROR", message)); - throw new IllegalStateException(message); + json = JSON.parseObject(data); + } catch (Exception ignored) { + counter.increment(); + consumer.accept(AiStreamEvent.delta(data, counter.get())); + return; + } + + String event = json.getString("event"); + String type = json.getString("type"); + if ("error".equalsIgnoreCase(event) || "error".equalsIgnoreCase(type)) { + String message = firstText(json, "message", "error", "error_msg", "errorMessage"); + if (!StringUtils.hasText(message)) { + message = data; } + throw new IllegalStateException(message); + } + + try { String delta = extractor.extract(json); if (StringUtils.hasText(delta)) { counter.increment(); diff --git a/backend-single/src/main/java/com/emotion/service/impl/AiCallLogServiceImpl.java b/backend-single/src/main/java/com/emotion/service/impl/AiCallLogServiceImpl.java index 9582de4..aa1c6e2 100644 --- a/backend-single/src/main/java/com/emotion/service/impl/AiCallLogServiceImpl.java +++ b/backend-single/src/main/java/com/emotion/service/impl/AiCallLogServiceImpl.java @@ -49,4 +49,17 @@ public class AiCallLogServiceImpl extends ServiceImpl page = page(pageParam, wrapper); return PageResult.of(page); } + + @Override + public AiCallLog findByRequestId(String requestId, String userId) { + if (StringUtils.isBlank(requestId)) { + return null; + } + return getOne(new LambdaQueryWrapper() + .eq(AiCallLog::getIsDeleted, 0) + .eq(AiCallLog::getRequestId, requestId) + .eq(StringUtils.isNotBlank(userId), AiCallLog::getUserId, userId) + .orderByDesc(AiCallLog::getCreateTime) + .last("limit 1")); + } } diff --git a/backend-single/src/main/java/com/emotion/service/impl/AiRuntimeServiceImpl.java b/backend-single/src/main/java/com/emotion/service/impl/AiRuntimeServiceImpl.java index b438bca..01f8ae4 100644 --- a/backend-single/src/main/java/com/emotion/service/impl/AiRuntimeServiceImpl.java +++ b/backend-single/src/main/java/com/emotion/service/impl/AiRuntimeServiceImpl.java @@ -58,11 +58,12 @@ public class AiRuntimeServiceImpl implements AiRuntimeService { @Override public void invokeStream(AiRuntimeRequest request, Consumer consumer) { + String requestId = StringUtils.hasText(request.getRequestId()) ? request.getRequestId() : UUID.randomUUID().toString(); + request.setRequestId(requestId); enrichInputs(request); long startedAt = System.currentTimeMillis(); AtomicLong firstTokenAt = new AtomicLong(0); AtomicInteger chunks = new AtomicInteger(0); - String requestId = UUID.randomUUID().toString(); StringBuilder output = new StringBuilder(); AiCallLog callLog = new AiCallLog(); callLog.setRequestId(requestId); @@ -97,19 +98,28 @@ public class AiRuntimeServiceImpl implements AiRuntimeService { callLog.setFirstTokenMs(firstTokenAt.get() == 0 ? null : firstTokenAt.get() - startedAt); callLog.setDurationMs(System.currentTimeMillis() - startedAt); callLogService.updateById(callLog); - consumer.accept(AiStreamEvent.done(Map.of( + emitDone(consumer, Map.of( "requestId", requestId, "streamChunks", chunks.get(), "durationMs", callLog.getDurationMs() - ))); + )); } catch (Exception e) { String code = normalizeErrorCode(e); + callLog.setOutputText(output.toString()); + callLog.setStreamChunks(chunks.get()); + callLog.setFirstTokenMs(firstTokenAt.get() == 0 ? null : firstTokenAt.get() - startedAt); + callLog.setDurationMs(System.currentTimeMillis() - startedAt); + if (StringUtils.hasText(output.toString())) { + callLog.setStatus("success"); + callLog.setErrorCode(null); + callLog.setErrorMessage(null); + saveOrUpdateLog(callLog); + emitDone(consumer, recoveredDoneMetadata(requestId, chunks.get(), callLog.getDurationMs(), code, e.getMessage())); + return; + } callLog.setStatus("failed"); callLog.setErrorCode(code); callLog.setErrorMessage(e.getMessage()); - callLog.setOutputText(output.toString()); - callLog.setStreamChunks(chunks.get()); - callLog.setDurationMs(System.currentTimeMillis() - startedAt); saveOrUpdateLog(callLog); consumer.accept(AiStreamEvent.error(code, e.getMessage())); } @@ -135,12 +145,12 @@ public class AiRuntimeServiceImpl implements AiRuntimeService { return AiRuntimeTestResponse.builder() .sceneCode(request.getSceneCode()) - .status(errorCode[0] == null ? "success" : "failed") + .status(errorCode[0] == null || StringUtils.hasText(output.toString()) ? "success" : "failed") .output(output.toString()) .streamChunks(chunks.get()) .durationMs(System.currentTimeMillis() - startedAt) - .errorCode(errorCode[0]) - .errorMessage(errorMessage[0]) + .errorCode(StringUtils.hasText(output.toString()) ? null : errorCode[0]) + .errorMessage(StringUtils.hasText(output.toString()) ? null : errorMessage[0]) .build(); } @@ -164,12 +174,12 @@ public class AiRuntimeServiceImpl implements AiRuntimeService { return AiRuntimeTestResponse.builder() .sceneCode("") - .status(errorCode[0] == null ? "success" : "failed") + .status(errorCode[0] == null || StringUtils.hasText(output.toString()) ? "success" : "failed") .output(output.toString()) .streamChunks(chunks.get()) .durationMs(System.currentTimeMillis() - startedAt) - .errorCode(errorCode[0]) - .errorMessage(errorMessage[0]) + .errorCode(StringUtils.hasText(output.toString()) ? null : errorCode[0]) + .errorMessage(StringUtils.hasText(output.toString()) ? null : errorMessage[0]) .build(); } @@ -178,7 +188,10 @@ public class AiRuntimeServiceImpl implements AiRuntimeService { long startedAt = System.currentTimeMillis(); AtomicLong firstTokenAt = new AtomicLong(0); AtomicInteger chunks = new AtomicInteger(0); - String requestId = UUID.randomUUID().toString(); + Object inputRequestId = inputs == null ? null : inputs.get("requestId"); + String requestId = inputRequestId != null && StringUtils.hasText(String.valueOf(inputRequestId)) + ? String.valueOf(inputRequestId) + : UUID.randomUUID().toString(); StringBuilder output = new StringBuilder(); AiEndpointConfig endpoint = endpointConfigService.getEnabledById(endpointId); @@ -200,7 +213,7 @@ public class AiRuntimeServiceImpl implements AiRuntimeService { request.setUserId(resolveUserId(request)); request.setUserName(UserContextHolder.getCurrentUsername()); request.setUserType(UserContextHolder.getCurrentUserType()); - request.setRequestId(UserContextHolder.getRequestId()); + request.setRequestId(requestId); enrichInputs(request); AiCallLog callLog = new AiCallLog(); @@ -233,19 +246,28 @@ public class AiRuntimeServiceImpl implements AiRuntimeService { callLog.setFirstTokenMs(firstTokenAt.get() == 0 ? null : firstTokenAt.get() - startedAt); callLog.setDurationMs(System.currentTimeMillis() - startedAt); callLogService.updateById(callLog); - consumer.accept(AiStreamEvent.done(Map.of( + emitDone(consumer, Map.of( "requestId", requestId, "streamChunks", chunks.get(), "durationMs", callLog.getDurationMs() - ))); + )); } catch (Exception e) { String code = normalizeErrorCode(e); + callLog.setOutputText(output.toString()); + callLog.setStreamChunks(chunks.get()); + callLog.setFirstTokenMs(firstTokenAt.get() == 0 ? null : firstTokenAt.get() - startedAt); + callLog.setDurationMs(System.currentTimeMillis() - startedAt); + if (StringUtils.hasText(output.toString())) { + callLog.setStatus("success"); + callLog.setErrorCode(null); + callLog.setErrorMessage(null); + saveOrUpdateLog(callLog); + emitDone(consumer, recoveredDoneMetadata(requestId, chunks.get(), callLog.getDurationMs(), code, e.getMessage())); + return; + } callLog.setStatus("failed"); callLog.setErrorCode(code); callLog.setErrorMessage(e.getMessage()); - callLog.setOutputText(output.toString()); - callLog.setStreamChunks(chunks.get()); - callLog.setDurationMs(System.currentTimeMillis() - startedAt); saveOrUpdateLog(callLog); consumer.accept(AiStreamEvent.error(code, e.getMessage())); } @@ -502,6 +524,27 @@ public class AiRuntimeServiceImpl implements AiRuntimeService { return "AI_STREAM_INTERRUPTED"; } + private Map recoveredDoneMetadata(String requestId, int streamChunks, Long durationMs, String code, String message) { + Map metadata = new LinkedHashMap<>(); + metadata.put("requestId", requestId); + metadata.put("streamChunks", streamChunks); + metadata.put("durationMs", durationMs); + metadata.put("recovered", true); + metadata.put("warningCode", code); + if (StringUtils.hasText(message)) { + metadata.put("warningMessage", message); + } + return metadata; + } + + private void emitDone(Consumer consumer, Map metadata) { + try { + consumer.accept(AiStreamEvent.done(metadata)); + } catch (Exception e) { + log.debug("AI stream done event skipped after output persisted: {}", e.getMessage()); + } + } + private void saveOrUpdateLog(AiCallLog callLog) { if (StringUtils.hasText(callLog.getId())) { callLogService.updateById(callLog); diff --git a/web-admin/src/api/aiconfig.ts b/web-admin/src/api/aiconfig.ts index c06d3a5..f724228 100644 --- a/web-admin/src/api/aiconfig.ts +++ b/web-admin/src/api/aiconfig.ts @@ -358,6 +358,22 @@ async function fetchSseStream( const decoder = new TextDecoder('utf-8') let buffer = '' let output = '' + let recovered = false + + const finishRecovered = (event: AiRuntimeStreamEvent, message?: string) => { + if (!output.trim()) return false + recovered = true + onEvent({ + type: 'done', + metadata: { + ...(event.metadata || {}), + recovered: true, + warningCode: event.code, + warningMessage: message || event.message + } + }, output) + return true + } const consumeText = (text: string) => { buffer += text @@ -370,17 +386,28 @@ async function fetchSseStream( output += normalizeAiText(event.content || '') } onEvent(event, output) + if (event.type === 'error' && finishRecovered(event)) { + return + } if (event.type === 'error') { throw new Error(event.message || event.code || '流式测试失败') } }) } - while (true) { - const { value, done } = await reader.read() - if (done) break - consumeText(decoder.decode(value, { stream: true })) + try { + while (true) { + const { value, done } = await reader.read() + if (done) break + consumeText(decoder.decode(value, { stream: true })) + if (recovered) break + } + } catch (error: any) { + if (!finishRecovered({ type: 'error', message: error?.message })) { + throw error + } } + if (recovered) return output consumeText(decoder.decode()) if (buffer.trim()) consumeText('\n\n') return output