Skip to content

Worker AnswerPostprocessor drops stream:true responses #49

@andreypfau

Description

@andreypfau

What happens

Any OpenAI-compatible /v1/chat/completions with stream: true returns
HTTP 200 with zero bytes:

curl -sS -o /tmp/out.bin \
  -w 'HTTP %{http_code} size=%{size_download}\n' \
  -H 'Content-Type: application/json' \
  -d '{"model":"Qwen/Qwen3-32B","stream":true,"max_tokens":30,
       "messages":[{"role":"user","content":"hi"}]}' \
  http://127.0.0.1:10080/v1/chat/completions
# HTTP 200 size=0

The same request without stream:true works normally. Worker logs show the
inference actually ran (start/end timestamps differ by the expected
generation time). Whatever the worker emitted never reaches the proxy.

Why

AnswerPostprocessor::add_next_answer_slice in
runners/helpers/ValidateRequest.cpp:1411-1486 runs incoming bytes through
std::stringstream ss(last_); ss >> v; where v is nlohmann::json. That
only works when the worker emits a single JSON document per response. vLLM
with stream:true emits SSE frames:

data: {"id":"...","choices":[{"delta":{"content":"hi"}}]}

data: {"id":"...","choices":[...],"usage":{"prompt_tokens":14,"completion_tokens":20}}

data: [DONE]

operator>> throws on the leading data:. The catch sets is_end = true,
the loop exits, and because pos never advanced, last_ = last_.substr(0)
keeps the whole stream buffered for the next call, which also throws. The
returned string is always empty. At the end, finalize() logs

worker request: unprocessed data in answer: bytes=481

and drops the tail. Client sees HTTP 200, content-length 0.

There's a second oddity: ValidateRequest::process_chat_completions (around
lines 1132-1163) happily accepts stream: true and auto-injects
stream_options.include_usage = true. So one part of the code tells the
client "yes, stream is supported" while another silently discards the result.

Repro

Standalone harness that drives AnswerPostprocessor directly, without
spinning up a worker:
https://gist.github.com/andreypfau/02d0987cf1bc1ebff447f0c6b3cf5f8b

Drop the .cpp into runners/utils/, add two lines to CMakeLists.txt
next to generate-cocoon-wallet-address (it only needs cocoon_common),
build the target, run the binary.

Against current main:

[BUG] postprocessor swallowed the SSE stream
=== forwarded payload (0 bytes) ===
=== end ===
usage: prompt=0 cached=0 completion=0 reasoning=0 total=0

with worker request: unprocessed data in answer: bytes=481 on stderr —
that's the untouched SSE stream.

With the fix below applied, three scenarios (full SSE frames, byte-by-byte
SSE to test buffering across slice boundaries, legacy single-JSON response):

[OK] sse_whole_events
[OK] sse_byte_split
[OK] raw_json_non_stream
usage: prompt=11 cached=3 completion=7 reasoning=0 total=21

Fix

Make add_next_answer_slice decide mode from the first non-whitespace byte.
If it's d / e / i / r / :, treat the stream as SSE: split on
\n\n (or \r\n\r\n), concatenate data: lines within each event, pass
[DONE] through verbatim, try nlohmann::json::parse on the payload (on
failure emit the event as-is so the caller can see what the worker sent),
run the existing usage extraction plus cost injection, emit
data: {json}\n\n. finalize() synthesises a trailing \n\n if the stream
ended on an SSE tail without a terminator. If the first byte isn't SSE-like,
fall through to the original ss >> v loop, so non-streaming responses
behave exactly as before.

ByteTokenCounter has the same parser pattern. Once this is in, it makes
sense to consolidate both through a shared helper in a separate small patch.

I have the patch; will open a PR referencing this issue.

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions