Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 16 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -285,7 +285,22 @@ python3 -m inferedge_orchestrator run-multi-workload-sustained \
For a minimal process-backed Safety input, use
`--capture-process-resource-snapshot` instead of `--resource-snapshot`. The CLI
writes a small process resource snapshot next to the output JSON and routes it
through the Safety producer.
through the Safety producer. If optional ONNX Runtime dependencies and a local
model file are available, the Vision producer can also run a small
ONNX Runtime probe while preserving the same producer-backed sustained contract:

```bash
python3 -m inferedge_orchestrator run-multi-workload-sustained \
--config configs/agent_multi_workload_sustained_device_local.json \
--output reports/agent_multi_workload_sustained_device_local.json \
--frames 16 \
--vision-input /path/to/frame.ppm \
--vision-onnx-model /path/to/vision_model.onnx
```

This records `vision_inference_backend=onnxruntime`, input/output shapes,
provider, and probe latency as runtime operation evidence. It is a lightweight
device-local producer step, not a full live YOLO service.

### InferEdge Handoff

Expand Down
5 changes: 4 additions & 1 deletion configs/README.ko.md
Original file line number Diff line number Diff line change
Expand Up @@ -70,4 +70,7 @@ runtime output은 일반적으로 git에서 ignore되는 `reports/` 아래에
Voice request, Safety resource producer를 하나의 sustained validation path로
묶는다. CLI에서 `--vision-input`, `--voice-ingress-payload`,
`--resource-snapshot`, `--capture-process-resource-snapshot`으로 실행 시점의
로컬 입력을 override할 수 있다.
로컬 입력을 override할 수 있다. 선택적 ONNX Runtime dependency가 설치되어
있으면 `--vision-onnx-model`로 lightweight Vision inference probe를 추가해
provider, input/output shape, probe latency를 기록할 수 있다. 이는 full live
YOLO service가 아니라 device-local producer 단계다.
5 changes: 4 additions & 1 deletion configs/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -72,4 +72,7 @@ Tracked evidence samples live under
committed Vision image, Voice request, and Safety resource producers in one
sustained validation path. The CLI can override those inputs at run time with
`--vision-input`, `--voice-ingress-payload`, `--resource-snapshot`, or
`--capture-process-resource-snapshot`.
`--capture-process-resource-snapshot`. If optional ONNX Runtime dependencies
are installed, `--vision-onnx-model` adds a lightweight Vision inference probe
that records provider, input/output shapes, and probe latency without claiming
a full live YOLO service.
8 changes: 8 additions & 0 deletions src/inferedge_orchestrator/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,13 @@ def build_parser() -> argparse.ArgumentParser:
"--vision-input",
help="optional local image/video path to use for the Vision producer",
)
sustained_parser.add_argument(
"--vision-onnx-model",
help=(
"optional ONNX model path for the Vision producer probe; requires "
"optional onnxruntime dependencies"
),
)
sustained_parser.add_argument(
"--voice-ingress-payload",
help="optional local JSON request payload path for the Voice producer",
Expand Down Expand Up @@ -139,6 +146,7 @@ def _run_multi_workload_sustained(args: argparse.Namespace) -> int:
config = apply_device_local_input_overrides(
config,
vision_input=args.vision_input,
vision_onnx_model=args.vision_onnx_model,
voice_ingress_payload=args.voice_ingress_payload,
resource_snapshot=resource_snapshot,
resource_snapshot_source=resource_snapshot_source,
Expand Down
18 changes: 18 additions & 0 deletions src/inferedge_orchestrator/sustained.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ def apply_device_local_input_overrides(
config: OrchestratorConfig,
*,
vision_input: str | Path | None = None,
vision_onnx_model: str | Path | None = None,
voice_ingress_payload: str | Path | None = None,
resource_snapshot: str | Path | None = None,
resource_snapshot_source: str | None = None,
Expand All @@ -30,6 +31,7 @@ def apply_device_local_input_overrides(

if (
vision_input is None
and vision_onnx_model is None
and voice_ingress_payload is None
and resource_snapshot is None
):
Expand All @@ -48,6 +50,11 @@ def apply_device_local_input_overrides(
if vision_input is not None and task.agent_type == "vision":
options["device_local_validation"] = True
options["producer_stage"] = "device_local_cli_override"
if vision_onnx_model is not None and task.agent_type == "vision":
options["device_local_validation"] = True
options["producer_stage"] = "device_local_cli_override"
options["vision_inference_backend"] = "onnxruntime"
options["vision_model_path"] = str(Path(vision_onnx_model))
if voice_ingress_payload is not None and task.agent_type == "voice":
options["device_local_validation"] = True
options["producer_stage"] = "device_local_cli_override"
Expand Down Expand Up @@ -222,6 +229,8 @@ def _workload_profile(task: TaskConfig, report: dict[str, Any]) -> dict[str, Any
options.get("expected_runtime_mode", "sustained")
),
"preferred_device": options.get("preferred_device"),
"vision_inference_backend": options.get("vision_inference_backend"),
"vision_model_path": options.get("vision_model_path"),
"executed": task_report.get("executed", 0),
"dropped": task_report.get("dropped", 0),
"deadline_missed": task_report.get("deadline_missed", 0),
Expand Down Expand Up @@ -306,6 +315,7 @@ def _estimated_memory_total_mb(
def _local_profile_signals(report: dict[str, Any]) -> dict[str, Any]:
profiled_events = []
profile_kinds: list[str] = []
vision_inference_backends: list[str] = []
elapsed_total = 0.0
for event in report.get("result_events", []):
if not isinstance(event, dict):
Expand All @@ -319,6 +329,12 @@ def _local_profile_signals(report: dict[str, Any]) -> dict[str, Any]:
kind = output.get("profile_kind")
if isinstance(kind, str) and kind not in profile_kinds:
profile_kinds.append(kind)
vision_backend = output.get("vision_inference_backend")
if (
isinstance(vision_backend, str)
and vision_backend not in vision_inference_backends
):
vision_inference_backends.append(vision_backend)
elapsed = output.get("profile_elapsed_ms")
if isinstance(elapsed, int | float):
elapsed_total += float(elapsed)
Expand All @@ -327,6 +343,8 @@ def _local_profile_signals(report: dict[str, Any]) -> dict[str, Any]:
"local_profile_adapter_count": len(profiled_events),
"local_profile_elapsed_ms": round(elapsed_total, 3),
"local_profile_kinds": profile_kinds,
"vision_inference_backend_count": len(vision_inference_backends),
"vision_inference_backends": vision_inference_backends,
}


Expand Down
134 changes: 132 additions & 2 deletions src/inferedge_orchestrator/workers.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ class TensorRtBuffer:
class DummyWorker:
def __init__(self, *, sleep: bool = False) -> None:
self._sleep = sleep
self._vision_probe_sessions: dict[tuple[str, tuple[str, ...]], object] = {}

def run(self, task: TaskConfig, frame: FrameEnvelope) -> WorkerResult:
options = task.worker_options or {}
Expand Down Expand Up @@ -72,6 +73,7 @@ def _run_local_profile(
frame=frame,
work_units=work_units,
options=options,
vision_probe_sessions=self._vision_probe_sessions,
)
latency_ms = (time.perf_counter() - started) * 1000.0
return WorkerResult(
Expand Down Expand Up @@ -113,9 +115,16 @@ def _profile_workload(
frame: FrameEnvelope,
work_units: int,
options: dict[str, object],
vision_probe_sessions: dict[tuple[str, tuple[str, ...]], object] | None = None,
) -> tuple[str, dict[str, object]]:
if workload_type == "realtime_vision":
return _profile_vision(task, frame, work_units, options)
return _profile_vision(
task,
frame,
work_units,
options,
sessions=vision_probe_sessions,
)
if workload_type == "voice_command":
return _profile_voice(task, frame, work_units, options)
if workload_type == "telemetry_monitor":
Expand All @@ -128,8 +137,17 @@ def _profile_vision(
frame: FrameEnvelope,
work_units: int,
options: dict[str, object],
*,
sessions: dict[tuple[str, tuple[str, ...]], object] | None = None,
) -> tuple[str, dict[str, object]]:
file_profile, sample = _vision_file_sample(frame, options)
inference_profile = _vision_inference_probe(
task,
frame,
sample,
options,
sessions=sessions,
)
accumulator = frame.sequence + len(task.name) + sum(sample[:128])
bright_pixels = 0
edge_votes = 0
Expand Down Expand Up @@ -157,9 +175,14 @@ def _profile_vision(
"edge_vote_ratio": round(edge_votes / work_units, 4),
"stale_frame_policy": task.drop_policy,
"contention_signal": (
"vision_file_cpu_profile" if file_profile else "frame_queue_cpu_profile"
"vision_onnxruntime_probe"
if inference_profile
else "vision_file_cpu_profile"
if file_profile
else "frame_queue_cpu_profile"
),
**file_profile,
**inference_profile,
}


Expand Down Expand Up @@ -195,6 +218,113 @@ def _vision_file_sample(
}, sample


def _vision_inference_probe(
task: TaskConfig,
frame: FrameEnvelope,
sample: bytes,
options: dict[str, object],
*,
sessions: dict[tuple[str, tuple[str, ...]], object] | None = None,
) -> dict[str, object]:
backend = options.get("vision_inference_backend")
if backend is None:
return {}
if str(backend) != "onnxruntime":
raise ValueError(
f"{task.name}: unsupported vision_inference_backend {backend!r}"
)

model_value = options.get("vision_model_path") or task.model_path
if not model_value:
raise ValueError(
f"{task.name}: vision_inference_backend=onnxruntime requires "
"worker_options.vision_model_path or task.model_path"
)
model_path = Path(str(model_value))
if not model_path.exists():
raise FileNotFoundError(f"vision ONNX model path does not exist: {model_path}")
if not model_path.is_file():
raise ValueError(f"vision ONNX model path is not a file: {model_path}")

try:
import numpy as np
import onnxruntime as ort
except ModuleNotFoundError as exc:
raise RuntimeError(
"vision ONNX producer probe requires optional numpy and onnxruntime "
"dependencies. Install inferedge-orchestrator[onnx]."
) from exc

providers = options.get("providers") or ["CPUExecutionProvider"]
provider_tuple = tuple(str(provider) for provider in providers)
session_key = (str(model_path), provider_tuple)
if sessions is not None and session_key in sessions:
session = sessions[session_key]
else:
session = ort.InferenceSession(str(model_path), providers=providers)
if sessions is not None:
sessions[session_key] = session
inputs = session.get_inputs()
if not inputs:
raise RuntimeError(f"{task.name}: vision ONNX model has no inputs")

feed: dict[str, object] = {}
input_shapes: dict[str, list[int]] = {}
for model_input in inputs:
shape = _concrete_shape(model_input.shape)
input_shapes[model_input.name] = list(shape)
feed[model_input.name] = _vision_probe_array(
shape=shape,
sample=sample,
frame=frame,
np=np,
)

started = time.perf_counter()
outputs = session.run(None, feed)
elapsed_ms = (time.perf_counter() - started) * 1000.0
output_shapes = [list(getattr(output, "shape", ())) for output in outputs]
return {
"vision_inference_backend": "onnxruntime",
"vision_inference_mode": "probe",
"vision_model_path": str(model_path),
"vision_provider": _session_provider(session, providers),
"vision_input_shapes": input_shapes,
"vision_output_count": len(outputs),
"vision_output_shapes": output_shapes,
"vision_probe_elapsed_ms": round(elapsed_ms, 3),
}


def _vision_probe_array(
*,
shape: tuple[int, ...],
sample: bytes,
frame: FrameEnvelope,
np: object,
) -> object:
array = np.zeros(shape, dtype=np.float32)
if array.size == 0:
return array
if sample:
values = np.frombuffer(sample, dtype=np.uint8).astype(np.float32) / 255.0
else:
values = np.asarray([frame.sequence % 255], dtype=np.float32) / 255.0
array[...] = np.resize(values, array.size).reshape(shape)
return array


def _session_provider(session: object, providers: object) -> str:
get_providers = getattr(session, "get_providers", None)
if callable(get_providers):
available = get_providers()
if available:
return str(available[0])
if isinstance(providers, list) and providers:
return str(providers[0])
return "unknown"


def _positive_int(value: Any, *, default: int) -> int:
if value is None:
return default
Expand Down
Loading