diff --git a/README.md b/README.md index 40095d2..b69806d 100644 --- a/README.md +++ b/README.md @@ -285,7 +285,22 @@ python3 -m inferedge_orchestrator run-multi-workload-sustained \ For a minimal process-backed Safety input, use `--capture-process-resource-snapshot` instead of `--resource-snapshot`. The CLI writes a small process resource snapshot next to the output JSON and routes it -through the Safety producer. +through the Safety producer. If optional ONNX Runtime dependencies and a local +model file are available, the Vision producer can also run a small +ONNX Runtime probe while preserving the same producer-backed sustained contract: + +```bash +python3 -m inferedge_orchestrator run-multi-workload-sustained \ + --config configs/agent_multi_workload_sustained_device_local.json \ + --output reports/agent_multi_workload_sustained_device_local.json \ + --frames 16 \ + --vision-input /path/to/frame.ppm \ + --vision-onnx-model /path/to/vision_model.onnx +``` + +This records `vision_inference_backend=onnxruntime`, input/output shapes, +provider, and probe latency as runtime operation evidence. It is a lightweight +device-local producer step, not a full live YOLO service. ### InferEdge Handoff diff --git a/configs/README.ko.md b/configs/README.ko.md index 556752d..6c1f818 100644 --- a/configs/README.ko.md +++ b/configs/README.ko.md @@ -70,4 +70,7 @@ runtime output은 일반적으로 git에서 ignore되는 `reports/` 아래에 Voice request, Safety resource producer를 하나의 sustained validation path로 묶는다. CLI에서 `--vision-input`, `--voice-ingress-payload`, `--resource-snapshot`, `--capture-process-resource-snapshot`으로 실행 시점의 - 로컬 입력을 override할 수 있다. + 로컬 입력을 override할 수 있다. 선택적 ONNX Runtime dependency가 설치되어 + 있으면 `--vision-onnx-model`로 lightweight Vision inference probe를 추가해 + provider, input/output shape, probe latency를 기록할 수 있다. 이는 full live + YOLO service가 아니라 device-local producer 단계다. diff --git a/configs/README.md b/configs/README.md index 725f9b2..14f2498 100644 --- a/configs/README.md +++ b/configs/README.md @@ -72,4 +72,7 @@ Tracked evidence samples live under committed Vision image, Voice request, and Safety resource producers in one sustained validation path. The CLI can override those inputs at run time with `--vision-input`, `--voice-ingress-payload`, `--resource-snapshot`, or - `--capture-process-resource-snapshot`. + `--capture-process-resource-snapshot`. If optional ONNX Runtime dependencies + are installed, `--vision-onnx-model` adds a lightweight Vision inference probe + that records provider, input/output shapes, and probe latency without claiming + a full live YOLO service. diff --git a/src/inferedge_orchestrator/cli.py b/src/inferedge_orchestrator/cli.py index 3775081..1f53da5 100644 --- a/src/inferedge_orchestrator/cli.py +++ b/src/inferedge_orchestrator/cli.py @@ -45,6 +45,13 @@ def build_parser() -> argparse.ArgumentParser: "--vision-input", help="optional local image/video path to use for the Vision producer", ) + sustained_parser.add_argument( + "--vision-onnx-model", + help=( + "optional ONNX model path for the Vision producer probe; requires " + "optional onnxruntime dependencies" + ), + ) sustained_parser.add_argument( "--voice-ingress-payload", help="optional local JSON request payload path for the Voice producer", @@ -139,6 +146,7 @@ def _run_multi_workload_sustained(args: argparse.Namespace) -> int: config = apply_device_local_input_overrides( config, vision_input=args.vision_input, + vision_onnx_model=args.vision_onnx_model, voice_ingress_payload=args.voice_ingress_payload, resource_snapshot=resource_snapshot, resource_snapshot_source=resource_snapshot_source, diff --git a/src/inferedge_orchestrator/sustained.py b/src/inferedge_orchestrator/sustained.py index 6ee14e5..2c2b6d4 100644 --- a/src/inferedge_orchestrator/sustained.py +++ b/src/inferedge_orchestrator/sustained.py @@ -17,6 +17,7 @@ def apply_device_local_input_overrides( config: OrchestratorConfig, *, vision_input: str | Path | None = None, + vision_onnx_model: str | Path | None = None, voice_ingress_payload: str | Path | None = None, resource_snapshot: str | Path | None = None, resource_snapshot_source: str | None = None, @@ -30,6 +31,7 @@ def apply_device_local_input_overrides( if ( vision_input is None + and vision_onnx_model is None and voice_ingress_payload is None and resource_snapshot is None ): @@ -48,6 +50,11 @@ def apply_device_local_input_overrides( if vision_input is not None and task.agent_type == "vision": options["device_local_validation"] = True options["producer_stage"] = "device_local_cli_override" + if vision_onnx_model is not None and task.agent_type == "vision": + options["device_local_validation"] = True + options["producer_stage"] = "device_local_cli_override" + options["vision_inference_backend"] = "onnxruntime" + options["vision_model_path"] = str(Path(vision_onnx_model)) if voice_ingress_payload is not None and task.agent_type == "voice": options["device_local_validation"] = True options["producer_stage"] = "device_local_cli_override" @@ -222,6 +229,8 @@ def _workload_profile(task: TaskConfig, report: dict[str, Any]) -> dict[str, Any options.get("expected_runtime_mode", "sustained") ), "preferred_device": options.get("preferred_device"), + "vision_inference_backend": options.get("vision_inference_backend"), + "vision_model_path": options.get("vision_model_path"), "executed": task_report.get("executed", 0), "dropped": task_report.get("dropped", 0), "deadline_missed": task_report.get("deadline_missed", 0), @@ -306,6 +315,7 @@ def _estimated_memory_total_mb( def _local_profile_signals(report: dict[str, Any]) -> dict[str, Any]: profiled_events = [] profile_kinds: list[str] = [] + vision_inference_backends: list[str] = [] elapsed_total = 0.0 for event in report.get("result_events", []): if not isinstance(event, dict): @@ -319,6 +329,12 @@ def _local_profile_signals(report: dict[str, Any]) -> dict[str, Any]: kind = output.get("profile_kind") if isinstance(kind, str) and kind not in profile_kinds: profile_kinds.append(kind) + vision_backend = output.get("vision_inference_backend") + if ( + isinstance(vision_backend, str) + and vision_backend not in vision_inference_backends + ): + vision_inference_backends.append(vision_backend) elapsed = output.get("profile_elapsed_ms") if isinstance(elapsed, int | float): elapsed_total += float(elapsed) @@ -327,6 +343,8 @@ def _local_profile_signals(report: dict[str, Any]) -> dict[str, Any]: "local_profile_adapter_count": len(profiled_events), "local_profile_elapsed_ms": round(elapsed_total, 3), "local_profile_kinds": profile_kinds, + "vision_inference_backend_count": len(vision_inference_backends), + "vision_inference_backends": vision_inference_backends, } diff --git a/src/inferedge_orchestrator/workers.py b/src/inferedge_orchestrator/workers.py index 9203a7d..febb4b2 100644 --- a/src/inferedge_orchestrator/workers.py +++ b/src/inferedge_orchestrator/workers.py @@ -38,6 +38,7 @@ class TensorRtBuffer: class DummyWorker: def __init__(self, *, sleep: bool = False) -> None: self._sleep = sleep + self._vision_probe_sessions: dict[tuple[str, tuple[str, ...]], object] = {} def run(self, task: TaskConfig, frame: FrameEnvelope) -> WorkerResult: options = task.worker_options or {} @@ -72,6 +73,7 @@ def _run_local_profile( frame=frame, work_units=work_units, options=options, + vision_probe_sessions=self._vision_probe_sessions, ) latency_ms = (time.perf_counter() - started) * 1000.0 return WorkerResult( @@ -113,9 +115,16 @@ def _profile_workload( frame: FrameEnvelope, work_units: int, options: dict[str, object], + vision_probe_sessions: dict[tuple[str, tuple[str, ...]], object] | None = None, ) -> tuple[str, dict[str, object]]: if workload_type == "realtime_vision": - return _profile_vision(task, frame, work_units, options) + return _profile_vision( + task, + frame, + work_units, + options, + sessions=vision_probe_sessions, + ) if workload_type == "voice_command": return _profile_voice(task, frame, work_units, options) if workload_type == "telemetry_monitor": @@ -128,8 +137,17 @@ def _profile_vision( frame: FrameEnvelope, work_units: int, options: dict[str, object], + *, + sessions: dict[tuple[str, tuple[str, ...]], object] | None = None, ) -> tuple[str, dict[str, object]]: file_profile, sample = _vision_file_sample(frame, options) + inference_profile = _vision_inference_probe( + task, + frame, + sample, + options, + sessions=sessions, + ) accumulator = frame.sequence + len(task.name) + sum(sample[:128]) bright_pixels = 0 edge_votes = 0 @@ -157,9 +175,14 @@ def _profile_vision( "edge_vote_ratio": round(edge_votes / work_units, 4), "stale_frame_policy": task.drop_policy, "contention_signal": ( - "vision_file_cpu_profile" if file_profile else "frame_queue_cpu_profile" + "vision_onnxruntime_probe" + if inference_profile + else "vision_file_cpu_profile" + if file_profile + else "frame_queue_cpu_profile" ), **file_profile, + **inference_profile, } @@ -195,6 +218,113 @@ def _vision_file_sample( }, sample +def _vision_inference_probe( + task: TaskConfig, + frame: FrameEnvelope, + sample: bytes, + options: dict[str, object], + *, + sessions: dict[tuple[str, tuple[str, ...]], object] | None = None, +) -> dict[str, object]: + backend = options.get("vision_inference_backend") + if backend is None: + return {} + if str(backend) != "onnxruntime": + raise ValueError( + f"{task.name}: unsupported vision_inference_backend {backend!r}" + ) + + model_value = options.get("vision_model_path") or task.model_path + if not model_value: + raise ValueError( + f"{task.name}: vision_inference_backend=onnxruntime requires " + "worker_options.vision_model_path or task.model_path" + ) + model_path = Path(str(model_value)) + if not model_path.exists(): + raise FileNotFoundError(f"vision ONNX model path does not exist: {model_path}") + if not model_path.is_file(): + raise ValueError(f"vision ONNX model path is not a file: {model_path}") + + try: + import numpy as np + import onnxruntime as ort + except ModuleNotFoundError as exc: + raise RuntimeError( + "vision ONNX producer probe requires optional numpy and onnxruntime " + "dependencies. Install inferedge-orchestrator[onnx]." + ) from exc + + providers = options.get("providers") or ["CPUExecutionProvider"] + provider_tuple = tuple(str(provider) for provider in providers) + session_key = (str(model_path), provider_tuple) + if sessions is not None and session_key in sessions: + session = sessions[session_key] + else: + session = ort.InferenceSession(str(model_path), providers=providers) + if sessions is not None: + sessions[session_key] = session + inputs = session.get_inputs() + if not inputs: + raise RuntimeError(f"{task.name}: vision ONNX model has no inputs") + + feed: dict[str, object] = {} + input_shapes: dict[str, list[int]] = {} + for model_input in inputs: + shape = _concrete_shape(model_input.shape) + input_shapes[model_input.name] = list(shape) + feed[model_input.name] = _vision_probe_array( + shape=shape, + sample=sample, + frame=frame, + np=np, + ) + + started = time.perf_counter() + outputs = session.run(None, feed) + elapsed_ms = (time.perf_counter() - started) * 1000.0 + output_shapes = [list(getattr(output, "shape", ())) for output in outputs] + return { + "vision_inference_backend": "onnxruntime", + "vision_inference_mode": "probe", + "vision_model_path": str(model_path), + "vision_provider": _session_provider(session, providers), + "vision_input_shapes": input_shapes, + "vision_output_count": len(outputs), + "vision_output_shapes": output_shapes, + "vision_probe_elapsed_ms": round(elapsed_ms, 3), + } + + +def _vision_probe_array( + *, + shape: tuple[int, ...], + sample: bytes, + frame: FrameEnvelope, + np: object, +) -> object: + array = np.zeros(shape, dtype=np.float32) + if array.size == 0: + return array + if sample: + values = np.frombuffer(sample, dtype=np.uint8).astype(np.float32) / 255.0 + else: + values = np.asarray([frame.sequence % 255], dtype=np.float32) / 255.0 + array[...] = np.resize(values, array.size).reshape(shape) + return array + + +def _session_provider(session: object, providers: object) -> str: + get_providers = getattr(session, "get_providers", None) + if callable(get_providers): + available = get_providers() + if available: + return str(available[0]) + if isinstance(providers, list) and providers: + return str(providers[0]) + return "unknown" + + def _positive_int(value: Any, *, default: int) -> int: if value is None: return default diff --git a/tests/test_multi_workload_sustained.py b/tests/test_multi_workload_sustained.py index eed1bcf..d26cd3a 100644 --- a/tests/test_multi_workload_sustained.py +++ b/tests/test_multi_workload_sustained.py @@ -1,7 +1,11 @@ from __future__ import annotations import json +import sys from pathlib import Path +from types import SimpleNamespace + +import pytest from inferedge_orchestrator.config import OrchestratorConfig from inferedge_orchestrator.sustained import ( @@ -381,6 +385,91 @@ def test_device_local_input_overrides_use_local_paths(tmp_path) -> None: ) +def test_device_local_vision_can_run_optional_onnx_probe( + tmp_path, + monkeypatch, +) -> None: + np = pytest.importorskip("numpy") + + class FakeInput: + name = "images" + shape = [1, 3, 2, 2] + + class FakeSession: + def __init__(self, model_path, providers): + self.model_path = model_path + self.providers = providers + + def get_inputs(self): + return [FakeInput()] + + def get_providers(self): + return self.providers + + def run(self, output_names, feed): + assert output_names is None + assert list(feed) == ["images"] + assert feed["images"].shape == (1, 3, 2, 2) + return [np.ones((1, 1, 6), dtype=np.float32)] + + monkeypatch.setitem( + sys.modules, + "onnxruntime", + SimpleNamespace(InferenceSession=FakeSession), + ) + + config = OrchestratorConfig.from_dict( + json.loads( + Path("configs/agent_multi_workload_sustained_device_local.json").read_text( + encoding="utf-8" + ) + ) + ) + image = tmp_path / "frame.ppm" + image.write_bytes(b"P6\n1 1\n255\n\xff\x00\x00") + model = tmp_path / "vision_probe.onnx" + model.write_bytes(b"fake onnx model for mocked session") + overridden = apply_device_local_input_overrides( + config, + vision_input=image, + vision_onnx_model=model, + ) + report = write_multi_workload_sustained( + overridden, + output=tmp_path / "device_local_vision_onnx_probe.json", + frames=4, + ) + + vision_profile = next( + profile + for profile in report["multi_workload_sustained_summary"]["workload_profiles"] + if profile["agent_id"] == "vision_agent" + ) + assert vision_profile["vision_inference_backend"] == "onnxruntime" + assert vision_profile["vision_model_path"] == str(model) + signals = report["multi_workload_sustained_summary"]["observed_runtime_signals"] + assert signals["vision_inference_backend_count"] == 1 + assert signals["vision_inference_backends"] == ["onnxruntime"] + + vision_outputs = [ + event["output"] + for event in report["result_events"] + if event["task"] == "vision_agent" + ] + assert vision_outputs + first_output = vision_outputs[0] + assert first_output["producer_source"] == "image_file" + assert first_output["contention_signal"] == "vision_onnxruntime_probe" + assert first_output["vision_inference_backend"] == "onnxruntime" + assert first_output["vision_inference_mode"] == "probe" + assert first_output["vision_model_path"] == str(model) + assert first_output["vision_provider"] == "CPUExecutionProvider" + assert first_output["vision_input_shapes"] == {"images": [1, 3, 2, 2]} + assert first_output["vision_output_shapes"] == [[1, 1, 6]] + assert first_output["vision_output_count"] == 1 + assert first_output["vision_probe_elapsed_ms"] >= 0 + + def test_process_resource_snapshot_can_feed_device_local_safety(tmp_path) -> None: config = OrchestratorConfig.from_dict( json.loads(