Conversation
|
Codex usage limits have been reached for code reviews. Please check with the admins of this repo to increase the limits by adding credits. |
|
No actionable comments were generated in the recent review. 🎉 ℹ️ Recent review info⚙️ Run configurationConfiguration used: Organization UI Review profile: ASSERTIVE Plan: Pro Run ID: 📒 Files selected for processing (1)
📝 WalkthroughWalkthroughAdds a new public SSE module at Changes
Sequence Diagram(s)sequenceDiagram
participant Client
participant Axum as "Axum Handler"
participant Producer as "Event Producer"
participant MPSC as "mpsc Sender"
participant Stream as "UnboundedReceiverStream"
participant Encoder as "SseEncoder"
Client->>Axum: open SSE connection (GET)
Axum->>MPSC: create UnboundedSender, hand to Producer
Producer-->>MPSC: send JSON events / control messages
MPSC-->>Stream: stream items to response body
Stream->>Encoder: encode items into SSE frames
Encoder->>Client: emit framed SSE bytes over HTTP
Estimated code review effort🎯 4 (Complex) | ⏱️ ~45 minutes Possibly related PRs
Suggested reviewers
Poem
🚥 Pre-merge checks | ✅ 3✅ Passed checks (3 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches📝 Generate docstrings
🧪 Generate unit tests (beta)
Comment |
There was a problem hiding this comment.
Code Review
This pull request introduces a shared SSE (Server-Sent Events) codec to consolidate parsing and formatting logic across the gateway. It includes an SseEncoder for efficient serialization and an SseDecoder for zero-allocation frame parsing. Feedback focuses on improving spec compliance for line terminators (supporting \r\n\r\n and \r\r), fixing potential buffer growth issues in the push method by ensuring proper compaction, and refining the parse_frame logic to correctly handle field/value splitting according to the SSE specification.
There was a problem hiding this comment.
Actionable comments posted: 2
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@model_gateway/src/routers/common/sse.rs`:
- Around line 173-188: The decoder only looks for "\n\n" and always advances by
2, so it fails on CRLF-delimited frames; in next_frame (and the analogous block
around lines 246-252) detect whether the delimiter at remaining[pos..] is
"\r\n\r\n" or "\n\n" (inspect remaining starting at pos) and set delimiter_len =
4 or 2 accordingly, then advance self.consumed by pos + delimiter_len (and use
the same delimiter_len when skipping on InvalidUtf8 error). Keep parse_frame
usage unchanged; reference symbols: next_frame, find_double_newline,
parse_frame, SseDecodeError, self.buf and self.consumed.
- Around line 206-215: The code is trimming the entire SSE block before parsing
which strips meaningful payload whitespace (e.g., "data: hello ") when flush()
or parse_block() call parse_frame(trimmed); change the logic to stop using
trim() on the full block and instead pass the raw string (or only remove CR/LF
line endings) to parse_frame so payload spacing is preserved; update occurrences
in the file (references: parse_frame call in this block, and the
flush()/parse_block() call sites mentioned) to use the untrimmed frame string or
a newline-only trim rather than trim().
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: ASSERTIVE
Plan: Pro
Run ID: 383ed3b7-06b2-4435-8602-b9e6380f779c
📒 Files selected for processing (3)
model_gateway/src/routers/common/mod.rsmodel_gateway/src/routers/common/sse.rsmodel_gateway/src/routers/mod.rs
30350b7 to
5476f96
Compare
There was a problem hiding this comment.
Actionable comments posted: 1
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@model_gateway/src/routers/common/sse.rs`:
- Around line 363-382: Add and use the shared build_sse_response from
model_gateway::routers::common::sse to deduplicate SSE logic: replace the
existing build_sse_response implementation in grpc::common::responses::streaming
(function build_sse_response) to call or re-export this shared
build_sse_response, and replace the SSE_DONE constant usage in
openai::responses::streaming (constant SSE_DONE) to reference the shared
module’s SSE_DONE (or move the constant into the shared module and update
imports), ensuring both files import the shared module rather than keeping local
copies.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: ASSERTIVE
Plan: Pro
Run ID: abcf03e6-3855-40b9-b72a-0bbf17759c7d
📒 Files selected for processing (3)
model_gateway/src/routers/common/mod.rsmodel_gateway/src/routers/common/sse.rsmodel_gateway/src/routers/mod.rs
There was a problem hiding this comment.
Actionable comments posted: 1
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@model_gateway/src/routers/common/sse.rs`:
- Around line 185-200: The SSE parser must ignore control-only blocks (comments,
event-only, id/retry updates) instead of emitting empty-data SseFrame; change
parsing to treat a block with no data lines as absent. Update parse_frame (or
parse_block) to indicate whether the block contained any data lines (e.g.,
return Option<SseFrame> or include a has_data flag), then in next_frame() after
calling parse_frame(frame_str) skip returning frames where no data lines were
present (advance consumed and loop to the next boundary), and apply the same
change to flush() and parse_block() so they also drop blocks lacking data lines;
adjust the tests that expect empty frames accordingly.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: ASSERTIVE
Plan: Pro
Run ID: 2e3b73d6-241b-4b5c-b122-1ad96e54716f
📒 Files selected for processing (1)
model_gateway/src/routers/common/sse.rs
There was a problem hiding this comment.
Actionable comments posted: 1
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@model_gateway/src/routers/common/sse.rs`:
- Around line 482-487: Replace the string literal "Cache-Control" with the http
header constant to keep header usage consistent: use CACHE_CONTROL (from
http::header) in the same call site that currently uses CONTENT_TYPE and
HeaderValue::from_static; update the .header invocation that sets the cache
control to .header(CACHE_CONTROL, HeaderValue::from_static("no-cache")) so both
headers use the shared constants (referencing CONTENT_TYPE, CACHE_CONTROL,
HeaderValue::from_static, and the .header(...) call).
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: ASSERTIVE
Plan: Pro
Run ID: 8686a4e2-4957-48e9-ae66-c2a288f4870d
📒 Files selected for processing (1)
model_gateway/src/routers/common/sse.rs
There was a problem hiding this comment.
Actionable comments posted: 1
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@model_gateway/src/routers/common/sse.rs`:
- Around line 264-286: The flush() method currently relies on debug_assert! with
find_frame_boundary(...) and will silently merge multiple complete frames in
release builds; modify flush to detect if
find_frame_boundary(&self.buf[self.consumed..]) is Some(boundary) and, instead
of proceeding, return a runtime error (e.g.,
Some(Err(SseDecodeError::UnexpectedCompleteFrames))) or fully drain/parse
complete frames before handling the tail; update logic around self.consumed,
parse_frame, and the InvalidUtf8 branch to ensure you never concatenate multiple
complete events into one payload in release mode (use the symbols flush,
find_frame_boundary, parse_frame, self.consumed, self.buf, and SseDecodeError to
locate and change the code).
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: ASSERTIVE
Plan: Pro
Run ID: f229849a-597e-4f29-b75a-eaad775146b0
📒 Files selected for processing (1)
model_gateway/src/routers/common/sse.rs
bb372a6 to
8464bd3
Compare
| InvalidUtf8(std::str::Utf8Error), | ||
| /// `flush()` called while complete frames remain in the buffer. | ||
| /// Drain `next_frame()` to `None` before calling `flush()`. | ||
| IncompleteFlush, |
There was a problem hiding this comment.
I am not sure if this is an actual SseDecodeError, as in thoery, this should never happen if caller uses the SseDecoder correctly. It is ok to leave this one here. But I feel a panic or a debug_assert might be better.
There was a problem hiding this comment.
If flush() is called with complete frames remaining (e.g. data: a\n\ndata: b), parse_frame would see the \n\n as an empty line separator and merge them into a single frame with data: "a\nb" — two events silently fused into
one. The caller would process wrong data without any indication something went wrong. So i'm thinking returning Err will let the caller detect and handle it, let me know if this make senses
| @@ -0,0 +1 @@ | |||
| pub mod sse; | |||
There was a problem hiding this comment.
I am not sure introducing a common just having sse is a good design.
If we look at our modules today, each router actually either serves as an actual router or an endpoint.
If common is only gonna have sse.rs for now, I'd rather we just have sse.rs. Instead of having a common under routers.
There was a problem hiding this comment.
Yeah okay, like just putting sse.rs under model_gateway/src/routers?
There was a problem hiding this comment.
yeah, if common is small enough, lets move this to under src/routers
CatherineSue
left a comment
There was a problem hiding this comment.
The overall SseEncoder looks clean to me. However, it is a bit hard for me to understand SseDecoder use case.
For instance, in the codebase is anthropic/sse.rs:208-264 (consume_and_forward):
// Current pattern — anthropic/sse.rs:238-248
while let Some(pos) = find_double_newline(&buffer) {
let frame_bytes = &buffer[..pos];
let frame = std::str::from_utf8(frame_bytes)?;
if let Some((event_type, data)) = parse_sse_frame(frame) {
processor.process(&event_type, &data).await?; // <-- async!
}
buffer.drain(..pos + 2);
}
It would become
// Hypothetical migration to SseDecoder
decoder.push(&chunk)?;
loop {
match decoder.next_frame() {
Some(Ok(frame)) => {
// frame borrows from decoder — can't hold across .await
// MUST clone/own everything before dropping
let event_type = frame.event_type.map(str::to_owned);
let data = frame.data.into_owned();
drop(frame); // mandatory before next iteration
processor.process(&event_type.as_deref().unwrap_or(""), &data).await?;
}
Some(Err(e)) => return Err(...),
None => break,
}
}
decoder.compact();
There is no lines change beneficial, and the entire pipeline is an ergonomics dance. We are doing Extract → own → drop → await. It is less clean than the while let existed in the code base.
| /// | ||
| /// Returns borrowed references into the internal buffer. | ||
| /// Call in a loop until `None`, then call `compact()`. | ||
| pub fn next_frame(&mut self) -> Option<Result<SseFrame<'_>, SseDecodeError>> { |
There was a problem hiding this comment.
This signature means SseFrame holds a shared borrow on the decoder while next_frame needs a mutable borrow. The compiler forces sequential get → own → drop → get choreography. I feel it is not that intuiative. And I can't picture (in details) what this would mean for the integration of SseDecoder to the actual usage in routers today.
There was a problem hiding this comment.
We don't have to copy the data out of the buffer each time, the sequence will be more like get -> drop -> get, more details is this reply, let me know what do you think
|
Hi @CatherineSue thanks for the review, about the decoder comment, Let me clarify the actual migration pattern and the concrete benefits. The example in the review assumes we must copy, but we don't. The frame borrow can be held across decoder.push(&chunk)?;
while let Some(Ok(frame)) = decoder.next_frame() {
// frame borrows from decoder.buf — but decoder is not touched during await
processor.process(frame.event_type.unwrap_or(""), &frame.data).await?;
// frame dropped here at end of loop body → decoder free for next call
drop(frame);
}
decoder.compact();No Concrete allocation savings per frame:
Other improvements over the current
|
8464bd3 to
dc2eb79
Compare
There was a problem hiding this comment.
Actionable comments posted: 2
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@model_gateway/src/routers/common/sse.rs`:
- Around line 414-427: The parse_frame function fails when the incoming frame
begins with a UTF-8 BOM (U+FEFF) because the field name becomes "\u{FEFF}data";
fix this by normalizing the input at the top of parse_frame: strip any leading
BOM from the frame (e.g., use frame.trim_start_matches('\u{FEFF}') or
strip_prefix) before calling split_sse_lines and parsing fields so subsequent
logic in parse_frame (and references to split_sse_lines) sees the correct field
names.
- Around line 416-468: The code currently collects additional "data" lines into
extra_data: Option<Vec<&str>> and then joins them into a String, causing two
allocations; instead, on receiving the second "data" line convert from the
borrowed first_data (&str) into an owned String and append subsequent lines
directly to that String so only one allocation occurs. Locate the loop using
split_sse_lines and the variables first_data, extra_data and saw_data in the SSE
parsing logic, change extra_data to Option<String> (or similar owned buffer), on
the first "data" set first_data: Option<&str>, on the second "data" create a
String by copying first_data, push '\n' and the new value, store into
extra_data, and on further "data" lines append to that String; finally adjust
the data match to return Cow::Borrowed(first) or Cow::Owned(owned_string) and
keep SseFrame { event_type, data } construction unchanged.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: ASSERTIVE
Plan: Pro
Run ID: b36f2be5-9b97-4556-874b-4bda44c51631
📒 Files selected for processing (3)
model_gateway/src/routers/common/mod.rsmodel_gateway/src/routers/common/sse.rsmodel_gateway/src/routers/mod.rs
Signed-off-by: XinyueZhang369 <zoeyzhang369@gmail.com>
Signed-off-by: XinyueZhang369 <zoeyzhang369@gmail.com>
Signed-off-by: XinyueZhang369 <zoeyzhang369@gmail.com>
Signed-off-by: XinyueZhang369 <zoeyzhang369@gmail.com>
Signed-off-by: XinyueZhang369 <zoeyzhang369@gmail.com>
Signed-off-by: XinyueZhang369 <zoeyzhang369@gmail.com>
Signed-off-by: XinyueZhang369 <zoeyzhang369@gmail.com>
Signed-off-by: XinyueZhang369 <zoeyzhang369@gmail.com>
Signed-off-by: XinyueZhang369 <zoeyzhang369@gmail.com>
Signed-off-by: XinyueZhang369 <zoeyzhang369@gmail.com>
dc2eb79 to
495be87
Compare
There was a problem hiding this comment.
Actionable comments posted: 2
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@model_gateway/src/routers/common/sse.rs`:
- Around line 204-209: The constructor SseDecoder::with_max_size currently
always reserves 4096 bytes (Vec::with_capacity(4096)) which ignores small
max_size values; change the initial buffer allocation to use min(max_size, 4096)
so buf is created with Vec::with_capacity(max_size.min(4096)), keeping consumed
and max_size the same; update the SseDecoder::with_max_size function to compute
the capacity from max_size and use that when constructing buf.
- Around line 18-20: The shared SSE builder is adding the connection-specific
header `CONNECTION`, which triggers HTTP/2 warnings; remove any use of the
`CONNECTION` symbol from the SSE helper (remove it from the imports header::{...
CONNECTION ...} and stop inserting the Connection: keep-alive header into the
SSE response builder), leaving only appropriate headers like `CACHE_CONTROL` and
`CONTENT_TYPE` set by the SSE builder (locate usages in functions such as the
SSE response builder helper in sse.rs and delete the line that inserts
`Connection`).
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: ASSERTIVE
Plan: Pro
Run ID: d20384f6-b068-429d-9712-9a53afe66e1a
📒 Files selected for processing (3)
model_gateway/src/routers/common/mod.rsmodel_gateway/src/routers/common/sse.rsmodel_gateway/src/routers/mod.rs
Signed-off-by: XinyueZhang369 <zoeyzhang369@gmail.com>
Description
Problem
The gateway has 3 duplicate SSE parsers and 2 duplicate chunk processors scattered across routers (gRPC, Anthropic, OpenAI, HTTP PD). Each uses its own encoding/decoding logic with a common pattern of
serde_json::to_string+format!("data: {json}\n\n"), resulting in 2 heap allocations per SSE chunk. For a 500-token response (~50 chunks), this adds up.Solution
Create a shared SSE codec module at
model_gateway/src/routers/common/sse.rs(Phase 1 of the SSE codec optimization design) with:SseEncoder: Serializes JSON directly into a reusableVec<u8>buffer viaserde_json::to_writer, then copies intoBytes. Reduces encoding from 2 allocations to 1 per chunk. Buffer grows to high-water mark and retains capacity across chunks via.clear().SseDecoder: Byte buffer with cursor tracking and borrow-based frame parsing. ReturnsSseFrame<'_>withCow<str>data — zero allocation for the common single-line case, only allocates when joining multi-linedata:fields.parse_block: Standalone function for parsing a single already-extracted SSE block string.build_sse_response: Shared SSE response builder with standard headers.find_double_newlineusingmemchr::memmem(~10x faster thanwindows(2)), spec-compliantparse_framewith CRLF normalization, comment ignoring, and one-space strip.Changes
model_gateway/src/routers/common/mod.rs— new module declarationmodel_gateway/src/routers/common/sse.rs— shared SSE codec (encoder, decoder, helpers, 35 tests)model_gateway/src/routers/mod.rs— registerpub mod commonTest Plan
35 unit tests covering:
[DONE]sentinel, buffer reuse, output compatibility with existingformat!patterns[DONE]detection, buffer overflow (DoS protection), compact, flush, unknown fields ignoredChecklist
cargo +nightly fmtpassescargo clippy --all-targets --all-features -- -D warningspassesSummary by CodeRabbit
New Features
Tests