From 0395c4f249d5f231fe74721017934d52066c04e7 Mon Sep 17 00:00:00 2001 From: hyeokjun32 Date: Sun, 17 May 2026 20:28:38 +0900 Subject: [PATCH] feat: surface sustained runtime reliability evidence --- README.md | 1 + .../agent_runtime_reliability_report.md | 7 + .../agent_3_orchestration_summary.json | 46 ++++++ .../aiguard_runtime_guard_analysis.json | 34 +++- inferedgelab/services/agent_runtime_report.py | 152 +++++++++++++++++- tests/test_agent_runtime_report.py | 106 +++++++++++- 6 files changed, 339 insertions(+), 7 deletions(-) diff --git a/README.md b/README.md index b4befb2..6d61418 100644 --- a/README.md +++ b/README.md @@ -127,6 +127,7 @@ The Lab decision surface now also exposes `policy_version`, `triggered_rules`, a `agent-runtime-report` is an additive reliable edge agent runtime report path. It bundles Orchestrator scheduling evidence and AIGuard runtime reliability `guard_analysis` into a Lab-owned agent deployment decision context without changing existing Runtime result or compare contracts. +The report preserves sustained queue-depth, policy decision reason, and `sustained_overload_risk` evidence as local-first deployment review context. ![InferEdge Local Studio demo evidence](assets/images/local-studio-demo-evidence.png) diff --git a/docs/portfolio/agent_runtime_reliability_report.md b/docs/portfolio/agent_runtime_reliability_report.md index d74cf19..0c8b88d 100644 --- a/docs/portfolio/agent_runtime_reliability_report.md +++ b/docs/portfolio/agent_runtime_reliability_report.md @@ -45,6 +45,12 @@ poetry run inferedgelab agent-runtime-report \ | fallback_rate | 0.583333 | | deadline_miss_rate | 0.1 | | queue_backlog_policy_decision_count | 1 | +| max_total_queue_depth | 6 | +| top_policy_decision_reason | queue_backlog_threshold_exceeded | + +AIGuard `guard_analysis` also includes `sustained_overload_risk`, which Lab +preserves as report evidence and reflects in the agent deployment decision +context. ## Lab Decision Context @@ -67,6 +73,7 @@ Triggered rules: - `fallback_rate_block` - `deadline_miss_review` - `queue_backlog_review` +- `sustained_overload_review` ## Boundary diff --git a/examples/agent_runtime/agent_3_orchestration_summary.json b/examples/agent_runtime/agent_3_orchestration_summary.json index 898891b..9da4c4f 100644 --- a/examples/agent_runtime/agent_3_orchestration_summary.json +++ b/examples/agent_runtime/agent_3_orchestration_summary.json @@ -1,5 +1,10 @@ { "schema_version": "inferedge-orchestration-summary-v1", + "run": { + "name": "agent_3_workload_sustained_high_load", + "scenario_mode": "sustained_high_load", + "frame_interval_ms": 5.0 + }, "agent_runtime_summary": { "schema_version": "inferedge-orchestration-summary-v1", "source_contracts": { @@ -41,12 +46,53 @@ "overload_event_count": 14 } }, + "sustained_runtime_summary": { + "schema_version": "inferedge-orchestrator-sustained-summary-v1", + "scenario_mode": "sustained_high_load", + "queue_depth_sample_count": 1, + "latency_sample_count": 1, + "max_total_queue_depth": 6, + "deadline_missed_count": 1, + "dropped_count": 14, + "fallback_count": 14, + "policy_decision_count": 14, + "overload_event_count": 14 + }, + "queue_depth_timeline": [ + { + "cycle": 1, + "stage": "before_policy", + "queue_depth": { + "vision_agent": 4, + "voice_command_agent": 2, + "safety_monitor_agent": 0 + }, + "total_queue_depth": 6 + } + ], + "latency_timeline": [ + { + "agent_id": "vision_agent", + "task_id": "task_vision_agent", + "latency_ms": 41.0, + "latency_budget_ms": 33.0, + "deadline_missed": true + } + ], "policy_decision_log": [ { "agent_id": "vision_agent", "task_id": "task_vision_agent", "decision": "load_shedding", "reason": "queue_backlog_threshold_exceeded", + "decision_reason": "queue_backlog_threshold_exceeded", + "total_backlog_before": 6, + "backlog_threshold": 3, + "queue_depth_snapshot": { + "vision_agent": 4, + "voice_command_agent": 2, + "safety_monitor_agent": 0 + }, "fallback_used": true, "protected_agent_id": "safety_monitor_agent" } diff --git a/examples/agent_runtime/aiguard_runtime_guard_analysis.json b/examples/agent_runtime/aiguard_runtime_guard_analysis.json index 65de9b6..c034d8e 100644 --- a/examples/agent_runtime/aiguard_runtime_guard_analysis.json +++ b/examples/agent_runtime/aiguard_runtime_guard_analysis.json @@ -31,15 +31,41 @@ "executed_count": 10, "dropped_count": 14 } + }, + { + "type": "sustained_overload_risk", + "metric_name": "max_total_queue_depth", + "observed_value": 6, + "baseline_value": null, + "threshold": 3, + "delta": null, + "delta_pct": null, + "increase_factor": null, + "severity": "medium", + "status": "failed", + "explanation": "Queue depth grew under sustained high-load mode.", + "why_it_matters": "Sustained queue depth growth shows that incoming workload pressure can exceed edge-device execution capacity.", + "suspected_causes": [ + "sustained_multi_agent_overload", + "producer_rate_exceeds_scheduler_capacity" + ], + "recommendation": "Lower producer rate, tighten stale-frame drop policy, or move lower priority work behind a fallback path before deployment.", + "raw_context": { + "scenario_mode": "sustained_high_load", + "queue_depth_sample_count": 1, + "latency_sample_count": 1 + } } ], "suspected_causes": [ "queue_backlog", "overload_load_shedding", - "producer_rate_exceeds_runtime_capacity" + "producer_rate_exceeds_runtime_capacity", + "sustained_multi_agent_overload" ], "recommendations": [ - "Tune target FPS, queue size, drop policy, or fallback policy for affected agents." + "Tune target FPS, queue size, drop policy, or fallback policy for affected agents.", + "Lower producer rate, tighten stale-frame drop policy, or move lower priority work behind a fallback path before deployment." ], "thresholds": { "drop_rate_review": 0.2, @@ -50,7 +76,9 @@ "runtime_reliability": { "drop_rate": 0.5833333333333334, "fallback_rate": 0.5833333333333334, - "deadline_miss_rate": 0.1 + "deadline_miss_rate": 0.1, + "max_total_queue_depth": 6, + "scenario_mode": "sustained_high_load" } }, "created_at": "2026-05-17T00:00:00Z" diff --git a/inferedgelab/services/agent_runtime_report.py b/inferedgelab/services/agent_runtime_report.py index 233cdcf..8e8a6a2 100644 --- a/inferedgelab/services/agent_runtime_report.py +++ b/inferedgelab/services/agent_runtime_report.py @@ -26,6 +26,8 @@ "fallback_rate_review": 0.20, "fallback_rate_blocked": 0.50, "queue_backlog_policy_decision_count_review": 1, + "max_total_queue_depth_review": 3, + "max_total_queue_depth_blocked": 8, } AGENT_RUNTIME_POLICY_RULES: dict[str, dict[str, str]] = { @@ -69,6 +71,14 @@ "effect": "review_required", "description": "Queue backlog policy intervention was observed.", }, + "sustained_overload_block": { + "effect": "blocked", + "description": "Sustained queue depth crossed the blocking threshold.", + }, + "sustained_overload_review": { + "effect": "review_required", + "description": "Sustained queue depth crossed the review threshold.", + }, "runtime_reliability_pass_note": { "effect": "deployable_with_note", "description": "Runtime reliability evidence stayed within configured thresholds.", @@ -115,9 +125,12 @@ def build_agent_runtime_reliability_report( "agents": _agent_summaries(runtime_summary), "totals": _totals(runtime_summary), "metrics": metrics, + "timeline_summary": _timeline_summary(orchestration_summary, metrics), + "policy_decision_reasons": metrics["policy_decision_reasons"], "policy_decision_log_count": len(_policy_log(orchestration_summary)), }, "guard_summary": _guard_summary(guard_analysis), + "runtime_reliability_evidence": _runtime_reliability_evidence(guard_analysis), "agent_deployment_decision": decision, "notes": [ "This report is local-first runtime reliability evidence, not a production cloud orchestration dashboard.", @@ -178,6 +191,14 @@ def build_agent_runtime_deployment_decision( >= policy["queue_backlog_policy_decision_count_review"] ): triggered_rules.append("queue_backlog_review") + _append_metric_rules( + triggered_rules, + metric_value=metrics["max_total_queue_depth"], + review=policy["max_total_queue_depth_review"], + blocked=policy["max_total_queue_depth_blocked"], + review_rule="sustained_overload_review", + blocked_rule="sustained_overload_block", + ) if not triggered_rules: triggered_rules.append("runtime_reliability_pass_note") @@ -228,20 +249,38 @@ def build_agent_runtime_deployment_decision( def compute_agent_runtime_metrics(orchestration_summary: dict[str, Any]) -> dict[str, Any]: runtime_summary = _agent_runtime_summary(orchestration_summary) + sustained_summary = _sustained_runtime_summary(orchestration_summary) totals = _totals(runtime_summary) + queue_depth_timeline = _dict_list(orchestration_summary.get("queue_depth_timeline")) + latency_timeline = _dict_list(orchestration_summary.get("latency_timeline")) executed_count = _non_negative_number(totals.get("executed_count")) dropped_count = _non_negative_number(totals.get("dropped_count")) - deadline_missed_count = _non_negative_number(totals.get("deadline_missed_count")) + timeline_deadline_missed_count = sum( + 1 for item in latency_timeline if bool(item.get("deadline_missed")) + ) + deadline_missed_count = max( + _non_negative_number(totals.get("deadline_missed_count")), + float(timeline_deadline_missed_count), + ) fallback_count = _non_negative_number(totals.get("fallback_count")) + if executed_count <= 0 and latency_timeline: + executed_count = float(len(latency_timeline)) total_task_events = executed_count + dropped_count policy_log = _policy_log(orchestration_summary) + policy_decision_reasons = _policy_decision_reasons(policy_log) queue_backlog_count = sum( 1 for item in policy_log if "backlog" in str(item.get("reason", "")).lower() + or "backlog" in str(item.get("decision_reason", "")).lower() or "backlog" in str(item.get("decision", "")).lower() ) + max_total_queue_depth = max( + _non_negative_number(sustained_summary.get("max_total_queue_depth")), + _max_total_queue_depth(queue_depth_timeline), + ) return { + "scenario_mode": _scenario_mode(orchestration_summary), "executed_count": executed_count, "dropped_count": dropped_count, "deadline_missed_count": deadline_missed_count, @@ -255,6 +294,11 @@ def compute_agent_runtime_metrics(orchestration_summary: dict[str, Any]) -> dict "drop_rate": _ratio(dropped_count, total_task_events), "fallback_rate": _ratio(fallback_count, total_task_events), "queue_backlog_policy_decision_count": queue_backlog_count, + "max_total_queue_depth": max_total_queue_depth, + "queue_depth_sample_count": len(queue_depth_timeline), + "latency_sample_count": len(latency_timeline), + "policy_decision_reasons": policy_decision_reasons, + "top_policy_decision_reason": _top_reason(policy_decision_reasons), } @@ -323,6 +367,10 @@ def build_agent_runtime_reliability_markdown(report: dict[str, Any]) -> str: f"| drop_rate | {_fmt_number(metrics['drop_rate'])} |", f"| fallback_rate | {_fmt_number(metrics['fallback_rate'])} |", f"| queue_backlog_policy_decision_count | {_fmt_number(metrics['queue_backlog_policy_decision_count'])} |", + f"| max_total_queue_depth | {_fmt_number(metrics['max_total_queue_depth'])} |", + f"| queue_depth_sample_count | {_fmt_number(metrics['queue_depth_sample_count'])} |", + f"| latency_sample_count | {_fmt_number(metrics['latency_sample_count'])} |", + f"| top_policy_decision_reason | {metrics.get('top_policy_decision_reason') or '-'} |", "", "## AIGuard Runtime Reliability Evidence", "", @@ -331,6 +379,11 @@ def build_agent_runtime_reliability_markdown(report: dict[str, Any]) -> str: f"- severity: `{guard.get('severity')}`", f"- primary_reason: {guard.get('primary_reason')}", f"- evidence_count: `{guard.get('evidence_count')}`", + "- evidence_types:", + *[ + f" - `{item['type']}`: {item.get('metric_name')}={_fmt_number(item.get('observed_value'))} ({item.get('status')})" + for item in report.get("runtime_reliability_evidence", []) + ], "", "## Lab Agent Deployment Decision", "", @@ -368,6 +421,11 @@ def _agent_runtime_summary(orchestration_summary: dict[str, Any]) -> dict[str, A return value if isinstance(value, dict) else {} +def _sustained_runtime_summary(orchestration_summary: dict[str, Any]) -> dict[str, Any]: + value = orchestration_summary.get("sustained_runtime_summary") + return value if isinstance(value, dict) else {} + + def _totals(runtime_summary: dict[str, Any]) -> dict[str, Any]: value = runtime_summary.get("totals") return value if isinstance(value, dict) else {} @@ -385,6 +443,7 @@ def _agent_summaries(runtime_summary: dict[str, Any]) -> list[dict[str, Any]]: def _guard_summary(guard_analysis: dict[str, Any] | None) -> dict[str, Any]: + evidence = guard_evidence_items(guard_analysis) return { "schema_version": guard_analysis.get("schema_version") if isinstance(guard_analysis, dict) @@ -393,7 +452,10 @@ def _guard_summary(guard_analysis: dict[str, Any] | None) -> dict[str, Any]: "guard_verdict": guard_verdict(guard_analysis), "severity": guard_analysis.get("severity") if isinstance(guard_analysis, dict) else None, "primary_reason": guard_primary_reason(guard_analysis), - "evidence_count": len(guard_evidence_items(guard_analysis)), + "evidence_count": len(evidence), + "evidence_types": [ + item.get("type") for item in evidence if isinstance(item, dict) and item.get("type") + ], } @@ -425,6 +487,92 @@ def _policy_log(orchestration_summary: dict[str, Any]) -> list[dict[str, Any]]: return [item for item in value if isinstance(item, dict)] +def _dict_list(value: Any) -> list[dict[str, Any]]: + if not isinstance(value, list): + return [] + return [item for item in value if isinstance(item, dict)] + + +def _scenario_mode(orchestration_summary: dict[str, Any]) -> str: + run = orchestration_summary.get("run") + if isinstance(run, dict) and isinstance(run.get("scenario_mode"), str): + return run["scenario_mode"] + sustained_summary = _sustained_runtime_summary(orchestration_summary) + if isinstance(sustained_summary.get("scenario_mode"), str): + return sustained_summary["scenario_mode"] + return "unknown" + + +def _max_total_queue_depth(queue_depth_timeline: list[dict[str, Any]]) -> float: + max_depth = 0.0 + for item in queue_depth_timeline: + max_depth = max(max_depth, _non_negative_number(item.get("total_queue_depth"))) + queue_depth = item.get("queue_depth") + if isinstance(queue_depth, dict): + max_depth = max( + max_depth, + sum(_non_negative_number(value) for value in queue_depth.values()), + ) + return max_depth + + +def _policy_decision_reasons(policy_log: list[dict[str, Any]]) -> dict[str, int]: + counts: dict[str, int] = {} + for item in policy_log: + reason = item.get("decision_reason") or item.get("reason") or item.get("decision") + if not isinstance(reason, str) or not reason: + reason = "unknown" + counts[reason] = counts.get(reason, 0) + 1 + return counts + + +def _top_reason(reasons: dict[str, int]) -> str | None: + if not reasons: + return None + return max(reasons.items(), key=lambda item: (item[1], item[0]))[0] + + +def _runtime_reliability_evidence( + guard_analysis: dict[str, Any] | None, +) -> list[dict[str, Any]]: + evidence = guard_evidence_items(guard_analysis) + return [ + { + "type": item.get("type"), + "metric_name": item.get("metric_name"), + "observed_value": item.get("observed_value"), + "threshold": item.get("threshold"), + "severity": item.get("severity"), + "status": item.get("status"), + "explanation": item.get("explanation"), + "recommendation": item.get("recommendation"), + "why_it_matters": item.get("why_it_matters"), + } + for item in evidence + if isinstance(item, dict) + ] + + +def _timeline_summary( + orchestration_summary: dict[str, Any], + metrics: dict[str, Any], +) -> dict[str, Any]: + return { + "scenario_mode": metrics["scenario_mode"], + "queue_depth_sample_count": metrics["queue_depth_sample_count"], + "latency_sample_count": metrics["latency_sample_count"], + "max_total_queue_depth": metrics["max_total_queue_depth"], + "top_policy_decision_reason": metrics.get("top_policy_decision_reason"), + "policy_decision_reasons": dict(metrics.get("policy_decision_reasons") or {}), + "has_queue_depth_timeline": bool( + _dict_list(orchestration_summary.get("queue_depth_timeline")) + ), + "has_latency_timeline": bool( + _dict_list(orchestration_summary.get("latency_timeline")) + ), + } + + def _load_json_dict(path: str | Path | None) -> dict[str, Any] | None: if path is None: return None diff --git a/tests/test_agent_runtime_report.py b/tests/test_agent_runtime_report.py index b4c2903..5f4b94b 100644 --- a/tests/test_agent_runtime_report.py +++ b/tests/test_agent_runtime_report.py @@ -18,6 +18,11 @@ def orchestration_summary() -> dict: return { "schema_version": "inferedge-orchestration-summary-v1", + "run": { + "name": "agent_3_workload_sustained_high_load", + "scenario_mode": "sustained_high_load", + "frame_interval_ms": 5.0, + }, "agent_runtime_summary": { "schema_version": "inferedge-orchestration-summary-v1", "source_contracts": { @@ -49,11 +54,47 @@ def orchestration_summary() -> dict: "overload_event_count": 14, }, }, + "sustained_runtime_summary": { + "schema_version": "inferedge-orchestrator-sustained-summary-v1", + "scenario_mode": "sustained_high_load", + "queue_depth_sample_count": 1, + "latency_sample_count": 1, + "max_total_queue_depth": 6, + }, + "queue_depth_timeline": [ + { + "cycle": 1, + "stage": "before_policy", + "queue_depth": { + "vision_agent": 4, + "voice_command_agent": 2, + "safety_monitor_agent": 0, + }, + "total_queue_depth": 6, + } + ], + "latency_timeline": [ + { + "agent_id": "vision_agent", + "task_id": "task_vision_agent", + "latency_ms": 41.0, + "latency_budget_ms": 33.0, + "deadline_missed": True, + } + ], "policy_decision_log": [ { "agent_id": "vision_agent", "decision": "load_shedding", "reason": "queue_backlog_threshold_exceeded", + "decision_reason": "queue_backlog_threshold_exceeded", + "total_backlog_before": 6, + "backlog_threshold": 3, + "queue_depth_snapshot": { + "vision_agent": 4, + "voice_command_agent": 2, + "safety_monitor_agent": 0, + }, "protected_agent_id": "safety_monitor_agent", } ], @@ -90,6 +131,42 @@ def guard_analysis() -> dict: } +def sustained_guard_analysis() -> dict: + data = guard_analysis() + data["evidence"].append( + { + "type": "sustained_overload_risk", + "metric_name": "max_total_queue_depth", + "observed_value": 6, + "baseline_value": None, + "threshold": 3, + "delta": None, + "delta_pct": None, + "increase_factor": None, + "severity": "medium", + "status": "failed", + "explanation": "Queue depth grew under sustained high-load mode.", + "why_it_matters": "Queue growth indicates multi-agent runtime pressure.", + "suspected_causes": ["sustained_multi_agent_overload"], + "recommendation": "Lower producer rate or tighten stale-frame drop policy.", + "raw_context": { + "scenario_mode": "sustained_high_load", + "queue_depth_sample_count": 1, + "latency_sample_count": 1, + }, + } + ) + data["suspected_causes"] = [ + "queue_backlog", + "sustained_multi_agent_overload", + ] + data["recommendations"] = [ + "Tune scheduling policy.", + "Lower producer rate or tighten stale-frame drop policy.", + ] + return data + + def test_compute_agent_runtime_metrics_from_orchestrator_summary(): metrics = compute_agent_runtime_metrics(orchestration_summary()) @@ -97,12 +174,20 @@ def test_compute_agent_runtime_metrics_from_orchestrator_summary(): assert metrics["drop_rate"] == pytest.approx(14 / 24) assert metrics["fallback_rate"] == pytest.approx(14 / 24) assert metrics["queue_backlog_policy_decision_count"] == 1 + assert metrics["scenario_mode"] == "sustained_high_load" + assert metrics["max_total_queue_depth"] == 6 + assert metrics["queue_depth_sample_count"] == 1 + assert metrics["latency_sample_count"] == 1 + assert metrics["top_policy_decision_reason"] == "queue_backlog_threshold_exceeded" + assert metrics["policy_decision_reasons"] == { + "queue_backlog_threshold_exceeded": 1 + } def test_agent_runtime_report_blocks_when_guard_blocks(): report = build_agent_runtime_reliability_report( orchestration_summary=orchestration_summary(), - guard_analysis=guard_analysis(), + guard_analysis=sustained_guard_analysis(), ) decision = report["agent_deployment_decision"] @@ -117,13 +202,28 @@ def test_agent_runtime_report_blocks_when_guard_blocks(): assert decision["decision"] == "blocked" assert "guard_blocked_runtime_block" in decision["triggered_rules"] assert "drop_rate_block" in decision["triggered_rules"] + assert "sustained_overload_review" in decision["triggered_rules"] assert report["guard_summary"]["guard_verdict"] == "blocked" + assert "sustained_overload_risk" in report["guard_summary"]["evidence_types"] + assert report["agent_runtime_summary"]["timeline_summary"] == { + "scenario_mode": "sustained_high_load", + "queue_depth_sample_count": 1, + "latency_sample_count": 1, + "max_total_queue_depth": 6, + "top_policy_decision_reason": "queue_backlog_threshold_exceeded", + "policy_decision_reasons": {"queue_backlog_threshold_exceeded": 1}, + "has_queue_depth_timeline": True, + "has_latency_timeline": True, + } + assert { + item["type"] for item in report["runtime_reliability_evidence"] + } == {"excessive_drop_rate", "sustained_overload_risk"} def test_agent_runtime_report_markdown_contains_sections(): report = build_agent_runtime_reliability_report( orchestration_summary=orchestration_summary(), - guard_analysis=guard_analysis(), + guard_analysis=sustained_guard_analysis(), ) markdown = build_agent_runtime_reliability_markdown(report) @@ -133,6 +233,8 @@ def test_agent_runtime_report_markdown_contains_sections(): assert "AIGuard Runtime Reliability Evidence" in markdown assert "Lab Agent Deployment Decision" in markdown assert "guard_blocked_runtime_block" in markdown + assert "sustained_overload_risk" in markdown + assert "max_total_queue_depth" in markdown assert "not a production cloud orchestration dashboard" in markdown