Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
575 changes: 162 additions & 413 deletions CLAUDE.md

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -123,8 +123,8 @@ public Flux<UnifiedAiStreamChunk> streamChat(UnifiedAiRequest request) {
.retrieve()
.bodyToFlux(String.class)
.timeout(Duration.ofSeconds(timeoutSeconds))
.reduce("", (accumulated, chunk) -> accumulated + chunk) // 累积所有数据块
.flatMapMany(this::parseGeminiStreamResponse)
.concatMap(this::parseGeminiChunk)
.doOnNext(chunk -> logger.debug("Gemini流式chunk: content={}", chunk.getContent()))
.doOnError(error -> logger.error("Gemini API调用失败: {}", error.getMessage()))
.onErrorResume(error -> {
UnifiedAiStreamChunk errorChunk = new UnifiedAiStreamChunk();
Expand Down Expand Up @@ -221,30 +221,66 @@ private Map<String, Object> buildGeminiRequest(UnifiedAiRequest request) {
return requestBody;
}

/**
* 逐个解析 Gemini 流式响应块
* Gemini streamGenerateContent 返回的是 JSON 数组的片段(以逗号分隔的对象)
*/
private Flux<UnifiedAiStreamChunk> parseGeminiChunk(String rawChunk) {
try {
// Gemini 流式响应可能是数组片段,需要清理边界字符
String cleaned = rawChunk.trim();
if (cleaned.startsWith("[")) cleaned = cleaned.substring(1);
if (cleaned.endsWith("]")) cleaned = cleaned.substring(0, cleaned.length() - 1);
if (cleaned.startsWith(",")) cleaned = cleaned.substring(1);
if (cleaned.endsWith(",")) cleaned = cleaned.substring(0, cleaned.length() - 1);
cleaned = cleaned.trim();

if (cleaned.isEmpty()) {
return Flux.empty();
}

// 尝试解析为单个 JSON 对象
@SuppressWarnings("unchecked")
Map<String, Object> responseObj = objectMapper.readValue(cleaned, Map.class);
String textContent = extractTextFromResponse(responseObj);
if (textContent != null && !textContent.trim().isEmpty()) {
UnifiedAiStreamChunk chunk = new UnifiedAiStreamChunk();
chunk.setContent(textContent);
chunk.setIsFinal(isLastResponse(responseObj));
return Flux.just(chunk);
}
return Flux.empty();

} catch (Exception e) {
// 可能是不完整的 JSON 片段(跨边界),静默忽略
logger.debug("解析Gemini chunk失败(可能是片段边界): {}", e.getMessage());
return Flux.empty();
}
}

private Flux<UnifiedAiStreamChunk> parseGeminiStreamResponse(String completeJson) {
try {
logger.debug("开始解析Gemini流式响应,数据长度: {}", completeJson.length());

List<UnifiedAiStreamChunk> chunks = new ArrayList<>();
String accumulatedContent = "";
StringBuilder accumulatedContent = new StringBuilder();

// Gemini API 返回数组格式的多个响应块
if (completeJson.startsWith("[")) {
@SuppressWarnings("unchecked")
List<Map<String, Object>> responseArray = objectMapper.readValue(completeJson, List.class);

for (Map<String, Object> responseObj : responseArray) {
for (int i = 0; i < responseArray.size(); i++) {
Map<String, Object> responseObj = responseArray.get(i);
String textContent = extractTextFromResponse(responseObj);
if (textContent != null && !textContent.trim().isEmpty()) {
accumulatedContent += textContent;
accumulatedContent.append(textContent);

UnifiedAiStreamChunk chunk = new UnifiedAiStreamChunk();
chunk.setContent(textContent);
chunk.setAccumulatedContent(accumulatedContent);
chunk.setAccumulatedContent(accumulatedContent.toString());

// 检查是否为最后一个响应
boolean isLast = isLastResponse(responseObj) ||
(responseArray.indexOf(responseObj) == responseArray.size() - 1);
boolean isLast = isLastResponse(responseObj) || (i == responseArray.size() - 1);
chunk.setIsFinal(isLast);

chunks.add(chunk);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -240,26 +240,30 @@ private Map<String, Object> buildOpenAiRequest(UnifiedAiRequest request) {
}

/**
* 解析OpenAI响应
* 解析OpenAI响应 - 使用Jackson
*/
private Mono<UnifiedAiStreamChunk> parseOpenAiResponse(String jsonData) {
try {
// 这里应该使用JSON解析库,简化处理
// 在实际项目中应该使用Jackson或Gson
com.fasterxml.jackson.databind.ObjectMapper mapper = new com.fasterxml.jackson.databind.ObjectMapper();
com.fasterxml.jackson.databind.JsonNode root = mapper.readTree(jsonData);

Comment on lines 245 to +249
Copy link

Copilot AI Apr 11, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

parseOpenAiResponse creates a new ObjectMapper for every SSE line, which adds avoidable allocations and GC pressure under streaming load. Reuse a single shared ObjectMapper instance (e.g., a private static final ObjectMapper or an injected bean) since it is thread-safe for reads.

Copilot uses AI. Check for mistakes.
UnifiedAiStreamChunk chunk = new UnifiedAiStreamChunk();
chunk.setType(UnifiedAiStreamChunk.ChunkType.CONTENT);

// 简化的JSON解析(实际应该使用Jackson)
if (jsonData.contains("\"content\"")) {
// 提取content字段的值
String content = extractJsonField(jsonData, "content");
chunk.setContent(content);
}
com.fasterxml.jackson.databind.JsonNode choices = root.get("choices");
if (choices != null && choices.isArray() && choices.size() > 0) {
com.fasterxml.jackson.databind.JsonNode choice = choices.get(0);
com.fasterxml.jackson.databind.JsonNode delta = choice.get("delta");

if (delta != null && delta.has("content") && !delta.get("content").isNull()) {
chunk.setContent(delta.get("content").asText());
}

if (jsonData.contains("\"finish_reason\"")) {
String finishReason = extractJsonField(jsonData, "finish_reason");
chunk.setFinishReason(finishReason);
chunk.setIsFinal(!"null".equals(finishReason) && finishReason != null);
if (choice.has("finish_reason") && !choice.get("finish_reason").isNull()) {
String finishReason = choice.get("finish_reason").asText();
chunk.setFinishReason(finishReason);
chunk.setIsFinal(true);
}
}

return Mono.just(chunk);
Expand All @@ -272,27 +276,6 @@ private Mono<UnifiedAiStreamChunk> parseOpenAiResponse(String jsonData) {
}
}

/**
* 简化的JSON字段提取(应该使用Jackson替代)
*/
private String extractJsonField(String json, String fieldName) {
try {
String pattern = "\"" + fieldName + "\":\"";
int startIndex = json.indexOf(pattern);
if (startIndex == -1) {
return null;
}
startIndex += pattern.length();
int endIndex = json.indexOf("\"", startIndex);
if (endIndex == -1) {
return null;
}
return json.substring(startIndex, endIndex);
} catch (Exception e) {
return null;
}
}

/**
* 设置性能指标
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,9 @@ public Flux<UnifiedAiStreamChunk> streamChat(UnifiedAiRequest request) {

logger.debug("发送七牛云AI请求,模型: {}", model);

// 用 StringBuilder 追踪累积文本,通过 doOnNext 设置到每个 chunk
StringBuilder accumulated = new StringBuilder();

return webClient
.post()
.uri("/chat/completions")
Expand All @@ -129,34 +132,21 @@ public Flux<UnifiedAiStreamChunk> streamChat(UnifiedAiRequest request) {
DataBufferUtils.release(dataBuffer);
return new String(bytes, StandardCharsets.UTF_8);
})
.buffer() // 缓冲数据以处理不完整的SSE消息
.flatMap(lines -> {
String combined = String.join("", lines);
return Flux.fromArray(combined.split("\n"))
.concatMap(rawChunk -> {
// 逐行解析 SSE
return Flux.fromArray(rawChunk.split("\n"))
.filter(line -> line.startsWith("data: ") && !line.equals("data: [DONE]"))
.map(line -> line.substring(6).trim())
.filter(data -> !data.isEmpty());
})
.timeout(Duration.ofSeconds(timeoutSeconds))
.flatMap(this::parseQiniuStreamChunk)
.collectList() // 收集所有chunk
.flatMapMany(chunks -> {
// 手动处理累积内容
StringBuilder accumulated = new StringBuilder();
List<UnifiedAiStreamChunk> updatedChunks = new ArrayList<>();

for (UnifiedAiStreamChunk chunk : chunks) {
if (chunk.getContent() != null && !chunk.getContent().isEmpty()) {
accumulated.append(chunk.getContent());
}
chunk.setAccumulatedContent(accumulated.toString());
updatedChunks.add(chunk);
.concatMap(this::parseQiniuStreamChunk)
.doOnNext(chunk -> {
// 真流式:每个 chunk 逐个 emit,增量计算 accumulatedContent
if (chunk.getContent() != null && !chunk.getContent().isEmpty()) {
accumulated.append(chunk.getContent());
}

logger.info("七牛云AI响应解析完成,生成{}个chunk,总内容长度: {}",
updatedChunks.size(), accumulated.length());

return Flux.fromIterable(updatedChunks);
chunk.setAccumulatedContent(accumulated.toString());
})
.doOnError(error -> logger.error("七牛云AI API调用失败: {}", error.getMessage()))
.onErrorResume(error -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,9 @@ public Flux<UnifiedAiStreamChunk> streamChat(UnifiedAiRequest request) {

logger.debug("发送硅基流动AI请求,模型: {}", model);

// 用 StringBuilder 追踪累积文本
StringBuilder accumulated = new StringBuilder();

return webClient
.post()
.uri("/chat/completions")
Expand All @@ -199,37 +202,22 @@ public Flux<UnifiedAiStreamChunk> streamChat(UnifiedAiRequest request) {
DataBufferUtils.release(dataBuffer);
return new String(bytes, StandardCharsets.UTF_8);
})
.buffer(Duration.ofMillis(50)) // 减少缓冲时间,加快响应
.flatMap(lines -> {
String combined = String.join("", lines);
return Flux.fromArray(combined.split("\n"))
.concatMap(rawChunk -> {
// 逐行解析 SSE
return Flux.fromArray(rawChunk.split("\n"))
.filter(line -> line.startsWith("data: ") && !line.equals("data: [DONE]"))
.map(line -> line.substring(6).trim())
.filter(data -> !data.isEmpty());
})
.timeout(Duration.ofSeconds(timeoutSeconds))
.flatMap(this::parseSiliconFlowStreamChunk)
.filter(chunk -> chunk.getContent() != null) // 过滤null内容
.filter(chunk -> !chunk.getContent().trim().isEmpty()) // 过滤空内容
.filter(chunk -> !"null".equals(chunk.getContent())) // 过滤字符串"null"
.collectList() // 收集所有chunk
.flatMapMany(chunks -> {
// 手动处理累积内容
StringBuilder accumulated = new StringBuilder();
List<UnifiedAiStreamChunk> updatedChunks = new ArrayList<>();

for (UnifiedAiStreamChunk chunk : chunks) {
if (chunk.getContent() != null && !chunk.getContent().isEmpty()) {
accumulated.append(chunk.getContent());
}
chunk.setAccumulatedContent(accumulated.toString());
updatedChunks.add(chunk);
}

logger.info("硅基流动AI响应解析完成,生成{}个chunk,总内容长度: {}",
updatedChunks.size(), accumulated.length());

return Flux.fromIterable(updatedChunks);
.concatMap(this::parseSiliconFlowStreamChunk)
.filter(chunk -> chunk.getContent() != null
&& !chunk.getContent().trim().isEmpty()
&& !"null".equals(chunk.getContent()))
.doOnNext(chunk -> {
// 真流式:逐个 chunk emit,增量计算 accumulatedContent
accumulated.append(chunk.getContent());
chunk.setAccumulatedContent(accumulated.toString());
})
.doOnError(error -> logger.error("硅基流动AI API调用失败: {}", error.getMessage()))
.onErrorResume(error -> {
Expand Down
Loading
Loading