From c087ab21fa26953922ef194387b4b2cb685c008b Mon Sep 17 00:00:00 2001 From: hyeokjun32 Date: Sat, 23 May 2026 01:06:10 +0900 Subject: [PATCH] Add orchestrator telemetry feed ingestion --- README.md | 12 ++ docs/runtime-telemetry-history.md | 34 +++++ inferedge_env/cli.py | 22 +++ inferedge_env/compare/regression.py | 20 +++ inferedge_env/result/telemetry_history.py | 152 +++++++++++++++++-- tests/test_regression.py | 108 ++++++++++++++ tests/test_runtime_telemetry_history.py | 170 ++++++++++++++++++++++ 7 files changed, 507 insertions(+), 11 deletions(-) diff --git a/README.md b/README.md index bc55f60..2793f86 100644 --- a/README.md +++ b/README.md @@ -297,6 +297,18 @@ Use `edgeenv runs telemetry export-history --output ` to aggregate registered run telemetry into an `edgeenv.runtime-telemetry-history.v1` JSON artifact. The export records missing telemetry as an evidence gap and remains local replay evidence, not production monitoring. +If an InferEdgeOrchestrator sustained run produced an +`edgeenv_runtime_telemetry_feed` artifact, attach it during export: + +```bash +edgeenv runs telemetry export-history \ + --orchestrator-feed /tmp/orchestrator-edgeenv-feed.json \ + --output /tmp/edgeenv-runtime-telemetry-history.json +``` + +The feed is stored as supplemental operation context for the matching run ID. +It does not replace Runtime telemetry, change comparability, or act as a +regression judgement. Use `edgeenv runs telemetry inspect-history ` to validate and summarize that replay artifact before attaching it to a regression report. The intended local flow is export history, inspect the replay artifact, then pass it to diff --git a/docs/runtime-telemetry-history.md b/docs/runtime-telemetry-history.md index 413f04a..c2abf03 100644 --- a/docs/runtime-telemetry-history.md +++ b/docs/runtime-telemetry-history.md @@ -53,6 +53,28 @@ History export command: edgeenv runs telemetry export-history --output /tmp/edgeenv-runtime-telemetry-history.json ``` +Optional Orchestrator operation context can be attached when the feed comes +from InferEdgeOrchestrator's EdgeEnv handoff contract: + +```bash +edgeenv runs telemetry export-history \ + --orchestrator-feed /tmp/orchestrator-edgeenv-feed.json \ + --output /tmp/edgeenv-runtime-telemetry-history.json +``` + +The feed schema is: + +```text +inferedge-orchestrator-edgeenv-runtime-telemetry-feed-v1 +``` + +EdgeEnv only accepts this feed when it explicitly declares +`not_a_regression_judgement=true` and `not_a_comparability_gate=true`. The feed +is then preserved under the matching history entry as +`orchestrator_operation_context`. It does not replace `runtime_telemetry`, does +not turn missing telemetry into a successful telemetry run, and does not change +the same-condition comparability gate. + Replay validation command: ```bash @@ -74,6 +96,11 @@ The history artifact uses this top-level shape: "run_id": "run-20260522-000000-12345678", "runtime_telemetry": { "schema_version": "inferedge-runtime-telemetry-v1" + }, + "orchestrator_operation_context": { + "schema_version": "inferedge-orchestrator-edgeenv-runtime-telemetry-feed-v1", + "not_a_regression_judgement": true, + "not_a_comparability_gate": true } } ], @@ -128,6 +155,11 @@ Replay edge cases are preserved as evidence context: preserves both result-side and history-side sequence IDs. This does not change comparability or regression math; downstream diagnosis can treat it as deterministic review context. +- If an Orchestrator feed is attached, the regression report exposes it under + the matching run's runtime telemetry context as supplemental operation + evidence. Queue depth, deadline/fallback, and resource hints remain context + for downstream review; EdgeEnv still owns only comparability-first regression + analysis. Optional AIGuard handoff: @@ -149,6 +181,7 @@ remains the final deployment decision owner. - Do not describe this as production observability, cloud monitoring, distributed tracing, or real-time data drift detection. - Do not use telemetry to bypass the existing comparability-first regression policy. - Do not treat `inspect-history` as a live health check; it only validates a local replay artifact. +- Do not use Orchestrator operation feed context as a substitute for Runtime telemetry or Lab deployment judgement. ## 5. WHERE — Role In The InferEdge Flow @@ -159,6 +192,7 @@ Current flow: ```text Runtime result -> EdgeEnv result.json + runtime_telemetry.json +-> optional Orchestrator edgeenv_runtime_telemetry_feed context -> EdgeEnv export/import replay seed -> EdgeEnv runtime telemetry history artifact -> EdgeEnv inspect-history replay validation diff --git a/inferedge_env/cli.py b/inferedge_env/cli.py index e7f854d..62ffd96 100644 --- a/inferedge_env/cli.py +++ b/inferedge_env/cli.py @@ -382,6 +382,14 @@ def export_runtime_telemetry_history( "--edgeenv-root", help="Directory for EdgeEnv artifacts and registry.", ), + orchestrator_feeds: Optional[list[Path]] = typer.Option( + None, + "--orchestrator-feed", + help=( + "Optional InferEdgeOrchestrator EdgeEnv telemetry feed JSON to attach " + "as supplemental operation context. Repeat for multiple run IDs." + ), + ), ) -> None: """Export local runtime telemetry evidence as a replayable history artifact.""" try: @@ -389,6 +397,7 @@ def export_runtime_telemetry_history( edgeenv_root, output_path, run_ids=run_ids, + orchestrator_feeds=orchestrator_feeds, ) except (RuntimeTelemetryHistoryError, OSError) as exc: _fail(str(exc), hint=_telemetry_history_error_hint(str(exc))) @@ -398,6 +407,9 @@ def export_runtime_telemetry_history( console.print(f"Runs scanned: {summary['registered_runs']}") console.print(f"Telemetry entries: {summary['telemetry_runs']}") console.print(f"Missing telemetry: {summary['missing_telemetry_runs']}") + console.print( + f"Orchestrator context entries: {summary.get('orchestrator_feed_runs', 0)}" + ) console.print( "Scope: local replay evidence; not production monitoring.", soft_wrap=True, @@ -428,6 +440,10 @@ def inspect_runtime_telemetry_history_command( console.print(f"Schema: {summary['schema_version']}") console.print(f"Replay runs: {len(replay['run_ids'])}") console.print(f"Telemetry fields: {', '.join(replay['telemetry_fields']) or '-'}") + console.print( + "Orchestrator context runs: " + f"{len(replay.get('orchestrator_context_run_ids', []))}" + ) console.print(f"Evidence gaps: {replay['evidence_gap_count']}") console.print(f"Missing run IDs: {', '.join(replay['missing_run_ids']) or '-'}") console.print( @@ -1205,6 +1221,12 @@ def _import_error_hint(message: str) -> str: def _telemetry_history_error_hint(message: str) -> str: + if "Orchestrator telemetry feed" in message: + return ( + "Attach only EdgeEnv runtime telemetry feed artifacts produced by " + "InferEdgeOrchestrator for run IDs included in this export. The feed " + "is supplemental operation context and never replaces runtime telemetry." + ) if "Run not found" in message: return ( "Use `edgeenv runs list` to find registered run IDs, or omit " diff --git a/inferedge_env/compare/regression.py b/inferedge_env/compare/regression.py index 3c65360..6212ec5 100644 --- a/inferedge_env/compare/regression.py +++ b/inferedge_env/compare/regression.py @@ -244,6 +244,7 @@ def _maybe_runtime_telemetry_context( "Runtime telemetry context is supplemental evidence, not a comparability gate.", "Missing telemetry is an evidence gap, not a failed benchmark run.", "Regression deltas are still gated by same-condition comparability.", + "Orchestrator operation context is supplemental evidence, not a regression judgement.", ], } if telemetry_history is not None: @@ -321,11 +322,30 @@ def _telemetry_run_context( context["history_execution_sequence_id"] = history_entry.get( "execution_sequence_id" ) + _attach_orchestrator_context(context, history_entry) if missing_entry is not None: context["history_missing_reason"] = missing_entry.get("reason") + _attach_orchestrator_context(context, missing_entry) return context +def _attach_orchestrator_context( + context: dict[str, Any], + history_item: dict[str, Any], +) -> None: + orchestrator_context = history_item.get("orchestrator_operation_context") + if not isinstance(orchestrator_context, dict): + return + candidate_context = orchestrator_context.get("candidate_context") + if not isinstance(candidate_context, dict): + candidate_context = {} + context["orchestrator_context_present"] = True + context["orchestrator_operation_context"] = orchestrator_context + context["orchestrator_available_sections"] = sorted( + str(key) for key in candidate_context.keys() + ) + + def _telemetry_source(telemetry: dict[str, Any]) -> str | None: resource = telemetry.get("resource") if not isinstance(resource, dict): diff --git a/inferedge_env/result/telemetry_history.py b/inferedge_env/result/telemetry_history.py index ec06913..816cee3 100644 --- a/inferedge_env/result/telemetry_history.py +++ b/inferedge_env/result/telemetry_history.py @@ -1,6 +1,7 @@ from __future__ import annotations import json +from copy import deepcopy from datetime import datetime, timezone from pathlib import Path from typing import Any @@ -12,6 +13,9 @@ RUNTIME_TELEMETRY_HISTORY_SCHEMA_VERSION = "edgeenv.runtime-telemetry-history.v1" +ORCHESTRATOR_TELEMETRY_FEED_SCHEMA_VERSION = ( + "inferedge-orchestrator-edgeenv-runtime-telemetry-feed-v1" +) class RuntimeTelemetryHistoryError(ValueError): @@ -23,25 +27,37 @@ def build_runtime_telemetry_history( *, run_ids: list[str] | None = None, generated_at: datetime | None = None, + orchestrator_feeds: list[Path | str] | None = None, ) -> dict[str, Any]: root = Path(edgeenv_root) registry = RunRegistry(root / "runs.db") records = _select_records(registry, run_ids) generated = generated_at or datetime.now(timezone.utc) + orchestrator_contexts = _load_orchestrator_feeds(orchestrator_feeds) + _validate_orchestrator_feed_scope(orchestrator_contexts, records) entries: list[dict[str, Any]] = [] - missing: list[dict[str, str]] = [] + missing: list[dict[str, Any]] = [] for record in records: result = _load_record_result(record) if result.runtime_telemetry is None: - missing.append( - { - "run_id": result.run_id, - "reason": "runtime_telemetry_missing", - } - ) + missing_entry: dict[str, Any] = { + "run_id": result.run_id, + "reason": "runtime_telemetry_missing", + } + orchestrator_context = orchestrator_contexts.get(result.run_id) + if orchestrator_context is not None: + missing_entry["orchestrator_operation_context"] = ( + orchestrator_context + ) + missing.append(missing_entry) continue - entries.append(_history_entry(result)) + entries.append( + _history_entry( + result, + orchestrator_context=orchestrator_contexts.get(result.run_id), + ) + ) entries.sort( key=lambda entry: ( @@ -64,6 +80,7 @@ def build_runtime_telemetry_history( "registered_runs": len(records), "telemetry_runs": len(entries), "missing_telemetry_runs": len(missing), + "orchestrator_feed_runs": len(orchestrator_contexts), }, "runs": entries, "missing_telemetry": missing, @@ -71,6 +88,7 @@ def build_runtime_telemetry_history( "Runtime telemetry history is local replay evidence, not production monitoring.", "Missing telemetry is recorded as an evidence gap, not a failed benchmark run.", "Comparability-first regression analysis must still run before delta judgement.", + "Orchestrator feed context is supplemental operation evidence, not a regression judgement.", ], } @@ -80,8 +98,13 @@ def write_runtime_telemetry_history( output_path: Path | str, *, run_ids: list[str] | None = None, + orchestrator_feeds: list[Path | str] | None = None, ) -> dict[str, Any]: - payload = build_runtime_telemetry_history(edgeenv_root, run_ids=run_ids) + payload = build_runtime_telemetry_history( + edgeenv_root, + run_ids=run_ids, + orchestrator_feeds=orchestrator_feeds, + ) destination = Path(output_path) destination.parent.mkdir(parents=True, exist_ok=True) destination.write_text( @@ -151,6 +174,16 @@ def validate_runtime_telemetry_history( "Runtime telemetry history " f"runs[{index}].runtime_telemetry must be an object: {label}" ) + orchestrator_context = entry.get("orchestrator_operation_context") + if orchestrator_context is not None and not isinstance( + orchestrator_context, + dict, + ): + raise RuntimeTelemetryHistoryError( + "Runtime telemetry history " + f"runs[{index}].orchestrator_operation_context must be an object: " + f"{label}" + ) def inspect_runtime_telemetry_history(payload: dict[str, Any]) -> dict[str, Any]: @@ -175,6 +208,11 @@ def inspect_runtime_telemetry_history(payload: dict[str, Any]) -> dict[str, Any] "replay": { "run_ids": run_ids, "telemetry_fields": _telemetry_fields(runs), + "orchestrator_context_run_ids": [ + entry["run_id"] + for entry in runs + if isinstance(entry.get("orchestrator_operation_context"), dict) + ], "first_telemetry_timestamp": min(timestamps) if timestamps else None, "last_telemetry_timestamp": max(timestamps) if timestamps else None, "execution_sequence_ids": sequence_ids, @@ -224,13 +262,17 @@ def _load_record_result(record: RegistryRecord) -> RunResult: ) from exc -def _history_entry(result: RunResult) -> dict[str, Any]: +def _history_entry( + result: RunResult, + *, + orchestrator_context: dict[str, Any] | None = None, +) -> dict[str, Any]: telemetry = result.runtime_telemetry if telemetry is None: raise RuntimeTelemetryHistoryError( f"Runtime telemetry missing for run: {result.run_id}" ) - return { + entry = { "run_id": result.run_id, "created_at": result.created_at.isoformat(), "telemetry_timestamp": telemetry.get("telemetry_timestamp"), @@ -242,6 +284,94 @@ def _history_entry(result: RunResult) -> dict[str, Any]: "metrics": result.metrics.model_dump(mode="json"), "runtime_telemetry": telemetry, } + if orchestrator_context is not None: + entry["orchestrator_operation_context"] = orchestrator_context + return entry + + +def _load_orchestrator_feeds( + orchestrator_feeds: list[Path | str] | None, +) -> dict[str, dict[str, Any]]: + contexts: dict[str, dict[str, Any]] = {} + for feed_path in orchestrator_feeds or []: + context = _load_orchestrator_feed(feed_path) + run_id = context["run_id"] + if run_id in contexts: + raise RuntimeTelemetryHistoryError( + f"Duplicate Orchestrator telemetry feed for run: {run_id}" + ) + contexts[run_id] = context + return contexts + + +def _load_orchestrator_feed(feed_path: Path | str) -> dict[str, Any]: + source = Path(feed_path) + try: + payload = json.loads(source.read_text(encoding="utf-8")) + except OSError as exc: + raise RuntimeTelemetryHistoryError( + f"Orchestrator telemetry feed not found: {source}" + ) from exc + except json.JSONDecodeError as exc: + raise RuntimeTelemetryHistoryError( + f"Invalid Orchestrator telemetry feed JSON: {source}" + ) from exc + if not isinstance(payload, dict): + raise RuntimeTelemetryHistoryError( + f"Orchestrator telemetry feed must be a JSON object: {source}" + ) + schema_version = payload.get("schema_version") + if schema_version != ORCHESTRATOR_TELEMETRY_FEED_SCHEMA_VERSION: + raise RuntimeTelemetryHistoryError( + "Unsupported Orchestrator telemetry feed schema: " + f"{schema_version or ''}" + ) + candidate_context = payload.get("candidate_context") + if not isinstance(candidate_context, dict): + raise RuntimeTelemetryHistoryError( + f"Orchestrator telemetry feed candidate_context must be an object: {source}" + ) + run_id = payload.get("run_id") or candidate_context.get("run_id") + if not isinstance(run_id, str) or not run_id: + raise RuntimeTelemetryHistoryError( + f"Orchestrator telemetry feed run_id must be a string: {source}" + ) + if payload.get("not_a_regression_judgement") is not True: + raise RuntimeTelemetryHistoryError( + "Orchestrator telemetry feed must declare " + "not_a_regression_judgement=true" + ) + if payload.get("not_a_comparability_gate") is not True: + raise RuntimeTelemetryHistoryError( + "Orchestrator telemetry feed must declare not_a_comparability_gate=true" + ) + return { + "schema_version": schema_version, + "role": payload.get("role"), + "source": payload.get("source"), + "run_id": run_id, + "not_a_regression_judgement": True, + "not_a_comparability_gate": True, + "decision_owner": payload.get("decision_owner"), + "regression_owner": payload.get("regression_owner"), + "candidate_context": deepcopy(candidate_context), + "edgeenv_mapping_hint": deepcopy(payload.get("edgeenv_mapping_hint", {})), + } + + +def _validate_orchestrator_feed_scope( + orchestrator_contexts: dict[str, dict[str, Any]], + records: list[RegistryRecord], +) -> None: + if not orchestrator_contexts: + return + selected_run_ids = {record.run_id for record in records} + for run_id in sorted(orchestrator_contexts): + if run_id not in selected_run_ids: + raise RuntimeTelemetryHistoryError( + "Orchestrator telemetry feed run is not selected in this history " + f"export: {run_id}" + ) def _sequence_sort_value(value: Any) -> tuple[int, float | str]: diff --git a/tests/test_regression.py b/tests/test_regression.py index 5e9fa98..90c2989 100644 --- a/tests/test_regression.py +++ b/tests/test_regression.py @@ -136,6 +136,83 @@ def test_regression_attaches_runtime_telemetry_history_context( assert report.evidence["mean_delta_pct"] == 12.0 +def test_regression_attaches_orchestrator_feed_as_supplemental_context( + bench_config, + target_profile, +): + baseline = make_result( + bench_config, + target_profile, + run_id="baseline", + runner_result=_runner_result( + mean=100.0, + p95=120.0, + p99=130.0, + fps=50.0, + runtime_telemetry=_runtime_telemetry(sequence_id=1), + ), + ) + candidate = make_result( + bench_config, + target_profile, + run_id="candidate", + runner_result=_runner_result( + mean=112.0, + p95=125.0, + p99=135.0, + fps=48.0, + runtime_telemetry=_runtime_telemetry(sequence_id=2), + ), + ) + telemetry_history = { + "schema_version": "edgeenv.runtime-telemetry-history.v1", + "summary": { + "registered_runs": 2, + "telemetry_runs": 2, + "missing_telemetry_runs": 0, + "orchestrator_feed_runs": 1, + }, + "runs": [ + { + "run_id": "baseline", + "telemetry_timestamp": "2026-05-22T00:00:01Z", + "execution_sequence_id": 1, + }, + { + "run_id": "candidate", + "telemetry_timestamp": "2026-05-22T00:00:02Z", + "execution_sequence_id": 2, + "orchestrator_operation_context": _orchestrator_context( + "candidate" + ), + }, + ], + "missing_telemetry": [], + } + + report = analyze_regression( + baseline, + candidate, + telemetry_history=telemetry_history, + ) + + context = report.to_dict()["runtime_telemetry_context"] + candidate_context = context["candidate"] + assert report.mode == "same-condition" + assert report.evidence["mean_delta_pct"] == 12.0 + assert candidate_context["orchestrator_context_present"] is True + assert candidate_context["orchestrator_operation_context"][ + "not_a_regression_judgement" + ] is True + assert candidate_context["orchestrator_operation_context"]["candidate_context"][ + "operation" + ]["queue_depth"] == 7 + assert ( + "Orchestrator operation context is supplemental evidence, not a regression judgement." + in context["notes"] + ) + + def test_regression_preserves_replay_sequence_order_mismatch_context( bench_config, target_profile, @@ -798,3 +875,34 @@ def _runtime_telemetry(sequence_id: int) -> dict: "timeout_observed": False, }, } + + +def _orchestrator_context(run_id: str) -> dict: + return { + "schema_version": "inferedge-orchestrator-edgeenv-runtime-telemetry-feed-v1", + "role": "orchestrator_operation_context_for_edgeenv", + "source": "orchestration_summary", + "run_id": run_id, + "not_a_regression_judgement": True, + "not_a_comparability_gate": True, + "decision_owner": "lab", + "regression_owner": "edgeenv", + "candidate_context": { + "run_id": run_id, + "queue_depth": 7, + "operation": { + "queue_depth": 7, + "deadline_missed_count": 2, + "fallback_count": 1, + }, + "resource": { + "source": "tegrastats_timeline", + "gpu_temperature": 78.5, + "ram_used_mb": 2048.0, + }, + }, + "edgeenv_mapping_hint": { + "runtime_telemetry_context_role": "candidate", + "copy_candidate_context_to": "runtime_telemetry_context.candidate", + }, + } diff --git a/tests/test_runtime_telemetry_history.py b/tests/test_runtime_telemetry_history.py index 967ece9..68416bc 100644 --- a/tests/test_runtime_telemetry_history.py +++ b/tests/test_runtime_telemetry_history.py @@ -10,6 +10,7 @@ from inferedge_env.cli import app from inferedge_env.registry.db import RunRegistry from inferedge_env.result.telemetry_history import ( + ORCHESTRATOR_TELEMETRY_FEED_SCHEMA_VERSION, RUNTIME_TELEMETRY_HISTORY_SCHEMA_VERSION, RuntimeTelemetryHistoryError, build_runtime_telemetry_history, @@ -56,6 +57,7 @@ def test_build_runtime_telemetry_history_records_entries_and_missing_gaps( "registered_runs": 2, "telemetry_runs": 1, "missing_telemetry_runs": 1, + "orchestrator_feed_runs": 0, } assert payload["runs"][0]["run_id"] == "run-with-telemetry" assert payload["runs"][0]["telemetry_timestamp"] == "2026-05-22T00:00:00Z" @@ -113,6 +115,133 @@ def test_write_runtime_telemetry_history_filters_selected_runs( assert payload["runs"][0]["execution_sequence_id"] == 2 +def test_build_runtime_telemetry_history_attaches_orchestrator_feed_context( + tmp_path, + bench_config, + target_profile, + config_files, +): + edgeenv_root = tmp_path / ".edgeenv" + _write_registered_run( + edgeenv_root, + bench_config, + target_profile, + config_files, + run_id="candidate", + runtime_telemetry=_runtime_telemetry_payload(sequence_id=2), + ) + feed_path = tmp_path / "orchestrator-feed.json" + feed_path.write_text( + json.dumps(_orchestrator_feed_payload("candidate")), + encoding="utf-8", + ) + + payload = build_runtime_telemetry_history( + edgeenv_root, + generated_at=datetime(2026, 5, 22, tzinfo=timezone.utc), + orchestrator_feeds=[feed_path], + ) + + assert payload["summary"]["orchestrator_feed_runs"] == 1 + context = payload["runs"][0]["orchestrator_operation_context"] + assert context["schema_version"] == ORCHESTRATOR_TELEMETRY_FEED_SCHEMA_VERSION + assert context["not_a_regression_judgement"] is True + assert context["not_a_comparability_gate"] is True + assert context["decision_owner"] == "lab" + assert context["regression_owner"] == "edgeenv" + assert context["candidate_context"]["operation"]["queue_depth"] == 7 + assert context["candidate_context"]["resource"]["gpu_temperature"] == 78.5 + assert "not a regression judgement" in payload["notes"][3] + + +def test_build_runtime_telemetry_history_rejects_feed_for_unselected_run( + tmp_path, + bench_config, + target_profile, + config_files, +): + edgeenv_root = tmp_path / ".edgeenv" + _write_registered_run( + edgeenv_root, + bench_config, + target_profile, + config_files, + run_id="candidate", + runtime_telemetry=_runtime_telemetry_payload(sequence_id=2), + ) + feed_path = tmp_path / "orchestrator-feed.json" + feed_path.write_text( + json.dumps(_orchestrator_feed_payload("other-run")), + encoding="utf-8", + ) + + with pytest.raises( + RuntimeTelemetryHistoryError, + match="Orchestrator telemetry feed run is not selected", + ): + build_runtime_telemetry_history( + edgeenv_root, + orchestrator_feeds=[feed_path], + ) + + +def test_cli_runs_telemetry_export_history_attaches_orchestrator_feed( + tmp_path, + bench_config, + target_profile, + config_files, +): + runner = CliRunner() + edgeenv_root = tmp_path / ".edgeenv" + _write_registered_run( + edgeenv_root, + bench_config, + target_profile, + config_files, + run_id="candidate", + runtime_telemetry=_runtime_telemetry_payload(sequence_id=2), + ) + output_path = tmp_path / "runtime-telemetry-history.json" + feed_path = tmp_path / "orchestrator-feed.json" + feed_path.write_text( + json.dumps(_orchestrator_feed_payload("candidate")), + encoding="utf-8", + ) + + export_result = runner.invoke( + app, + [ + "runs", + "telemetry", + "export-history", + "--output", + str(output_path), + "--edgeenv-root", + str(edgeenv_root), + "--orchestrator-feed", + str(feed_path), + ], + ) + inspect_result = runner.invoke( + app, + [ + "runs", + "telemetry", + "inspect-history", + str(output_path), + ], + ) + + assert export_result.exit_code == 0, export_result.output + assert "Orchestrator context entries: 1" in export_result.output + assert inspect_result.exit_code == 0, inspect_result.output + assert "Orchestrator context runs: 1" in inspect_result.output + payload = json.loads(output_path.read_text(encoding="utf-8")) + assert payload["runs"][0]["orchestrator_operation_context"]["run_id"] == ( + "candidate" + ) + + def test_inspect_runtime_telemetry_history_reports_replay_summary( tmp_path, bench_config, @@ -156,6 +285,7 @@ def test_inspect_runtime_telemetry_history_reports_replay_summary( assert summary["replay"]["missing_run_ids"] == ["run-without-telemetry"] assert "latency" in summary["replay"]["telemetry_fields"] assert "operation" in summary["replay"]["telemetry_fields"] + assert summary["replay"]["orchestrator_context_run_ids"] == [] assert "not production monitoring" in summary["notes"][2] @@ -360,3 +490,43 @@ def _runtime_telemetry_payload(sequence_id: int = 7) -> dict: "missing_fields": ["queue_depth"], "production_monitoring": False, } + + +def _orchestrator_feed_payload(run_id: str) -> dict: + return { + "schema_version": ORCHESTRATOR_TELEMETRY_FEED_SCHEMA_VERSION, + "role": "orchestrator_operation_context_for_edgeenv", + "source": "orchestration_summary", + "run_id": run_id, + "not_a_regression_judgement": True, + "not_a_comparability_gate": True, + "decision_owner": "lab", + "regression_owner": "edgeenv", + "candidate_context": { + "run_id": run_id, + "result_telemetry_present": True, + "history_entry_present": True, + "telemetry_source": "inferedge_orchestrator_operation_summary", + "available_sections": [ + "operation", + "resource", + "queue_state_summary", + ], + "queue_depth": 7, + "operation": { + "queue_depth": 7, + "deadline_missed_count": 2, + "fallback_count": 1, + }, + "resource": { + "source": "tegrastats_timeline", + "resource_evidence_available": True, + "gpu_temperature": 78.5, + "ram_used_mb": 2048.0, + }, + }, + "edgeenv_mapping_hint": { + "runtime_telemetry_context_role": "candidate", + "copy_candidate_context_to": "runtime_telemetry_context.candidate", + }, + }