Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 12 additions & 1 deletion docs/runtime-telemetry-history.md
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,12 @@ is then preserved under the matching history entry as
not turn missing telemetry into a successful telemetry run, and does not change
the same-condition comparability gate.

Newer Orchestrator feeds can also declare `edgeenv_mapping_hint` fields. EdgeEnv
preserves these hints and validates them when present: Orchestrator may map only
supplemental candidate operation context to
`runtime_telemetry_context.candidate`, while EdgeEnv remains the owner of
`runtime_telemetry_context.history.telemetry_coverage`.

Replay validation command:

```bash
Expand Down Expand Up @@ -112,7 +118,12 @@ The history artifact uses this top-level shape:
"orchestrator_operation_context": {
"schema_version": "inferedge-orchestrator-edgeenv-runtime-telemetry-feed-v1",
"not_a_regression_judgement": true,
"not_a_comparability_gate": true
"not_a_comparability_gate": true,
"edgeenv_mapping_hint": {
"copy_candidate_context_to": "runtime_telemetry_context.candidate",
"coverage_summary_owner": "edgeenv",
"coverage_summary_path": "runtime_telemetry_context.history.telemetry_coverage"
}
}
}
],
Expand Down
75 changes: 75 additions & 0 deletions inferedge_env/result/lab_handoff.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,11 @@
from typing import Any

from inferedge_env.result.telemetry_history import (
ORCHESTRATOR_EDGEENV_CANDIDATE_CONTEXT_PATH,
ORCHESTRATOR_EDGEENV_COVERAGE_SUMMARY_OWNER,
ORCHESTRATOR_EDGEENV_HISTORY_COVERAGE_PATH,
ORCHESTRATOR_EDGEENV_OPERATION_CONTEXT_ROLE,
ORCHESTRATOR_EDGEENV_REQUIRED_CANDIDATE_FIELDS,
ORCHESTRATOR_TELEMETRY_FEED_SCHEMA_VERSION,
RUNTIME_TELEMETRY_HISTORY_SCHEMA_VERSION,
)
Expand Down Expand Up @@ -251,6 +256,76 @@ def _validate_orchestrator_context(
raise RuntimeIntelligenceLabHandoffError(
"orchestrator_operation_context.regression_owner must be edgeenv"
)
_validate_orchestrator_mapping_hint(
operation_context.get("edgeenv_mapping_hint"),
operation_context=operation_context,
regression_path=regression_path,
)


def _validate_orchestrator_mapping_hint(
mapping_hint: Any,
*,
operation_context: dict[str, Any],
regression_path: Path,
) -> None:
if mapping_hint is None:
return
if not isinstance(mapping_hint, dict):
raise RuntimeIntelligenceLabHandoffError(
f"orchestrator_operation_context.edgeenv_mapping_hint must be an object: "
f"{regression_path}"
)
expected_pairs = {
"copy_candidate_context_to": ORCHESTRATOR_EDGEENV_CANDIDATE_CONTEXT_PATH,
"operation_context_role": ORCHESTRATOR_EDGEENV_OPERATION_CONTEXT_ROLE,
"coverage_summary_owner": ORCHESTRATOR_EDGEENV_COVERAGE_SUMMARY_OWNER,
"coverage_summary_path": ORCHESTRATOR_EDGEENV_HISTORY_COVERAGE_PATH,
}
for key, expected in expected_pairs.items():
if key in mapping_hint and mapping_hint.get(key) != expected:
raise RuntimeIntelligenceLabHandoffError(
"orchestrator_operation_context.edgeenv_mapping_hint."
f"{key} must be {expected}: {regression_path}"
)
required_fields = mapping_hint.get("candidate_context_required_fields")
if required_fields is None:
return
if not isinstance(required_fields, list) or not all(
isinstance(item, str) for item in required_fields
):
raise RuntimeIntelligenceLabHandoffError(
"orchestrator_operation_context.edgeenv_mapping_hint."
"candidate_context_required_fields must be a string list: "
f"{regression_path}"
)
missing_required = [
field
for field in ORCHESTRATOR_EDGEENV_REQUIRED_CANDIDATE_FIELDS
if field not in required_fields
]
if missing_required:
raise RuntimeIntelligenceLabHandoffError(
"orchestrator_operation_context.edgeenv_mapping_hint."
"candidate_context_required_fields must include "
f"{', '.join(missing_required)}: {regression_path}"
)
candidate_context = operation_context.get("candidate_context")
if not isinstance(candidate_context, dict):
raise RuntimeIntelligenceLabHandoffError(
"orchestrator_operation_context.candidate_context must be an object: "
f"{regression_path}"
)
missing_context = [
field
for field in ORCHESTRATOR_EDGEENV_REQUIRED_CANDIDATE_FIELDS
if field not in candidate_context
]
if missing_context:
raise RuntimeIntelligenceLabHandoffError(
"orchestrator_operation_context.candidate_context must include "
f"{', '.join(missing_context)}: {regression_path}"
)


def _validate_telemetry_history(
Expand Down
105 changes: 104 additions & 1 deletion inferedge_env/result/telemetry_history.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,18 @@
ORCHESTRATOR_TELEMETRY_FEED_SCHEMA_VERSION = (
"inferedge-orchestrator-edgeenv-runtime-telemetry-feed-v1"
)
ORCHESTRATOR_EDGEENV_CANDIDATE_CONTEXT_PATH = "runtime_telemetry_context.candidate"
ORCHESTRATOR_EDGEENV_HISTORY_COVERAGE_PATH = (
"runtime_telemetry_context.history.telemetry_coverage"
)
ORCHESTRATOR_EDGEENV_COVERAGE_SUMMARY_OWNER = "edgeenv"
ORCHESTRATOR_EDGEENV_OPERATION_CONTEXT_ROLE = "supplemental"
ORCHESTRATOR_EDGEENV_REQUIRED_CANDIDATE_FIELDS = (
"run_id",
"telemetry_source",
"operation",
"resource",
)


class RuntimeTelemetryHistoryError(ValueError):
Expand Down Expand Up @@ -358,6 +370,11 @@ def _load_orchestrator_feed(feed_path: Path | str) -> dict[str, Any]:
raise RuntimeTelemetryHistoryError(
"Orchestrator telemetry feed must declare not_a_comparability_gate=true"
)
mapping_hint = _validate_orchestrator_mapping_hint(
payload.get("edgeenv_mapping_hint", {}),
candidate_context=candidate_context,
source=source,
)
return {
"schema_version": schema_version,
"role": payload.get("role"),
Expand All @@ -368,10 +385,96 @@ def _load_orchestrator_feed(feed_path: Path | str) -> dict[str, Any]:
"decision_owner": payload.get("decision_owner"),
"regression_owner": payload.get("regression_owner"),
"candidate_context": deepcopy(candidate_context),
"edgeenv_mapping_hint": deepcopy(payload.get("edgeenv_mapping_hint", {})),
"edgeenv_mapping_hint": mapping_hint,
}


def _validate_orchestrator_mapping_hint(
value: Any,
*,
candidate_context: dict[str, Any],
source: Path,
) -> dict[str, Any]:
if value is None:
return {}
if not isinstance(value, dict):
raise RuntimeTelemetryHistoryError(
f"Orchestrator telemetry feed edgeenv_mapping_hint must be an object: {source}"
)
mapping_hint = deepcopy(value)
_validate_optional_mapping_value(
mapping_hint,
"copy_candidate_context_to",
ORCHESTRATOR_EDGEENV_CANDIDATE_CONTEXT_PATH,
source,
)
_validate_optional_mapping_value(
mapping_hint,
"operation_context_role",
ORCHESTRATOR_EDGEENV_OPERATION_CONTEXT_ROLE,
source,
)
_validate_optional_mapping_value(
mapping_hint,
"coverage_summary_owner",
ORCHESTRATOR_EDGEENV_COVERAGE_SUMMARY_OWNER,
source,
)
_validate_optional_mapping_value(
mapping_hint,
"coverage_summary_path",
ORCHESTRATOR_EDGEENV_HISTORY_COVERAGE_PATH,
source,
)
required_fields = mapping_hint.get("candidate_context_required_fields")
if required_fields is not None:
if not isinstance(required_fields, list) or not all(
isinstance(item, str) for item in required_fields
):
raise RuntimeTelemetryHistoryError(
"Orchestrator telemetry feed "
f"edgeenv_mapping_hint.candidate_context_required_fields must be "
f"a string list: {source}"
)
missing_required = [
field
for field in ORCHESTRATOR_EDGEENV_REQUIRED_CANDIDATE_FIELDS
if field not in required_fields
]
if missing_required:
raise RuntimeTelemetryHistoryError(
"Orchestrator telemetry feed "
"edgeenv_mapping_hint.candidate_context_required_fields must "
f"include {', '.join(missing_required)}: {source}"
)
missing_context = [
field
for field in ORCHESTRATOR_EDGEENV_REQUIRED_CANDIDATE_FIELDS
if field not in candidate_context
]
if missing_context:
raise RuntimeTelemetryHistoryError(
"Orchestrator telemetry feed candidate_context must include "
f"{', '.join(missing_context)} when declared as required: {source}"
)
return mapping_hint


def _validate_optional_mapping_value(
mapping_hint: dict[str, Any],
key: str,
expected: str,
source: Path,
) -> None:
if key not in mapping_hint:
return
if mapping_hint.get(key) != expected:
raise RuntimeTelemetryHistoryError(
"Orchestrator telemetry feed "
f"edgeenv_mapping_hint.{key} must be {expected}: {source}"
)


def _validate_orchestrator_feed_scope(
orchestrator_contexts: dict[str, dict[str, Any]],
records: list[RegistryRecord],
Expand Down
24 changes: 23 additions & 1 deletion tests/test_regression.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,12 @@
from inferedge_env.config.target_profile import TargetProfile
from inferedge_env.registry.db import RunRegistry
from inferedge_env.result.schema import ResourceMetrics
from inferedge_env.result.telemetry_history import (
ORCHESTRATOR_EDGEENV_CANDIDATE_CONTEXT_PATH,
ORCHESTRATOR_EDGEENV_COVERAGE_SUMMARY_OWNER,
ORCHESTRATOR_EDGEENV_HISTORY_COVERAGE_PATH,
ORCHESTRATOR_EDGEENV_OPERATION_CONTEXT_ROLE,
)
from inferedge_env.result.writer import ResultArtifactWriter
from inferedge_env.runners.base import RunnerResult
from helpers import make_result
Expand Down Expand Up @@ -259,6 +265,12 @@ def test_regression_attaches_orchestrator_feed_as_supplemental_context(
assert candidate_context["orchestrator_operation_context"][
"not_a_regression_judgement"
] is True
assert candidate_context["orchestrator_operation_context"]["edgeenv_mapping_hint"][
"coverage_summary_owner"
] == ORCHESTRATOR_EDGEENV_COVERAGE_SUMMARY_OWNER
assert candidate_context["orchestrator_operation_context"]["edgeenv_mapping_hint"][
"coverage_summary_path"
] == ORCHESTRATOR_EDGEENV_HISTORY_COVERAGE_PATH
assert candidate_context["orchestrator_operation_context"]["candidate_context"][
"operation"
]["queue_depth"] == 7
Expand Down Expand Up @@ -963,6 +975,7 @@ def _orchestrator_context(run_id: str) -> dict:
"regression_owner": "edgeenv",
"candidate_context": {
"run_id": run_id,
"telemetry_source": "inferedge_orchestrator_operation_summary",
"queue_depth": 7,
"operation": {
"queue_depth": 7,
Expand All @@ -977,6 +990,15 @@ def _orchestrator_context(run_id: str) -> dict:
},
"edgeenv_mapping_hint": {
"runtime_telemetry_context_role": "candidate",
"copy_candidate_context_to": "runtime_telemetry_context.candidate",
"copy_candidate_context_to": ORCHESTRATOR_EDGEENV_CANDIDATE_CONTEXT_PATH,
"operation_context_role": ORCHESTRATOR_EDGEENV_OPERATION_CONTEXT_ROLE,
"coverage_summary_owner": ORCHESTRATOR_EDGEENV_COVERAGE_SUMMARY_OWNER,
"coverage_summary_path": ORCHESTRATOR_EDGEENV_HISTORY_COVERAGE_PATH,
"candidate_context_required_fields": [
"run_id",
"telemetry_source",
"operation",
"resource",
],
},
}
46 changes: 46 additions & 0 deletions tests/test_runtime_intelligence_lab_handoff.py
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,30 @@ def test_runtime_intelligence_lab_handoff_rejects_bad_orchestrator_schema(tmp_pa
)


def test_runtime_intelligence_lab_handoff_rejects_bad_orchestrator_mapping(
tmp_path,
):
baseline_path, candidate_path, regression_path, history_path = _write_handoff_files(
tmp_path
)
regression = json.loads(regression_path.read_text(encoding="utf-8"))
regression["runtime_telemetry_context"]["candidate"][
"orchestrator_operation_context"
]["edgeenv_mapping_hint"]["coverage_summary_owner"] = "orchestrator"
regression_path.write_text(json.dumps(regression), encoding="utf-8")

with pytest.raises(
RuntimeIntelligenceLabHandoffError,
match="coverage_summary_owner must be edgeenv",
):
build_runtime_intelligence_lab_handoff_manifest(
baseline_result_path=baseline_path,
candidate_result_path=candidate_path,
edgeenv_regression_report_path=regression_path,
telemetry_history_path=history_path,
)


def _write_handoff_files(tmp_path):
baseline_path = tmp_path / "baseline-result.json"
candidate_path = tmp_path / "candidate-result.json"
Expand Down Expand Up @@ -199,7 +223,29 @@ def _write_handoff_files(tmp_path):
"regression_owner": "edgeenv",
"candidate_context": {
"run_id": "candidate",
"telemetry_source": (
"inferedge_orchestrator_operation_summary"
),
"operation": {"queue_depth": 7},
"resource": {"source": "tegrastats_timeline"},
},
"edgeenv_mapping_hint": {
"runtime_telemetry_context_role": "candidate",
"copy_candidate_context_to": (
"runtime_telemetry_context.candidate"
),
"operation_context_role": "supplemental",
"coverage_summary_owner": "edgeenv",
"coverage_summary_path": (
"runtime_telemetry_context.history."
"telemetry_coverage"
),
"candidate_context_required_fields": [
"run_id",
"telemetry_source",
"operation",
"resource",
],
},
},
},
Expand Down
Loading
Loading