feat(ext-proc): integrate full-duplex request routing#707
Conversation
|
PR too large: 3878 lines added (limit: 750, excludes Cargo files, tests, docs, examples, and benchmarks). Please split into smaller PRs. Add |
Signed-off-by: Brent Salisbury <bsalisbu@redhat.com>
078e1c1 to
816e0df
Compare
praxis-bot
left a comment
There was a problem hiding this comment.
PR #707 Review: feat(ext-proc): integrate full-duplex request routing
This is a substantial and well-structured PR that wires the standalone full-duplex ext_proc exchange into the Praxis request lifecycle. The architecture is sound: trusted mutation provenance prevents SSRF, the endpoint_selector filter validates addresses thoroughly, and the lifecycle timeout bounds prevent unbounded memory growth from slow processors.
Findings: 11 comments (0 blocking, 5 suggested improvements, 6 nits/docs)
Key observations
Correctness: The mutation provenance model is solid -- pre-read mutations are clearly distinguished from client-supplied headers, and the ordered log correctly handles override sequences (Add -> Remove -> Set resolves deterministically). The validate_host_port function covers a comprehensive set of attack vectors (SSRF via scheme injection, path traversal, userinfo, malformed IPv6).
Architecture concern: The stream_buffer pre-read fallback has an implicit mutual-exclusivity contract between legacy mutation queues and pre_read_mutations that is not documented or enforced. A future filter mixing both mechanisms in the same chain would lose mutations silently.
Validation gap: lifecycle_timeout_ms can be configured below message_timeout_ms, which would make the full-duplex drain path always fail. This should be validated similarly to deferred_close_timeout_ms.
Test coverage: Excellent. The test suite covers the full lifecycle (headers-before-body ordering, incremental chunk delivery, EOS-does-not-resend, deferred routing, immediate rejection, configured timeouts), plus edge cases (Host mutation protection, case-insensitive matching, ambiguous trusted values). The endpoint validation tests cover DNS, IPv4, IPv6, scheme injection, path injection, userinfo, port zero, and malformed labels.
See inline comments for details.
| // Bootstrap State | ||
| // ----------------------------------------------------------------------------- | ||
|
|
||
| /// Pinned boxed Process future, `Send + 'static` but not `Sync`. |
There was a problem hiding this comment.
Duplicate doc comment: This line adds a second /// Pinned boxed Process future ... doc comment above the use statement, while the original one still exists on line 334 above the type alias. One of them should be removed.
| reason = "sectioned state-machine implementation keeps domains reviewable" | ||
| )] | ||
| impl ExtProcExchange { | ||
| /// Internal receive with override loop and classification. |
There was a problem hiding this comment.
Doc comment placed on the wrong method: The detailed doc comment describing the internal receive algorithm (steps 1-3 with override loop, classification, and classify_and_validate link) was moved from receive_inner onto ensure_response_stream. The comment describes receive_inner's behavior (override loop, classification), not ensure_response_stream's (which just resolves the bootstrap future). Meanwhile receive_inner on line 928 lost its detailed docs and only has a short summary. The algorithmic doc should stay on receive_inner.
| /// | ||
| /// Uses [`OnceLock`] so initialization happens exactly once, | ||
| /// inside whichever Tokio runtime context the first request | ||
| /// runs in. Returns `None` if the initial connection fails. |
There was a problem hiding this comment.
Misleading doc comment: The doc says "Returns None if the initial connection fails" but the method signature is fn channel(&self) -> Channel -- it never returns None. Since connect_lazy() does not fail (it defers connection), this should say something like "The returned channel connects lazily on first use."
| let ms = cfg.lifecycle_timeout_ms; | ||
| return Err( | ||
| format!("ext_proc: lifecycle_timeout_ms ({ms}) exceeds maximum ({MAX_LIFECYCLE_TIMEOUT_MS})").into(), | ||
| ); |
There was a problem hiding this comment.
Missing validation: lifecycle_timeout_ms can be less than message_timeout_ms: The config validates lifecycle_timeout_ms > 0 and <= 300_000, but does not check whether it is at least as large as message_timeout_ms. A lifecycle timeout shorter than the per-message timeout means the lifecycle will always expire before the first drain receive() can complete, making the full-duplex path always reject with status_on_error. Consider adding a check analogous to the existing deferred_close_timeout_ms >= message_timeout_ms validation above.
| }; | ||
| match eos_result { | ||
| Ok(()) => {}, | ||
| Err(ExchangeError::SendFailed | ExchangeError::Closed) => { |
There was a problem hiding this comment.
SendFailed silently swallowed at debug level: When the EOS body send fails due to SendFailed | Closed, this logs at debug and proceeds to drain. Closed is a clean half-close, but SendFailed could indicate the exchange never bootstrapped (channel creation problem, serialization failure). Consider logging SendFailed at warn to surface connectivity problems, while keeping Closed at debug.
| /// Returns an error if a pending `Set` value contains | ||
| /// non-text bytes, or if the final state has multiple | ||
| /// distinct values. | ||
| pub fn pending_header_value(&self, name: &HeaderName) -> Result<PendingHeaderResult, String> { |
There was a problem hiding this comment.
pending_header_value assumes removes precede sets: The method checks request_headers_to_remove for presence and separately finds the last set value. When both exist, it returns the set value (remove + set = set), which is correct for the ext_proc mutation path where removes are always applied before sets. However, if a filter ever pushes Set(x) then Remove(x) (intending: set first, then retract), this method would still return Value(...). A brief comment documenting the ordering assumption would help future maintainers.
| // mutation queues. This preserves their existing remove -> set | ||
| // -> add application order. | ||
| for name in &filter_ctx.request_headers_to_remove { | ||
| mutation_log.push(TrustedHeaderMutation::Remove(name.clone())); |
There was a problem hiding this comment.
Mutual exclusivity contract between mutation sources is implicit: When pre_read_mutations is non-empty, the else-branch discards legacy queues (extra_request_headers, request_headers_to_set, request_headers_to_remove). This works because ext_proc populates both in lockstep, but a future filter that only uses legacy queues while another uses pre_read_mutations would silently lose mutations. Consider adding a debug_assert! or a comment documenting that filters in the same pre-read chain must use the same mutation mechanism.
| /// let filter = EndpointSelectorFilter::from_config(&yaml).unwrap(); | ||
| /// assert_eq!(filter.name(), "endpoint_selector"); | ||
| /// ``` | ||
| pub struct EndpointSelectorFilter { |
There was a problem hiding this comment.
EndpointSelectorFilter does not implement Debug: Unlike ExtProcFilter in this same PR (which has a manual Debug impl), this struct has no Debug implementation. Adding at least a minimal Debug impl showing source_header and required would help with diagnostic logging.
| @@ -2049,7 +2112,7 @@ fn make_response() -> praxis_filter::Response { | |||
| static TEST_ID_GENERATOR: std::sync::LazyLock<praxis_core::id::IdGenerator> = | |||
| std::sync::LazyLock::new(|| praxis_core::id::IdGenerator::with_seed(0)); | |||
There was a problem hiding this comment.
Attribute before doc comment: The #[expect(clippy::too_many_lines, ...)] attribute appears before the /// doc comment. Per Rust convention (and this project's style), doc comments should precede attributes:
// Current:
#[expect(clippy::too_many_lines, reason = "...")]
/// Build a minimal [`HttpFilterContext`] ...
fn make_ctx(...)
// Should be:
/// Build a minimal [`HttpFilterContext`] ...
#[expect(clippy::too_many_lines, reason = "...")]
fn make_ctx(...)| extensions, | ||
| filter_metadata, | ||
| filter_state, | ||
| _pre_read_mutations, |
There was a problem hiding this comment.
_pre_read_mutations silently discarded at destructuring site: The destructured filter_ctx.pre_read_mutations is bound to _pre_read_mutations and dropped. The comment on line 219 explains why (consumed by endpoint_selector, cleared to prevent stale reuse), but the underscore binding at the destructuring site makes it easy to miss. A brief inline comment at the destructuring would help: // Consumed by request pipeline; cleared below.
Summary
This PR lets Praxis ask an external processor where a request should go, wait for the answer after the request body is complete, and then safely route the request to that selected backend.
This PR wires the standalone full-duplex
ext_procexchange into the Praxis request lifecycle for generic request-routing use cases.It enables Praxis to speak Envoy's
ExternalProcessor.Processprotocol with one full-duplex gRPC stream per HTTP request. Praxis can send request headers and body chunks to an external processor, wait for the processor's endpoint decision at request body EOS, and route through a genericendpoint_selectorfilter.This is PR3 in the full-duplex
ext_procstack:Changes
1. Full-Duplex
ext_procRequest LifecycleSummary: Praxis now keeps one conversation open with the external processor for the whole request instead of making separate disconnected calls.
This PR enables
request_body_mode: full_duplex_streamedinExtProcFilter.The request lifecycle now supports:
RequestHeaderswithProtocolConfigurationThe first ext_proc message is sent through
open_with_request_headers(), which atomically queues and commits the initialRequestHeadersenvelope. That avoids split state between opening the stream and recording that headers were sent.2. StreamBuffer Pre-Read Integration
Summary: Praxis reads the request body early enough for the processor to make a routing decision before the request is forwarded upstream.
The protocol layer uses StreamBuffer pre-read to let body filters observe request body chunks before upstream selection happens.
For full-duplex
ext_proc, this means:This is what makes deferred routing work for processors that wait for the full body before choosing a destination.
3. Trusted Mutation Provenance
Summary: Praxis trusts routing headers only when they came from the external processor, never when they came from the client.
The request-routing path needs to distinguish processor-created routing headers from client-supplied spoofed headers.
This PR adds trusted pre-read mutation provenance:
TrustedHeaderMutationvaluesThis prevents a client from setting
x-gateway-destination-endpointdirectly and selecting an arbitrary upstream.4. Ordered Header Mutation Semantics
Summary: If the processor changes its mind across multiple responses, Praxis follows the final ordered mutation result instead of mixing old and new values.
The mutation resolver preserves order-sensitive semantics for:
RemoveSetAddThe resolver applies mutations in order and rejects only final ambiguous states. For example:
Add(A), Add(A)is accepted as the same valueAdd(A), Add(B)is rejected as ambiguousAdd(A), Remove, Set(B)resolves toBThis matters because the external processor can return multiple responses during pre-read, and later responses must be able to override earlier ones deterministically.
5. Generic
endpoint_selectorFilterSummary: A generic filter takes the trusted destination from the processor, validates it, sets the upstream, and strips the internal header before the backend sees it.
This PR adds a new generic HTTP filter:
endpoint_selector.The filter:
host:portsyntaxctx.upstreamrequired: truestatus_on_required_failureRequired-mode failures return
FilterAction::Reject, notFilterError, so they are not bypassed by fail-open behavior.6. Structured Metadata Persistence
Summary: Metadata returned by the processor is carried through the request context in a bounded way.
This PR persists processor dynamic metadata as
serde_json::Value.Metadata handling includes:
This keeps processor metadata available to later phases without making routing depend on it.
7. Server Registration and Example Config
Summary: The generic
ext_procfilter can now be used by the normal Praxis server when the feature is enabled.This PR adds feature-gated
ext_procregistration through the server registry.It also adds a documented example config for the composition:
ext_proc -> endpoint_selectorThe example demonstrates the generic request-routing path without a custom llm-d-specific filter.
Concurrency Model
Summary: The request path avoids extra per-request worker tasks and keeps buffering tight so slow processors cannot cause unbounded memory growth.
The request path uses the PR2 exchange state machine:
This matters for processors that wait until request body EOS before returning a routing decision. Praxis must be able to send headers, stream body chunks, and send EOS without first waiting for an early processor response.
Safety Properties
Summary: The routing decision is processor-controlled, validated, fail-closed when required, and hidden from the backend.
This PR preserves these safety properties:
Hostis treated as a protected request-authority header. Genericext_procrequest mutations cannot append, replace, or remove it, preventing a default append mutation from producing duplicateHostfields on the forwarded request. This is protocol-safety behavior, not llm-d-specific handling.Scope Boundary
Summary: This PR proves request routing. It does not claim full Envoy ext_proc parity or full llm-d environment validation.
This PR does not implement:
Those are follow-up areas and are not blockers for this request-routing path.
Reviewer Docs
Summary: The supporting docs explain the architecture, code path, PR stack, and validation output.
Reviewer docs are available here:
https://github.com/nerdalert/praxis-research-spikes/tree/main/demo/llm-d-track-b
Relevant pages:
Architecture and PR Stack
Code Walkthrough
Sample Output
PR3 Integration Validation Output
The validation artifacts under demo/llm-d-track-b/validation/ are pushed in the research-spikes repo. The separate hermetic integration-test branch is not part of
this PR.
Validation
Summary: The branch has been rebased on current main and passed lint, unit, ext_proc serial, docs, and diff checks.
Validated after rebasing on upstream/main:
Current focused counts include: