Skip to content

feat: support engine metrics from PROMETHEUS_MULTIPROC_DIR in grpc mode#1038

Open
ConnorLi96 wants to merge 3 commits intomainfrom
connorli/grpc-prometheus-metrics
Open

feat: support engine metrics from PROMETHEUS_MULTIPROC_DIR in grpc mode#1038
ConnorLi96 wants to merge 3 commits intomainfrom
connorli/grpc-prometheus-metrics

Conversation

@ConnorLi96
Copy link
Copy Markdown
Collaborator

@ConnorLi96 ConnorLi96 commented Apr 4, 2026

Description

Problem

When TensorRT-LLM (or any backend) runs in gRPC mode, the SMG router cannot collect engine metrics via /engine_metrics. HTTP-mode workers expose a /metrics endpoint that the router scrapes directly, but gRPC workers have no HTTP endpoint. The router returns 500 All backend requests failed.

Solution

Use the prometheus_client multiprocess pattern (same approach sglang uses):

  1. Python orchestrator (serve.py): Create a temporary PROMETHEUS_MULTIPROC_DIR before launching workers, so worker processes inherit it and write .db files there.
  2. Rust router (worker_manager.rs): For gRPC workers, read PROMETHEUS_MULTIPROC_DIR and spawn a python3 subprocess that aggregates the .db files via MultiProcessCollector and returns standard prometheus text format.

Changes

  • bindings/python/src/smg/serve.py: Set PROMETHEUS_MULTIPROC_DIR env var in _launch_workers() when connection_mode == "grpc". Clean up the temp directory on shutdown.
  • model_gateway/src/core/worker_manager.rs: Add collect_prometheus_multiproc_metrics() for gRPC workers. Split get_engine_metrics() into HTTP (fan-out to /metrics) and gRPC (read from PROMETHEUS_MULTIPROC_DIR) paths.

Test Plan

Tested end-to-end with TensorRT-LLM (Qwen3-0.6B, pytorch backend) in gRPC mode:

# Start SMG + trtllm gRPC worker
smg serve --model Qwen/Qwen3-0.6B --backend trtllm --connection-mode grpc --port 8992

# Send requests
curl http://localhost:8992/v1/chat/completions \
  -X POST -H "Content-Type: application/json" \
  -d '{"model":"Qwen3-0.6B","messages":[{"role":"user","content":"hi"}],"max_tokens":5}'

# Before this PR: 500
# After this PR:  200 with prometheus metrics
curl http://localhost:8992/engine_metrics
# trtllm_request_success_total{engine_type="grpc",finished_reason="length",...} 3.0

<!-- This is an auto-generated comment: release notes by coderabbit.ai -->
## Summary by CodeRabbit

* **New Features**
  * Added Prometheus metrics support for gRPC workers so gRPC-based processes are included in monitoring.

* **Improvements**
  * Consolidated metrics aggregation across worker types for more complete reporting.
  * Added safer cleanup and error handling around temporary metrics data to reduce resource leaks and surface warnings on failures.
<!-- end of auto-generated comment: release notes by coderabbit.ai -->

Signed-off-by: Scott Lee <scott@together.ai>
Signed-off-by: Scott Lee <scott@together.ai>
@github-actions github-actions bot added python-bindings Python bindings changes model-gateway Model gateway crate changes labels Apr 4, 2026
@coderabbitai
Copy link
Copy Markdown

coderabbitai bot commented Apr 4, 2026

📝 Walkthrough

Walkthrough

Added gRPC-only Prometheus multiprocess metric handling: Python orchestrator creates and cleans a temporary PROMETHEUS_MULTIPROC_DIR for workers; Rust WorkerManager aggregates gRPC metrics by invoking a Python subprocess to read multiprocess metrics and merges results with HTTP worker metrics.

Changes

