From 119c9bc8aa43c147f1fc14ba76e8fcd8739eba60 Mon Sep 17 00:00:00 2001 From: mengchengTang <745274877@qq.com> Date: Wed, 6 May 2026 09:37:03 +0800 Subject: [PATCH] rl insight support online monitor --- .pre-commit-config.yaml | 2 +- experimental/README.md | 153 +++++++ experimental/__init__.py | 51 +++ experimental/__main__.py | 21 + experimental/api.py | 310 +++++++++++++ experimental/cli.py | 406 ++++++++++++++++++ experimental/client/__init__.py | 63 +++ experimental/client/ray_monitor_client.py | 116 +++++ experimental/collector/__init__.py | 19 + experimental/collector/ray_monitor_hub.py | 172 ++++++++ experimental/config/__init__.py | 41 ++ experimental/config/config.py | 166 +++++++ experimental/config/services/config.yaml | 27 ++ .../dashboards/rl-insight-overview.json | 30 ++ .../config/services/docker-compose.yaml | 46 ++ experimental/config/services/grafana.ini | 13 + experimental/config/services/prometheus.yml | 4 + .../provisioning/dashboards/default.yml | 12 + .../provisioning/datasources/default.yml | 17 + experimental/config/services/tempo.yaml | 26 ++ experimental/utils/__init__.py | 40 ++ experimental/utils/constants.py | 32 ++ experimental/utils/opentelemetry_utils.py | 132 ++++++ experimental/utils/prometheus_utils.py | 300 +++++++++++++ pyproject.toml | 23 +- requirements.txt | 8 +- rl_insight/__init__.py | 25 +- 27 files changed, 2250 insertions(+), 5 deletions(-) create mode 100644 experimental/README.md create mode 100644 experimental/__init__.py create mode 100644 experimental/__main__.py create mode 100644 experimental/api.py create mode 100644 experimental/cli.py create mode 100644 experimental/client/__init__.py create mode 100644 experimental/client/ray_monitor_client.py create mode 100644 experimental/collector/__init__.py create mode 100644 experimental/collector/ray_monitor_hub.py create mode 100644 experimental/config/__init__.py create mode 100644 experimental/config/config.py create mode 100644 experimental/config/services/config.yaml create mode 100644 experimental/config/services/dashboards/rl-insight-overview.json create mode 100644 experimental/config/services/docker-compose.yaml create mode 100644 experimental/config/services/grafana.ini create mode 100644 experimental/config/services/prometheus.yml create mode 100644 experimental/config/services/provisioning/dashboards/default.yml create mode 100644 experimental/config/services/provisioning/datasources/default.yml create mode 100644 experimental/config/services/tempo.yaml create mode 100644 experimental/utils/__init__.py create mode 100644 experimental/utils/constants.py create mode 100644 experimental/utils/opentelemetry_utils.py create mode 100644 experimental/utils/prometheus_utils.py diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index e68cd2d..fae7e8c 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -11,7 +11,7 @@ repos: rev: "v1.17.0" hooks: - id: mypy - additional_dependencies: [types-requests] + additional_dependencies: [types-PyYAML, types-requests] - repo: local hooks: diff --git a/experimental/README.md b/experimental/README.md new file mode 100644 index 0000000..6b1904f --- /dev/null +++ b/experimental/README.md @@ -0,0 +1,153 @@ +# RL-Insight Monitor + +RL-Insight Monitor provides an observability stack for RL training metrics and traces based on Prometheus, Tempo, and Grafana. + +It has two parts: + +- `rl-insight server ...`: manage the observability Docker stack. +- `rl_insight`: training-side Python APIs for metrics and traces. + +## Quickstart + +### 1. Install + +From the repository root: + +```bash +pip install -r requirements.txt +pip install -e . +``` + +### 2. Start the observability stack + +Default foreground mode: + +```bash +rl-insight server start +``` + +This mode starts Docker Compose silently, keeps the CLI attached, and stops the whole stack when you press `Ctrl+C`. + +Grafana will be provisioned automatically with Prometheus and Tempo datasources plus an empty starter dashboard. The datasources follow the configured Prometheus and Tempo published ports. + +Background mode: + +```bash +rl-insight server start --detach +``` + +Foreground mode with compose/container logs attached: + +```bash +rl-insight server start --attach-logs +``` + +Use a custom config file: + +```bash +rl-insight server start --config path/to/config.yaml +``` + +Stop the stack explicitly from another terminal: + +```bash +rl-insight server stop +``` + +After startup, the CLI prints: + +- Prometheus config file path +- Trainer OTLP traces URL +- Prometheus, Tempo, and Grafana access URLs + +### 3. Initialize the training side + +```python +import os +import ray +import rl_insight as insight + +os.environ["OTEL_EXPORTER_OTLP_TRACES_ENDPOINT"] = "http://:4318/v1/traces" + +ray.init(address="auto", namespace="rl-insight-monitor") +insight.init() +``` + +Notes: + +- `ray.init(namespace="rl-insight-monitor")` is used to find the monitor hub actor. +- `OTEL_EXPORTER_OTLP_TRACES_ENDPOINT` takes precedence over `insight.init(config)` -> `otel.traces_endpoint`. + +### 4. Emit metrics and traces + +```python +import rl_insight as insight + +insight.metric_count("train_step_total", amount=1, worker="trainer_0") +insight.metric_value("reward_mean", value=1.23, worker="trainer_0") +insight.metric_distribution("step_latency_ms", value=42.5, worker="trainer_0") + +with insight.trace_state("rollout", state_lane_id="trainer_0", step=10): + run_rollout() + +@insight.trace_op("update_model", stage="optimizer") +def update_model(batch): + ... +``` + +## APIs + +| API | Purpose | +|---|---| +| `init(config=None)` | Initialize training-side monitoring | +| `close()` | Reset monitor state in the current process | +| `metric_count()` | Report a counter | +| `metric_value()` | Report a gauge | +| `metric_distribution()` | Report a histogram | +| `trace_state()` | Report a state interval | +| `trace_op()` | Decorator for operation latency traces | + +## CLI Reference + +### `rl-insight server start` + +| Argument | Default | Description | +|---|---:|---| +| `--detach` | `false` | Start in background and return immediately | +| `--attach-logs` | `false` | Run in foreground and stream compose/container logs | +| `--config` | `experimental/config/services/config.yaml` | Server config file path | +| `--log-level` | `INFO` | Python log level | + +### `rl-insight server stop` + +| Argument | Default | Description | +|---|---:|---| +| `--config` | `experimental/config/services/config.yaml` | Server config file path | +| `--log-level` | `INFO` | Python log level | + +## Server YAML + +| Key | Default | Description | +|---|---:|---| +| `server.backend` | `docker_compose` | Stack startup backend | +| `server.compose_file` | `docker-compose.yaml` | Compose file path | +| `server.project_name` | `rl-insight-monitor` | Compose project name | +| `prometheus.prometheus_port` | `9090` | Prometheus HTTP port | +| `prometheus.config_file` | `prometheus.yml` | Prometheus config file | +| `tempo.query_port` | `3200` | Tempo query port | +| `otel.traces_endpoint` | `http://127.0.0.1:4318/v1/traces` | Trainer trace export endpoint | +| `grafana.port` | `3000` | Grafana HTTP port | +| `grafana.provisioning_dir` | `provisioning` | Grafana provisioning directory mounted into the container | +| `grafana.dashboards_dir` | `dashboards` | Grafana dashboard JSON directory mounted into the container | + +## `insight.init(config)` + +| Key | Default | Description | +|---|---:|---| +| `namespace` | `rl_insight_monitor` | Metrics and trace namespace | +| `backend.type` | `ray` | Currently only `ray` is supported | +| `prometheus.metrics_report_port` | `9092` | Monitor hub `/metrics` port | +| `prometheus.prometheus_port` | `9090` | Prometheus HTTP port used for reload | +| `prometheus.config_file` | bundled absolute path | Prometheus config file to rewrite | +| `prometheus.reload.mode` | `ray` | `ray` or `none` | +| `otel.traces_endpoint` | `http://127.0.0.1:4318/v1/traces` | Trainer trace export endpoint | diff --git a/experimental/__init__.py b/experimental/__init__.py new file mode 100644 index 0000000..5b6a8e4 --- /dev/null +++ b/experimental/__init__.py @@ -0,0 +1,51 @@ +# Copyright (c) 2026 verl-project authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Experimental online monitoring: Ray hub, Prometheus ``/metrics``, and OTLP trace export.""" + +from .api import ( + close, + init, + metric_count, + metric_distribution, + metric_value, + trace_op, + trace_state, +) +from .config import ( + MONITOR_HUB_ACTOR_NAME, + MONITOR_RAY_NAMESPACE, + load_monitor_config, + load_server_config_file, + resolve_monitor_stack_paths, +) +from .utils import PROMETHEUS_SCRAPE_JOB_NAME, update_prometheus_config + + +__all__ = [ + "close", + "init", + "load_monitor_config", + "load_server_config_file", + "MONITOR_HUB_ACTOR_NAME", + "MONITOR_RAY_NAMESPACE", + "metric_count", + "metric_distribution", + "metric_value", + "PROMETHEUS_SCRAPE_JOB_NAME", + "resolve_monitor_stack_paths", + "trace_op", + "trace_state", + "update_prometheus_config", +] diff --git a/experimental/__main__.py b/experimental/__main__.py new file mode 100644 index 0000000..6ad4850 --- /dev/null +++ b/experimental/__main__.py @@ -0,0 +1,21 @@ +# Copyright (c) 2026 verl-project authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from __future__ import annotations + +from .cli import main + + +if __name__ == "__main__": + raise SystemExit(main()) diff --git a/experimental/api.py b/experimental/api.py new file mode 100644 index 0000000..5f2dcb7 --- /dev/null +++ b/experimental/api.py @@ -0,0 +1,310 @@ +# Copyright (c) 2026 verl-project authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""High-level monitor API backed by a pluggable monitor client.""" + +from __future__ import annotations + +import functools +import inspect +import logging +import os +import time +import warnings +from contextlib import contextmanager +from dataclasses import dataclass, field +from typing import Any, Callable, Generator, Mapping + +from omegaconf import DictConfig + +from .client import create_monitor_client +from .config import load_monitor_config +from .utils import MonitorEventKind + +logger = logging.getLogger(__name__) +logger.setLevel(logging.WARNING) + +__all__ = [ + "close", + "init", + "metric_count", + "metric_distribution", + "metric_value", + "trace_state", + "trace_op", +] + + +@dataclass +class _MonitorState: + """Per-process singleton state used by ``init`` and emit helpers. + + Attributes: + enabled: True after ``init`` produced a non-null client. + client: Backend object with ``apply_event`` (e.g. ``MonitorRayClient``). + conf: Merged trainer monitor config. + namespace: Config ``namespace`` used for metric/OTEL resource naming (not Ray actor namespace). + process_id: String PID added to trace attributes on emit. + """ + + enabled: bool = False + client: Any | None = None + conf: DictConfig | None = None + namespace: str = "" + process_id: str = field(default_factory=lambda: str(os.getpid())) + + +_STATE = _MonitorState() + + +def init(config: Mapping[str, Any] | None = None) -> None: + """Load merged monitor config, create backend client, enable metric/trace helpers (once per process). + + Args: + config: Optional user overrides merged into training defaults; see ``load_monitor_config``. + + Note: + Repeated calls are ignored with ``RuntimeWarning``. Ray backend requires ``ray.init()`` first. + """ + global _STATE + if _STATE.enabled: + warnings.warn( + "monitor.init() called more than once; ignoring re-initialization.", + RuntimeWarning, + stacklevel=2, + ) + return + + monitor_conf = load_monitor_config(config) + client = create_monitor_client(monitor_conf) + _STATE = _MonitorState( + enabled=client is not None, + client=client, + conf=monitor_conf, + namespace=str(monitor_conf.namespace), + ) + + +def close() -> None: + """Clear in-process monitor state so further emits are no-ops. + + Does not stop the hub HTTP server or kill the detached Ray actor. + """ + global _STATE + _STATE = _MonitorState() + + +def metric_count( + name: str, amount: float = 1.0, documentation: str = "", **labels: Any +) -> None: + """Record a counter increment. + + Args: + name: Metric name. + amount: Increment amount (typically 1.0). + documentation: Help string; default derived from ``name``. + **labels: Extra label key-values attached to the event. + """ + doc = documentation or f"Counter {name}" + _emit(MonitorEventKind.COUNTER, name, float(amount), doc, labels) + + +def metric_value( + name: str, value: float, documentation: str = "", **labels: Any +) -> None: + """Record the latest value for a metric. + + Args: + name: Metric name. + value: Current value. + documentation: Help string. + **labels: Extra labels attached to the event. + """ + doc = documentation or f"Gauge {name}" + _emit(MonitorEventKind.GAUGE, name, float(value), doc, labels) + + +def metric_distribution( + name: str, value: float, documentation: str = "", **labels: Any +) -> None: + """Record one sample into a metric distribution. + + Args: + name: Metric name. + value: Observed sample. + documentation: Help string. + **labels: Extra labels attached to the event. + """ + doc = documentation or f"Histogram {name}" + _emit(MonitorEventKind.HISTOGRAM, name, float(value), doc, labels) + + +@contextmanager +def trace_state( + state_name: str, + *, + state_lane_id: str | None = None, + **labels: Any, +) -> Generator[None, None, None]: + """Record a named runtime state as one root span (useful for Grafana timeline views). + + Args: + state_name: Span name and human-readable state label (e.g. ``"rollout"``). + state_lane_id: Optional id for grouping state intervals in trace UIs (swim lane). + Defaults to the current OS process id: one lane per process unless you pass + a custom id (e.g. Ray worker). Overlapping ``trace_state`` calls for the same + lane show as overlapping intervals. + **labels: Extra span attributes. Keys ``state_name``, ``state_lane_id``, and + ``monitor.trace_segment`` cannot be overridden; they are set after merging. + + Yields: + Control during the covered code block; emits the span in ``finally``. + """ + + lane_id = state_lane_id if state_lane_id is not None else _STATE.process_id + + start_time_ns = time.time_ns() + attributes = { + **labels, + "monitor.trace_segment": "state_interval", + "state_name": state_name, + "state_lane_id": lane_id, + } + + try: + yield + finally: + _emit_trace_span( + name=state_name, + start_time_ns=start_time_ns, + end_time_ns=time.time_ns(), + attributes=attributes, + ) + + +def trace_op( + name: str | None = None, + *, + extra_labels: Callable[[Any], dict[str, Any]] | None = None, + **static_labels: Any, +) -> Callable[[Callable[..., Any]], Callable[..., Any]]: + """Decorator that records one root span per synchronous call. + + Async callables are not wrapped: a :class:`RuntimeWarning` is issued and the + function is returned unchanged. + + Args: + name: Span name; defaults to ``func.__qualname__``. + extra_labels: If set, ``extra_labels(first_positional_arg)`` is merged after + ``static_labels`` when the wrapped function is called. The first positional + is often ``self`` for bound methods; if there are no positional args, it is + not called. + **static_labels: Extra attributes attached to every span for this operation. + + Returns: + Decorator that replaces sync functions with a span-wrapped version (async functions unchanged with warning). + """ + + def decorator(func: Callable[..., Any]) -> Callable[..., Any]: + """Return ``func`` unchanged for coroutine functions; else attach span timing wrapper.""" + if inspect.iscoroutinefunction(func): + warnings.warn( + "trace_op does not support coroutine functions; decorator is a no-op.", + RuntimeWarning, + stacklevel=2, + ) + return func + + @functools.wraps(func) + def wrapper(*args: Any, **kwargs: Any) -> Any: + """Call the wrapped function and record one duration span around it.""" + span_name = name or func.__qualname__ + merged: dict[str, Any] = dict(static_labels) + if extra_labels is not None and args: + merged.update(extra_labels(args[0])) + + start_time_ns = time.time_ns() + attributes = {**merged, "monitor.trace_segment": "duration"} + try: + return func(*args, **kwargs) + finally: + _emit_trace_span( + name=span_name, + start_time_ns=start_time_ns, + end_time_ns=time.time_ns(), + attributes=attributes, + ) + + return wrapper + + return decorator + + +def _emit( + kind: str, + name: str, + value: float, + documentation: str, + labels: dict[str, Any], +) -> None: + """If monitoring is on, forward a Prometheus metric event to the hub. + + Args: + kind: One of ``MonitorEventKind`` counter/gauge/histogram strings. + name: Metric name. + value: Sample or increment amount. + documentation: Help text stored with the series. + labels: Label dimensions for the observation. + """ + if not _STATE.enabled or _STATE.client is None: + return + event = { + "kind": kind, + "name": name, + "documentation": documentation, + "value": value, + "labels": dict(labels), + } + _STATE.client.apply_event(event) + + +def _emit_trace_span( + *, + name: str, + start_time_ns: int, + end_time_ns: int, + attributes: dict[str, Any], +) -> None: + """If monitoring is on, send one OTLP root span event (hub may no-op if OTLP is disabled). + + Args: + name: Span name. + start_time_ns: Span start (nanoseconds). + end_time_ns: Span end (nanoseconds). + attributes: Span attributes; ``process_id`` is merged in before send. + """ + if not _STATE.enabled or _STATE.client is None: + return + + merged_attributes: dict[str, Any] = {"process_id": _STATE.process_id} + merged_attributes.update(attributes) + + event = { + "kind": MonitorEventKind.TRACE, + "name": name, + "start_time_ns": int(start_time_ns), + "end_time_ns": int(end_time_ns), + "attributes": merged_attributes, + } + _STATE.client.apply_event(event) diff --git a/experimental/cli.py b/experimental/cli.py new file mode 100644 index 0000000..5ec0ccc --- /dev/null +++ b/experimental/cli.py @@ -0,0 +1,406 @@ +# Copyright (c) 2026 verl-project authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Command-line helpers for RL-Insight observability stack management.""" + +from __future__ import annotations + +import argparse +import logging +import os +import socket +import subprocess +import sys +import time +from pathlib import Path +from typing import Sequence +from urllib.parse import urlparse + +from omegaconf import DictConfig, OmegaConf + +from .config import load_server_config_file + + +def main(argv: Sequence[str] | None = None) -> int: + """CLI entry for ``rl-insight``.""" + parser = _build_parser() + args = parser.parse_args(argv) + logging.basicConfig(level=getattr(logging, str(args.log_level).upper())) + try: + return int(args.func(args)) + except KeyboardInterrupt: + return 130 + + +def _build_parser() -> argparse.ArgumentParser: + """Construct the root argument parser and ``server`` subcommands.""" + parser = argparse.ArgumentParser(prog="rl-insight") + parser.add_argument( + "--log-level", + default="INFO", + choices=["DEBUG", "INFO", "WARNING", "ERROR"], + help="Python logging level.", + ) + + subparsers = parser.add_subparsers(dest="command", required=True) + + server = subparsers.add_parser( + "server", help="Manage Prometheus, Tempo, and Grafana." + ) + server_subparsers = server.add_subparsers(dest="server_command", required=True) + + start = server_subparsers.add_parser( + "start", + help="Start Prometheus, Tempo, and Grafana.", + ) + _add_common_config_args(start) + mode_group = start.add_mutually_exclusive_group() + mode_group.add_argument( + "--detach", + action="store_true", + help="Start in background and return immediately.", + ) + mode_group.add_argument( + "--attach-logs", + action="store_true", + help="Run in foreground and stream docker compose logs.", + ) + start.set_defaults(func=_server_start) + + stop = server_subparsers.add_parser( + "stop", + help="Stop Prometheus, Tempo, and Grafana.", + ) + _add_common_config_args(stop) + stop.set_defaults(func=_server_stop) + + return parser + + +def _add_common_config_args(parser: argparse.ArgumentParser) -> None: + """Attach ``--config`` shared by subcommands that read stack YAML.""" + parser.add_argument( + "--config", + type=Path, + default=None, + help="Observability YAML; default is bundled experimental/config/services/config.yaml.", + ) + + +def _select_str(conf: DictConfig, key: str) -> str: + """Return a stripped string for ``key``; empty becomes ``''``.""" + value = OmegaConf.select(conf, key) + return str(value).strip() if value is not None else "" + + +def _require_stack_field(conf: DictConfig, key: str, desc: str) -> str: + value = _select_str(conf, key) + if not value: + print( + f"Error: missing required stack config field {desc} ({key}).", + file=sys.stderr, + ) + raise SystemExit(2) + return value + + +def _require_stack_int(conf: DictConfig, key: str, desc: str) -> int: + value = _require_stack_field(conf, key, desc) + try: + return int(value) + except ValueError: + print( + f"Error: {desc} ({key}) must be an integer; got {value!r}.", file=sys.stderr + ) + raise SystemExit(2) + + +def _otlp_http_publish_port(traces_endpoint: str) -> int: + """Publish host port implied by ``otel.traces_endpoint``.""" + raw = traces_endpoint.strip() + if not raw: + return 4318 + if "://" not in raw: + raw = f"http://{raw}" + parsed = urlparse(raw) + if parsed.port is not None: + return int(parsed.port) + if parsed.scheme.lower() == "https": + return 443 + return 4318 + + +def _trainer_otlp_traces_url(host: str, traces_endpoint: str) -> str: + """Resolve the trainer OTLP URL advertised to users.""" + if "127.0.0.1" in traces_endpoint or "localhost" in traces_endpoint.lower(): + port = _otlp_http_publish_port(traces_endpoint) + return f"http://{host}:{port}/v1/traces".rstrip("/") + return traces_endpoint.rstrip("/") + + +def _server_start(args: argparse.Namespace) -> int: + """Start Docker Compose when ``server.backend`` is ``docker_compose``.""" + conf = load_server_config_file(config_path=args.config) + if not _stack_management_enabled(conf, action="start"): + return 0 + + compose_file, project_name = _stack_compose_target(conf) + traces_endpoint = _validate_start_config(conf) + env = _stack_compose_env(conf, traces_endpoint) + base_command = _compose_base_command(compose_file, project_name) + + _print_start_summary(conf, compose_file, traces_endpoint) + + if args.attach_logs: + try: + return subprocess.run( + [*base_command, "up", "--quiet-pull"], + check=False, + env=env, + ).returncode + except KeyboardInterrupt: + return 130 + + result = _run_compose_command( + [*base_command, "up", "--quiet-pull", "-d"], + env=env, + quiet=True, + ) + if result.returncode != 0: + _print_compose_error("start", result.returncode, result.stderr) + return int(result.returncode) + + if args.detach: + print("RL-Insight observability services are running in background mode.") + return 0 + + print("RL-Insight observability services are running. Press Ctrl+C to stop.") + return _wait_for_stack_shutdown(base_command, env) + + +def _server_stop(args: argparse.Namespace) -> int: + """Stop Docker Compose when ``server.backend`` is ``docker_compose``.""" + conf = load_server_config_file(config_path=args.config) + if not _stack_management_enabled(conf, action="stop"): + return 0 + + compose_file, project_name = _stack_compose_target(conf) + base_command = _compose_base_command(compose_file, project_name) + + print("Stopping RL-Insight observability services...") + return _stop_compose_stack( + base_command, env=_stack_compose_env(conf), announce_success=True + ) + + +def _stack_management_enabled(conf: DictConfig, action: str) -> bool: + if not bool(conf.server.get("enable", True)): + print("RL-Insight server management is disabled by config.") + return False + + backend = str(conf.server.get("backend")) + if backend != "docker_compose": + print(f"Server backend {backend!r} is external; nothing to {action}.") + return False + + return True + + +def _stack_compose_target(conf: DictConfig) -> tuple[Path, str]: + _require_stack_field(conf, "server.project_name", "server project name") + _require_stack_field(conf, "server.compose_file", "server compose file") + compose_file = Path(str(conf.server.compose_file)) + project_name = _select_str(conf, "server.project_name") + return compose_file, project_name + + +def _validate_start_config(conf: DictConfig) -> str: + _require_stack_int(conf, "prometheus.prometheus_port", "Prometheus HTTP port") + _require_stack_field(conf, "prometheus.config_file", "Prometheus config file") + traces_endpoint = _require_stack_field( + conf, "otel.traces_endpoint", "OTLP traces endpoint" + ) + _require_stack_int(conf, "tempo.query_port", "Tempo query port") + _require_stack_field(conf, "tempo.config_file", "Tempo config file") + _require_stack_int(conf, "grafana.port", "Grafana port") + _require_stack_field(conf, "grafana.config_file", "Grafana config file") + _require_stack_field( + conf, "grafana.provisioning_dir", "Grafana provisioning directory" + ) + _require_stack_field(conf, "grafana.dashboards_dir", "Grafana dashboards directory") + return traces_endpoint + + +def _print_start_summary( + conf: DictConfig, compose_file: Path, traces_endpoint: str +) -> None: + host = _advertised_host() + prom_port = int(conf.prometheus.prometheus_port) + tempo_query = int(conf.tempo.query_port) + grafana_port = int(conf.grafana.port) + otlp_trainer_url = _trainer_otlp_traces_url(host, traces_endpoint) + + print("Starting RL-Insight observability services...") + print(f"Observability node IP (LAN): {host}") + print(f"Prometheus config file: {conf.prometheus.config_file}") + print(f"OpenTelemetry OTLP traces URL (trainers): {otlp_trainer_url}") + print(f"Compose file: {compose_file}") + print( + f"Prometheus UI: {_http_url(host, prom_port)} " + f"(Tempo query: {_http_url(host, tempo_query)})" + ) + print(f"Tempo config file: {conf.tempo.config_file}") + print( + "Ray monitor hub: training calls init(); set " + "OTEL_EXPORTER_OTLP_TRACES_ENDPOINT or init otel.traces_endpoint." + ) + print(f"Grafana URL: {_http_url(host, grafana_port)}") + + +def _compose_base_command(compose_file: Path, project_name: str) -> list[str]: + return ["docker", "compose", "-f", str(compose_file), "-p", project_name] + + +def _run_compose_command( + command: Sequence[str], + *, + env: dict[str, str], + quiet: bool, +) -> subprocess.CompletedProcess[str] | subprocess.CompletedProcess[bytes]: + if quiet: + return subprocess.run( + command, + check=False, + env=env, + stdout=subprocess.DEVNULL, + stderr=subprocess.PIPE, + text=True, + ) + return subprocess.run(command, check=False, env=env) + + +def _wait_for_stack_shutdown(base_command: Sequence[str], env: dict[str, str]) -> int: + while True: + try: + time.sleep(1) + except KeyboardInterrupt: + print("\nStopping RL-Insight observability services...") + return _stop_compose_stack(base_command, env=env, announce_success=True) + + +def _stop_compose_stack( + base_command: Sequence[str], + *, + env: dict[str, str], + announce_success: bool, +) -> int: + result = _run_compose_command( + [*base_command, "down"], + env=env, + quiet=True, + ) + if result.returncode != 0: + _print_compose_error("stop", result.returncode, result.stderr) + return int(result.returncode) + + if announce_success: + print("RL-Insight observability services stopped.") + return 0 + + +def _print_compose_error( + action: str, return_code: int, stderr: str | bytes | None +) -> None: + message = "" + if isinstance(stderr, bytes): + message = stderr.decode(errors="replace").strip() + elif isinstance(stderr, str): + message = stderr.strip() + + print( + message or f"docker compose {action} failed with exit code {return_code}.", + file=sys.stderr, + ) + + +def _http_url(host: str, port: int) -> str: + return f"http://{host}:{int(port)}" + + +def _advertised_host() -> str: + explicit = os.environ.get("RLINSIGHT_ADVERTISED_IP", "").strip() + if explicit: + return explicit + + try: + with socket.socket(socket.AF_INET, socket.SOCK_DGRAM) as sock: + sock.connect(("198.51.100.1", 1)) + addr = sock.getsockname()[0] + if addr and not addr.startswith("127."): + return addr + except OSError: + pass + + try: + ip = socket.gethostbyname(socket.gethostname()) + if ip and not ip.startswith("127."): + return ip + except OSError: + pass + + return "127.0.0.1" + + +def _stack_compose_env( + conf: DictConfig, traces_endpoint: str | None = None +) -> dict[str, str]: + """Map stack YAML fields to docker compose environment variables.""" + env = dict(os.environ) + + def _set_env_int(config_key: str, env_key: str) -> None: + value = OmegaConf.select(conf, config_key) + if value in (None, ""): + return + try: + env[env_key] = str(int(value)) + except (TypeError, ValueError): + return + + def _set_env_str(config_key: str, env_key: str) -> None: + value = OmegaConf.select(conf, config_key) + if value in (None, ""): + return + env[env_key] = str(value) + + _set_env_int("prometheus.prometheus_port", "RLINSIGHT_PROMETHEUS_PORT") + _set_env_str("prometheus.config_file", "RLINSIGHT_PROMETHEUS_CONFIG") + _set_env_int("tempo.query_port", "RLINSIGHT_TEMPO_QUERY_PORT") + _set_env_str("tempo.config_file", "RLINSIGHT_TEMPO_CONFIG") + _set_env_int("grafana.port", "RLINSIGHT_GRAFANA_PORT") + _set_env_str("grafana.config_file", "RLINSIGHT_GRAFANA_CONFIG") + _set_env_str("grafana.provisioning_dir", "RLINSIGHT_GRAFANA_PROVISIONING") + _set_env_str("grafana.dashboards_dir", "RLINSIGHT_GRAFANA_DASHBOARDS") + + endpoint = (traces_endpoint or _select_str(conf, "otel.traces_endpoint")).rstrip( + "/" + ) + if endpoint: + env["OTEL_EXPORTER_OTLP_TRACES_ENDPOINT"] = endpoint + env["OTEL_EXPORTER_OTLP_TRACES_PORT"] = str(_otlp_http_publish_port(endpoint)) + + return env + + +if __name__ == "__main__": + raise SystemExit(main()) diff --git a/experimental/client/__init__.py b/experimental/client/__init__.py new file mode 100644 index 0000000..d36c728 --- /dev/null +++ b/experimental/client/__init__.py @@ -0,0 +1,63 @@ +# Copyright (c) 2026 verl-project authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from __future__ import annotations + +import logging +from typing import Any + +from omegaconf import DictConfig, OmegaConf + +from ..utils import MonitorBackend + +logger = logging.getLogger(__name__) +logger.setLevel(logging.WARNING) + +__all__ = ["create_monitor_client"] + + +def create_monitor_client(conf: DictConfig) -> Any | None: + """Factory: pick backend from ``conf.backend`` and return the implementation client. + + Args: + conf: Merged monitor config; must set ``backend.type`` (only ``ray`` is supported). + + Returns: + Backend-specific client, or ``None`` if optional Ray deps fail to import. + + Raises: + ValueError: Unknown backend or missing ``backend.type``. + """ + backend = conf.backend + if OmegaConf.is_config(backend): + typ = backend.get("type") + if typ is None: + raise ValueError("monitor config backend.type is required") + backend_type = str(typ) + else: + backend_type = str(backend) + if backend_type != MonitorBackend.RAY: + raise ValueError(f"Unsupported monitor backend: {backend_type!r}") + + try: + from .ray_monitor_client import ( + create_monitor_client as create_ray_monitor_client, + ) + except ImportError as e: + logger.warning( + "Ray monitor client is unavailable; monitoring is disabled: %s", e + ) + return None + + return create_ray_monitor_client(conf) diff --git a/experimental/client/ray_monitor_client.py b/experimental/client/ray_monitor_client.py new file mode 100644 index 0000000..b700203 --- /dev/null +++ b/experimental/client/ray_monitor_client.py @@ -0,0 +1,116 @@ +# Copyright (c) 2026 verl-project authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Ray client for the shared monitor hub.""" + +from __future__ import annotations + +import logging +from typing import Any, cast + +import ray +from omegaconf import DictConfig, OmegaConf + +from ..collector.ray_monitor_hub import MonitorHubActor +from ..config import MONITOR_HUB_ACTOR_NAME, MONITOR_RAY_NAMESPACE + +logger = logging.getLogger(__name__) +logger.setLevel(logging.WARNING) + +__all__ = ["MonitorRayClient", "create_monitor_client", "get_or_create_monitor_hub"] + + +def get_or_create_monitor_hub(conf: DictConfig) -> Any: + """Get the detached ``MonitorHubActor`` handle, creating it on first use (race-safe). + + Args: + conf: Merged trainer monitor config passed to the actor constructor. + + Returns: + Ray actor handle for ``MonitorHubActor``. + + Raises: + RuntimeError: If Ray is not initialized. + """ + if not ray.is_initialized(): + raise RuntimeError( + "Ray is not initialized. Call ray.init() before using monitor helpers." + ) + + actor_name = MONITOR_HUB_ACTOR_NAME + namespace = MONITOR_RAY_NAMESPACE + + try: + handle = ray.get_actor(actor_name, namespace=namespace) + logger.info("Connected to existing monitor hub actor %r.", actor_name) + return handle + except ValueError: + logger.info("No existing monitor hub actor %r found; creating one.", actor_name) + + actor_options: dict[str, Any] = { + "name": actor_name, + "namespace": namespace, + "lifetime": "detached", + } + + try: + actor_cls = cast(Any, MonitorHubActor) + return actor_cls.options(**actor_options).remote( + OmegaConf.to_container(conf, resolve=True) + ) + except ValueError: + logger.info( + "Monitor hub actor %r was created concurrently; connecting to it.", + actor_name, + ) + return ray.get_actor(actor_name, namespace=namespace) + + +def create_monitor_client(conf: DictConfig) -> "MonitorRayClient | None": + """Build a client that talks to ``MonitorHubActor`` over Ray. + + Args: + conf: Merged monitor configuration. + + Returns: + Client instance, or ``None`` if Ray is not initialized (monitoring disabled). + """ + if not ray.is_initialized(): + logger.warning("Ray is not initialized; monitoring is disabled.") + return None + + handle = get_or_create_monitor_hub(conf) + return MonitorRayClient(handle) + + +class MonitorRayClient: + """Ray facade: ``apply_event`` submits work to the hub without blocking on completion.""" + + def __init__(self, actor_handle: Any) -> None: + """ + Args: + actor_handle: Return value of ``get_or_create_monitor_hub``. + """ + self._actor = actor_handle + + def apply_event(self, event: dict[str, Any]) -> None: + """Submit ``MonitorHubActor.apply_event`` on the actor (fire-and-forget; no ``ray.get``). + + Args: + event: Serialized monitor event (see ``experimental.api`` helpers for shapes). + + Note: + Errors on the hub side are not surfaced here. Ordering follows Ray actor scheduling. + """ + self._actor.apply_event.remote(event) diff --git a/experimental/collector/__init__.py b/experimental/collector/__init__.py new file mode 100644 index 0000000..7f2a663 --- /dev/null +++ b/experimental/collector/__init__.py @@ -0,0 +1,19 @@ +# Copyright (c) 2026 verl-project authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Monitor hub actor re-export.""" + +from .ray_monitor_hub import MonitorHubActor + +__all__ = ["MonitorHubActor"] diff --git a/experimental/collector/ray_monitor_hub.py b/experimental/collector/ray_monitor_hub.py new file mode 100644 index 0000000..0420295 --- /dev/null +++ b/experimental/collector/ray_monitor_hub.py @@ -0,0 +1,172 @@ +# Copyright (c) 2026 verl-project authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Ray named actor that collects monitor events and exposes a metrics endpoint.""" + +from __future__ import annotations + +import logging +from typing import Any + +import ray +from omegaconf import DictConfig, OmegaConf + +from ..config import MONITOR_HUB_ACTOR_NAME, MONITOR_RAY_NAMESPACE +from ..utils import ( + MetricRegistry, + MonitorEventKind, + OpenTelemetryTraceCollector, + start_metrics_http_server, + update_prometheus_config, +) + +logger = logging.getLogger(__name__) +logger.setLevel(logging.WARNING) + +__all__ = ["MonitorHubActor"] + + +@ray.remote +class MonitorHubActor: + """Ray detached actor: receives monitor events from trainers, serves ``/metrics``, optional OTLP traces. + + Actor methods run one at a time (no ``max_concurrency``), so hub state updates are serialized. + + On startup it may rewrite the local Prometheus scrape config when ``prometheus.reload.mode`` is ``ray``. + """ + + def __init__( + self, + conf: dict[str, Any] | DictConfig, + ) -> None: + """ + Args: + conf: Merged monitor config (trainer dict); expects ``namespace``, ``otel``, ``prometheus`` keys. + """ + self._conf = conf if isinstance(conf, DictConfig) else OmegaConf.create(conf) + namespace = str(self._conf.namespace) + self._registry = MetricRegistry(namespace=namespace) + te_raw = OmegaConf.select(self._conf, "otel.traces_endpoint") + te = str(te_raw).strip() if te_raw is not None else "" + self._trace_collector = ( + OpenTelemetryTraceCollector(namespace=namespace, endpoint=te) + if te + else None + ) + self._events_applied = 0 + self._node_ip = ray.util.get_node_ip_address() + self._metrics_port = int(self._conf.prometheus.metrics_report_port) + self._event_handlers = { + MonitorEventKind.COUNTER: self._handle_counter, + MonitorEventKind.GAUGE: self._handle_gauge, + MonitorEventKind.HISTOGRAM: self._handle_histogram, + MonitorEventKind.TRACE: self._handle_trace, + } + + scrape_host = self._node_ip + start_metrics_http_server(self._metrics_port, addr=scrape_host) + if ( + str(OmegaConf.select(self._conf, "prometheus.reload.mode") or "ray") + .strip() + .lower() + == "ray" + ): + update_prometheus_config( + self._conf, + [f"{scrape_host}:{self._metrics_port}"], + ) + + listen_desc = scrape_host if scrape_host else "0.0.0.0" + logger.info( + "MonitorHubActor HTTP bind %s:%s, Prometheus scrape target %s:%s", + listen_desc, + self._metrics_port, + scrape_host, + self._metrics_port, + ) + + def apply_event(self, event: dict[str, Any]) -> None: + """Dispatch one event by ``kind``: counter/gauge/histogram update Prometheus registry, trace exports OTLP. + + Args: + event: Must include ``kind``; metric kinds need ``name``/``value``; trace needs ``start_time_ns``/``end_time_ns``. + """ + self._events_applied += 1 + try: + kind = event["kind"] + except KeyError as e: + raise ValueError(f"Event missing required field: {e!r}") from e + + handler = self._event_handlers.get(kind) + if handler is None: + raise ValueError(f"Unknown event kind: {kind!r}") + handler(event) + + def get_status(self) -> dict[str, Any]: + """Return a small status dict for debugging (endpoints, counters). + + Returns: + Dict with ``actor_name``, ``namespace`` (Ray placement namespace, not metric prefix), scrape URL, flags. + """ + return { + "actor_name": MONITOR_HUB_ACTOR_NAME, + "namespace": MONITOR_RAY_NAMESPACE, + "node_ip": self._node_ip, + "metrics_endpoint": f"http://{self._node_ip}:{self._metrics_port}/metrics", + "prometheus_metrics_enabled": True, + "otel_traces_enabled": self._trace_collector is not None, + "events_applied": self._events_applied, + } + + def _handle_counter(self, event: dict[str, Any]) -> None: + """Increment a Prometheus counter from a ``counter`` event payload.""" + self._registry.count( + event["name"], + event.get("documentation") or "", + float(event["value"]), + {}, + dict(event.get("labels") or {}), + ) + + def _handle_gauge(self, event: dict[str, Any]) -> None: + """Set a Prometheus gauge from a ``gauge`` event payload.""" + self._registry.value( + event["name"], + event.get("documentation") or "", + float(event["value"]), + {}, + dict(event.get("labels") or {}), + ) + + def _handle_histogram(self, event: dict[str, Any]) -> None: + """Observe one sample on a Prometheus histogram from a ``histogram`` event payload.""" + self._registry.distribution( + event["name"], + event.get("documentation") or "", + float(event["value"]), + {}, + dict(event.get("labels") or {}), + buckets=None, + ) + + def _handle_trace(self, event: dict[str, Any]) -> None: + """Export one root span via OTLP if a trace collector is configured; otherwise no-op.""" + if self._trace_collector is None: + return + self._trace_collector.record_span( + event["name"], + int(event["start_time_ns"]), + int(event["end_time_ns"]), + attributes=dict(event.get("attributes") or {}), + ) diff --git a/experimental/config/__init__.py b/experimental/config/__init__.py new file mode 100644 index 0000000..0f59437 --- /dev/null +++ b/experimental/config/__init__.py @@ -0,0 +1,41 @@ +# Copyright (c) 2026 verl-project authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Public re-exports for ``experimental.config`` paths and loaders.""" + +from __future__ import annotations + +from .config import ( + MONITOR_CONFIG_DIR, + MONITOR_CONFIG_FILE, + MONITOR_SERVICE_CONFIG_DIR, + MONITOR_HUB_ACTOR_NAME, + MONITOR_RAY_NAMESPACE, + OTEL_EXPORTER_OTLP_TRACES_ENDPOINT, + load_monitor_config, + load_server_config_file, + resolve_monitor_stack_paths, +) + +__all__ = [ + "MONITOR_CONFIG_DIR", + "MONITOR_CONFIG_FILE", + "MONITOR_SERVICE_CONFIG_DIR", + "MONITOR_HUB_ACTOR_NAME", + "MONITOR_RAY_NAMESPACE", + "OTEL_EXPORTER_OTLP_TRACES_ENDPOINT", + "load_monitor_config", + "load_server_config_file", + "resolve_monitor_stack_paths", +] diff --git a/experimental/config/config.py b/experimental/config/config.py new file mode 100644 index 0000000..7d86e72 --- /dev/null +++ b/experimental/config/config.py @@ -0,0 +1,166 @@ +# Copyright (c) 2026 verl-project authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Trainer vs observability-stack paths and loaders for RL-Insight monitoring.""" + +from __future__ import annotations + +import logging +import os +from pathlib import Path +from typing import Any, Mapping + +from omegaconf import DictConfig, OmegaConf + +logger = logging.getLogger(__name__) + +MONITOR_CONFIG_DIR = Path(__file__).resolve().parent +MONITOR_SERVICE_CONFIG_DIR = MONITOR_CONFIG_DIR / "services" +MONITOR_CONFIG_FILE = MONITOR_SERVICE_CONFIG_DIR / "config.yaml" + +OTEL_EXPORTER_OTLP_TRACES_ENDPOINT = "OTEL_EXPORTER_OTLP_TRACES_ENDPOINT" + +MONITOR_HUB_ACTOR_NAME = "RLInsightMonitorHub" +MONITOR_RAY_NAMESPACE = "rl-insight-monitor" + +_DEFAULT_PROM_FILE = str((MONITOR_SERVICE_CONFIG_DIR / "prometheus.yml").resolve()) + +_TRAINING_MONITOR_DEFAULTS = OmegaConf.create( + { + "namespace": "rl_insight_monitor", + "backend": {"type": "ray"}, + "prometheus": { + "metrics_report_port": 9092, + "prometheus_port": 9090, + "config_file": _DEFAULT_PROM_FILE, + "reload": {"mode": "ray"}, + }, + "otel": {"traces_endpoint": "http://127.0.0.1:4318/v1/traces"}, + } +) + +__all__ = [ + "MONITOR_CONFIG_FILE", + "MONITOR_CONFIG_DIR", + "MONITOR_SERVICE_CONFIG_DIR", + "MONITOR_HUB_ACTOR_NAME", + "MONITOR_RAY_NAMESPACE", + "OTEL_EXPORTER_OTLP_TRACES_ENDPOINT", + "load_monitor_config", + "load_server_config_file", + "resolve_monitor_stack_paths", +] + + +def load_monitor_config( + config: Mapping[str, Any] | DictConfig | None = None, +) -> DictConfig: + """Merge trainer monitor defaults with optional user config and resolve OTLP trace endpoint. + + Args: + config: Partial mapping or ``DictConfig`` merged on top of built-in training defaults; may be ``None``. + + Returns: + Fully merged config; ``otel.traces_endpoint`` prefers non-empty ``OTEL_EXPORTER_OTLP_TRACES_ENDPOINT``. + """ + base = OmegaConf.create( + OmegaConf.to_container(_TRAINING_MONITOR_DEFAULTS, resolve=True) + ) + if config is None: + merged = OmegaConf.create(OmegaConf.to_container(base, resolve=True)) + else: + user = ( + OmegaConf.create(OmegaConf.to_container(config, resolve=True)) + if OmegaConf.is_config(config) + else OmegaConf.create(dict(config)) + ) + merged = OmegaConf.merge(base, user) + + env_ep = os.environ.get(OTEL_EXPORTER_OTLP_TRACES_ENDPOINT, "").strip() + dict_ep = str(OmegaConf.select(merged, "otel.traces_endpoint") or "").strip() + final_ep = env_ep if env_ep else dict_ep + if not final_ep: + logger.warning( + "No OTLP traces endpoint: set %s or ``otel.traces_endpoint`` in the monitor config dict. Trace export disabled.", + OTEL_EXPORTER_OTLP_TRACES_ENDPOINT, + ) + if merged.get("otel") is None: + merged.otel = OmegaConf.create({}) + merged.otel.traces_endpoint = final_ep + return merged + + +def load_server_config_file(config_path: str | Path | None = None) -> DictConfig: + """Load the observability stack YAML used by ``rl-insight server start/stop`` and absolutize relative paths. + + Args: + config_path: YAML file path; default is the bundled ``config/services/config.yaml``. + + Returns: + Loaded config with ``config_file`` / ``compose_file`` paths resolved against the YAML directory. + """ + yaml_path = ( + MONITOR_CONFIG_FILE.resolve() + if config_path is None + else Path(config_path).expanduser().resolve() + ) + conf = OmegaConf.load(str(yaml_path)) + resolve_monitor_stack_paths(conf, yaml_path.parent) + return conf + + +def resolve_monitor_stack_paths(conf: DictConfig, config_root: Path) -> None: + """Mutate ``conf`` so stack file/directory paths become absolute. + + Args: + conf: Stack config as loaded from YAML. + config_root: Directory used to resolve relative paths (typically the YAML parent folder). + """ + root = Path(config_root).expanduser().resolve() + filenames = { + "prometheus": "prometheus.yml", + "tempo": "tempo.yaml", + "grafana": "grafana.ini", + } + for section, filename in filenames.items(): + section_conf = conf.get(section) + if section_conf is None: + continue + if not section_conf.get("config_file"): + section_conf.config_file = str( + (MONITOR_SERVICE_CONFIG_DIR / filename).resolve() + ) + continue + path = Path(str(section_conf.config_file)).expanduser() + if not path.is_absolute(): + path = root / path + section_conf.config_file = str(path.resolve()) + + grafana = conf.get("grafana") + if grafana is not None: + for key in ("provisioning_dir", "dashboards_dir"): + path_value = grafana.get(key) + if not path_value: + continue + path = Path(str(path_value)).expanduser() + if not path.is_absolute(): + path = root / path + grafana[key] = str(path.resolve()) + + server = conf.get("server") + if server is not None and server.get("compose_file"): + path = Path(str(server.compose_file)).expanduser() + if not path.is_absolute(): + path = root / path + server.compose_file = str(path.resolve()) diff --git a/experimental/config/services/config.yaml b/experimental/config/services/config.yaml new file mode 100644 index 0000000..a6f28c3 --- /dev/null +++ b/experimental/config/services/config.yaml @@ -0,0 +1,27 @@ +# Observability stack for `rl-insight server start/stop` (`load_server_config_file`). + +server: + enable: true + backend: docker_compose + compose_file: docker-compose.yaml + project_name: rl-insight-monitor + +prometheus: + enable: true + prometheus_port: 9090 + config_file: prometheus.yml + +otel: + traces_endpoint: http://127.0.0.1:4318/v1/traces + +tempo: + enable: true + query_port: 3200 + config_file: tempo.yaml + +grafana: + enable: true + port: 3000 + config_file: grafana.ini + provisioning_dir: provisioning + dashboards_dir: dashboards diff --git a/experimental/config/services/dashboards/rl-insight-overview.json b/experimental/config/services/dashboards/rl-insight-overview.json new file mode 100644 index 0000000..2fa699e --- /dev/null +++ b/experimental/config/services/dashboards/rl-insight-overview.json @@ -0,0 +1,30 @@ +{ + "annotations": { + "list": [] + }, + "editable": true, + "fiscalYearStartMonth": 0, + "graphTooltip": 0, + "id": null, + "links": [], + "panels": [], + "refresh": "10s", + "schemaVersion": 39, + "style": "dark", + "tags": [ + "rl-insight" + ], + "templating": { + "list": [] + }, + "time": { + "from": "now-6h", + "to": "now" + }, + "timepicker": {}, + "timezone": "browser", + "title": "RL-Insight Overview", + "uid": "rl-insight-overview", + "version": 1, + "weekStart": "" +} diff --git a/experimental/config/services/docker-compose.yaml b/experimental/config/services/docker-compose.yaml new file mode 100644 index 0000000..9575fbd --- /dev/null +++ b/experimental/config/services/docker-compose.yaml @@ -0,0 +1,46 @@ +# docker-compose.yaml + +services: + prometheus: + image: prom/prometheus:latest + extra_hosts: + - "host.docker.internal:host-gateway" + ports: + - "${RLINSIGHT_PROMETHEUS_PORT:-9090}:9090" + volumes: + - "${RLINSIGHT_PROMETHEUS_CONFIG:-./prometheus.yml}:/etc/prometheus/prometheus.yml:ro" + command: + - "--config.file=/etc/prometheus/prometheus.yml" + - "--web.enable-lifecycle" + + tempo: + image: grafana/tempo:2.10.0 + command: ["-config.file=/etc/tempo.yaml"] + volumes: + - "${RLINSIGHT_TEMPO_CONFIG:-./tempo.yaml}:/etc/tempo.yaml:ro" + ports: + - "${RLINSIGHT_TEMPO_QUERY_PORT:-3200}:3200" + # Host-side port parsed from the same URL by CLI as ``OTEL_EXPORTER_OTLP_TRACES_PORT`` (defaults 4318). + - "${OTEL_EXPORTER_OTLP_TRACES_PORT:-4318}:4318" + + grafana: + image: grafana/grafana:latest + depends_on: + - prometheus + - tempo + extra_hosts: + - "host.docker.internal:host-gateway" + ports: + - "${RLINSIGHT_GRAFANA_PORT:-3000}:3000" + environment: + RLINSIGHT_PROMETHEUS_PORT: "${RLINSIGHT_PROMETHEUS_PORT:-9090}" + RLINSIGHT_TEMPO_QUERY_PORT: "${RLINSIGHT_TEMPO_QUERY_PORT:-3200}" + volumes: + - "${RLINSIGHT_GRAFANA_CONFIG:-./grafana.ini}:/etc/grafana/grafana.ini:ro" + - "${RLINSIGHT_GRAFANA_PROVISIONING:-./provisioning}:/etc/grafana/provisioning:ro" + - "${RLINSIGHT_GRAFANA_DASHBOARDS:-./dashboards}:/etc/grafana/dashboards:ro" + command: > + grafana-server + --config /etc/grafana/grafana.ini + --homepath /usr/share/grafana + web diff --git a/experimental/config/services/grafana.ini b/experimental/config/services/grafana.ini new file mode 100644 index 0000000..c2b7abd --- /dev/null +++ b/experimental/config/services/grafana.ini @@ -0,0 +1,13 @@ +[server] +http_port = 3000 + +[security] +allow_embedding = true + +[auth.anonymous] +enabled = true +org_name = Main Org. +org_role = Viewer + +[paths] +provisioning = /etc/grafana/provisioning diff --git a/experimental/config/services/prometheus.yml b/experimental/config/services/prometheus.yml new file mode 100644 index 0000000..9e164bc --- /dev/null +++ b/experimental/config/services/prometheus.yml @@ -0,0 +1,4 @@ +global: + scrape_interval: 10s + +scrape_configs: [] diff --git a/experimental/config/services/provisioning/dashboards/default.yml b/experimental/config/services/provisioning/dashboards/default.yml new file mode 100644 index 0000000..3aa91b5 --- /dev/null +++ b/experimental/config/services/provisioning/dashboards/default.yml @@ -0,0 +1,12 @@ +apiVersion: 1 + +providers: + - name: RL-Insight + orgId: 1 + folder: RL-Insight + type: file + disableDeletion: false + updateIntervalSeconds: 10 + allowUiUpdates: true + options: + path: /etc/grafana/dashboards diff --git a/experimental/config/services/provisioning/datasources/default.yml b/experimental/config/services/provisioning/datasources/default.yml new file mode 100644 index 0000000..8db877d --- /dev/null +++ b/experimental/config/services/provisioning/datasources/default.yml @@ -0,0 +1,17 @@ +apiVersion: 1 + +datasources: + - name: Prometheus + uid: prometheus + type: prometheus + access: proxy + isDefault: true + url: http://host.docker.internal:${RLINSIGHT_PROMETHEUS_PORT} + editable: true + - name: Tempo + uid: tempo + type: tempo + access: proxy + isDefault: false + url: http://host.docker.internal:${RLINSIGHT_TEMPO_QUERY_PORT} + editable: true diff --git a/experimental/config/services/tempo.yaml b/experimental/config/services/tempo.yaml new file mode 100644 index 0000000..c7163a2 --- /dev/null +++ b/experimental/config/services/tempo.yaml @@ -0,0 +1,26 @@ +server: + http_listen_port: 3200 + +distributor: + receivers: + otlp: + protocols: + http: + endpoint: 0.0.0.0:4318 + +ingester: + lifecycler: + ring: + replication_factor: 1 + +storage: + trace: + backend: local + local: + path: /tmp/tempo/traces + wal: + path: /tmp/tempo/wal + +compactor: + compaction: + block_retention: 24h diff --git a/experimental/utils/__init__.py b/experimental/utils/__init__.py new file mode 100644 index 0000000..8105e2d --- /dev/null +++ b/experimental/utils/__init__.py @@ -0,0 +1,40 @@ +# Copyright (c) 2026 verl-project authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Monitor utilities: Prometheus helpers, OTLP trace collector, constants.""" + +from .constants import MonitorBackend, MonitorEventKind +from .opentelemetry_utils import ( + OpenTelemetryTraceCollector, + resolve_otlp_traces_endpoint, +) +from .prometheus_utils import ( + PROMETHEUS_SCRAPE_JOB_NAME, + MetricRegistry, + merge_labels, + start_metrics_http_server, + update_prometheus_config, +) + +__all__ = [ + "MetricRegistry", + "MonitorBackend", + "MonitorEventKind", + "OpenTelemetryTraceCollector", + "PROMETHEUS_SCRAPE_JOB_NAME", + "merge_labels", + "resolve_otlp_traces_endpoint", + "start_metrics_http_server", + "update_prometheus_config", +] diff --git a/experimental/utils/constants.py b/experimental/utils/constants.py new file mode 100644 index 0000000..27aaf26 --- /dev/null +++ b/experimental/utils/constants.py @@ -0,0 +1,32 @@ +# Copyright (c) 2026 verl-project authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""String constants for monitor backends and event kinds.""" + +from __future__ import annotations + + +class MonitorBackend: + """Supported monitor transport implementations (``create_monitor_client`` dispatches on ``type``).""" + + RAY = "ray" + + +class MonitorEventKind: + """String ``kind`` field on events sent through ``MonitorHubActor.apply_event``.""" + + COUNTER = "counter" + GAUGE = "gauge" + HISTOGRAM = "histogram" + TRACE = "trace" diff --git a/experimental/utils/opentelemetry_utils.py b/experimental/utils/opentelemetry_utils.py new file mode 100644 index 0000000..78c55ca --- /dev/null +++ b/experimental/utils/opentelemetry_utils.py @@ -0,0 +1,132 @@ +# Copyright (c) 2026 verl-project authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""OpenTelemetry OTLP/HTTP trace export used by the monitor hub.""" + +from __future__ import annotations + +import logging +import os +from typing import Any + +from opentelemetry.exporter.otlp.proto.http.trace_exporter import OTLPSpanExporter +from opentelemetry.sdk.resources import SERVICE_NAME, Resource +from opentelemetry.sdk.trace import TracerProvider +from opentelemetry.sdk.trace.export import BatchSpanProcessor + +logger = logging.getLogger(__name__) + +__all__ = [ + "OpenTelemetryTraceCollector", + "resolve_otlp_traces_endpoint", +] + + +def resolve_otlp_traces_endpoint(endpoint: str | None = None) -> str | None: + """Normalize OTLP URL: strip whitespace, map empty to ``None``. + + Args: + endpoint: Raw endpoint string from config or environment. + """ + if not endpoint: + return None + return str(endpoint).strip() or None + + +def _normalize_attributes(attributes: dict[str, Any] | None) -> dict[str, Any]: + """Convert span attributes to OTLP-friendly scalars (other types stringified); drop ``None`` values.""" + normalized: dict[str, Any] = {} + if not attributes: + return normalized + + for key, value in attributes.items(): + key = str(key) + if value is None: + continue + if isinstance(value, (bool, int, float, str)): + normalized[key] = value + else: + normalized[key] = str(value) + return normalized + + +class OpenTelemetryTraceCollector: + """Build an OTLP/HTTP exporter and record closed root spans with explicit timestamps.""" + + def __init__(self, namespace: str = "", endpoint: str | None = None) -> None: + """ + Args: + namespace: Stored as ``service.namespace`` on the OpenTelemetry resource. + endpoint: OTLP traces URL; if missing, collector stays disabled and trace ops no-op. + """ + self._spans_recorded = 0 + self._enabled = False + resolved_endpoint = resolve_otlp_traces_endpoint(endpoint) + if not resolved_endpoint: + logger.warning( + "OpenTelemetry trace export is disabled because no OTLP endpoint is configured. " + "Trainers: set OTEL_EXPORTER_OTLP_TRACES_ENDPOINT or init dict key ``otel.traces_endpoint``. " + "Stack YAML: top-level ``otel.traces_endpoint`` (see bundled ``config/services/config.yaml``)." + ) + return + + resource_attributes = { + SERVICE_NAME: os.getenv("OTEL_SERVICE_NAME", "rl-insight-monitor"), + } + if namespace: + resource_attributes["service.namespace"] = namespace + + provider = TracerProvider( + resource=Resource.create(resource_attributes), + ) + exporter = OTLPSpanExporter(endpoint=resolved_endpoint) + provider.add_span_processor(BatchSpanProcessor(exporter)) + + self._provider = provider + self._tracer = provider.get_tracer(__name__) + self._enabled = True + + def record_span( + self, + name: str, + start_time_ns: int, + end_time_ns: int, + *, + attributes: dict[str, Any] | None = None, + ) -> None: + """Create one exported span from ``start_time_ns`` to ``end_time_ns`` (no-op if disabled). + + Args: + name: Span name. + start_time_ns: Start time in nanoseconds. + end_time_ns: End time in nanoseconds. + attributes: Optional span attributes (non-scalars coerced). + """ + if not self._enabled: + return + + span = self._tracer.start_span( + name=name, + start_time=start_time_ns, + attributes=_normalize_attributes(attributes), + ) + + span.end(end_time=end_time_ns) + self._spans_recorded += 1 + + def get_stats(self) -> dict[str, int]: + """Return simple counters (spans successfully handed to the SDK).""" + return { + "trace_spans_recorded": self._spans_recorded, + } diff --git a/experimental/utils/prometheus_utils.py b/experimental/utils/prometheus_utils.py new file mode 100644 index 0000000..4101842 --- /dev/null +++ b/experimental/utils/prometheus_utils.py @@ -0,0 +1,300 @@ +# Copyright (c) 2026 verl-project authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Prometheus metric registry, ``/metrics`` HTTP server, and scrape-config reload helpers.""" + +from __future__ import annotations + +import logging +import os +import socket +from typing import Any, Mapping + +import ray +import requests +import yaml +from omegaconf import DictConfig +from prometheus_client import Counter, Gauge, Histogram, start_http_server + +logger = logging.getLogger(__file__) +logger.setLevel(logging.WARNING) + +PROMETHEUS_SCRAPE_JOB_NAME = "trainer_metrics" + + +@ray.remote(num_cpus=0) +def _write_prometheus_config_file(config_data: dict[str, Any], path: str) -> bool: + """Ray task: write merged Prometheus YAML to ``path`` on a node.""" + os.makedirs(os.path.dirname(path) or ".", exist_ok=True) + with open(path, "w", encoding="utf-8") as f: + yaml.dump(config_data, f, default_flow_style=False, indent=2) + return True + + +@ray.remote(num_cpus=0) +def _reload_prometheus_on_node(port: int, reload_url: str | None = None) -> None: + """Ray task: POST Prometheus ``/-/reload`` on a node.""" + url = str(reload_url) if reload_url else None + if not url: + hostname = socket.gethostname() + ip_address = socket.gethostbyname(hostname) + url = f"http://{ip_address}:{int(port)}/-/reload" + try: + response = requests.post(url, timeout=10) + response.raise_for_status() + print(f"Reloading Prometheus on node: {url}") + except requests.RequestException as exc: + logger.warning("Prometheus reload failed at %s: %s", url, exc) + + +__all__ = [ + "MetricRegistry", + "PROMETHEUS_SCRAPE_JOB_NAME", + "merge_labels", + "start_metrics_http_server", + "update_prometheus_config", +] + + +def merge_labels( + defaults: Mapping[str, Any] | None, overrides: Mapping[str, Any] | None +) -> dict[str, str]: + """Merge Prometheus label dicts with string keys/values; ``overrides`` wins on duplicate keys. + + Args: + defaults: Base labels (optional). + overrides: Labels applied after defaults (optional). + """ + out: dict[str, str] = {} + if defaults: + out.update({str(k): str(v) for k, v in defaults.items()}) + if overrides: + out.update({str(k): str(v) for k, v in overrides.items()}) + return out + + +def start_metrics_http_server(port: int, addr: str = "") -> None: + """Start a background thread serving ``/metrics`` via ``prometheus_client``. + + Args: + port: TCP port to listen on. + addr: Bind address; empty may mean all interfaces depending on library defaults. + """ + start_http_server(port, addr=addr) + + +class MetricRegistry: + """Lazy registry of ``prometheus_client`` Counter/Gauge/Histogram objects keyed by metric name + label set.""" + + def __init__(self, namespace: str = "", subsystem: str = "") -> None: + """ + Args: + namespace: Passed as Prometheus metric namespace (prefix segment). + subsystem: Optional second prefix segment from ``prometheus_client`` API. + """ + self._namespace = namespace + self._subsystem = subsystem + self._counters: dict[tuple[str, tuple[str, ...]], object] = {} + self._gauges: dict[tuple[str, tuple[str, ...]], object] = {} + self._histograms: dict[tuple[str, tuple[str, ...]], object] = {} + + def _get_or_create_counter( + self, name: str, documentation: str, label_names: tuple[str, ...] + ): + """Return a cached or new ``Counter`` for ``(name, label_names)``.""" + key = (name, label_names) + if key not in self._counters: + self._counters[key] = Counter( + name, + documentation, + labelnames=label_names, + namespace=self._namespace, + subsystem=self._subsystem, + ) + return self._counters[key] + + def _get_or_create_gauge( + self, name: str, documentation: str, label_names: tuple[str, ...] + ): + """Return a cached or new ``Gauge`` for ``(name, label_names)``.""" + key = (name, label_names) + if key not in self._gauges: + self._gauges[key] = Gauge( + name, + documentation, + labelnames=label_names, + namespace=self._namespace, + subsystem=self._subsystem, + ) + return self._gauges[key] + + def _get_or_create_histogram( + self, + name: str, + documentation: str, + label_names: tuple[str, ...], + buckets: tuple[float, ...] | None, + ): + """Return a cached or new ``Histogram`` for ``(name, label_names)`` with optional bucket boundaries.""" + key = (name, label_names) + if key not in self._histograms: + kw = {} + if buckets is not None: + kw["buckets"] = buckets + self._histograms[key] = Histogram( + name, + documentation, + labelnames=label_names, + namespace=self._namespace, + subsystem=self._subsystem, + **kw, + ) + return self._histograms[key] + + def count( + self, + name: str, + documentation: str, + amount: float, + defaults: Mapping[str, Any] | None = None, + labels: Mapping[str, Any] | None = None, + ) -> None: + """Increment a counter, merging ``defaults`` and ``labels`` into the label set.""" + merged = merge_labels(defaults, labels) + names = tuple(sorted(merged.keys())) + counter = self._get_or_create_counter(name, documentation, names) + if merged: + counter.labels(**merged).inc(amount) + else: + counter.inc(amount) + + def value( + self, + name: str, + documentation: str, + value: float, + defaults: Mapping[str, Any] | None = None, + labels: Mapping[str, Any] | None = None, + ) -> None: + """Set a gauge to ``value`` with merged labels.""" + merged = merge_labels(defaults, labels) + names = tuple(sorted(merged.keys())) + gauge = self._get_or_create_gauge(name, documentation, names) + if merged: + gauge.labels(**merged).set(value) + else: + gauge.set(value) + + def distribution( + self, + name: str, + documentation: str, + value: float, + defaults: Mapping[str, Any] | None = None, + labels: Mapping[str, Any] | None = None, + buckets: tuple[float, ...] | None = None, + ) -> None: + """Observe ``value`` on a histogram with merged labels and optional ``buckets``.""" + merged = merge_labels(defaults, labels) + names = tuple(sorted(merged.keys())) + histogram = self._get_or_create_histogram(name, documentation, names, buckets) + if merged: + histogram.labels(**merged).observe(value) + else: + histogram.observe(value) + + +def update_prometheus_config( + config: Mapping[str, Any] | DictConfig | None, + server_addresses: list[str], + job_name: str | None = None, +) -> None: + """Rewrite on-disk Prometheus scrape config for ``server_addresses`` and POST ``/-/reload`` on each Ray node. + + Args: + config: Trainer monitor config (or ``None`` for defaults); used for paths and ``reload.mode``. + server_addresses: ``host:port`` targets (typically the hub ``/metrics`` endpoints). + job_name: Override scrape job name; default uses ``PROMETHEUS_SCRAPE_JOB_NAME``. + + Note: + No-op when ``prometheus.reload.mode`` is ``none`` or unsupported; requires a running Ray cluster for reload path. + """ + if not server_addresses: + logger.warning("No server addresses available to update Prometheus config") + return + + from ..config.config import load_monitor_config + + conf = load_monitor_config(config) + mode = str(conf.prometheus.reload.mode).strip().lower() + if mode == "none": + return + if mode != "ray": + logger.warning( + "prometheus.reload.mode is %r; only %r is supported. Skipping Prometheus scrape update and reload.", + mode, + "ray", + ) + return + + resolved_job = job_name or PROMETHEUS_SCRAPE_JOB_NAME + prom_file = str(conf.prometheus.config_file) + reload_port = int(conf.prometheus.prometheus_port) + reload_url = None + + try: + with open(prom_file, encoding="utf-8") as f: + prometheus_data = yaml.safe_load(f) or {} + scrape_configs = prometheus_data.setdefault("scrape_configs", []) + new_job = { + "job_name": resolved_job, + "static_configs": [{"targets": server_addresses}], + } + for i, sc in enumerate(scrape_configs): + if sc.get("job_name") == resolved_job: + scrape_configs[i] = new_job + break + else: + scrape_configs.append(new_job) + except Exception as e: + logger.error("Failed to read or merge Prometheus config %s: %s", prom_file, e) + return + + try: + alive_nodes = [node for node in ray.nodes() if node["Alive"]] + ray.get( + [ + _write_prometheus_config_file.options( + resources={"node:" + node["NodeManagerAddress"]: 0.001} + ).remote(prometheus_data, prom_file) + for node in alive_nodes + ] + ) + + print( + f"Updated Prometheus configuration at {prom_file} with {len(server_addresses)} targets " + f"(job_name={resolved_job})" + ) + + ray.get( + [ + _reload_prometheus_on_node.options( + resources={"node:" + node["NodeManagerAddress"]: 0.001} + ).remote(reload_port, reload_url) + for node in alive_nodes + ] + ) + + except Exception as e: + logger.error("Failed to update Prometheus configuration: %s", e) diff --git a/pyproject.toml b/pyproject.toml index 6525f6d..a94a8f1 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -17,16 +17,35 @@ dependencies = [ "plotly", "matplotlib", "pytest", - "loguru" + "loguru", + "ray", + "omegaconf", + "prometheus_client", + "opentelemetry-sdk", + "opentelemetry-exporter-otlp-proto-http", + "pyyaml", ] +[project.scripts] +rl-insight = "experimental.cli:main" + [project.urls] Homepage = "https://github.com/verl-project/rl-insight" Source = "https://github.com/verl-project/rl-insight" [tool.setuptools.packages.find] where = ["."] -include = ["rl_insight"] +include = ["rl_insight", "rl_insight.*", "experimental", "experimental.*"] + +[tool.setuptools.package-data] +experimental = [ + "config/*.yaml", + "config/*.yml", + "config/*.ini", + "config/services/*.yaml", + "config/services/*.yml", + "config/services/*.ini" +] [tool.pytest.ini_options] pythonpath = ["."] diff --git a/requirements.txt b/requirements.txt index cc8b83e..f3c7a54 100644 --- a/requirements.txt +++ b/requirements.txt @@ -7,4 +7,10 @@ pytest loguru kaleido torch==2.7.1 -requests \ No newline at end of file +requests +ray +omegaconf +prometheus_client +opentelemetry-sdk +opentelemetry-exporter-otlp-proto-http +pyyaml diff --git a/rl_insight/__init__.py b/rl_insight/__init__.py index 893db19..1150b93 100644 --- a/rl_insight/__init__.py +++ b/rl_insight/__init__.py @@ -18,6 +18,16 @@ This package exposes built-in parser classes and a CLI entry helper. """ +from experimental import ( + close, + init, + metric_count, + metric_distribution, + metric_value, + trace_op, + trace_state, + update_prometheus_config, +) from .parser import MstxClusterParser, TorchClusterParser, NvtxClusterParser @@ -28,4 +38,17 @@ def main(): return _main() -__all__ = ["MstxClusterParser", "TorchClusterParser", "NvtxClusterParser", "main"] +__all__ = [ + "MstxClusterParser", + "TorchClusterParser", + "NvtxClusterParser", + "main", + "init", + "close", + "metric_count", + "metric_value", + "metric_distribution", + "trace_op", + "trace_state", + "update_prometheus_config", +]