Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 29 additions & 0 deletions bindings/python/src/smg/serve.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,9 @@
import logging
import os
import random
import shutil
import signal
import tempfile
import socket
import subprocess
import sys
Expand Down Expand Up @@ -550,6 +552,7 @@ def __init__(self, backend: str, args: argparse.Namespace, backend_args: list[st
self.launcher: WorkerLauncher = BACKEND_LAUNCHERS[backend]()
self.workers: list[tuple[subprocess.Popen, int]] = []
self._shutting_down = False
self._prometheus_dir: str | None = None

# -- public API ---------------------------------------------------------

Expand All @@ -571,6 +574,15 @@ def run(self) -> None:
def _launch_workers(self) -> None:
ports = _find_available_ports(self.args.worker_base_port, self.args.data_parallel_size)
host = self.args.worker_host

if getattr(self.args, "connection_mode", "grpc") == "grpc":
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🟡 Nit: getattr(self.args, "connection_mode", "grpc") defaults to "grpc" when the attribute is absent. If an older or HTTP-only configuration doesn't set connection_mode at all, this will unnecessarily create a temp directory and set PROMETHEUS_MULTIPROC_DIR in the process environment — which could interfere with any other prometheus_client usage in the same process.

Consider defaulting to "http" (the safer no-op path), or checking for the attribute explicitly:

if getattr(self.args, "connection_mode", None) == "grpc":

self._prometheus_dir = tempfile.mkdtemp(prefix="smg_prometheus_")
os.environ["PROMETHEUS_MULTIPROC_DIR"] = self._prometheus_dir
logger.info(
"Set PROMETHEUS_MULTIPROC_DIR=%s for gRPC metrics collection",
self._prometheus_dir,
)
Comment on lines +578 to +584
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

To ensure the metrics collector uses the same Python environment as the orchestrator (which is critical when running in a virtual environment), consider passing sys.executable to the router via an environment variable.

Suggested change
if getattr(self.args, "connection_mode", "grpc") == "grpc":
self._prometheus_dir = tempfile.mkdtemp(prefix="smg_prometheus_")
os.environ["PROMETHEUS_MULTIPROC_DIR"] = self._prometheus_dir
logger.info(
"Set PROMETHEUS_MULTIPROC_DIR=%s for gRPC metrics collection",
self._prometheus_dir,
)
if getattr(self.args, "connection_mode", "grpc") == "grpc":
self._prometheus_dir = tempfile.mkdtemp(prefix="smg_prometheus_")
os.environ["PROMETHEUS_MULTIPROC_DIR"] = self._prometheus_dir
os.environ["SMG_PYTHON_EXECUTABLE"] = sys.executable
logger.info(
"Set PROMETHEUS_MULTIPROC_DIR=%s for gRPC metrics collection",
self._prometheus_dir,
)


for dp_rank, port in enumerate(ports):
env = self.launcher.gpu_env(self.args, dp_rank)
proc = self.launcher.launch(self.args, self.backend_args, host, port, env)
Expand Down Expand Up @@ -641,6 +653,23 @@ def _cleanup_workers(self) -> None:
except (ProcessLookupError, OSError):
pass

self._cleanup_prometheus_dir()
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Badge Run Prometheus dir cleanup even when worker list is empty

In gRPC mode _launch_workers() creates self._prometheus_dir before launching subprocesses, but _cleanup_workers() returns early when self.workers is empty. If startup fails before the first worker is appended (or data_parallel_size is 0), the new cleanup path is skipped and the temp multiprocess directory is leaked for the rest of the process. Ensure _cleanup_prometheus_dir() also runs on the empty-worker path.

Useful? React with 👍 / 👎.


def _cleanup_prometheus_dir(self) -> None:
"""Remove the temporary prometheus multiprocess directory and its .db files."""
if self._prometheus_dir is None:
return
try:
shutil.rmtree(self._prometheus_dir)
logger.info("Cleaned up PROMETHEUS_MULTIPROC_DIR=%s", self._prometheus_dir)
except OSError as e:
logger.warning(
"Failed to clean up PROMETHEUS_MULTIPROC_DIR=%s: %s",
self._prometheus_dir,
e,
)
self._prometheus_dir = None


# ---------------------------------------------------------------------------
# Entry point
Expand Down
86 changes: 75 additions & 11 deletions model_gateway/src/core/worker_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ use tokio::{
sync::{watch, Mutex},
task::JoinHandle,
};
use tracing::{debug, info};
use tracing::{debug, info, warn};

use crate::{
core::{
Expand Down Expand Up @@ -85,6 +85,40 @@ impl IntoResponse for EngineMetricsResult {
}
}

/// Collect gRPC worker metrics by aggregating `PROMETHEUS_MULTIPROC_DIR` via a python3 subprocess.
async fn collect_prometheus_multiproc_metrics() -> Result<String, String> {
let dir = std::env::var("PROMETHEUS_MULTIPROC_DIR").map_err(|_| {
"PROMETHEUS_MULTIPROC_DIR not set; cannot collect metrics from gRPC workers".to_string()
})?;

let output = tokio::process::Command::new("python3")
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

Instead of hardcoding python3, use the Python executable path provided by the orchestrator if available. This ensures that the metrics collection subprocess runs in the correct environment (e.g., within a virtualenv).

Suggested change
let output = tokio::process::Command::new("python3")
let python_exe = std::env::var("SMG_PYTHON_EXECUTABLE").unwrap_or_else(|_| "python3".to_string());
let output = tokio::process::Command::new(python_exe)

.args([
"-c",
"import sys\n\
from prometheus_client import CollectorRegistry, generate_latest\n\
from prometheus_client.multiprocess import MultiProcessCollector\n\
registry = CollectorRegistry()\n\
MultiProcessCollector(registry)\n\
sys.stdout.buffer.write(generate_latest(registry))\n",
])
.env("PROMETHEUS_MULTIPROC_DIR", &dir)
.output()
.await
.map_err(|e| format!("failed to run python3 prometheus collector: {e}"))?;
Comment on lines +94 to +107
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🔴 Important: This subprocess call has no timeout. The HTTP fan-out path uses REQUEST_TIMEOUT (5s), but the python3 subprocess can hang indefinitely (e.g., corrupted .db file in the multiproc dir, or python3 not on PATH causing a slow lookup). Since /metrics is typically polled by Prometheus every 15–30s, a hung subprocess will accumulate blocked tasks.

Consider wrapping with tokio::time::timeout:

Suggested change
let output = tokio::process::Command::new("python3")
.args([
"-c",
"import sys\n\
from prometheus_client import CollectorRegistry, generate_latest\n\
from prometheus_client.multiprocess import MultiProcessCollector\n\
registry = CollectorRegistry()\n\
MultiProcessCollector(registry)\n\
sys.stdout.buffer.write(generate_latest(registry))\n",
])
.env("PROMETHEUS_MULTIPROC_DIR", &dir)
.output()
.await
.map_err(|e| format!("failed to run python3 prometheus collector: {e}"))?;
let output = tokio::time::timeout(
REQUEST_TIMEOUT,
tokio::process::Command::new("python3")
.args([
"-c",
"import sys\n\
from prometheus_client import CollectorRegistry, generate_latest\n\
from prometheus_client.multiprocess import MultiProcessCollector\n\
registry = CollectorRegistry()\n\
MultiProcessCollector(registry)\n\
sys.stdout.buffer.write(generate_latest(registry))\n",
])
.env("PROMETHEUS_MULTIPROC_DIR", &dir)
.output(),
)
.await
.map_err(|_| "python3 prometheus collector timed out".to_string())?
.map_err(|e| format!("failed to run python3 prometheus collector: {e}"))?;

Comment on lines +94 to +107
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

🧩 Analysis chain

🏁 Script executed:

#!/bin/bash
set -euo pipefail

rg -n -C2 'python3|sys\.executable|SMG_PYTHON_EXECUTABLE|PROMETHEUS_MULTIPROC_DIR' \
  bindings/python/src/smg model_gateway/src

Repository: lightseekorg/smg

Length of output: 5084


🏁 Script executed:

#!/bin/bash
set -euo pipefail

# Get more context around serve.py to see how the router is started
cat -n bindings/python/src/smg/serve.py | head -100

Repository: lightseekorg/smg

Length of output: 4451


🏁 Script executed:

#!/bin/bash
set -euo pipefail

# Search for where the router/model_gateway is actually invoked in serve.py
rg -n 'subprocess|Popen|run\(|spawn|model_gateway' bindings/python/src/smg/serve.py | head -30

Repository: lightseekorg/smg

Length of output: 370


🏁 Script executed:

#!/bin/bash
set -euo pipefail

# Check if serve.py exports SMG_PYTHON_EXECUTABLE anywhere (not just in docstrings)
rg -n 'SMG_PYTHON_EXECUTABLE' bindings/python/src/smg/serve.py

Repository: lightseekorg/smg

Length of output: 42


🏁 Script executed:

#!/bin/bash
set -euo pipefail

# Find and examine launch_router to see how the router process is spawned
fd -t f "launch_router.py" bindings/python/src/smg/

Repository: lightseekorg/smg

Length of output: 100


🏁 Script executed:

#!/bin/bash
set -euo pipefail

# Check the launch_router implementation - see how the router subprocess is created
rg -n -A 20 'def launch_router' bindings/python/src/smg/

Repository: lightseekorg/smg

Length of output: 1665


🏁 Script executed:

#!/bin/bash
set -euo pipefail

# Get the full launch_router function implementation
cat -n bindings/python/src/smg/launch_router.py

Repository: lightseekorg/smg

Length of output: 4526


Don't hardcode python3 for the collector subprocess.

Line 94 can bypass the interpreter that launched smg serve. In venv/conda installs, python3 may resolve to a different environment or be missing entirely, so /engine_metrics fails even though the workers started successfully. The subprocess needs the same Python interpreter that serve.py runs under.

Export the interpreter from serve.py via an environment variable and read it in Rust, falling back to python3 if absent:

🛠 Suggested direction
-    let output = tokio::process::Command::new("python3")
+    let python = std::env::var("SMG_PYTHON_EXECUTABLE")
+        .unwrap_or_else(|_| "python3".to_string());
+    let output = tokio::process::Command::new(&python)

Add this to bindings/python/src/smg/serve.py before launching the router:

os.environ["SMG_PYTHON_EXECUTABLE"] = sys.executable

This mirrors the pattern already used for worker launchers, which correctly use sys.executable instead of hardcoding python3.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@model_gateway/src/core/worker_manager.rs` around lines 94 - 107, The
subprocess is hardcoded with Command::new("python3"); change it to read the
SMG_PYTHON_EXECUTABLE environment variable (falling back to "python3") and use
that value in Command::new(...) so the collector runs with the same interpreter
as serve.py; update the call that builds the process (the block constructing
tokio::process::Command::new("python3") and .args(...).env(...).output().await)
to first fetch std::env::var("SMG_PYTHON_EXECUTABLE").unwrap_or_else(|_|
"python3".into()) and pass that string to Command::new instead of the literal
"python3".


if !output.status.success() {
let stderr = String::from_utf8_lossy(&output.stderr);
return Err(format!("python3 prometheus collector failed: {stderr}"));
}

let text = String::from_utf8(output.stdout)
.map_err(|e| format!("prometheus collector output is not valid UTF-8: {e}"))?;
if text.trim().is_empty() {
return Err("no metrics available from gRPC workers yet".to_string());
}
Ok(text)
}

pub struct WorkerManager;

impl WorkerManager {
Expand Down Expand Up @@ -273,22 +307,52 @@ impl WorkerManager {
return EngineMetricsResult::Err("No available workers".to_string());
}

let responses = fan_out(&workers, client, "metrics", reqwest::Method::GET).await;

let mut metric_packs = Vec::new();
for resp in responses {
if let Ok(r) = resp.result {
if r.status().is_success() {
if let Ok(text) = r.text().await {
metric_packs.push(MetricPack {
labels: vec![("worker_addr".into(), resp.url)],
metrics_text: text,
});

let http_workers: Vec<_> = workers
.iter()
.filter(|w| matches!(w.connection_mode(), ConnectionMode::Http))
.cloned()
.collect();
let has_grpc = workers
.iter()
.any(|w| matches!(w.connection_mode(), ConnectionMode::Grpc));

if !http_workers.is_empty() {
let responses = fan_out(&http_workers, client, "metrics", reqwest::Method::GET).await;
for resp in responses {
if let Ok(r) = resp.result {
if r.status().is_success() {
if let Ok(text) = r.text().await {
metric_packs.push(MetricPack {
labels: vec![("worker_addr".into(), resp.url)],
metrics_text: text,
});
}
}
}
}
}

if has_grpc {
match collect_prometheus_multiproc_metrics().await {
Ok(text) if !text.trim().is_empty() => {
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🟡 Nit: This if !text.trim().is_empty() guard is now dead code. collect_prometheus_multiproc_metrics() (line 116-118 above) already returns Err("no metrics available from gRPC workers yet") when the text is empty, so Ok(text) can never contain an empty string. The Ok(_) arm on line 345 is therefore unreachable.

Consider simplifying to just Ok(text) => { ... } without the guard, or removing the Ok(_) arm entirely.

metric_packs.push(MetricPack {
labels: vec![],
metrics_text: text,
Comment on lines +327 to +342
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

Build MetricPack with its actual fields and types.

These literals do not match model_gateway/src/core/metrics_aggregator.rs: MetricPack is { labels: HashMap<String, String>, text: String }. As written, Lines 323-338 will not compile.

🛠 Proposed fix
-                            metric_packs.push(MetricPack {
-                                labels: vec![("worker_addr".into(), resp.url)],
-                                metrics_text: text,
-                            });
+                            metric_packs.push(MetricPack::new(
+                                HashMap::from([("worker_addr".to_string(), resp.url)]),
+                                text,
+                            ));
...
-                    metric_packs.push(MetricPack {
-                        labels: vec![],
-                        metrics_text: text,
-                    });
+                    metric_packs.push(MetricPack::new(
+                        HashMap::<String, String>::new(),
+                        text,
+                    ));
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
metric_packs.push(MetricPack {
labels: vec![("worker_addr".into(), resp.url)],
metrics_text: text,
});
}
}
}
}
}
if has_grpc {
match collect_prometheus_multiproc_metrics().await {
Ok(text) => {
metric_packs.push(MetricPack {
labels: vec![],
metrics_text: text,
metric_packs.push(MetricPack {
labels: HashMap::from([("worker_addr".to_string(), resp.url)]),
text: text,
});
}
}
}
}
}
if has_grpc {
match collect_prometheus_multiproc_metrics().await {
Ok(text) => {
metric_packs.push(MetricPack {
labels: HashMap::new(),
text: text,
});
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@model_gateway/src/core/worker_manager.rs` around lines 323 - 338, The
MetricPack construction uses the wrong field names and types; change places that
push MetricPack (e.g., where metric_packs.push(MetricPack { labels:
vec![("worker_addr".into(), resp.url)], metrics_text: text, }) and the block
after collect_prometheus_multiproc_metrics()) to match the real struct signature
MetricPack { labels: HashMap<String,String>, text: String }: build a HashMap for
labels (insert ("worker_addr".to_string(), resp.url) when present) and use the
field name text: text.into() (or text) for the metric body, and update any other
instantiations to use these exact fields and types.

});
}
Ok(_) => {
// No metrics available yet from gRPC workers — skip silently
}
Err(e) => {
warn!(
"Failed to collect gRPC worker metrics from PROMETHEUS_MULTIPROC_DIR: {e}"
);
}
}
}

if metric_packs.is_empty() {
return EngineMetricsResult::Err("All backend requests failed".to_string());
}
Expand Down
Loading