Cohort / File(s) Summary
Prometheus Multiprocess Setup (Python)
bindings/python/src/smg/serve.py
Create a temporary Prometheus multiprocess directory and set PROMETHEUS_MULTIPROC_DIR in _launch_workers(); add cleanup logic and error-handling to remove the directory during _cleanup_workers().
gRPC Metrics Collection (Rust)
model_gateway/src/core/worker_manager.rs
Add collect_prometheus_multiproc_metrics() to run python3 and aggregate multiprocess metrics; refactor get_engine_metrics() to exclude gRPC from HTTP fan-out, invoke the new helper once for gRPC workers, log warnings on failures, and include aggregated metrics as a single MetricPack when available.

Sequence Diagram(s)

sequenceDiagram
    participant Orch as Python Orchestrator
    participant FS as Filesystem (PROMETHEUS_MULTIPROC_DIR)
    participant WorkerMgr as Rust WorkerManager
    participant PySub as Python subprocess (prometheus_client)
    participant HTTP as HTTP Workers
    participant GRPC as gRPC Workers

    Orch->>FS: mkdtemp() and set PROMETHEUS_MULTIPROC_DIR
    Orch->>GRPC: launch workers (env points to FS)
    Orch->>HTTP: launch HTTP workers

    WorkerMgr->>HTTP: fan_out GET /metrics (per-worker)
    HTTP-->>WorkerMgr: per-worker metrics -> MetricPacks

    alt gRPC workers exist
        WorkerMgr->>PySub: spawn python3 to run MultiProcessCollector (reads PROMETHEUS_MULTIPROC_DIR)
        PySub->>FS: read metrics files
        FS-->>PySub: metrics contents
        PySub-->>WorkerMgr: aggregated metrics text
        WorkerMgr->>WorkerMgr: add aggregated MetricPack (empty labels)
    end

    Orch->>FS: _cleanup_prometheus_dir() -> rmtree
    Orch-->>FS: PROMETHEUS_MULTIPROC_DIR cleared
Loading

Estimated code review effort

🎯 3 (Moderate) | ⏱️ ~22 minutes

Poem

🐰 I dug a temp dir, soft and neat,
Where metrics gather, small and fleet.
Python hums and Rust will ask,
Together tidy up the task.
Hooray—prometheus carrots to eat! 🥕

🚥 Pre-merge checks | ✅ 2 | ❌ 1

❌ Failed checks (1 warning)

Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 42.86% which is insufficient. The required threshold is 80.00%. Write docstrings for the functions missing them to satisfy the coverage threshold.
✅ Passed checks (2 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.
Title check ✅ Passed The title accurately describes the main change: adding support for collecting Prometheus engine metrics from gRPC workers using PROMETHEUS_MULTIPROC_DIR, which is the core purpose of this PR.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing Touches
📝 Generate docstrings
  • Create stacked PR
  • Commit on current branch
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Commit unit tests in branch connorli/grpc-prometheus-metrics

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

@ConnorLi96 ConnorLi96 changed the title Connorli/grpc prometheus metrics feat: support engine metrics from PROMETHEUS_MULTIPROC_DIR in grpc mode Apr 4, 2026
Copy link
Copy Markdown

@gemini-code-assist gemini-code-assist bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request implements gRPC worker metrics collection by utilizing a temporary directory for Prometheus multiprocess data and a Python-based aggregation subprocess. The changes include setting up the environment in the Python orchestrator and updating the Rust worker manager to collect these metrics. Feedback suggests passing the specific Python executable path via an environment variable to ensure the metrics collector runs in the correct environment, rather than relying on a hardcoded 'python3' command.

Comment on lines +578 to +584
if getattr(self.args, "connection_mode", "grpc") == "grpc":
self._prometheus_dir = tempfile.mkdtemp(prefix="smg_prometheus_")
os.environ["PROMETHEUS_MULTIPROC_DIR"] = self._prometheus_dir
logger.info(
"Set PROMETHEUS_MULTIPROC_DIR=%s for gRPC metrics collection",
self._prometheus_dir,
)
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

To ensure the metrics collector uses the same Python environment as the orchestrator (which is critical when running in a virtual environment), consider passing sys.executable to the router via an environment variable.

Suggested change
if getattr(self.args, "connection_mode", "grpc") == "grpc":
self._prometheus_dir = tempfile.mkdtemp(prefix="smg_prometheus_")
os.environ["PROMETHEUS_MULTIPROC_DIR"] = self._prometheus_dir
logger.info(
"Set PROMETHEUS_MULTIPROC_DIR=%s for gRPC metrics collection",
self._prometheus_dir,
)
if getattr(self.args, "connection_mode", "grpc") == "grpc":
self._prometheus_dir = tempfile.mkdtemp(prefix="smg_prometheus_")
os.environ["PROMETHEUS_MULTIPROC_DIR"] = self._prometheus_dir
os.environ["SMG_PYTHON_EXECUTABLE"] = sys.executable
logger.info(
"Set PROMETHEUS_MULTIPROC_DIR=%s for gRPC metrics collection",
self._prometheus_dir,
)

"PROMETHEUS_MULTIPROC_DIR not set; cannot collect metrics from gRPC workers".to_string()
})?;

