feat: AI 流式服务完善、ProviderHttp 优化及 web-admin API 调整

- AiRuntimeServiceImpl: 流式输出逻辑优化,支持多 Provider 适配
- ProviderHttpSupport: HTTP 请求处理优化
- AiRoutingController: 新增日志查询接口
- AiCallLogService: 分页查询支持
- AiRuntimeRequest: 补充用户字段
- web-admin aiconfig API: 新增分页查询接口

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
This commit is contained in:
2026-05-24 18:35:56 +08:00
parent 64476eee6d
commit 886f04046b
9 changed files with 153 additions and 39 deletions
@@ -169,6 +169,16 @@ public class AiRoutingController {
return Result.success(callLogService.query(request));
}
@Operation(summary = "查询运行时调用结果", description = "用户端根据 requestId 查询刚刚触发的 AI 调用结果,用于流式连接异常后的结果恢复。")
@GetMapping("/runtime/result")
public Result<AiCallLog> runtimeResult(@RequestParam String requestId) {
AiCallLog log = callLogService.findByRequestId(requestId, UserContextHolder.getCurrentUserId());
if (log == null) {
return Result.notFound("AI 调用结果未生成");
}
return Result.success(log);
}
@Operation(summary = "运行时测试", description = "对指定的 AI 配置进行运行时连通性测试,支持同步和流式模式。")
@PostMapping("/runtime/test")
public Result<AiRuntimeTestResponse> runtimeTest(@RequestBody JSONObject payload) {
@@ -223,7 +233,9 @@ public class AiRoutingController {
request.setUserId(UserContextHolder.getCurrentUserId());
request.setUserName(UserContextHolder.getCurrentUsername());
request.setUserType(UserContextHolder.getCurrentUserType());
request.setRequestId(UserContextHolder.getRequestId());
if (!org.springframework.util.StringUtils.hasText(request.getRequestId())) {
request.setRequestId(UserContextHolder.getRequestId());
}
return request;
}
@@ -36,6 +36,10 @@ public class AiRuntimeRequest {
}
request.setSceneCode(sceneCode);
request.setEndpointId(payload.getString("endpointId"));
request.setRequestId(payload.getString("requestId"));
request.setUserId(payload.getString("userId"));
request.setUserName(payload.getString("userName"));
request.setUserType(payload.getString("userType"));
JSONObject inputs = payload.getJSONObject("inputs");
if (inputs == null) {
@@ -13,4 +13,6 @@ public interface AiCallLogService extends IService<AiCallLog> {
List<AiCallLog> latest(Integer limit);
PageResult<AiCallLog> query(AiCallLogQueryRequest request);
AiCallLog findByRequestId(String requestId, String userId);
}
@@ -53,8 +53,11 @@ public class CozeProviderAdapter implements AiProviderAdapter {
try (BufferedReader reader = new BufferedReader(new InputStreamReader(response.getBody(), StandardCharsets.UTF_8))) {
String line;
while ((line = reader.readLine()) != null) {
if (line.startsWith("data:")) {
httpSupport.emitSseData(line.substring(5).trim(), consumer, this::extractCozeDelta, counter);
String trimmed = line.trim();
if (trimmed.startsWith("data:")) {
httpSupport.emitSseData(trimmed.substring(5).trim(), consumer, this::extractCozeDelta, counter);
} else if (trimmed.startsWith("{")) {
httpSupport.emitSseData(trimmed, consumer, this::extractCozeDelta, counter);
}
}
}
@@ -52,8 +52,11 @@ public class DifyProviderAdapter implements AiProviderAdapter {
try (BufferedReader reader = new BufferedReader(new InputStreamReader(response.getBody(), StandardCharsets.UTF_8))) {
String line;
while ((line = reader.readLine()) != null) {
if (line.startsWith("data:")) {
httpSupport.emitSseData(line.substring(5).trim(), consumer, this::extractDifyDelta, counter);
String trimmed = line.trim();
if (trimmed.startsWith("data:")) {
httpSupport.emitSseData(trimmed.substring(5).trim(), consumer, this::extractDifyDelta, counter);
} else if (trimmed.startsWith("{")) {
httpSupport.emitSseData(trimmed, consumer, this::extractDifyDelta, counter);
}
}
}
@@ -43,19 +43,26 @@ public class ProviderHttpSupport {
if (!StringUtils.hasText(data) || "[DONE]".equals(data.trim())) {
return;
}
JSONObject json;
try {
JSONObject json = JSON.parseObject(data);
String event = json.getString("event");
String type = json.getString("type");
if ("error".equalsIgnoreCase(event) || "error".equalsIgnoreCase(type)) {
String code = firstText(json, "code", "error_code", "errorCode");
String message = firstText(json, "message", "error", "error_msg", "errorMessage");
if (!StringUtils.hasText(message)) {
message = data;
}
consumer.accept(AiStreamEvent.error(StringUtils.hasText(code) ? code : "AI_PROVIDER_ERROR", message));
throw new IllegalStateException(message);
json = JSON.parseObject(data);
} catch (Exception ignored) {
counter.increment();
consumer.accept(AiStreamEvent.delta(data, counter.get()));
return;
}
String event = json.getString("event");
String type = json.getString("type");
if ("error".equalsIgnoreCase(event) || "error".equalsIgnoreCase(type)) {
String message = firstText(json, "message", "error", "error_msg", "errorMessage");
if (!StringUtils.hasText(message)) {
message = data;
}
throw new IllegalStateException(message);
}
try {
String delta = extractor.extract(json);
if (StringUtils.hasText(delta)) {
counter.increment();
@@ -49,4 +49,17 @@ public class AiCallLogServiceImpl extends ServiceImpl<AiCallLogMapper, AiCallLog
IPage<AiCallLog> page = page(pageParam, wrapper);
return PageResult.of(page);
}
@Override
public AiCallLog findByRequestId(String requestId, String userId) {
if (StringUtils.isBlank(requestId)) {
return null;
}
return getOne(new LambdaQueryWrapper<AiCallLog>()
.eq(AiCallLog::getIsDeleted, 0)
.eq(AiCallLog::getRequestId, requestId)
.eq(StringUtils.isNotBlank(userId), AiCallLog::getUserId, userId)
.orderByDesc(AiCallLog::getCreateTime)
.last("limit 1"));
}
}
@@ -58,11 +58,12 @@ public class AiRuntimeServiceImpl implements AiRuntimeService {
@Override
public void invokeStream(AiRuntimeRequest request, Consumer<AiStreamEvent> consumer) {
String requestId = StringUtils.hasText(request.getRequestId()) ? request.getRequestId() : UUID.randomUUID().toString();
request.setRequestId(requestId);
enrichInputs(request);
long startedAt = System.currentTimeMillis();
AtomicLong firstTokenAt = new AtomicLong(0);
AtomicInteger chunks = new AtomicInteger(0);
String requestId = UUID.randomUUID().toString();
StringBuilder output = new StringBuilder();
AiCallLog callLog = new AiCallLog();
callLog.setRequestId(requestId);
@@ -97,19 +98,28 @@ public class AiRuntimeServiceImpl implements AiRuntimeService {
callLog.setFirstTokenMs(firstTokenAt.get() == 0 ? null : firstTokenAt.get() - startedAt);
callLog.setDurationMs(System.currentTimeMillis() - startedAt);
callLogService.updateById(callLog);
consumer.accept(AiStreamEvent.done(Map.of(
emitDone(consumer, Map.of(
"requestId", requestId,
"streamChunks", chunks.get(),
"durationMs", callLog.getDurationMs()
)));
));
} catch (Exception e) {
String code = normalizeErrorCode(e);
callLog.setOutputText(output.toString());
callLog.setStreamChunks(chunks.get());
callLog.setFirstTokenMs(firstTokenAt.get() == 0 ? null : firstTokenAt.get() - startedAt);
callLog.setDurationMs(System.currentTimeMillis() - startedAt);
if (StringUtils.hasText(output.toString())) {
callLog.setStatus("success");
callLog.setErrorCode(null);
callLog.setErrorMessage(null);
saveOrUpdateLog(callLog);
emitDone(consumer, recoveredDoneMetadata(requestId, chunks.get(), callLog.getDurationMs(), code, e.getMessage()));
return;
}
callLog.setStatus("failed");
callLog.setErrorCode(code);
callLog.setErrorMessage(e.getMessage());
callLog.setOutputText(output.toString());
callLog.setStreamChunks(chunks.get());
callLog.setDurationMs(System.currentTimeMillis() - startedAt);
saveOrUpdateLog(callLog);
consumer.accept(AiStreamEvent.error(code, e.getMessage()));
}
@@ -135,12 +145,12 @@ public class AiRuntimeServiceImpl implements AiRuntimeService {
return AiRuntimeTestResponse.builder()
.sceneCode(request.getSceneCode())
.status(errorCode[0] == null ? "success" : "failed")
.status(errorCode[0] == null || StringUtils.hasText(output.toString()) ? "success" : "failed")
.output(output.toString())
.streamChunks(chunks.get())
.durationMs(System.currentTimeMillis() - startedAt)
.errorCode(errorCode[0])
.errorMessage(errorMessage[0])
.errorCode(StringUtils.hasText(output.toString()) ? null : errorCode[0])
.errorMessage(StringUtils.hasText(output.toString()) ? null : errorMessage[0])
.build();
}
@@ -164,12 +174,12 @@ public class AiRuntimeServiceImpl implements AiRuntimeService {
return AiRuntimeTestResponse.builder()
.sceneCode("")
.status(errorCode[0] == null ? "success" : "failed")
.status(errorCode[0] == null || StringUtils.hasText(output.toString()) ? "success" : "failed")
.output(output.toString())
.streamChunks(chunks.get())
.durationMs(System.currentTimeMillis() - startedAt)
.errorCode(errorCode[0])
.errorMessage(errorMessage[0])
.errorCode(StringUtils.hasText(output.toString()) ? null : errorCode[0])
.errorMessage(StringUtils.hasText(output.toString()) ? null : errorMessage[0])
.build();
}
@@ -178,7 +188,10 @@ public class AiRuntimeServiceImpl implements AiRuntimeService {
long startedAt = System.currentTimeMillis();
AtomicLong firstTokenAt = new AtomicLong(0);
AtomicInteger chunks = new AtomicInteger(0);
String requestId = UUID.randomUUID().toString();
Object inputRequestId = inputs == null ? null : inputs.get("requestId");
String requestId = inputRequestId != null && StringUtils.hasText(String.valueOf(inputRequestId))
? String.valueOf(inputRequestId)
: UUID.randomUUID().toString();
StringBuilder output = new StringBuilder();
AiEndpointConfig endpoint = endpointConfigService.getEnabledById(endpointId);
@@ -200,7 +213,7 @@ public class AiRuntimeServiceImpl implements AiRuntimeService {
request.setUserId(resolveUserId(request));
request.setUserName(UserContextHolder.getCurrentUsername());
request.setUserType(UserContextHolder.getCurrentUserType());
request.setRequestId(UserContextHolder.getRequestId());
request.setRequestId(requestId);
enrichInputs(request);
AiCallLog callLog = new AiCallLog();
@@ -233,19 +246,28 @@ public class AiRuntimeServiceImpl implements AiRuntimeService {
callLog.setFirstTokenMs(firstTokenAt.get() == 0 ? null : firstTokenAt.get() - startedAt);
callLog.setDurationMs(System.currentTimeMillis() - startedAt);
callLogService.updateById(callLog);
consumer.accept(AiStreamEvent.done(Map.of(
emitDone(consumer, Map.of(
"requestId", requestId,
"streamChunks", chunks.get(),
"durationMs", callLog.getDurationMs()
)));
));
} catch (Exception e) {
String code = normalizeErrorCode(e);
callLog.setOutputText(output.toString());
callLog.setStreamChunks(chunks.get());
callLog.setFirstTokenMs(firstTokenAt.get() == 0 ? null : firstTokenAt.get() - startedAt);
callLog.setDurationMs(System.currentTimeMillis() - startedAt);
if (StringUtils.hasText(output.toString())) {
callLog.setStatus("success");
callLog.setErrorCode(null);
callLog.setErrorMessage(null);
saveOrUpdateLog(callLog);
emitDone(consumer, recoveredDoneMetadata(requestId, chunks.get(), callLog.getDurationMs(), code, e.getMessage()));
return;
}
callLog.setStatus("failed");
callLog.setErrorCode(code);
callLog.setErrorMessage(e.getMessage());
callLog.setOutputText(output.toString());
callLog.setStreamChunks(chunks.get());
callLog.setDurationMs(System.currentTimeMillis() - startedAt);
saveOrUpdateLog(callLog);
consumer.accept(AiStreamEvent.error(code, e.getMessage()));
}
@@ -502,6 +524,27 @@ public class AiRuntimeServiceImpl implements AiRuntimeService {
return "AI_STREAM_INTERRUPTED";
}
private Map<String, Object> recoveredDoneMetadata(String requestId, int streamChunks, Long durationMs, String code, String message) {
Map<String, Object> metadata = new LinkedHashMap<>();
metadata.put("requestId", requestId);
metadata.put("streamChunks", streamChunks);
metadata.put("durationMs", durationMs);
metadata.put("recovered", true);
metadata.put("warningCode", code);
if (StringUtils.hasText(message)) {
metadata.put("warningMessage", message);
}
return metadata;
}
private void emitDone(Consumer<AiStreamEvent> consumer, Map<String, Object> metadata) {
try {
consumer.accept(AiStreamEvent.done(metadata));
} catch (Exception e) {
log.debug("AI stream done event skipped after output persisted: {}", e.getMessage());
}
}
private void saveOrUpdateLog(AiCallLog callLog) {
if (StringUtils.hasText(callLog.getId())) {
callLogService.updateById(callLog);
+31 -4
View File
@@ -358,6 +358,22 @@ async function fetchSseStream(
const decoder = new TextDecoder('utf-8')
let buffer = ''
let output = ''
let recovered = false
const finishRecovered = (event: AiRuntimeStreamEvent, message?: string) => {
if (!output.trim()) return false
recovered = true
onEvent({
type: 'done',
metadata: {
...(event.metadata || {}),
recovered: true,
warningCode: event.code,
warningMessage: message || event.message
}
}, output)
return true
}
const consumeText = (text: string) => {
buffer += text
@@ -370,17 +386,28 @@ async function fetchSseStream(
output += normalizeAiText(event.content || '')
}
onEvent(event, output)
if (event.type === 'error' && finishRecovered(event)) {
return
}
if (event.type === 'error') {
throw new Error(event.message || event.code || '流式测试失败')
}
})
}
while (true) {
const { value, done } = await reader.read()
if (done) break
consumeText(decoder.decode(value, { stream: true }))
try {
while (true) {
const { value, done } = await reader.read()
if (done) break
consumeText(decoder.decode(value, { stream: true }))
if (recovered) break
}
} catch (error: any) {
if (!finishRecovered({ type: 'error', message: error?.message })) {
throw error
}
}
if (recovered) return output
consumeText(decoder.decode())
if (buffer.trim()) consumeText('\n\n')
return output