refactor(routers): use worker.http_client() instead of shared AppContext.client#1016
refactor(routers): use worker.http_client() instead of shared AppContext.client#1016CatherineSue wants to merge 7 commits intomainfrom
Conversation
WorkerSelector no longer needs a shared reqwest::Client. The refresh_worker_models function now uses worker.http_client() to call /v1/models on each external worker, using the worker's own isolated connection pool. - Remove client field from WorkerSelector - Remove client parameter from WorkerSelector::new() - Update all 5 callers to drop the client argument Signed-off-by: Chang Su <chang.s.su@oracle.com>
Switch OpenAI Chat, Responses non-streaming, and Responses streaming handlers from ctx.components.client() (shared) to worker.http_client() (per-worker isolated connection pool). SharedComponents.client is no longer used for request routing in the OpenAI router — only for model refresh via WorkerSelector (already migrated to worker.http_client() in the previous commit). Signed-off-by: Chang Su <chang.s.su@oracle.com>
Switch HTTP router from self.client (shared) to worker.http_client() (per-worker) for all request routing paths: - proxy_get_request: change select_first_worker to return Arc<dyn Worker> instead of String, use worker.http_client() - route_simple_request (fan-out): use worker.http_client().clone() - send_typed_request: use worker.http_client() - Update tests for new select_first_worker return type Signed-off-by: Chang Su <chang.s.su@oracle.com>
Switch Gemini non-stream execution from shared client to worker.http_client(). Remove client field from Gemini SharedComponents since it's no longer used (worker_selection already migrated, and this was the last usage). Signed-off-by: Chang Su <chang.s.su@oracle.com>
Switch HTTP PD router from self.client (shared) to worker.http_client() (per-worker) for all request paths: - proxy_to_worker: takes &dyn Worker instead of URL string - build_post_with_headers: use prefill/decode worker clients - health_generate: use prefill/decode worker clients Signed-off-by: Chang Su <chang.s.su@oracle.com>
📝 WalkthroughWalkthroughRemoved router-level shared Changes
Estimated code review effort🎯 3 (Moderate) | ⏱️ ~25 minutes Possibly related PRs
Suggested labels
Suggested reviewers
Poem
🚥 Pre-merge checks | ✅ 3✅ Passed checks (3 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches📝 Generate docstrings
🧪 Generate unit tests (beta)
Warning Review ran into problems🔥 ProblemsTimed out fetching pipeline failures after 30000ms Comment |
| .map(|worker| { | ||
| let url = format!("{}/{}", worker.base_url(), endpoint); | ||
| let client = self.client.clone(); | ||
| let client = worker.http_client().clone(); |
There was a problem hiding this comment.
🟡 Nit: With this change, self.client is no longer used for any HTTP requests in Router — the only remaining reference is in the Debug impl (line 59). The client: Client field on the Router struct (line 50) is now dead code and can be removed along with its initialization in new() and the Debug field.
There was a problem hiding this comment.
Fixed in bbaa9f4. Removed client field, initialization, and Debug field from Router.
There was a problem hiding this comment.
💡 Codex Review
Here are some automated review suggestions for this pull request.
Reviewed commit: 7b8e82cdbb
ℹ️ About Codex in GitHub
Codex has been enabled to automatically review pull requests in this repo. Reviews are triggered when you
- Open a pull request for review
- Mark a draft as ready
- Comment "@codex review".
If Codex has suggestions, it will comment; otherwise it will react with 👍.
When you sign up for Codex through ChatGPT, Codex can also answer questions or update the PR, like "@codex address that feedback".
| let payload_ref = ctx.payload().expect("Payload not prepared"); | ||
| let payload_json = Arc::new(payload_ref.json.clone()); | ||
| let client = ctx.components.client().clone(); | ||
| let client = worker.http_client().clone(); |
There was a problem hiding this comment.
Restore configured client for mesh-synced workers
Switching chat routing to worker.http_client() assumes every Worker carries a fully configured HTTP client, but mesh-synced workers are still built via BasicWorkerBuilder::from_spec(...).build() / .new(...).build() in worker_registry.rs without .http_client(...), which falls back to a bare default client in worker_builder.rs (no router TLS identity/custom CA/proxy settings). In mesh deployments that depend on those router-level settings, requests to selected workers will now fail after this line, whereas the previous shared AppContext.client path worked; please either inject configured clients when creating mesh workers or keep a safe fallback for workers with default clients.
Useful? React with 👍 / 👎.
There was a problem hiding this comment.
Valid concern. Mesh-synced workers built via on_remote_worker_state use BasicWorkerBuilder without router TLS config, so their http_client() is a bare default (no mTLS/custom CA). The fix requires passing RouterConfig into the mesh sync path. Tracked as a known limitation — pre-existing issue exposed by this migration.
There was a problem hiding this comment.
Code Review
This pull request refactors the model gateway to utilize per-worker HTTP clients across multiple routers and the worker selection process. The review feedback identifies a recurring issue where worker.url() is used for request construction, which is incompatible with Data Parallel (DP) aware workers due to rank suffixes. It is recommended to use worker.base_url() instead and to centralize this URL formatting logic to improve maintainability.
| Ok(worker) => { | ||
| let mut request_builder = worker | ||
| .http_client() | ||
| .get(format!("{}/{endpoint}", worker.url())); |
There was a problem hiding this comment.
Using worker.url() for the request URL is incorrect for Data Parallel (DP) aware workers. The url() method returns the worker's identity string which includes the @rank suffix (e.g., http://host:port@rank), causing reqwest to misparse the host. Use worker.base_url() instead to get the physical endpoint URL. Since this URL construction pattern is repeated across the router, consider extracting it into a shared helper function.
| .get(format!("{}/{endpoint}", worker.url())); | |
| .get(format!("{}/{endpoint}", worker.base_url())); |
References
- Extract duplicated logic into a shared helper function to improve maintainability and reduce redundancy.
- Server-generated IDs used in format! macros for URL paths do not require URL encoding, as this is the standard pattern throughout the codebase for non-user-input IDs.
There was a problem hiding this comment.
Pre-existing — the old code also used w.url().to_string() in proxy_to_first_prefill_worker. Not introduced by this PR.
Switch realtime REST routes (sessions, client_secrets, transcription_sessions) from shared_components.client to worker.http_client(). Remove client parameter from forward_realtime_rest. Remove client field from OpenAI SharedComponents and ComponentRefs::client() — no remaining usages. Signed-off-by: Chang Su <chang.s.su@oracle.com>
Remove self.client from Router and PDRouter structs — no longer used for request routing after migration to worker.http_client(). Also remove unused reqwest::Client import from HTTP router. Signed-off-by: Chang Su <chang.s.su@oracle.com>
There was a problem hiding this comment.
💡 Codex Review
Here are some automated review suggestions for this pull request.
Reviewed commit: bbaa9f443c
ℹ️ About Codex in GitHub
Codex has been enabled to automatically review pull requests in this repo. Reviews are triggered when you
- Open a pull request for review
- Mark a draft as ready
- Comment "@codex review".
If Codex has suggestions, it will comment; otherwise it will react with 👍.
When you sign up for Codex through ChatGPT, Codex can also answer questions or update the PR, like "@codex address that feedback".
| ) -> bool { | ||
| let url = format!("{}/v1/models", worker.url()); | ||
| let mut backend_req = client.get(&url); | ||
| let mut backend_req = worker.http_client().get(&url); |
There was a problem hiding this comment.
Keep configured client for external model refresh
Switching refresh_worker_models to worker.http_client() regresses model discovery for mesh-synced workers that are constructed without an injected client (WorkerRegistry::on_remote_worker_state builds via BasicWorkerBuilder::from_spec(...).build() at core/worker_registry.rs:1065, and BasicWorkerBuilder::build falls back to a default reqwest client at core/worker_builder.rs:245-253). In deployments that rely on router-level TLS/proxy/custom-CA settings, /v1/models refresh now fails and select_worker can return model_not_found even though healthy workers exist. Fresh evidence beyond the existing chat-path comment: this now breaks worker selection itself (refresh-on-miss) before any request routing occurs.
Useful? React with 👍 / 👎.
| ) -> bool { | ||
| let url = format!("{}/v1/models", worker.url()); | ||
| let mut backend_req = client.get(&url); | ||
| let mut backend_req = worker.http_client().get(&url); |
There was a problem hiding this comment.
🔴 Important: Same mesh-worker concern flagged on chat.rs by another reviewer, but with a worse failure mode here: refresh_worker_models returns false on failure and only logs a warning (line 237). For mesh-synced workers built via BasicWorkerBuilder::from_spec(spec).build() without .http_client(...), this will silently use a bare default client (no TLS identity, no custom CA, no proxy settings from router config). Unlike the request-routing paths where failure surfaces as an HTTP error to the caller, here the failure is silent — the worker's model list simply won't refresh, making it invisible for model-based routing without any user-facing error.
There was a problem hiding this comment.
Actionable comments posted: 2
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@model_gateway/src/routers/http/pd_router.rs`:
- Around line 71-75: The current code calls
WorkerRegistry::get_prefill_workers() and then uses workers.first(), which can
pick an unhealthy worker; change the selection to pick a healthy/available
worker (e.g., call .iter().find(|w| w.is_available()) or .into_iter().filter(|w|
w.is_available()).next()) before calling proxy_to_worker so
get_server_info/get_model_info are proxied only to available backends; update
the branch that currently matches on workers.first() to instead match on the
healthy worker found and pass that worker.as_ref() into proxy_to_worker.
In `@model_gateway/src/routers/http/router.rs`:
- Around line 76-82: select_first_worker currently returns any healthy worker
from worker_registry.get_all(), which can allow PD or gRPC backends to be
chosen; change it to filter for workers that are both WorkerType::Regular and
ConnectionMode::Http before checking health. Specifically, in
select_first_worker, replace the existing healthy_workers computation with an
iterator that first filters by worker.worker_type() == WorkerType::Regular &&
worker.connection_mode() == ConnectionMode::Http, then by is_healthy(), and
return the first matching Arc<dyn Worker> or the same "No workers are available"
error if none match.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: ASSERTIVE
Plan: Pro
Run ID: b60130ca-e574-4894-9935-3fd920932a6b
📒 Files selected for processing (5)
model_gateway/src/routers/http/pd_router.rsmodel_gateway/src/routers/http/router.rsmodel_gateway/src/routers/openai/context.rsmodel_gateway/src/routers/openai/realtime/rest.rsmodel_gateway/src/routers/openai/router.rs
| let workers = self.worker_registry.get_prefill_workers(); | ||
| let first_worker_url = workers.first().map(|w| w.url().to_string()); | ||
|
|
||
| if let Some(worker_url) = first_worker_url { | ||
| self.proxy_to_worker(worker_url, endpoint, headers).await | ||
| if let Some(worker) = workers.first() { | ||
| self.proxy_to_worker(worker.as_ref(), endpoint, headers) | ||
| .await |
There was a problem hiding this comment.
🧩 Analysis chain
🏁 Script executed:
#!/bin/bash
set -euo pipefail
rg -n -A20 -B5 'fn get_prefill_workers|fn get_decode_workers' model_gateway/src/core/worker_registry.rsRepository: lightseekorg/smg
Length of output: 1397
Add health filter before proxy in proxy_to_first_prefill_worker.
WorkerRegistry::get_prefill_workers() only filters by worker type; it returns all prefill workers regardless of health status. Calling .first() can select an unhealthy prefill worker, causing get_server_info and get_model_info to proxy requests to a down backend. Add an explicit .filter(|w| w.is_available()) or similar before .first().
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@model_gateway/src/routers/http/pd_router.rs` around lines 71 - 75, The
current code calls WorkerRegistry::get_prefill_workers() and then uses
workers.first(), which can pick an unhealthy worker; change the selection to
pick a healthy/available worker (e.g., call .iter().find(|w| w.is_available())
or .into_iter().filter(|w| w.is_available()).next()) before calling
proxy_to_worker so get_server_info/get_model_info are proxied only to available
backends; update the branch that currently matches on workers.first() to instead
match on the healthy worker found and pass that worker.as_ref() into
proxy_to_worker.
| fn select_first_worker(&self) -> Result<Arc<dyn Worker>, String> { | ||
| let workers = self.worker_registry.get_all(); | ||
| let healthy_workers: Vec<_> = workers.iter().filter(|w| w.is_healthy()).collect(); | ||
| if healthy_workers.is_empty() { | ||
| Err("No workers are available".to_string()) | ||
| } else { | ||
| Ok(healthy_workers[0].url().to_string()) | ||
| Ok(healthy_workers[0].clone()) |
There was a problem hiding this comment.
Restrict select_first_worker to regular HTTP workers.
This is the only selection helper in the file that bypasses the WorkerType::Regular + ConnectionMode::Http filter. In mixed deployments, Line 89 can forward /health_generate, /get_server_info, or /get_model_info to a PD or gRPC backend.
🎯 Suggested fix
fn select_first_worker(&self) -> Result<Arc<dyn Worker>, String> {
- let workers = self.worker_registry.get_all();
- let healthy_workers: Vec<_> = workers.iter().filter(|w| w.is_healthy()).collect();
- if healthy_workers.is_empty() {
- Err("No workers are available".to_string())
- } else {
- Ok(healthy_workers[0].clone())
- }
+ self.worker_registry
+ .get_workers_filtered(
+ None,
+ Some(WorkerType::Regular),
+ Some(ConnectionMode::Http),
+ None,
+ false,
+ )
+ .into_iter()
+ .find(|w| w.is_healthy())
+ .ok_or_else(|| "No workers are available".to_string())
}🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@model_gateway/src/routers/http/router.rs` around lines 76 - 82,
select_first_worker currently returns any healthy worker from
worker_registry.get_all(), which can allow PD or gRPC backends to be chosen;
change it to filter for workers that are both WorkerType::Regular and
ConnectionMode::Http before checking health. Specifically, in
select_first_worker, replace the existing healthy_workers computation with an
iterator that first filters by worker.worker_type() == WorkerType::Regular &&
worker.connection_mode() == ConnectionMode::Http, then by is_healthy(), and
return the first matching Arc<dyn Worker> or the same "No workers are available"
error if none match.
|
Hi @CatherineSue, this PR has merge conflicts that must be resolved before it can be merged. Please rebase your branch: git fetch origin main
git rebase origin/main
# resolve any conflicts, then:
git push --force-with-lease |
|
This pull request has been automatically marked as stale because it has not had any activity within 14 days. It will be automatically closed if no further activity occurs within 16 days. Leave a comment if you feel this pull request should remain open. Thank you! |
Description
Part of the per-worker resilience refactor series.
Problem
All routers share a single
reqwest::ClientfromAppContext.client. This means all workers share the same connection pool — one slow worker's connections affect others.Solution
Routers now use
worker.http_client()for request routing. Each worker has its own isolated connection pool (configured viaHttpPoolConfigat registration time).The shared client is removed from
WorkerSelector(model refresh now usesworker.http_client()too) and from GeminiSharedComponents. HTTP and HTTP PD routers still haveself.clientfield but it's unused for request routing.Changes (one commit per router)
WorkerSelector,refresh_worker_modelsusesworker.http_client()chat.rs,non_streaming.rs,streaming.rsuseworker.http_client()instead ofctx.components.client()proxy_get_request,route_simple_request,send_typed_requestuseworker.http_client(). Changedselect_first_workerto returnArc<dyn Worker>instead of String.non_stream_execution.rsusesworker.http_client(). Removedclientfield from GeminiSharedComponents.proxy_to_worker,build_post_with_headers,health_generateuse per-worker clients. Changedproxy_to_workerto take&dyn Workerinstead of URL string.Test Plan
cargo test -p smg --lib— all 450 tests passChecklist
cargo +nightly fmtpassescargo clippy --all-targets --all-features -- -D warningspassesSummary by CodeRabbit
Summary by CodeRabbit