let output = tokio::process::Command::new("python3")
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

Instead of hardcoding python3, use the Python executable path provided by the orchestrator if available. This ensures that the metrics collection subprocess runs in the correct environment (e.g., within a virtualenv).

Suggested change
let output = tokio::process::Command::new("python3")
let python_exe = std::env::var("SMG_PYTHON_EXECUTABLE").unwrap_or_else(|_| "python3".to_string());
let output = tokio::process::Command::new(python_exe)

Comment on lines +94 to +107
let output = tokio::process::Command::new("python3")
.args([
"-c",
"import sys\n\
from prometheus_client import CollectorRegistry, generate_latest\n\
from prometheus_client.multiprocess import MultiProcessCollector\n\
registry = CollectorRegistry()\n\
MultiProcessCollector(registry)\n\
sys.stdout.buffer.write(generate_latest(registry))\n",
])
.env("PROMETHEUS_MULTIPROC_DIR", &dir)
.output()
.await
.map_err(|e| format!("failed to run python3 prometheus collector: {e}"))?;
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🔴 Important: This subprocess call has no timeout. The HTTP fan-out path uses REQUEST_TIMEOUT (5s), but the python3 subprocess can hang indefinitely (e.g., corrupted .db file in the multiproc dir, or python3 not on PATH causing a slow lookup). Since /metrics is typically polled by Prometheus every 15–30s, a hung subprocess will accumulate blocked tasks.

Consider wrapping with tokio::time::timeout:

Suggested change
let output = tokio::process::Command::new("python3")
.args([
"-c",
"import sys\n\
from prometheus_client import CollectorRegistry, generate_latest\n\
from prometheus_client.multiprocess import MultiProcessCollector\n\
registry = CollectorRegistry()\n\
MultiProcessCollector(registry)\n\
sys.stdout.buffer.write(generate_latest(registry))\n",
])
.env("PROMETHEUS_MULTIPROC_DIR", &dir)
.output()
.await
.map_err(|e| format!("failed to run python3 prometheus collector: {e}"))?;
let output = tokio::time::timeout(
REQUEST_TIMEOUT,
tokio::process::Command::new("python3")
.args([
"-c",
"import sys\n\
from prometheus_client import CollectorRegistry, generate_latest\n\
from prometheus_client.multiprocess import MultiProcessCollector\n\
registry = CollectorRegistry()\n\
MultiProcessCollector(registry)\n\
sys.stdout.buffer.write(generate_latest(registry))\n",
])
.env("PROMETHEUS_MULTIPROC_DIR", &dir)
.output(),
)
.await
.map_err(|_| "python3 prometheus collector timed out".to_string())?
.map_err(|e| format!("failed to run python3 prometheus collector: {e}"))?;

ports = _find_available_ports(self.args.worker_base_port, self.args.data_parallel_size)
host = self.args.worker_host

if getattr(self.args, "connection_mode", "grpc") == "grpc":
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🟡 Nit: getattr(self.args, "connection_mode", "grpc") defaults to "grpc" when the attribute is absent. If an older or HTTP-only configuration doesn't set connection_mode at all, this will unnecessarily create a temp directory and set PROMETHEUS_MULTIPROC_DIR in the process environment — which could interfere with any other prometheus_client usage in the same process.

