diff --git a/README.md b/README.md index 2cf663a..2824cb5 100644 --- a/README.md +++ b/README.md @@ -128,7 +128,7 @@ The Lab decision surface now also exposes `policy_version`, `triggered_rules`, a `agent-runtime-report` is an additive reliable edge agent runtime report path. It bundles Orchestrator scheduling evidence and AIGuard runtime reliability `guard_analysis` into a Lab-owned agent deployment decision context without changing existing Runtime result or compare contracts. The current bundled evidence is a synthetic/dummy sustained high-load 3-agent scenario. -The report preserves sustained queue-depth, policy decision reason, and `sustained_overload_risk` evidence as local-first deployment review context. +The report preserves sustained queue-depth, worker health, runtime event summary/timeline, policy decision reason, and `sustained_overload_risk` evidence as local-first deployment review context. ![InferEdge Local Studio demo evidence](assets/images/local-studio-demo-evidence.png) diff --git a/docs/portfolio/agent_runtime_reliability_report.md b/docs/portfolio/agent_runtime_reliability_report.md index dc3d45f..24c9830 100644 --- a/docs/portfolio/agent_runtime_reliability_report.md +++ b/docs/portfolio/agent_runtime_reliability_report.md @@ -57,6 +57,23 @@ AIGuard `guard_analysis` also includes `sustained_overload_risk`, which Lab preserves as report evidence and reflects in the agent deployment decision context. +The report also preserves the Orchestrator operation-health fields added for +runtime operation review: + +- `queue_state_summary` for queue pressure, max backlog, final queue depth, and + overload threshold. +- `worker_health_snapshot` for healthy/constrained/degraded worker state, + executed/drop/deadline/fallback counts, and latency context. +- `runtime_event_summary` for event type counts. +- `runtime_event_timeline` sample rows for queue snapshots, policy decisions, + drops, and execution outcomes. + +These fields make the report path explicit: + +```text +Orchestrator operation evidence -> AIGuard reliability explanation -> Lab-owned deployment risk context +``` + ## Lab Decision Context Expected decision: @@ -83,6 +100,7 @@ Triggered rules: ## Boundary - Orchestrator records scheduling and policy evidence. +- Orchestrator operation-health fields are displayed as local runtime evidence. - AIGuard explains runtime reliability risk. - Lab remains the final deployment decision owner. - This report is an additive agent-runtime path and does not change existing diff --git a/examples/agent_runtime/agent_3_orchestration_summary.json b/examples/agent_runtime/agent_3_orchestration_summary.json index 9da4c4f..af73de7 100644 --- a/examples/agent_runtime/agent_3_orchestration_summary.json +++ b/examples/agent_runtime/agent_3_orchestration_summary.json @@ -97,6 +97,124 @@ "protected_agent_id": "safety_monitor_agent" } ], + "queue_state_summary": { + "schema_version": "inferedge-orchestrator-queue-state-v1", + "sample_count": 1, + "overload_backlog_threshold": 3, + "max_total_queue_depth": 6, + "average_total_queue_depth": 6.0, + "final_queue_depth": { + "vision_agent": 4, + "voice_command_agent": 2, + "safety_monitor_agent": 0 + }, + "max_queue_depth_by_task": { + "vision_agent": 4, + "voice_command_agent": 2, + "safety_monitor_agent": 0 + }, + "queue_pressure_state": "overloaded" + }, + "worker_health_snapshot": { + "schema_version": "inferedge-orchestrator-worker-health-v1", + "workers": { + "safety_monitor_agent": { + "task": "safety_monitor_agent", + "agent_id": "safety_monitor_agent", + "task_id": "task_safety_monitor_agent", + "agent_type": "safety", + "scheduled_priority": 100, + "latency_budget_ms": 20.0, + "worker": "dummy", + "health_state": "healthy", + "executed_count": 2, + "dropped_count": 0, + "deadline_missed_count": 0, + "fallback_count": 0, + "mean_latency_ms": 6.0, + "p95_latency_ms": 8.0, + "max_queue_backlog": 0, + "queue_size": 1, + "queue_pressure_ratio": 0.0 + }, + "vision_agent": { + "task": "vision_agent", + "agent_id": "vision_agent", + "task_id": "task_vision_agent", + "agent_type": "vision", + "scheduled_priority": 90, + "latency_budget_ms": 33.0, + "worker": "dummy", + "health_state": "degraded", + "executed_count": 8, + "dropped_count": 14, + "deadline_missed_count": 1, + "fallback_count": 14, + "mean_latency_ms": 41.0, + "p95_latency_ms": 41.0, + "max_queue_backlog": 4, + "queue_size": 4, + "queue_pressure_ratio": 1.0 + }, + "voice_command_agent": { + "task": "voice_command_agent", + "agent_id": "voice_command_agent", + "task_id": "task_voice_command_agent", + "agent_type": "voice", + "scheduled_priority": 50, + "latency_budget_ms": 120.0, + "worker": "dummy", + "health_state": "constrained", + "executed_count": 0, + "dropped_count": 0, + "deadline_missed_count": 0, + "fallback_count": 0, + "mean_latency_ms": null, + "p95_latency_ms": null, + "max_queue_backlog": 2, + "queue_size": 2, + "queue_pressure_ratio": 1.0 + } + } + }, + "runtime_event_summary": { + "schema_version": "inferedge-orchestrator-runtime-event-summary-v1", + "event_count": 4, + "event_type_counts": { + "queue_snapshot": 1, + "policy_decision": 1, + "drop": 1, + "execution": 1 + } + }, + "runtime_event_timeline": [ + { + "event_index": 0, + "event_type": "queue_snapshot", + "reason": "queue_depth_sampled" + }, + { + "event_index": 1, + "event_type": "policy_decision", + "agent_id": "vision_agent", + "task_id": "task_vision_agent", + "reason": "queue_backlog_threshold_exceeded" + }, + { + "event_index": 2, + "event_type": "drop", + "agent_id": "vision_agent", + "task_id": "task_vision_agent", + "reason": "load_shedding_backlog_threshold_exceeded" + }, + { + "event_index": 3, + "event_type": "execution", + "agent_id": "vision_agent", + "task_id": "task_vision_agent", + "reason": "deadline_missed" + } + ], "drop_events": [ { "agent_id": "vision_agent", diff --git a/inferedgelab/services/agent_runtime_report.py b/inferedgelab/services/agent_runtime_report.py index 8e8a6a2..cdb530b 100644 --- a/inferedgelab/services/agent_runtime_report.py +++ b/inferedgelab/services/agent_runtime_report.py @@ -126,6 +126,7 @@ def build_agent_runtime_reliability_report( "totals": _totals(runtime_summary), "metrics": metrics, "timeline_summary": _timeline_summary(orchestration_summary, metrics), + "operation_context": _operation_context(orchestration_summary, metrics), "policy_decision_reasons": metrics["policy_decision_reasons"], "policy_decision_log_count": len(_policy_log(orchestration_summary)), }, @@ -267,6 +268,11 @@ def compute_agent_runtime_metrics(orchestration_summary: dict[str, Any]) -> dict executed_count = float(len(latency_timeline)) total_task_events = executed_count + dropped_count policy_log = _policy_log(orchestration_summary) + queue_state = _queue_state_summary(orchestration_summary) + worker_health = _worker_health_snapshot(orchestration_summary) + worker_health_counts = _worker_health_counts(worker_health) + runtime_events = _runtime_event_timeline(orchestration_summary) + runtime_event_summary = _runtime_event_summary(orchestration_summary, runtime_events) policy_decision_reasons = _policy_decision_reasons(policy_log) queue_backlog_count = sum( 1 @@ -297,6 +303,26 @@ def compute_agent_runtime_metrics(orchestration_summary: dict[str, Any]) -> dict "max_total_queue_depth": max_total_queue_depth, "queue_depth_sample_count": len(queue_depth_timeline), "latency_sample_count": len(latency_timeline), + "queue_pressure_state": queue_state.get("queue_pressure_state") or "unknown", + "queue_state_sample_count": _non_negative_number(queue_state.get("sample_count")), + "queue_state_max_total_queue_depth": _non_negative_number( + queue_state.get("max_total_queue_depth") + ), + "queue_state_average_total_queue_depth": _non_negative_number( + queue_state.get("average_total_queue_depth") + ), + "worker_health_counts": worker_health_counts, + "degraded_worker_count": _non_negative_number(worker_health_counts.get("degraded")), + "constrained_worker_count": _non_negative_number( + worker_health_counts.get("constrained") + ), + "healthy_worker_count": _non_negative_number(worker_health_counts.get("healthy")), + "runtime_event_count": _non_negative_number( + runtime_event_summary.get("event_count") + ), + "runtime_event_type_counts": dict( + runtime_event_summary.get("event_type_counts") or {} + ), "policy_decision_reasons": policy_decision_reasons, "top_policy_decision_reason": _top_reason(policy_decision_reasons), } @@ -370,8 +396,76 @@ def build_agent_runtime_reliability_markdown(report: dict[str, Any]) -> str: f"| max_total_queue_depth | {_fmt_number(metrics['max_total_queue_depth'])} |", f"| queue_depth_sample_count | {_fmt_number(metrics['queue_depth_sample_count'])} |", f"| latency_sample_count | {_fmt_number(metrics['latency_sample_count'])} |", + f"| queue_pressure_state | {metrics.get('queue_pressure_state') or '-'} |", + f"| runtime_event_count | {_fmt_number(metrics.get('runtime_event_count'))} |", + f"| degraded_worker_count | {_fmt_number(metrics.get('degraded_worker_count'))} |", + f"| constrained_worker_count | {_fmt_number(metrics.get('constrained_worker_count'))} |", f"| top_policy_decision_reason | {metrics.get('top_policy_decision_reason') or '-'} |", "", + "## Orchestrator Operation Context", + "", + "### Queue State", + "", + "| Field | Value |", + "|---|---:|", + f"| queue_pressure_state | {runtime['operation_context']['queue_state_summary'].get('queue_pressure_state') or '-'} |", + f"| overload_backlog_threshold | {_fmt_number(runtime['operation_context']['queue_state_summary'].get('overload_backlog_threshold'))} |", + f"| max_total_queue_depth | {_fmt_number(runtime['operation_context']['queue_state_summary'].get('max_total_queue_depth'))} |", + f"| average_total_queue_depth | {_fmt_number(runtime['operation_context']['queue_state_summary'].get('average_total_queue_depth'))} |", + "", + "### Worker Health", + "", + "| Worker | Health | Executed | Dropped | Deadline Missed | Fallback | Mean Latency ms |", + "|---|---|---:|---:|---:|---:|---:|", + *[ + "| " + f"{worker.get('task') or task_name} | " + f"{worker.get('health_state') or '-'} | " + f"{_fmt_number(worker.get('executed_count'))} | " + f"{_fmt_number(worker.get('dropped_count'))} | " + f"{_fmt_number(worker.get('deadline_missed_count'))} | " + f"{_fmt_number(worker.get('fallback_count'))} | " + f"{_fmt_number(worker.get('mean_latency_ms'))} |" + for task_name, worker in runtime["operation_context"][ + "worker_health_snapshot" + ] + .get("workers", {}) + .items() + if isinstance(worker, dict) + ], + "", + "### Runtime Event Summary", + "", + "| Event Type | Count |", + "|---|---:|", + *[ + f"| {event_type} | {_fmt_number(count)} |" + for event_type, count in sorted( + ( + runtime["operation_context"]["runtime_event_summary"].get( + "event_type_counts" + ) + or {} + ).items() + ) + ], + "", + "Runtime event timeline sample:", + "", + "| # | Type | Agent | Task | Reason |", + "|---:|---|---|---|---|", + *[ + "| " + f"{_fmt_number(event.get('event_index'))} | " + f"{event.get('event_type') or '-'} | " + f"{event.get('agent_id') or '-'} | " + f"{event.get('task_id') or event.get('task') or '-'} | " + f"{event.get('reason') or event.get('decision_reason') or '-'} |" + for event in runtime["operation_context"][ + "runtime_event_timeline_sample" + ] + ], + "", "## AIGuard Runtime Reliability Evidence", "", f"- guard_status: `{guard.get('status')}`", @@ -573,6 +667,138 @@ def _timeline_summary( } +def _operation_context( + orchestration_summary: dict[str, Any], + metrics: dict[str, Any], +) -> dict[str, Any]: + runtime_events = _runtime_event_timeline(orchestration_summary) + queue_state = _queue_state_summary(orchestration_summary) + worker_health = _worker_health_snapshot(orchestration_summary) + runtime_event_summary = _runtime_event_summary(orchestration_summary, runtime_events) + return { + "queue_state_summary": queue_state, + "worker_health_snapshot": worker_health, + "worker_health_counts": dict(metrics.get("worker_health_counts") or {}), + "runtime_event_summary": runtime_event_summary, + "runtime_event_timeline_count": len(runtime_events), + "runtime_event_timeline_sample": runtime_events[:8], + } + + +def _queue_state_summary(orchestration_summary: dict[str, Any]) -> dict[str, Any]: + value = orchestration_summary.get("queue_state_summary") + if isinstance(value, dict): + return dict(value) + + queue_depth_timeline = _dict_list(orchestration_summary.get("queue_depth_timeline")) + max_total_queue_depth = _max_total_queue_depth(queue_depth_timeline) + final_queue_depth: dict[str, Any] = {} + if queue_depth_timeline: + queue_depth = queue_depth_timeline[-1].get("queue_depth") + if isinstance(queue_depth, dict): + final_queue_depth = dict(queue_depth) + return { + "schema_version": None, + "sample_count": len(queue_depth_timeline), + "overload_backlog_threshold": None, + "max_total_queue_depth": max_total_queue_depth, + "average_total_queue_depth": _average_total_queue_depth(queue_depth_timeline), + "final_queue_depth": final_queue_depth, + "max_queue_depth_by_task": _max_queue_depth_by_task(queue_depth_timeline), + "queue_pressure_state": _derived_queue_pressure_state(max_total_queue_depth), + } + + +def _worker_health_snapshot(orchestration_summary: dict[str, Any]) -> dict[str, Any]: + value = orchestration_summary.get("worker_health_snapshot") + if isinstance(value, dict): + return dict(value) + return {"schema_version": None, "workers": {}} + + +def _runtime_event_summary( + orchestration_summary: dict[str, Any], + runtime_events: list[dict[str, Any]], +) -> dict[str, Any]: + value = orchestration_summary.get("runtime_event_summary") + if isinstance(value, dict): + summary = dict(value) + if "event_type_counts" not in summary: + summary["event_type_counts"] = _runtime_event_type_counts(runtime_events) + if "event_count" not in summary: + summary["event_count"] = len(runtime_events) + return summary + return { + "schema_version": None, + "event_count": len(runtime_events), + "event_type_counts": _runtime_event_type_counts(runtime_events), + } + + +def _runtime_event_timeline(orchestration_summary: dict[str, Any]) -> list[dict[str, Any]]: + return _dict_list(orchestration_summary.get("runtime_event_timeline")) + + +def _runtime_event_type_counts(runtime_events: list[dict[str, Any]]) -> dict[str, int]: + counts: dict[str, int] = {} + for event in runtime_events: + event_type = event.get("event_type") + if not isinstance(event_type, str) or not event_type: + event_type = "unknown" + counts[event_type] = counts.get(event_type, 0) + 1 + return counts + + +def _worker_health_counts(worker_health_snapshot: dict[str, Any]) -> dict[str, int]: + workers = worker_health_snapshot.get("workers") + if not isinstance(workers, dict): + return {} + counts: dict[str, int] = {} + for worker in workers.values(): + if not isinstance(worker, dict): + continue + state = worker.get("health_state") + if not isinstance(state, str) or not state: + state = "unknown" + counts[state] = counts.get(state, 0) + 1 + return counts + + +def _average_total_queue_depth(queue_depth_timeline: list[dict[str, Any]]) -> float: + values = [ + _non_negative_number(item.get("total_queue_depth")) + for item in queue_depth_timeline + if item.get("total_queue_depth") is not None + ] + if not values: + return 0.0 + return sum(values) / len(values) + + +def _max_queue_depth_by_task( + queue_depth_timeline: list[dict[str, Any]], +) -> dict[str, float]: + result: dict[str, float] = {} + for item in queue_depth_timeline: + queue_depth = item.get("queue_depth") + if not isinstance(queue_depth, dict): + continue + for task_name, depth in queue_depth.items(): + result[str(task_name)] = max( + result.get(str(task_name), 0.0), + _non_negative_number(depth), + ) + return result + + +def _derived_queue_pressure_state(max_total_queue_depth: float) -> str: + if max_total_queue_depth >= 8: + return "overloaded" + if max_total_queue_depth >= 3: + return "elevated" + return "nominal" + + def _load_json_dict(path: str | Path | None) -> dict[str, Any] | None: if path is None: return None diff --git a/tests/test_agent_runtime_report.py b/tests/test_agent_runtime_report.py index 5f4b94b..14fa93d 100644 --- a/tests/test_agent_runtime_report.py +++ b/tests/test_agent_runtime_report.py @@ -98,6 +98,93 @@ def orchestration_summary() -> dict: "protected_agent_id": "safety_monitor_agent", } ], + "queue_state_summary": { + "schema_version": "inferedge-orchestrator-queue-state-v1", + "sample_count": 1, + "overload_backlog_threshold": 3, + "max_total_queue_depth": 6, + "average_total_queue_depth": 6.0, + "final_queue_depth": { + "vision_agent": 4, + "voice_command_agent": 2, + "safety_monitor_agent": 0, + }, + "max_queue_depth_by_task": { + "vision_agent": 4, + "voice_command_agent": 2, + "safety_monitor_agent": 0, + }, + "queue_pressure_state": "overloaded", + }, + "worker_health_snapshot": { + "schema_version": "inferedge-orchestrator-worker-health-v1", + "workers": { + "safety_monitor_agent": { + "task": "safety_monitor_agent", + "agent_id": "safety_monitor_agent", + "task_id": "task_safety_monitor_agent", + "agent_type": "safety", + "worker": "dummy", + "health_state": "healthy", + "executed_count": 2, + "dropped_count": 0, + "deadline_missed_count": 0, + "fallback_count": 0, + "mean_latency_ms": 6.0, + }, + "vision_agent": { + "task": "vision_agent", + "agent_id": "vision_agent", + "task_id": "task_vision_agent", + "agent_type": "vision", + "worker": "dummy", + "health_state": "degraded", + "executed_count": 8, + "dropped_count": 14, + "deadline_missed_count": 1, + "fallback_count": 14, + "mean_latency_ms": 41.0, + }, + }, + }, + "runtime_event_summary": { + "schema_version": "inferedge-orchestrator-runtime-event-summary-v1", + "event_count": 4, + "event_type_counts": { + "queue_snapshot": 1, + "policy_decision": 1, + "drop": 1, + "execution": 1, + }, + }, + "runtime_event_timeline": [ + { + "event_index": 0, + "event_type": "queue_snapshot", + "reason": "queue_depth_sampled", + }, + { + "event_index": 1, + "event_type": "policy_decision", + "agent_id": "vision_agent", + "task_id": "task_vision_agent", + "reason": "queue_backlog_threshold_exceeded", + }, + { + "event_index": 2, + "event_type": "drop", + "agent_id": "vision_agent", + "task_id": "task_vision_agent", + "reason": "load_shedding_backlog_threshold_exceeded", + }, + { + "event_index": 3, + "event_type": "execution", + "agent_id": "vision_agent", + "task_id": "task_vision_agent", + "reason": "deadline_missed", + }, + ], } @@ -182,6 +269,16 @@ def test_compute_agent_runtime_metrics_from_orchestrator_summary(): assert metrics["policy_decision_reasons"] == { "queue_backlog_threshold_exceeded": 1 } + assert metrics["queue_pressure_state"] == "overloaded" + assert metrics["runtime_event_count"] == 4 + assert metrics["runtime_event_type_counts"] == { + "queue_snapshot": 1, + "policy_decision": 1, + "drop": 1, + "execution": 1, + } + assert metrics["degraded_worker_count"] == 1 + assert metrics["healthy_worker_count"] == 1 def test_agent_runtime_report_blocks_when_guard_blocks(): @@ -218,6 +315,37 @@ def test_agent_runtime_report_blocks_when_guard_blocks(): assert { item["type"] for item in report["runtime_reliability_evidence"] } == {"excessive_drop_rate", "sustained_overload_risk"} + operation_context = report["agent_runtime_summary"]["operation_context"] + assert operation_context["queue_state_summary"]["queue_pressure_state"] == "overloaded" + assert operation_context["worker_health_counts"] == { + "healthy": 1, + "degraded": 1, + } + assert operation_context["runtime_event_summary"]["event_type_counts"]["drop"] == 1 + assert operation_context["runtime_event_timeline_count"] == 4 + assert operation_context["runtime_event_timeline_sample"][1]["event_type"] == ( + "policy_decision" + ) + + +def test_agent_runtime_report_keeps_legacy_orchestrator_summary_compatible(): + legacy_summary = orchestration_summary() + legacy_summary.pop("queue_state_summary") + legacy_summary.pop("worker_health_snapshot") + legacy_summary.pop("runtime_event_summary") + legacy_summary.pop("runtime_event_timeline") + + report = build_agent_runtime_reliability_report( + orchestration_summary=legacy_summary, + guard_analysis=sustained_guard_analysis(), + ) + + operation_context = report["agent_runtime_summary"]["operation_context"] + assert operation_context["queue_state_summary"]["schema_version"] is None + assert operation_context["queue_state_summary"]["max_total_queue_depth"] == 6 + assert operation_context["worker_health_snapshot"]["workers"] == {} + assert operation_context["runtime_event_summary"]["event_count"] == 0 + assert report["agent_deployment_decision"]["decision"] == "blocked" def test_agent_runtime_report_markdown_contains_sections(): @@ -230,6 +358,12 @@ def test_agent_runtime_report_markdown_contains_sections(): assert "# InferEdge Agent Runtime Reliability Report" in markdown assert "Agent Runtime Summary" in markdown assert "Runtime Reliability Metrics" in markdown + assert "Orchestrator Operation Context" in markdown + assert "Queue State" in markdown + assert "Worker Health" in markdown + assert "Runtime Event Summary" in markdown + assert "queue_pressure_state" in markdown + assert "policy_decision" in markdown assert "AIGuard Runtime Reliability Evidence" in markdown assert "Lab Agent Deployment Decision" in markdown assert "guard_blocked_runtime_block" in markdown