fix: 修复 Dify 非流式测试 user_id 缺失和超时问题

- enrichInputs 增加 user_id 下划线字段注入(Dify API 要求下划线格式)
- testAiRuntime 接口超时从 15 秒延长到 60 秒

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
This commit is contained in:
2026-05-23 13:19:18 +08:00
parent 0968f9418f
commit d77090aa5e
4 changed files with 1003 additions and 1857 deletions
@@ -0,0 +1,249 @@
package com.emotion.service.impl;
import com.alibaba.fastjson2.JSON;
import com.emotion.dto.request.ai.AiRuntimeRequest;
import com.emotion.dto.response.ai.AiRuntimeTestResponse;
import com.emotion.dto.response.ai.AiStreamEvent;
import com.emotion.entity.AiCallLog;
import com.emotion.entity.AiEndpointConfig;
import com.emotion.entity.AiProvider;
import com.emotion.entity.AiSceneBinding;
import com.emotion.service.AiCallLogService;
import com.emotion.service.AiEndpointConfigService;
import com.emotion.service.AiProviderService;
import com.emotion.service.AiRuntimeService;
import com.emotion.service.AiSceneBindingService;
import com.emotion.service.ScriptContextService;
import com.emotion.service.ai.AiProviderAdapter;
import com.emotion.util.UserContextHolder;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;
import org.springframework.util.StringUtils;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Consumer;
@Slf4j
@Service
public class AiRuntimeServiceImpl implements AiRuntimeService {
private final AiSceneBindingService sceneBindingService;
private final AiEndpointConfigService endpointConfigService;
private final AiProviderService providerService;
private final AiCallLogService callLogService;
private final ScriptContextService scriptContextService;
private final List<AiProviderAdapter> adapters;
public AiRuntimeServiceImpl(AiSceneBindingService sceneBindingService,
AiEndpointConfigService endpointConfigService,
AiProviderService providerService,
AiCallLogService callLogService,
ScriptContextService scriptContextService,
List<AiProviderAdapter> adapters) {
this.sceneBindingService = sceneBindingService;
this.endpointConfigService = endpointConfigService;
this.providerService = providerService;
this.callLogService = callLogService;
this.scriptContextService = scriptContextService;
this.adapters = adapters;
}
@Override
public void invokeStream(AiRuntimeRequest request, Consumer<AiStreamEvent> consumer) {
enrichInputs(request);
long startedAt = System.currentTimeMillis();
AtomicLong firstTokenAt = new AtomicLong(0);
AtomicInteger chunks = new AtomicInteger(0);
String requestId = UUID.randomUUID().toString();
StringBuilder output = new StringBuilder();
AiCallLog callLog = new AiCallLog();
callLog.setRequestId(requestId);
callLog.setSceneCode(request.getSceneCode());
callLog.setUserId(resolveUserId(request));
callLog.setInputText(JSON.toJSONString(request.getInputs()));
callLog.setStatus("running");
try {
RuntimeTarget target = resolveTarget(request);
callLog.setProviderCode(target.provider.getProviderCode());
callLog.setEndpointCode(target.endpoint.getEndpointCode());
callLogService.save(callLog);
consumer.accept(AiStreamEvent.start(request.getSceneCode()));
target.adapter.stream(target.provider, target.endpoint, request, event -> {
if ("delta".equals(event.getType())) {
chunks.incrementAndGet();
if (firstTokenAt.compareAndSet(0, System.currentTimeMillis())) {
log.debug("AI first token emitted, scene={}, requestId={}", request.getSceneCode(), requestId);
}
if (event.getContent() != null) {
output.append(event.getContent());
}
}
consumer.accept(event);
});
callLog.setStatus("success");
callLog.setOutputText(output.toString());
callLog.setStreamChunks(chunks.get());
callLog.setFirstTokenMs(firstTokenAt.get() == 0 ? null : firstTokenAt.get() - startedAt);
callLog.setDurationMs(System.currentTimeMillis() - startedAt);
callLogService.updateById(callLog);
consumer.accept(AiStreamEvent.done(Map.of(
"requestId", requestId,
"streamChunks", chunks.get(),
"durationMs", callLog.getDurationMs()
)));
} catch (Exception e) {
String code = normalizeErrorCode(e);
callLog.setStatus("failed");
callLog.setErrorCode(code);
callLog.setErrorMessage(e.getMessage());
callLog.setOutputText(output.toString());
callLog.setStreamChunks(chunks.get());
callLog.setDurationMs(System.currentTimeMillis() - startedAt);
saveOrUpdateLog(callLog);
consumer.accept(AiStreamEvent.error(code, e.getMessage()));
}
}
@Override
public AiRuntimeTestResponse test(AiRuntimeRequest request) {
long startedAt = System.currentTimeMillis();
StringBuilder output = new StringBuilder();
AtomicInteger chunks = new AtomicInteger(0);
final String[] errorCode = new String[1];
final String[] errorMessage = new String[1];
invokeStream(request, event -> {
if ("delta".equals(event.getType()) && event.getContent() != null) {
chunks.incrementAndGet();
output.append(event.getContent());
} else if ("error".equals(event.getType())) {
errorCode[0] = event.getCode();
errorMessage[0] = event.getMessage();
}
});
return AiRuntimeTestResponse.builder()
.sceneCode(request.getSceneCode())
.status(errorCode[0] == null ? "success" : "failed")
.output(output.toString())
.streamChunks(chunks.get())
.durationMs(System.currentTimeMillis() - startedAt)
.errorCode(errorCode[0])
.errorMessage(errorMessage[0])
.build();
}
private RuntimeTarget resolveTarget(AiRuntimeRequest request) {
if (!StringUtils.hasText(request.getSceneCode())) {
throw new IllegalArgumentException("AI_SCENE_REQUIRED");
}
if (!StringUtils.hasText(request.getUserId())) {
request.setUserId(resolveUserId(request));
}
AiSceneBinding scene = sceneBindingService.resolveScene(request.getSceneCode());
if (scene == null) {
throw new IllegalStateException("AI_SCENE_NOT_BOUND");
}
AiEndpointConfig endpoint = endpointConfigService.getEnabledById(scene.getEndpointId());
if (endpoint == null) {
throw new IllegalStateException("AI_ENDPOINT_DISABLED");
}
if (Integer.valueOf(1).equals(scene.getRequiredStream()) && !Integer.valueOf(1).equals(endpoint.getSupportStream())) {
throw new IllegalStateException("AI_ENDPOINT_STREAM_REQUIRED");
}
AiProvider provider = providerService.getEnabledById(endpoint.getProviderId());
if (provider == null) {
throw new IllegalStateException("AI_PROVIDER_DISABLED");
}
AiProviderAdapter adapter = adapters.stream()
.filter(item -> item.supports(provider.getProviderType()))
.findFirst()
.orElseThrow(() -> new IllegalStateException("AI_PROVIDER_ADAPTER_NOT_FOUND"));
return new RuntimeTarget(scene, endpoint, provider, adapter);
}
private String resolveUserId(AiRuntimeRequest request) {
if (StringUtils.hasText(request.getUserId())) {
return request.getUserId();
}
String currentUserId = UserContextHolder.getCurrentUserId();
return StringUtils.hasText(currentUserId) ? currentUserId : "anonymous";
}
private void enrichInputs(AiRuntimeRequest request) {
if (request.getInputs() == null) {
request.setInputs(new com.alibaba.fastjson2.JSONObject());
}
String userId = resolveUserId(request);
request.setUserId(userId);
request.getInputs().put("userId", userId);
request.getInputs().put("currentUserId", userId);
request.getInputs().put("user_id", userId);
if (StringUtils.hasText(request.getUserName())) {
request.getInputs().put("userName", request.getUserName());
request.getInputs().put("username", request.getUserName());
}
if (StringUtils.hasText(request.getUserType())) {
request.getInputs().put("userType", request.getUserType());
}
if (StringUtils.hasText(request.getRequestId())) {
request.getInputs().put("requestId", request.getRequestId());
}
enrichSceneInputs(request, userId);
}
private void enrichSceneInputs(AiRuntimeRequest request, String userId) {
if (!StringUtils.hasText(request.getSceneCode())) {
return;
}
if ("script_generate".equals(request.getSceneCode()) || "short_story_generate".equals(request.getSceneCode())) {
Boolean useSocialInsights = request.getInputs().getBoolean("useSocialInsights");
String socialInsightContext = scriptContextService.buildSocialInsightContext(userId, useSocialInsights);
if (StringUtils.hasText(socialInsightContext)) {
request.getInputs().put("socialInsightContext", socialInsightContext);
}
}
}
private String normalizeErrorCode(Exception e) {
String message = e.getMessage();
if (message != null && message.startsWith("AI_")) {
return message;
}
if (message != null && message.contains("timed out")) {
return "AI_STREAM_TIMEOUT";
}
return "AI_STREAM_INTERRUPTED";
}
private void saveOrUpdateLog(AiCallLog callLog) {
if (StringUtils.hasText(callLog.getId())) {
callLogService.updateById(callLog);
} else {
callLogService.save(callLog);
}
}
private static class RuntimeTarget {
private final AiSceneBinding scene;
private final AiEndpointConfig endpoint;
private final AiProvider provider;
private final AiProviderAdapter adapter;
private RuntimeTarget(AiSceneBinding scene, AiEndpointConfig endpoint, AiProvider provider, AiProviderAdapter adapter) {
this.scene = scene;
this.endpoint = endpoint;
this.provider = provider;
this.adapter = adapter;
}
}
}