Consider defaulting to "http" (the safer no-op path), or checking for the attribute explicitly:

if getattr(self.args, "connection_mode", None) == "grpc":

Copy link
Copy Markdown

@chatgpt-codex-connector chatgpt-codex-connector bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Codex Review

Here are some automated review suggestions for this pull request.

Reviewed commit: 65b14bb137

ℹ️ About Codex in GitHub

Codex has been enabled to automatically review pull requests in this repo. Reviews are triggered when you

  • Open a pull request for review
  • Mark a draft as ready
  • Comment "@codex review".

If Codex has suggestions, it will comment; otherwise it will react with 👍.

When you sign up for Codex through ChatGPT, Codex can also answer questions or update the PR, like "@codex address that feedback".

except (ProcessLookupError, OSError):
pass

self._cleanup_prometheus_dir()
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Badge Run Prometheus dir cleanup even when worker list is empty

In gRPC mode _launch_workers() creates self._prometheus_dir before launching subprocesses, but _cleanup_workers() returns early when self.workers is empty. If startup fails before the first worker is appended (or data_parallel_size is 0), the new cleanup path is skipped and the temp multiprocess directory is leaked for the rest of the process. Ensure _cleanup_prometheus_dir() also runs on the empty-worker path.

Useful? React with 👍 / 👎.

Copy link
Copy Markdown

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (1)
bindings/python/src/smg/serve.py (1)

632-671: ⚠️ Potential issue | 🟡 Minor

Run Prometheus-dir cleanup even when worker launch fails before the first append.

Line 634 returns before the new cleanup path. If _launch_workers() creates self._prometheus_dir and the first launcher.launch() raises, the temp directory is leaked under /tmp and PROMETHEUS_MULTIPROC_DIR stays stale for the rest of the process.

🛠 Minimal fix
     def _cleanup_workers(self) -> None:
         """SIGTERM all worker process groups, wait, then SIGKILL stragglers."""
         if not self.workers:
+            self._cleanup_prometheus_dir()
             return
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@bindings/python/src/smg/serve.py` around lines 632 - 671, _cleanup_workers
currently returns early when self.workers is empty, which skips cleaning up any
temporary Prometheus directory created earlier; update _cleanup_workers so it
always calls self._cleanup_prometheus_dir() before returning (i.e., move or add
a call to _cleanup_prometheus_dir after the initial empty-check return path) to
ensure self._prometheus_dir is removed even if no workers were launched
(addresses directories created by _launch_workers and failures from
launcher.launch()).
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@model_gateway/src/core/worker_manager.rs`:
- Around line 323-338: The MetricPack construction uses the wrong field names
and types; change places that push MetricPack (e.g., where
metric_packs.push(MetricPack { labels: vec![("worker_addr".into(), resp.url)],
metrics_text: text, }) and the block after
collect_prometheus_multiproc_metrics()) to match the real struct signature
MetricPack { labels: HashMap<String,String>, text: String }: build a HashMap for
labels (insert ("worker_addr".to_string(), resp.url) when present) and use the
field name text: text.into() (or text) for the metric body, and update any other
instantiations to use these exact fields and types.
- Around line 94-107: The subprocess is hardcoded with Command::new("python3");
change it to read the SMG_PYTHON_EXECUTABLE environment variable (falling back
to "python3") and use that value in Command::new(...) so the collector runs with
the same interpreter as serve.py; update the call that builds the process (the
block constructing tokio::process::Command::new("python3") and
.args(...).env(...).output().await) to first fetch
std::env::var("SMG_PYTHON_EXECUTABLE").unwrap_or_else(|_| "python3".into()) and
pass that string to Command::new instead of the literal "python3".

---

Outside diff comments:
In `@bindings/python/src/smg/serve.py`:
- Around line 632-671: _cleanup_workers currently returns early when
self.workers is empty, which skips cleaning up any temporary Prometheus
directory created earlier; update _cleanup_workers so it always calls
self._cleanup_prometheus_dir() before returning (i.e., move or add a call to
_cleanup_prometheus_dir after the initial empty-check return path) to ensure
self._prometheus_dir is removed even if no workers were launched (addresses
directories created by _launch_workers and failures from launcher.launch()).
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: ASSERTIVE

