[pipeline] feat: rl insight support online monitor#53
Conversation
There was a problem hiding this comment.
Code Review
This pull request introduces an experimental online monitoring system for RL-Insight, leveraging Ray for event collection and Prometheus/OpenTelemetry for observability. It provides a high-level Python API for recording metrics and traces, a CLI for managing backend services via Docker Compose, and a central Ray actor to aggregate data. The review feedback highlights several performance and portability improvements, specifically recommending asynchronous event submission to avoid blocking the training loop, using batch processing for OpenTelemetry spans, and replacing external 'curl' dependencies with native Python libraries for better cross-environment compatibility.
| ref = self._actor.apply_event.remote(event) | ||
| ray.get(ref) |
There was a problem hiding this comment.
Calling ray.get() on every event submission makes monitoring synchronous and blocking. This can significantly degrade the performance of the training loop, especially when recording metrics or traces at high frequency. Since monitoring events are typically fire-and-forget, it is recommended to remove the ray.get() call to allow asynchronous submission via Ray.
| ref = self._actor.apply_event.remote(event) | |
| ray.get(ref) | |
| self._actor.apply_event.remote(event) |
| resource=self._otel.Resource.create(resource_attributes), | ||
| ) | ||
| exporter = self._otel.OTLPSpanExporter(endpoint=resolved_endpoint) | ||
| provider.add_span_processor(self._otel.SimpleSpanProcessor(exporter)) |
There was a problem hiding this comment.
SimpleSpanProcessor exports spans synchronously as they are ended, which can block the collector's execution thread if the OTLP endpoint is slow or under load. Using BatchSpanProcessor is recommended for better performance as it buffers spans and exports them in the background. Note that you will also need to update the imports and the SimpleNamespace in _require_opentelemetry to use BatchSpanProcessor.
| provider.add_span_processor(self._otel.SimpleSpanProcessor(exporter)) | |
| provider.add_span_processor(self._otel.BatchSpanProcessor(exporter)) |
| import subprocess | ||
|
|
||
| if not reload_url: | ||
| hostname = socket.gethostname() | ||
| ip_address = socket.gethostbyname(hostname) | ||
| reload_url = f"http://{ip_address}:{port}/-/reload" | ||
|
|
||
| try: | ||
| subprocess.run(["curl", "-X", "POST", reload_url], capture_output=True, text=True, timeout=10) | ||
| print(f"Reloading Prometheus on node: {reload_url}") |
There was a problem hiding this comment.
Using subprocess.run(["curl", ...]) introduces an external dependency on the curl binary being present on all nodes in the cluster. It is more portable and robust to use Python's built-in urllib.request to perform the POST request for reloading Prometheus.
| import subprocess | |
| if not reload_url: | |
| hostname = socket.gethostname() | |
| ip_address = socket.gethostbyname(hostname) | |
| reload_url = f"http://{ip_address}:{port}/-/reload" | |
| try: | |
| subprocess.run(["curl", "-X", "POST", reload_url], capture_output=True, text=True, timeout=10) | |
| print(f"Reloading Prometheus on node: {reload_url}") | |
| import urllib.request | |
| if not reload_url: | |
| hostname = socket.gethostname() | |
| ip_address = socket.gethostbyname(hostname) | |
| reload_url = f"http://{ip_address}:{port}/-/reload" | |
| try: | |
| req = urllib.request.Request(reload_url, method="POST") | |
| with urllib.request.urlopen(req, timeout=10): | |
| print(f"Reloading Prometheus on node: {reload_url}") |
3d78032 to
981188a
Compare
| @@ -0,0 +1,133 @@ | |||
| """Trainer vs observability-stack paths and loaders for RL-Insight monitoring.""" | |||
|
|
|||
| from __future__ import annotations | |||
There was a problem hiding this comment.
RL-Insight之前一直使用简单的argsparser来控制参数,随着项目规模上升,确实需要考虑omegaconf来管理。个人感受是verl的方案稍微有点复杂, RL-Insight可以尝试更简单直接得使用omegaconf来管理配置。
| @@ -0,0 +1,122 @@ | |||
| # RL-Insight Monitor | |||
There was a problem hiding this comment.
建议进行一些高并发测试,明确一下当前基于ray进行数据后端传输的负载上限。(也许我们当前的需求远不触及)
| |---|---:|---| | ||
| | `namespace` | `rl_insight_monitor` | 指标 / trace 业务命名空间 | | ||
| | `backend.type` | `ray` | 当前只支持 `ray` | | ||
| | `prometheus.metrics_report_port` | `9092` | monitor hub 暴露 `/metrics` 的端口 | |
There was a problem hiding this comment.
这个后续可以作为一个feature,扩展下,理论上支持
| [tool.setuptools.packages.find] | ||
| where = ["."] | ||
| include = ["rl_insight"] | ||
| include = ["rl_insight", "rl_insight.*", "experimental", "experimental.*"] |
There was a problem hiding this comment.
Monitor引入了不少的环境依赖,及时整理,更新readme、toml、requirements等。
|
|
||
|
|
||
| @ray.remote() | ||
| class MonitorHubActor: |
There was a problem hiding this comment.
Collector要支持替换的话,建议提供一下基类。或者在文档或者代码中进行必要的接口说明,区分collector能力和raycollector的代码。
| return MonitorRayClient(handle) | ||
|
|
||
|
|
||
| class MonitorRayClient: |
There was a problem hiding this comment.
Client的定位是什么,我一时没有很好理解。建议刷新一下rfc中的框架
| @@ -0,0 +1,4 @@ | |||
| global: | |||
There was a problem hiding this comment.
我们在grafana中应该有自定义的布局文件吧,这部分内容也可以上库
There was a problem hiding this comment.
有的,后续确认好第一版verl打点位置以及可视化效果后上库,随版本发布
| start_http_server(port, addr=addr) | ||
|
|
||
|
|
||
| class MetricRegistry: |
There was a problem hiding this comment.
目前的代码暂时应该只是做数据的采集和展示。后续如果需要进行数据分析和处理等能力,请参考offline pipiline的parser接口去实现功能
There was a problem hiding this comment.
目前数据的可视化处理主要在grafana前端配置,后续可以考虑再collector后端添加数据处理类
| @@ -0,0 +1,288 @@ | |||
| # Copyright 2026 Meituan Ltd. and/or its affiliates | |||
There was a problem hiding this comment.
对prometheusd的定位有点困惑,数据后端汇总通过collector实现,数据前端展示通过grafna。prometheus似乎只完成了通用指标的传输收集工作,有可能化简吗?
There was a problem hiding this comment.
promethues服务需要有一个配置文件记录那些ip地址需要监控,此工具类提供用户添加ip地址至配置文件的函数
0687bef to
d8ea87d
Compare
7ac27f4 to
772fb2b
Compare
00777ad to
7acb81f
Compare
|
/gemini review |
There was a problem hiding this comment.
Code Review
This pull request introduces the RL-Insight Monitor, an experimental observability stack for RL training metrics and traces utilizing Prometheus, Tempo, and Grafana. It includes a new CLI for managing the Docker Compose stack, a Ray-based monitor hub actor for event collection, and a suite of Python APIs for reporting metrics and spans. Feedback highlights several improvement opportunities: switching to BatchSpanProcessor to avoid blocking the single-threaded hub actor during trace exports, enhancing URL parsing robustness in the CLI, moving Ray tasks to the module level to prevent redundant registration, and replacing curl with the requests library for better portability when reloading Prometheus.
| resource=Resource.create(resource_attributes), | ||
| ) | ||
| exporter = OTLPSpanExporter(endpoint=resolved_endpoint) | ||
| provider.add_span_processor(SimpleSpanProcessor(exporter)) |
There was a problem hiding this comment.
Using SimpleSpanProcessor results in synchronous span exports. Since the MonitorHubActor (which uses this collector) is single-threaded and processes events sequentially, every trace event will block the hub until the OTLP export (HTTP POST) completes. This can significantly limit the event processing throughput of the monitoring system. It is highly recommended to use BatchSpanProcessor instead, which exports spans asynchronously in batches.
| provider.add_span_processor(SimpleSpanProcessor(exporter)) | |
| from opentelemetry.sdk.trace.export import BatchSpanProcessor | |
| provider.add_span_processor(BatchSpanProcessor(exporter)) |
|
|
||
| def _otlp_http_publish_port(traces_endpoint: str) -> int: | ||
| """Publish host port implied by ``otel.traces_endpoint``.""" | ||
| parsed = urlparse(traces_endpoint.strip()) |
There was a problem hiding this comment.
urlparse may fail to correctly identify the port if the traces_endpoint string does not include a scheme (e.g., "127.0.0.1:4318"). In such cases, parsed.port will be None. It's safer to ensure the endpoint has a scheme before parsing.
endpoint = traces_endpoint.strip()
if "://" not in endpoint:
endpoint = f"http://{endpoint}"
parsed = urlparse(endpoint)| @ray.remote(num_cpus=0) | ||
| def write_config_file(config_data, path): | ||
| os.makedirs(os.path.dirname(path) or ".", exist_ok=True) | ||
| with open(path, "w", encoding="utf-8") as f: | ||
| yaml.dump(config_data, f, default_flow_style=False, indent=2) | ||
| return True | ||
|
|
||
| # Ray task: ask node's Prometheus HTTP API to reload configuration. | ||
| @ray.remote(num_cpus=0) | ||
| def reload_prometheus(port, r_url=None): | ||
| url = str(r_url) if r_url else None | ||
| if not url: | ||
| hostname = socket.gethostname() | ||
| ip_address = socket.gethostbyname(hostname) | ||
| url = f"http://{ip_address}:{int(port)}/-/reload" | ||
| try: | ||
| subprocess.run( | ||
| ["curl", "-X", "POST", url], | ||
| capture_output=True, | ||
| text=True, | ||
| timeout=10, | ||
| ) | ||
| print(f"Reloading Prometheus on node: {url}") | ||
| except Exception: | ||
| pass |
There was a problem hiding this comment.
Defining Ray tasks (write_config_file, reload_prometheus) inside the update_prometheus_config function causes them to be redefined and re-registered with Ray every time the function is called. This is inefficient and can lead to issues in Ray's task management. These tasks should be moved to the module level.
| subprocess.run( | ||
| ["curl", "-X", "POST", url], | ||
| capture_output=True, | ||
| text=True, | ||
| timeout=10, | ||
| ) |
There was a problem hiding this comment.
Using subprocess.run(["curl", ...]) to reload Prometheus is less portable and harder to debug than using a Python library. Since requests is already a project dependency, it should be used here instead. This also avoids potential FileNotFoundError if curl is not installed on the Ray nodes.
| subprocess.run( | |
| ["curl", "-X", "POST", url], | |
| capture_output=True, | |
| text=True, | |
| timeout=10, | |
| ) | |
| import requests | |
| try: | |
| requests.post(url, timeout=10).raise_for_status() | |
| print(f"Reloading Prometheus on node: {url}") | |
| except Exception as e: | |
| print(f"Failed to reload Prometheus on node {url}: {e}") |
|
Merge this PR as an experimental feature for further optimization and iteration in practical use. Track relevant requirements in the roadmap. #49 |
What does this PR do?
This PR introduces an experimental monitoring flow for RL-Insight in three parts:
rl-insight server start/rl-insight server stopfor managing the observability stackChecklist Before Starting
[{modules}] {type}: {description}(This will be checked by the CI){modules}includepipeline,parser,visualizer,data,deployment,perf,algo,env,doc,cfg,ci,misc,like[mstx, ci]{type}is infeat,fix,refactor,chore,test[BREAKING]to the beginning of the title.[BREAKING][mstx, torch_profile] feat: support timeline parsingTest
rl-insight server startPrometheusandTempodatasourcesCtrl+Cstops the full stack in foreground moderl-insight server stopworks from another terminalAPI and Usage Example
The following monitoring APIs are available from rl_insight:
Initialize the training-side monitor client.
Reset local monitoring state in the current process.
Report a counter metric.
Report a gauge metric.
Report a histogram metric.
Record a state interval as a root span.
Decorator for recording operation duration spans.
Update Prometheus scrape targets and trigger reload when supported.
Design & Code Changes
This architecture shows the end-to-end online monitoring flow in rl-insight.
The RL framework first integrates with the rl insight api, which provides lightweight collection APIs for function, variable, metric, and state monitoring. The API aggregates training-side monitoring events and forwards them to the rl insight data collector. The collector is responsible for backend data collection and processing, and can also interact with distributed runtime components such as Ray through RPC.
After aggregation and processing, the collector reports metrics and traces to the rl insight server. The server manages the observability stack and connects Prometheus for metrics, Tempo for traces, and Grafana for visualization. In this way, rl-insight separates instrumentation, collection, storage, and visualization into clear layers.
Checklist Before Submitting
Important
Please check all the following items before requesting a review, otherwise the reviewer might deprioritize this PR for review.
pre-commit install && pre-commit run --all-files --show-diff-on-failure --color=always