feat(llm): streaming llm_call — provider SSE consumption with durable accumulated output#69
Merged
Merged
Conversation
… accumulated output
`stream: true` on llm_call consumes the provider's streaming wire protocol
(OpenAI /chat/completions SSE with stream_options.include_usage; Anthropic
/messages event stream), publishes incremental text deltas to clients
watching GET /instances/{id}/stream, and produces a durable step output
identical to the non-streaming path (message, finish_reason, usage, tool
calls) — downstream blocks are unaffected.
- New `orch8_engine::stream_bus`: per-instance tokio broadcast registry
(lazily created on subscribe, capacity-bounded, dropped when the last
subscriber disconnects). Handlers publish `llm_delta` events; the API's
SSE endpoint subscribes in the same process and forwards them as
`llm_delta` SSE events alongside the existing polled state/output events.
- Incremental SSE parser shared by both providers (CRLF-safe, chunk-split
and UTF-8-split safe); request-body building extracted so streaming and
non-streaming share multimodal conversion.
- Failure taxonomy: mid-stream drop, missing terminal event ([DONE] /
message_stop) and an inter-chunk idle timeout (stream_idle_timeout_secs,
default 30s) all fail Retryable; Anthropic mid-stream error events map to
the existing retryable/permanent split. Failover streams per attempt.
- `stream` + `response_schema` falls back to non-streaming (logged): the
validate/repair loop needs complete responses. dry_run skips the provider
call unchanged.
- Engine-only deployments compile unchanged: the bus lives behind the
existing engine/api boundary and stays inert without subscribers.
Tests: mock-SSE provider streams asserting streamed output equals the
non-streaming shape (incl. usage and tool calls) for both providers,
mid-stream drop / idle timeout / error-event taxonomy, schema fallback,
failover streaming, bus pub/sub lifecycle, and API e2e tests proving a
connected SSE client receives llm_delta events scoped to its instance.
Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Summary
Item #12 from docs/FEATURE_OPPORTUNITIES.md, full slice:
stream: trueonllm_callconsumes the provider's SSE stream AND forwards live deltas to clients watching the instance.stream_options.include_usage, tool-call delta accumulation) and Anthropic (message_start→message_stoprebuild through the existing normalizer, error-event taxonomy mapped to Retryable/Permanent). Inter-chunk idle timeout (default 30s) — a stalled stream fails Retryable. The durable step output is asserted exactly equal to the non-streaming path, usage included.StreamBusin orch8-engine (per-instancetokio::sync::broadcast, lazy create, dropped with last subscriber);GET /instances/{id}/streamgains a select arm forwarding{"type":"llm_delta",…}events between its existing storage polls. Deltas publish only when subscribers exist — zero cost otherwise. Best-effort live view; full text always lands durably.stream+response_schema: falls back to non-streaming with a warn (repair loop needs complete responses; durable output identical either way) — documented and tested.Tests
Engine 1224 lib tests + API 301 (2412 total across both crates): stream-equals-non-stream for both providers, tool-call/
input_json_deltaaccumulation, mid-stream drop and idle timeout → Retryable, Anthropic error taxonomy, schema fallback, failover streaming, bus lifecycle, SSE-client e2e receiving instance-scoped deltas. fmt/clippy/doc gates clean.🤖 Generated with Claude Code