Plan: Pro

Run ID: 170e1229-4cee-46b9-9233-4ed27663bea1

📥 Commits

Reviewing files that changed from the base of the PR and between 2da7233 and 65b14bb.

📒 Files selected for processing (2)
  • bindings/python/src/smg/serve.py
  • model_gateway/src/core/worker_manager.rs

Comment on lines +94 to +107
let output = tokio::process::Command::new("python3")
.args([
"-c",
"import sys\n\
from prometheus_client import CollectorRegistry, generate_latest\n\
from prometheus_client.multiprocess import MultiProcessCollector\n\
registry = CollectorRegistry()\n\
MultiProcessCollector(registry)\n\
sys.stdout.buffer.write(generate_latest(registry))\n",
])
.env("PROMETHEUS_MULTIPROC_DIR", &dir)
.output()
.await
.map_err(|e| format!("failed to run python3 prometheus collector: {e}"))?;
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

🧩 Analysis chain

🏁 Script executed:

#!/bin/bash
set -euo pipefail

rg -n -C2 'python3|sys\.executable|SMG_PYTHON_EXECUTABLE|PROMETHEUS_MULTIPROC_DIR' \
  bindings/python/src/smg model_gateway/src

Repository: lightseekorg/smg

Length of output: 5084


🏁 Script executed:

#!/bin/bash
set -euo pipefail

# Get more context around serve.py to see how the router is started
cat -n bindings/python/src/smg/serve.py | head -100

Repository: lightseekorg/smg

Length of output: 4451


🏁 Script executed:

#!/bin/bash
set -euo pipefail

# Search for where the router/model_gateway is actually invoked in serve.py
rg -n 'subprocess|Popen|run\(|spawn|model_gateway' bindings/python/src/smg/serve.py | head -30

Repository: lightseekorg/smg

Length of output: 370


🏁 Script executed:

#!/bin/bash
set -euo pipefail

# Check if serve.py exports SMG_PYTHON_EXECUTABLE anywhere (not just in docstrings)
rg -n 'SMG_PYTHON_EXECUTABLE' bindings/python/src/smg/serve.py

Repository: lightseekorg/smg

Length of output: 42


🏁 Script executed:

#!/bin/bash
set -euo pipefail

# Find and examine launch_router to see how the router process is spawned
fd -t f "launch_router.py" bindings/python/src/smg/

Repository: lightseekorg/smg

Length of output: 100


🏁 Script executed:

#!/bin/bash
set -euo pipefail

# Check the launch_router implementation - see how the router subprocess is created
rg -n -A 20 'def launch_router' bindings/python/src/smg/

Repository: lightseekorg/smg

Length of output: 1665


🏁 Script executed:

#!/bin/bash
set -euo pipefail

# Get the full launch_router function implementation
cat -n bindings/python/src/smg/launch_router.py

Repository: lightseekorg/smg

Length of output: 4526


Don't hardcode python3 for the collector subprocess.

Line 94 can bypass the interpreter that launched smg serve. In venv/conda installs, python3 may resolve to a different environment or be missing entirely, so /engine_metrics fails even though the workers started successfully. The subprocess needs the same Python interpreter that serve.py runs under.

Export the interpreter from serve.py via an environment variable and read it in Rust, falling back to python3 if absent:

🛠 Suggested direction
-    let output = tokio::process::Command::new("python3")
+    let python = std::env::var("SMG_PYTHON_EXECUTABLE")
+        .unwrap_or_else(|_| "python3".to_string());
+    let output = tokio::process::Command::new(&python)

Add this to bindings/python/src/smg/serve.py before launching the router:

os.environ["SMG_PYTHON_EXECUTABLE"] = sys.executable

