[pipeline] feat: rl online monitor statetimeline optime#61
[pipeline] feat: rl online monitor statetimeline optime#61mengchengTang wants to merge 1 commit into
Conversation
There was a problem hiding this comment.
Code Review
This pull request introduces state interval trace merging in the Ray monitor hub, allowing overlapping state intervals to be merged before exporting them as OTLP spans. The review highlights two critical issues: first, direct dictionary access of state_lane_id and other keys could raise a KeyError and crash the shared actor, requiring defensive .get() fallbacks; second, buffered traces in _state_pending are never flushed at the end of a run, leading to data loss for the final state interval, which should be resolved by adding a public flush method.
| def _handle_state_interval_trace( | ||
| self, event: dict[str, Any], attrs: dict[str, Any] | ||
| ) -> None: | ||
| """Merge overlapping state intervals per (lane, name); flush on gap.""" | ||
| key = (str(attrs["state_lane_id"]), event["name"]) | ||
| start_ns = int(event["start_time_ns"]) | ||
| end_ns = int(event["end_time_ns"]) | ||
|
|
||
| pending = self._state_pending.get(key) | ||
| if pending is not None and start_ns > pending["end_ns"]: | ||
| self._export_trace_span( | ||
| pending["name"], | ||
| pending["start_ns"], | ||
| pending["end_ns"], | ||
| pending["attributes"], | ||
| ) | ||
| pending = None | ||
|
|
||
| if pending is None: | ||
| self._state_pending[key] = { | ||
| "name": event["name"], | ||
| "start_ns": start_ns, | ||
| "end_ns": end_ns, | ||
| "attributes": attrs, | ||
| } | ||
| return | ||
|
|
||
| pending["start_ns"] = min(pending["start_ns"], start_ns) | ||
| pending["end_ns"] = max(pending["end_ns"], end_ns) | ||
|
|
There was a problem hiding this comment.
The _handle_state_interval_trace method accesses attrs["state_lane_id"] directly. If state_lane_id is missing from the attributes dictionary, this will raise a KeyError and potentially crash the shared MonitorHubActor. To ensure robustness, use defensive programming by using .get() with a default fallback value.
def _handle_state_interval_trace(
self, event: dict[str, Any], attrs: dict[str, Any]
) -> None:
"""Merge overlapping state intervals per (lane, name); flush on gap."""
lane_id = attrs.get("state_lane_id", "unknown")
key = (str(lane_id), event.get("name", "unknown"))
start_ns = int(event.get("start_time_ns", 0))
end_ns = int(event.get("end_time_ns", 0))
pending = self._state_pending.get(key)
if pending is not None and start_ns > pending["end_ns"]:
self._export_trace_span(
pending["name"],
pending["start_ns"],
pending["end_ns"],
pending["attributes"],
)
pending = None
if pending is None:
self._state_pending[key] = {
"name": event.get("name", "unknown"),
"start_ns": start_ns,
"end_ns": end_ns,
"attributes": attrs,
}
return
pending["start_ns"] = min(pending["start_ns"], start_ns)
pending["end_ns"] = max(pending["end_ns"], end_ns)| def _export_trace_span( | ||
| self, | ||
| name: str, | ||
| start_time_ns: int, | ||
| end_time_ns: int, | ||
| attributes: dict[str, Any], | ||
| ) -> None: | ||
| """Export one root span via OTLP (no-op if collector disabled).""" | ||
| if self._trace_collector is None: | ||
| return | ||
| self._trace_collector.record_span( | ||
| name, | ||
| start_time_ns, | ||
| end_time_ns, | ||
| attributes=attributes, | ||
| ) |
There was a problem hiding this comment.
Because state interval traces are buffered in self._state_pending and only flushed when a subsequent span starts after a gap, the final state interval for each lane will never be exported to OTLP. This results in data loss for the last state of the training run. Adding a public flush method allows clients or the lifecycle manager to flush any remaining pending spans at the end of the run.
def _export_trace_span(
self,
name: str,
start_time_ns: int,
end_time_ns: int,
attributes: dict[str, Any],
) -> None:
"""Export one root span via OTLP (no-op if collector disabled)."""
if self._trace_collector is None:
return
self._trace_collector.record_span(
name,
start_time_ns,
end_time_ns,
attributes=attributes,
)
def flush(self) -> None:
"""Flush all pending state interval traces to OTLP."""
for key, pending in list(self._state_pending.items()):
self._export_trace_span(
pending["name"],
pending["start_ns"],
pending["end_ns"],
pending["attributes"],
)
self._state_pending.clear()
What does this PR do?
Summary
Merge overlapping trace_state intervals in MonitorHubActor before exporting to Tempo, so Grafana state timelines show one row per process instead of many overlapping spans.
Changes
Add _state_pending to buffer in-flight state intervals keyed by (state_lane_id, state_name).
For state_interval traces, merge overlapping spans into a single envelope [min(start), max(end)].
Flush the pending interval to OTLP when a new span starts after a gap (non-overlapping start time).
Other trace types (e.g. trace_op) are exported unchanged.
Checklist Before Starting
[{modules}] {type}: {description}(This will be checked by the CI){modules}includepipeline,parser,visualizer,data,deployment,perf,algo,env,doc,cfg,ci,misc,like[mstx, ci]{type}is infeat,fix,refactor,chore,test[BREAKING]to the beginning of the title.[BREAKING][mstx, torch_profile] feat: support timeline parsingTest
API and Usage Example
# Add code snippet or script demonstrating how to use thisDesign & Code Changes
Checklist Before Submitting
Important
Please check all the following items before requesting a review, otherwise the reviewer might deprioritize this PR for review.
pre-commit install && pre-commit run --all-files --show-diff-on-failure --color=always