feat(ext-proc): add standalone full-duplex exchange core#627
Merged
shaneutt merged 4 commits intoJun 22, 2026
Conversation
|
PR too large: 5358 lines added (limit: 750, excludes Cargo files, tests, docs, examples, and benchmarks). Please split into smaller PRs. Add |
068ad2f to
b134e2a
Compare
praxis-bot
reviewed
Jun 18, 2026
praxis-bot
left a comment
Collaborator
There was a problem hiding this comment.
PR Review
Summary: Adds a standalone ExtProcExchange duplex state machine for persistent bidirectional gRPC ext_proc streams, to be wired into the filter lifecycle in a follow-up PR.
Overall: Well-designed state machine with strong test coverage (82% test code). The transactional send/receive model, output validation, and override handling are thorough. A few correctness issues to address.
| Severity | Count |
|---|---|
| Large | 2 |
| Medium | 3 |
Findings without inline placement
None -- all findings placed inline.
praxis-bot
reviewed
Jun 18, 2026
praxis-bot
reviewed
Jun 18, 2026
praxis-bot
reviewed
Jun 18, 2026
praxis-bot
reviewed
Jun 18, 2026
praxis-bot
reviewed
Jun 18, 2026
b134e2a to
d03cdff
Compare
Adds the standalone duplex exchange core for persistent bidirectional ExternalProcessor.Process streams. The exchange owns a single Process stream, uses capacity-1 backpressure, validates request/response ordering through explicit state, and keeps send/receive independent so deferred processors can receive body EOS before responding. This is intentionally not wired into ExtProcFilter request routing yet; that integration remains follow-up scope. Signed-off-by: Brent Salisbury <bsalisbu@redhat.com>
d03cdff to
d58a913
Compare
shaneutt
approved these changes
Jun 22, 2026
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Summary
This PR adds the standalone
ext_procduplex exchange core needed for the upcoming generic full-duplex request-routing integration.It introduces
ExtProcExchange, a request-scoped transport/state-machine abstraction around one persistent bidirectional gRPCExternalProcessor.Processstream. The exchange can send processor requests and receive processor responses independently, which is required for processors that defer their routing decision until after body EOS.This PR does not wire the exchange into
ExtProcFilterbehavior yet. It keeps full-duplex config acceptance and request-routing integration for the follow-up PR.This PR is the internal async state machine for one persistent ext_proc Process stream. It does not expose a new user-facing feature yet; it provides the concurrency, ordering, timeout, and close semantics that the request-routing integration uses in the next PR.
The working llm-d PoC integration demo, PR stack details, code walkthrough, and architecture notes live here:
https://github.com/nerdalert/praxis-research-spikes/tree/main/demo/llm-d-track-b
For a detailed walkthrough of the research spike and alternatives researched see (should probably put that somewhere for posterity like an upstream discussion or a spike doc somewhere):
https://github.com/nerdalert/praxis-research-spikes/blob/main/demo/llm-d-track-b/code-walkthrough.md#full-duplex-async-performance-model
What Changed
1. Standalone Duplex Exchange Core
Adds
filter/ext-proc/src/duplex.rswith a standaloneExtProcExchange.The exchange owns one bidirectional
ExternalProcessor.Processstream and exposes a small internal API:send() and receive() are intentionally independent. A caller can send headers, body chunks, and body EOS before waiting for the processor response. That avoids the deadlock pattern
where a processor waits for the full body before sending a header response.
PR2 keeps this as a testable transport core only. It does not modify ExtProcFilter to use the exchange in production request routing yet.
2. Single-Owner Process Driver
The exchange constructs the tonic Process future synchronously during open() without polling it immediately.
The response stream is resolved lazily when
send()orreceive()drives the pending future. This avoids forcing an early server response during stream setup, which would deadlock with processors that wait for request body data before responding.The design avoids production per-request worker machinery:
Arc<Mutex<_>>to force thread-safetyThe pending tonic future is single-owned and wrapped with
SyncWrapperso the type can later live inside request scoped filter state without adding a lock.3. Bounded Backpressure
Outbound processor requests flow through a bounded mpsc channel with capacity 1.
That keeps memory usage bounded and prevents a request from getting arbitrarily far ahead of the gRPC stream.
send()reserves channel capacity before committing state, so backpressure is part of the transactional send path.While the gRPC stream is still bootstrapping,
send()usestokio::select!to make progress on both:This lets the exchange avoid startup deadlocks without spawning a separate exchange task.
4. Transactional Send State
send() follows a strict commit sequence:
This ensures cancellation before message commit leaves the exchange state unchanged. It also ensures message timeouts start at the send commit boundary, not at the later receive call.
The exchange tracks outbound send state separately for request and response directions, including whether body data was ever committed. That lets response validation distinguish solicited processor output from unsolicited or wrong phase output.
5. Active Processing State and Output Validation
For non-full-duplex modes, each sent message creates an ActiveProcessingState with:
Only the exact expected response consumes the active state. Wrong-direction or wrong-type responses are rejected instead of being interpreted as valid output.
For full-duplex body modes, the exchange permits independent send/receive progress without per-message active processing deadlines.
Processor output is also validated transactionally. The exchange validates output phase changes on a local copy first and commits the new phase only after all checks pass. A rejected response cannot corrupt output history.
6. Timeout and Override Handling
Message timeout policy is internal to the exchange. Callers do not decide whether a receive is timed or untimed.
For non-full-duplex active processing states, the exchange stores an absolute deadline when the message is committed. receive() uses that stored deadline.
override_message_timeout envelopes are handled before response classification:
open_timeout is intentionally not part of this PR. open() is synchronous and only constructs the pending Process future.
7. Typed Exchange Events
Processor responses are classified into typed exchange events:
Each event preserves the original proto response payload and any dynamic_metadata from the processor envelope.
ImmediateResponse is terminal after a soliciting send. Once terminal, later send/receive calls return Closed.
8. Clean Close Support
finish_sending() half-closes the outbound processor request stream by dropping the sender. This does not immediately close the response side; callers may still receive buffered processor output.
drain_trailing() can consume remaining response stream messages after a successful receive path. This gives follow-up integration code a way to cleanly drain the gRPC stream before dropping the exchange.
9. Test Coverage
This PR adds extensive exchange-level coverage with a mock duplex processor.
Coverage includes:
PR Composition
This PR has a large test LOC because the exchange core handles async stream ordering, cancellation, timeouts, and close behavior.
Most of the diff is mock processor infrastructure and behavioral coverage for duplex ordering, cancellation, timeout, override, metadata, and close semantics. The production change is limited to the standalone exchange core plus minimal module/dependency wiring.
PR Stack Context
HttpFilterContext, stable filter identity, pipeline pinning across hot reloadExtProcExchangetransport core for one persistent bidirectional gRPCExternalProcessor.Processstream per requestext_procinto the filter lifecycle for the llm-d Go EPP request-routing path using genericext_proc+endpoint_selectorExplicitly Out of Scope
This PR intentionally does not include:
Why This Is Needed
The upcoming generic full-duplex ext_proc integration needs one persistent processor stream per HTTP request.
Without this exchange core, the filter would either have to serialize send/receive behavior or hide stream management inside ad hoc integration code. That would make deferred processors, body streaming, timeout handling, cancellation, and response ordering much harder to reason about.
This PR keeps the transport foundation isolated and testable before wiring it into request routing.
Design Notes
The exchange design intentionally uses a state machine plus a single-owner async driver instead of a per-request worker task, a locked shared stream, or a serialized send-then-receive model. That keeps protocol correctness explicit, avoids lock/task overhead on the hot path, preserves bounded backpressure, and prevents deadlocks with processors that wait for body EOS before responding. More details in the research spike state-machine/async-driver design instead of simpler looking options
Validation