This mirrors the pattern already used for worker launchers, which correctly use sys.executable instead of hardcoding python3.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@model_gateway/src/core/worker_manager.rs` around lines 94 - 107, The
subprocess is hardcoded with Command::new("python3"); change it to read the
SMG_PYTHON_EXECUTABLE environment variable (falling back to "python3") and use
that value in Command::new(...) so the collector runs with the same interpreter
as serve.py; update the call that builds the process (the block constructing
tokio::process::Command::new("python3") and .args(...).env(...).output().await)
to first fetch std::env::var("SMG_PYTHON_EXECUTABLE").unwrap_or_else(|_|
"python3".into()) and pass that string to Command::new instead of the literal
"python3".

Comment on lines +323 to +338
metric_packs.push(MetricPack {
labels: vec![("worker_addr".into(), resp.url)],
metrics_text: text,
});
}
}
}
}
}

if has_grpc {
match collect_prometheus_multiproc_metrics().await {
Ok(text) => {
metric_packs.push(MetricPack {
labels: vec![],
metrics_text: text,
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

Build MetricPack with its actual fields and types.

These literals do not match model_gateway/src/core/metrics_aggregator.rs: MetricPack is { labels: HashMap<String, String>, text: String }. As written, Lines 323-338 will not compile.

🛠 Proposed fix
-                            metric_packs.push(MetricPack {
-                                labels: vec![("worker_addr".into(), resp.url)],
-                                metrics_text: text,
-                            });
+                            metric_packs.push(MetricPack::new(
+                                HashMap::from([("worker_addr".to_string(), resp.url)]),
+                                text,
+                            ));
...
-                    metric_packs.push(MetricPack {
-                        labels: vec![],
-                        metrics_text: text,
-                    });
+                    metric_packs.push(MetricPack::new(
+                        HashMap::<String, String>::new(),
+                        text,
+                    ));
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
metric_packs.push(MetricPack {
labels: vec![("worker_addr".into(), resp.url)],
metrics_text: text,
});
}
}
}
}
}
if has_grpc {
match collect_prometheus_multiproc_metrics().await {
Ok(text) => {
metric_packs.push(MetricPack {
labels: vec![],
metrics_text: text,
metric_packs.push(MetricPack {
labels: HashMap::from([("worker_addr".to_string(), resp.url)]),
text: text,
});
}
}
}
}
}
if has_grpc {
match collect_prometheus_multiproc_metrics().await {
Ok(text) => {
metric_packs.push(MetricPack {
labels: HashMap::new(),
text: text,
});
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@model_gateway/src/core/worker_manager.rs` around lines 323 - 338, The
MetricPack construction uses the wrong field names and types; change places that
push MetricPack (e.g., where metric_packs.push(MetricPack { labels:
vec![("worker_addr".into(), resp.url)], metrics_text: text, }) and the block
after collect_prometheus_multiproc_metrics()) to match the real struct signature
MetricPack { labels: HashMap<String,String>, text: String }: build a HashMap for
labels (insert ("worker_addr".to_string(), resp.url) when present) and use the
field name text: text.into() (or text) for the metric body, and update any other
instantiations to use these exact fields and types.

When the gRPC worker hasn't written any .db files yet (startup phase or
no requests processed), generate_latest() returns empty bytes.  Passing
this empty string to parse_prometheus() triggers a parse error WARN log
on every scrape interval.

Add guards at both the collector function (return Err for empty output)
and the call site (skip empty text silently) to prevent log spam.

Signed-off-by: ConnorLi96 <ConnorLi96@users.noreply.github.com>
Made-with: Cursor

