refactor: decouple SSE parsing from protocol conversion#1305
Open
lennney wants to merge 1 commit into
Open
Conversation
Extract SSE parsing from ChatSseToResponsesConverter into a standalone SseBlockParser. The converter now receives pre-parsed JSON chunks via feed_chunk() / feed_done() / feed_error(), following cc-switch's architecture where the transport layer handles SSE framing. Adds fast-path extract_simple_content_delta() for 90%+ of streaming chunks that are pure content deltas — skips the full handle_chat_chunk_into codepath for these. Backward-compatible: push_bytes() API preserved, delegates internally to the new methods.
4c02ed8 to
6ba4475
Compare
Contributor
There was a problem hiding this comment.
Pull request overview
本 PR 将 Chat Completions 的 SSE 分帧解析从 ChatSseToResponsesConverter 中解耦出来,引入独立的 SseBlockParser,并为 converter 增加接收“已解析 JSON chunk”的新 API,以便在传输层完成 SSE framing 后,在协议转换层实现更轻量的处理路径(含 content delta fast-path)。
Changes:
- 新增
SseBlockParser/ParsedSseBlock:把原始字节流增量解析为结构化 SSE block(event/data),并处理 UTF-8 chunk 边界。 - 为
ChatSseToResponsesConverter增加feed_chunk/feed_done/feed_error,并保留push_bytes兼容旧调用方式(内部委托到新 API)。 - 增加“简单 content delta”快路径:在文本 item 已启动时跳过完整状态机路径,直接输出
response.output_text.delta。
Reviewed changes
Copilot reviewed 3 out of 3 changed files in this pull request and generated 5 comments.
| File | Description |
|---|---|
| crates/codex-plus-core/src/protocol_proxy.rs | 新增 feed_* API、fast-path,以及 SseBlockParser/ParsedSseBlock;调整 legacy push_bytes 内部实现以复用新 API。 |
| crates/codex-plus-core/src/launcher.rs | 流式代理路径改为:先用 SseBlockParser 做 SSE 分帧,再对 data 做 JSON 解析并调用 converter.feed_*。 |
| crates/codex-plus-core/tests/protocol_proxy.rs | 增加 SseBlockParser 与 feed_chunk 的单测,以及 legacy vs 新 API 的等价性测试。 |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
Comment on lines
474
to
476
| let Ok(chunk) = serde_json::from_str::<Value>(&data) else { | ||
| return; | ||
| }; |
Comment on lines
+1237
to
+1247
| pub fn push_content_delta_direct(&mut self, content: &str, output: &mut String) { | ||
| self.text.text.push_str(content); | ||
| let oi = self.text.output_index.unwrap_or(0); | ||
| push_sse(output, "response.output_text.delta", json!({ | ||
| "type": "response.output_text.delta", | ||
| "item_id": self.text.item_id, | ||
| "output_index": oi, | ||
| "content_index": 0, | ||
| "delta": content | ||
| })); | ||
| } |
Comment on lines
+1271
to
+1292
| if let Ok(value) = | ||
| serde_json::from_str::<serde_json::Value>(data) | ||
| { | ||
| if block.event.as_deref() == Some("error") | ||
| || value.get("error").is_some() | ||
| { | ||
| let (message, error_type) = | ||
| crate::protocol_proxy::extract_chat_sse_error( | ||
| &value, | ||
| ); | ||
| let failed = converter.feed_error(message, error_type); | ||
| if !failed.is_empty() { | ||
| stream.write_all(&failed).await?; | ||
| } | ||
| stream_failed = true; | ||
| break; | ||
| } | ||
| let converted = converter.feed_chunk(&value); | ||
| if !converted.is_empty() { | ||
| stream.write_all(&converted).await?; | ||
| } | ||
| } |
| } | ||
| } | ||
| } | ||
| } |
| @@ -1,5 +1,5 @@ | |||
| use codex_plus_core::protocol_proxy::{ | |||
| ChatSseToResponsesConverter, chat_completion_to_response, | |||
| ChatSseToResponsesConverter, ParsedSseBlock, SseBlockParser, chat_completion_to_response, | |||
1714e4c to
6ba4475
Compare
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Summary
Extract SSE parsing from
ChatSseToResponsesConverterinto a standaloneSseBlockParser. The converter now receives pre-parsed JSON chunks via a new API, following cc-switch's architecture where the transport layer handles SSE framing.Why
Every streaming chunk goes through a monolithic pipeline: SSE parse → JSON deserialize → state machine → re-serialize. 90%+ of chunks are simple content deltas that don't need the full state machine. By decoupling SSE parsing from protocol conversion, we can add a fast-path for content deltas.
Changes
New
SseBlockParserstruct:ParsedSseBlockeventsevent:anddata:fieldsNew
ChatSseToResponsesConverterAPI:feed_chunk(&Value)— process a pre-parsed JSON chunkfeed_done()— emit final done/completed eventsfeed_error(message, type)— emit error SSEFast-path optimization:
extract_simple_content_delta()detects pure content deltas (choices[0].delta has only acontentfield)handle_chat_chunk_intocodepath — directly emitsresponse.output_text.deltaBackward-compatible:
push_bytes()API preserved, delegates internally to the new methods. Equivalence validated by tests.SseBlockParseruses the sameappend_utf8_safe/take_sse_blockprimitives as the existing converter — no new dependencies.Verification
SseBlockParser(basic block, multi-block, UTF-8 split, event+data fields, drain_remainder)feed_chunkAPI (content delta, reasoning, tool call, done, error, equivalence with legacy push_bytes)