feat: 后端新增 /ai/endpoint/test 和 /ai/endpoint/stream 接口

- AiRuntimeRequest DTO 新增 endpointId 字段
- AiRuntimeService 接口新增 testEndpoint 和 invokeEndpointStream
- AiRuntimeServiceImpl 实现 endpoint 直调链路(绕过场景解析)
- AiRoutingController 新增 /endpoint/test 和 /endpoint/stream

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
This commit is contained in:
2026-05-23 14:13:25 +08:00
parent e5fc52ff84
commit cccb720060
4 changed files with 142 additions and 1 deletions
@@ -28,6 +28,7 @@ import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;
import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
@Slf4j
@@ -150,6 +151,31 @@ public class AiRoutingController {
return emitter;
}
@PostMapping("/endpoint/test")
public Result<AiRuntimeTestResponse> endpointTest(@RequestBody JSONObject payload) {
String endpointId = payload.getString("endpointId");
JSONObject inputs = payload.getJSONObject("inputs");
Map<String, Object> inputMap = inputs == null ? Map.of() : inputs;
return Result.success(runtimeService.testEndpoint(endpointId, inputMap));
}
@PostMapping("/endpoint/stream")
public SseEmitter endpointStream(@RequestBody JSONObject payload) {
String endpointId = payload.getString("endpointId");
JSONObject inputs = payload.getJSONObject("inputs");
Map<String, Object> inputMap = inputs == null ? Map.of() : inputs;
SseEmitter emitter = new SseEmitter(0L);
CompletableFuture.runAsync(() -> {
runtimeService.invokeEndpointStream(endpointId, inputMap, event -> sendEvent(emitter, event));
emitter.complete();
}).exceptionally(error -> {
sendEvent(emitter, AiStreamEvent.error("AI_ENDPOINT_TEST_INTERRUPTED", error.getMessage()));
emitter.completeWithError(error);
return null;
});
return emitter;
}
private AiRuntimeRequest withCurrentUser(AiRuntimeRequest request) {
request.setUserId(UserContextHolder.getCurrentUserId());
request.setUserName(UserContextHolder.getCurrentUsername());
@@ -9,10 +9,12 @@ import java.util.Set;
@Data
public class AiRuntimeRequest {
private static final Set<String> RESERVED_KEYS = Set.of("sceneCode", "scene", "inputs", "userId", "userName", "username", "userType", "requestId");
private static final Set<String> RESERVED_KEYS = Set.of("sceneCode", "scene", "inputs", "userId", "userName", "username", "userType", "requestId", "endpointId");
private String sceneCode;
private String endpointId;
private String userId;
private String userName;
@@ -33,6 +35,7 @@ public class AiRuntimeRequest {
sceneCode = payload.getString("scene");
}
request.setSceneCode(sceneCode);
request.setEndpointId(payload.getString("endpointId"));
JSONObject inputs = payload.getJSONObject("inputs");
if (inputs == null) {
@@ -4,6 +4,7 @@ import com.emotion.dto.request.ai.AiRuntimeRequest;
import com.emotion.dto.response.ai.AiRuntimeTestResponse;
import com.emotion.dto.response.ai.AiStreamEvent;
import java.util.Map;
import java.util.function.Consumer;
public interface AiRuntimeService {
@@ -11,4 +12,8 @@ public interface AiRuntimeService {
void invokeStream(AiRuntimeRequest request, Consumer<AiStreamEvent> consumer);
AiRuntimeTestResponse test(AiRuntimeRequest request);
AiRuntimeTestResponse testEndpoint(String endpointId, Map<String, Object> inputs);
void invokeEndpointStream(String endpointId, Map<String, Object> inputs, Consumer<AiStreamEvent> consumer);
}
@@ -140,6 +140,113 @@ public class AiRuntimeServiceImpl implements AiRuntimeService {
.build();
}
@Override
public AiRuntimeTestResponse testEndpoint(String endpointId, Map<String, Object> inputs) {
long startedAt = System.currentTimeMillis();
StringBuilder output = new StringBuilder();
AtomicInteger chunks = new AtomicInteger(0);
final String[] errorCode = new String[1];
final String[] errorMessage = new String[1];
invokeEndpointStream(endpointId, inputs, event -> {
if ("delta".equals(event.getType()) && event.getContent() != null) {
chunks.incrementAndGet();
output.append(event.getContent());
} else if ("error".equals(event.getType())) {
errorCode[0] = event.getCode();
errorMessage[0] = event.getMessage();
}
});
return AiRuntimeTestResponse.builder()
.sceneCode("")
.status(errorCode[0] == null ? "success" : "failed")
.output(output.toString())
.streamChunks(chunks.get())
.durationMs(System.currentTimeMillis() - startedAt)
.errorCode(errorCode[0])
.errorMessage(errorMessage[0])
.build();
}
@Override
public void invokeEndpointStream(String endpointId, Map<String, Object> inputs, Consumer<AiStreamEvent> consumer) {
long startedAt = System.currentTimeMillis();
AtomicLong firstTokenAt = new AtomicLong(0);
AtomicInteger chunks = new AtomicInteger(0);
String requestId = UUID.randomUUID().toString();
StringBuilder output = new StringBuilder();
AiEndpointConfig endpoint = endpointConfigService.getEnabledById(endpointId);
if (endpoint == null) {
throw new IllegalStateException("AI_ENDPOINT_DISABLED");
}
AiProvider provider = providerService.getEnabledById(endpoint.getProviderId());
if (provider == null) {
throw new IllegalStateException("AI_PROVIDER_DISABLED");
}
AiProviderAdapter adapter = adapters.stream()
.filter(item -> item.supports(provider.getProviderType()))
.findFirst()
.orElseThrow(() -> new IllegalStateException("AI_PROVIDER_ADAPTER_NOT_FOUND"));
AiRuntimeRequest request = new AiRuntimeRequest();
request.setInputs(inputs == null ? new com.alibaba.fastjson2.JSONObject() : new com.alibaba.fastjson2.JSONObject(inputs));
request.setEndpointId(endpointId);
request.setUserId(resolveUserId(request));
request.setUserName(UserContextHolder.getCurrentUsername());
request.setUserType(UserContextHolder.getCurrentUserType());
request.setRequestId(UserContextHolder.getRequestId());
enrichInputs(request);
AiCallLog callLog = new AiCallLog();
callLog.setRequestId(requestId);
callLog.setEndpointCode(endpoint.getEndpointCode());
callLog.setProviderCode(provider.getProviderCode());
callLog.setUserId(request.getUserId());
callLog.setInputText(JSON.toJSONString(request.getInputs()));
callLog.setStatus("running");
callLogService.save(callLog);
consumer.accept(AiStreamEvent.start(endpoint.getEndpointCode()));
try {
adapter.stream(provider, endpoint, request, event -> {
if ("delta".equals(event.getType())) {
chunks.incrementAndGet();
if (firstTokenAt.compareAndSet(0, System.currentTimeMillis())) {
log.debug("AI first token emitted, endpoint={}, requestId={}", endpoint.getEndpointCode(), requestId);
}
if (event.getContent() != null) {
output.append(event.getContent());
}
}
consumer.accept(event);
});
callLog.setStatus("success");
callLog.setOutputText(output.toString());
callLog.setStreamChunks(chunks.get());
callLog.setFirstTokenMs(firstTokenAt.get() == 0 ? null : firstTokenAt.get() - startedAt);
callLog.setDurationMs(System.currentTimeMillis() - startedAt);
callLogService.updateById(callLog);
consumer.accept(AiStreamEvent.done(Map.of(
"requestId", requestId,
"streamChunks", chunks.get(),
"durationMs", callLog.getDurationMs()
)));
} catch (Exception e) {
String code = normalizeErrorCode(e);
callLog.setStatus("failed");
callLog.setErrorCode(code);
callLog.setErrorMessage(e.getMessage());
callLog.setOutputText(output.toString());
callLog.setStreamChunks(chunks.get());
callLog.setDurationMs(System.currentTimeMillis() - startedAt);
saveOrUpdateLog(callLog);
consumer.accept(AiStreamEvent.error(code, e.getMessage()));
}
}
private RuntimeTarget resolveTarget(AiRuntimeRequest request) {
if (!StringUtils.hasText(request.getSceneCode())) {
throw new IllegalArgumentException("AI_SCENE_REQUIRED");