if has_grpc {
match collect_prometheus_multiproc_metrics().await {
Ok(text) if !text.trim().is_empty() => {
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🟡 Nit: This if !text.trim().is_empty() guard is now dead code. collect_prometheus_multiproc_metrics() (line 116-118 above) already returns Err("no metrics available from gRPC workers yet") when the text is empty, so Ok(text) can never contain an empty string. The Ok(_) arm on line 345 is therefore unreachable.

Consider simplifying to just Ok(text) => { ... } without the guard, or removing the Ok(_) arm entirely.

Copy link
Copy Markdown

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

♻️ Duplicate comments (2)
model_gateway/src/core/worker_manager.rs (2)

327-342: ⚠️ Potential issue | 🔴 Critical

Build MetricPack with its actual fields and types.

This issue was already flagged in a previous review: The MetricPack struct uses { labels: HashMap<String, String>, text: String }, but lines 327-330 and 340-343 use labels: vec![...] (wrong type) and metrics_text: ... (wrong field name). This code will not compile.

The correct construction is:

MetricPack {
    labels: HashMap::from([("worker_addr".to_string(), resp.url)]),
    text: text,
}

and for the gRPC case (no worker_addr label):

MetricPack {
    labels: HashMap::new(),
    text: text,
}
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@model_gateway/src/core/worker_manager.rs` around lines 327 - 342, The
MetricPack constructions use incorrect field names and types; replace the
vec![("worker_addr"...)] and metrics_text usages with the struct's actual
fields: set labels to a HashMap<String,String> and the metrics string to text.
For the worker case in the loop that references resp.url, build MetricPack with
labels = HashMap::from([("worker_addr".to_string(), resp.url)]) and text = text;
for the gRPC case use labels = HashMap::new() and text = text; ensure these
changes are applied where MetricPack is created (see MetricPack, resp.url,
collect_prometheus_multiproc_metrics, has_grpc).

94-107: ⚠️ Potential issue | 🔴 Critical

Don't hardcode python3 for the collector subprocess.

This issue was already flagged in a previous review: Line 94 bypasses the interpreter that launched smg serve. In venv/conda installs, python3 may resolve to a different environment or be missing entirely.

The subprocess needs the same Python interpreter that serve.py runs under. Export the interpreter from serve.py via an environment variable (e.g., SMG_PYTHON_EXECUTABLE = sys.executable) and read it in Rust with a fallback to python3.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@model_gateway/src/core/worker_manager.rs` around lines 94 - 107, Replace the
hardcoded Command::new("python3") call with a program path read from the
SMG_PYTHON_EXECUTABLE environment variable (fallback to "python3" if not set) so
the subprocess uses the same interpreter as serve.py; e.g., obtain the
executable via std::env::var("SMG_PYTHON_EXECUTABLE").unwrap_or_else(|_|
"python3".to_string()) and pass that string into
tokio::process::Command::new(...) before keeping the existing .args([...]),
.env("PROMETHEUS_MULTIPROC_DIR", &dir) and .output().await.map_err(...) logic.
Ensure the env var name is exactly "SMG_PYTHON_EXECUTABLE" so it matches the
exporter from serve.py.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Duplicate comments:
In `@model_gateway/src/core/worker_manager.rs`:
- Around line 327-342: The MetricPack constructions use incorrect field names
and types; replace the vec![("worker_addr"...)] and metrics_text usages with the
struct's actual fields: set labels to a HashMap<String,String> and the metrics
string to text. For the worker case in the loop that references resp.url, build
MetricPack with labels = HashMap::from([("worker_addr".to_string(), resp.url)])
and text = text; for the gRPC case use labels = HashMap::new() and text = text;
ensure these changes are applied where MetricPack is created (see MetricPack,
resp.url, collect_prometheus_multiproc_metrics, has_grpc).
- Around line 94-107: Replace the hardcoded Command::new("python3") call with a
program path read from the SMG_PYTHON_EXECUTABLE environment variable (fallback
to "python3" if not set) so the subprocess uses the same interpreter as
serve.py; e.g., obtain the executable via
std::env::var("SMG_PYTHON_EXECUTABLE").unwrap_or_else(|_| "python3".to_string())
and pass that string into tokio::process::Command::new(...) before keeping the
existing .args([...]), .env("PROMETHEUS_MULTIPROC_DIR", &dir) and
.output().await.map_err(...) logic. Ensure the env var name is exactly
"SMG_PYTHON_EXECUTABLE" so it matches the exporter from serve.py.

ℹ️ Review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: ASSERTIVE

Plan: Pro

Run ID: 83b4467c-7d25-4d13-9b91-4c6d7c7533db

📥 Commits

Reviewing files that changed from the base of the PR and between 65b14bb and f9629ca.

📒 Files selected for processing (1)
  • model_gateway/src/core/worker_manager.rs

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

model-gateway Model gateway crate changes python-bindings Python bindings changes

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants