diff --git a/README.ko.md b/README.ko.md index 843e235..13f749e 100644 --- a/README.ko.md +++ b/README.ko.md @@ -312,14 +312,16 @@ request, Safety resource evidence를 기록하며 live YOLO/ONNX, FastAPI, tegrastats, Jetson/RPi producer는 후속 integration으로 둔다. config를 수정하지 않고 실행 시점에 committed producer fixture를 로컬 입력으로 -교체할 수도 있다. +교체할 수도 있다. `--vision-input`은 단일 image/video file 또는 image frame +directory를 받을 수 있으며, directory는 sustained run 동안 deterministic image +sequence로 순환 처리된다. ```bash python3 -m inferedge_orchestrator run-multi-workload-sustained \ --config configs/agent_multi_workload_sustained_device_local.json \ --output reports/agent_multi_workload_sustained_device_local.json \ --frames 16 \ - --vision-input /path/to/frame.ppm \ + --vision-input /path/to/frame-or-image-sequence \ --voice-ingress-payload /path/to/requests.json \ --resource-snapshot /path/to/resources.json ``` diff --git a/README.md b/README.md index b69806d..facf851 100644 --- a/README.md +++ b/README.md @@ -270,14 +270,16 @@ YOLO/ONNX, FastAPI, tegrastats, and Jetson/RPi producers as follow-up integrations. You can replace those committed producer fixtures at run time without editing -the config: +the config. `--vision-input` accepts a single image/video file or a directory of +image frames; directories are treated as a deterministic image sequence and +cycled during the sustained run: ```bash python3 -m inferedge_orchestrator run-multi-workload-sustained \ --config configs/agent_multi_workload_sustained_device_local.json \ --output reports/agent_multi_workload_sustained_device_local.json \ --frames 16 \ - --vision-input /path/to/frame.ppm \ + --vision-input /path/to/frame-or-image-sequence \ --voice-ingress-payload /path/to/requests.json \ --resource-snapshot /path/to/resources.json ``` diff --git a/configs/README.md b/configs/README.md index 14f2498..5158595 100644 --- a/configs/README.md +++ b/configs/README.md @@ -72,7 +72,9 @@ Tracked evidence samples live under committed Vision image, Voice request, and Safety resource producers in one sustained validation path. The CLI can override those inputs at run time with `--vision-input`, `--voice-ingress-payload`, `--resource-snapshot`, or - `--capture-process-resource-snapshot`. If optional ONNX Runtime dependencies - are installed, `--vision-onnx-model` adds a lightweight Vision inference probe - that records provider, input/output shapes, and probe latency without claiming - a full live YOLO service. + `--capture-process-resource-snapshot`. `--vision-input` accepts either a + single image/video file or an image-frame directory, which is recorded as + `image_sequence_file` evidence and cycled during the sustained run. If + optional ONNX Runtime dependencies are installed, `--vision-onnx-model` adds + a lightweight Vision inference probe that records provider, input/output + shapes, and probe latency without claiming a full live YOLO service. diff --git a/docs/agent_orchestration_summary_contract.ko.md b/docs/agent_orchestration_summary_contract.ko.md index efd035f..f9e3649 100644 --- a/docs/agent_orchestration_summary_contract.ko.md +++ b/docs/agent_orchestration_summary_contract.ko.md @@ -213,7 +213,10 @@ Whisper, FastAPI, live monitor, Jetson producer는 선택적 후속 integration CLI에서는 committed device-local producer fixture를 실행 시점에 `--vision-input`, `--voice-ingress-payload`, `--resource-snapshot`으로 교체할 수 -있습니다. 최소 local process signal이 필요하면 +있습니다. `--vision-input`은 단일 image/video file 또는 image frame directory를 +받을 수 있으며, directory는 `image_sequence_file` producer evidence로 기록되고 +sustained validation 동안 deterministic하게 순환 처리됩니다. 최소 local process +signal이 필요하면 `--capture-process-resource-snapshot`이 output report 옆에 작은 process resource snapshot을 만들고 Safety workload에 `producer_source=process_resource_snapshot`으로 연결합니다. diff --git a/docs/agent_orchestration_summary_contract.md b/docs/agent_orchestration_summary_contract.md index f23e861..a7525e2 100644 --- a/docs/agent_orchestration_summary_contract.md +++ b/docs/agent_orchestration_summary_contract.md @@ -214,6 +214,9 @@ integrations, not required dependencies. The CLI can override the committed device-local producer fixtures at run time with `--vision-input`, `--voice-ingress-payload`, and `--resource-snapshot`. +`--vision-input` accepts a single image/video file or a directory of image +frames. Directories are recorded as `image_sequence_file` producer evidence and +cycled deterministically during sustained validation. For a minimal local process signal, `--capture-process-resource-snapshot` writes a small process resource snapshot next to the output report and routes it into the Safety workload with `producer_source=process_resource_snapshot`. diff --git a/src/inferedge_orchestrator/cli.py b/src/inferedge_orchestrator/cli.py index 1f53da5..53be78b 100644 --- a/src/inferedge_orchestrator/cli.py +++ b/src/inferedge_orchestrator/cli.py @@ -43,7 +43,10 @@ def build_parser() -> argparse.ArgumentParser: ) sustained_parser.add_argument( "--vision-input", - help="optional local image/video path to use for the Vision producer", + help=( + "optional local image, video, or image-sequence directory to use " + "for the Vision producer" + ), ) sustained_parser.add_argument( "--vision-onnx-model", diff --git a/src/inferedge_orchestrator/config.py b/src/inferedge_orchestrator/config.py index 10a412e..9f89d6a 100644 --- a/src/inferedge_orchestrator/config.py +++ b/src/inferedge_orchestrator/config.py @@ -7,7 +7,7 @@ DROP_POLICIES = {"drop_oldest", "drop_newest", "drop_low_priority"} -INPUT_SOURCES = {"dummy", "image", "video"} +INPUT_SOURCES = {"dummy", "image", "image_sequence", "video"} SCENARIO_MODES = {"normal", "overload", "sustained_high_load", "device_local"} WORKERS = {"dummy", "onnxruntime", "tensorrt"} @@ -182,7 +182,10 @@ def validate(self) -> None: raise ValueError("frame_interval_ms must be > 0") if self.input_source not in INPUT_SOURCES: raise ValueError(f"unsupported input_source {self.input_source!r}") - if self.input_source in {"image", "video"} and not self.input_path: + if ( + self.input_source in {"image", "image_sequence", "video"} + and not self.input_path + ): raise ValueError(f"{self.input_source} input_source requires input_path") def task_map(self) -> dict[str, TaskConfig]: diff --git a/src/inferedge_orchestrator/frames.py b/src/inferedge_orchestrator/frames.py index f0e3b1a..3b827e7 100644 --- a/src/inferedge_orchestrator/frames.py +++ b/src/inferedge_orchestrator/frames.py @@ -48,11 +48,14 @@ def frames_for_cycle( class FileFrameSource: - """Routes image or video file metadata to workers without binding scheduler logic.""" + """Routes file-backed metadata to workers without binding scheduler logic.""" def __init__(self, *, source: str, path: str) -> None: self._source = source self._path = str(Path(path)) + self._sequence_paths = ( + _image_sequence_paths(Path(path)) if source == "image_sequence" else () + ) self._sequence = 0 def frames_for_cycle( @@ -67,6 +70,18 @@ def frames_for_cycle( if cycle % task.emit_every_cycles != 0: continue self._sequence += 1 + selected_path = self._path + if self._sequence_paths: + selected_path = str( + self._sequence_paths[cycle % len(self._sequence_paths)] + ) + payload = { + "source": self._source, + "path": selected_path, + "frame_index": cycle, + } + if self._sequence_paths: + payload["sequence_root"] = self._path frames.append( FrameEnvelope( frame_id=f"{task.name}-{cycle}-{self._sequence}", @@ -74,11 +89,7 @@ def frames_for_cycle( sequence=self._sequence, created_at_ms=now_ms, deadline_at_ms=now_ms + task.latency_budget_ms, - payload={ - "source": self._source, - "path": self._path, - "frame_index": cycle, - }, + payload=payload, ) ) return frames @@ -90,3 +101,21 @@ def build_frame_source(config: OrchestratorConfig) -> DummyFrameSource | FileFra if config.input_path is None: raise ValueError(f"{config.input_source} input_source requires input_path") return FileFrameSource(source=config.input_source, path=config.input_path) + + +def _image_sequence_paths(path: Path) -> tuple[Path, ...]: + if not path.exists(): + raise FileNotFoundError(f"image_sequence input path does not exist: {path}") + if not path.is_dir(): + raise ValueError(f"image_sequence input path must be a directory: {path}") + extensions = {".jpg", ".jpeg", ".png", ".ppm", ".bmp"} + images = tuple( + sorted( + child + for child in path.iterdir() + if child.is_file() and child.suffix.lower() in extensions + ) + ) + if not images: + raise ValueError(f"image_sequence input path has no supported image files: {path}") + return images diff --git a/src/inferedge_orchestrator/sustained.py b/src/inferedge_orchestrator/sustained.py index 2c2b6d4..588c114 100644 --- a/src/inferedge_orchestrator/sustained.py +++ b/src/inferedge_orchestrator/sustained.py @@ -297,6 +297,8 @@ def _next_validation_step(config: OrchestratorConfig) -> str: def _vision_input_source(path: Path) -> str: + if path.is_dir(): + return "image_sequence" if path.suffix.lower() in {".mp4", ".mov", ".mkv", ".avi", ".webm"}: return "video" return "image" @@ -353,6 +355,7 @@ def _producer_source_signals(report: dict[str, Any]) -> dict[str, Any]: producer_sources: list[str] = [] device_local_sources = { "image_file", + "image_sequence_file", "video_file", "fastapi_request_fixture", "resource_snapshot_fixture", diff --git a/src/inferedge_orchestrator/workers.py b/src/inferedge_orchestrator/workers.py index febb4b2..2ce454e 100644 --- a/src/inferedge_orchestrator/workers.py +++ b/src/inferedge_orchestrator/workers.py @@ -192,7 +192,7 @@ def _vision_file_sample( ) -> tuple[dict[str, object], bytes]: source = _payload_value(frame, "source", "dummy") path_value = _payload_value(frame, "path", None) - if source not in {"image", "video"} or path_value is None: + if source not in {"image", "image_sequence", "video"} or path_value is None: return {}, b"" path = Path(str(path_value)) @@ -207,9 +207,13 @@ def _vision_file_sample( size = path.stat().st_size digest = hashlib.sha1(sample).hexdigest()[:12] if sample else "empty" + producer_source = ( + "image_sequence_file" if source == "image_sequence" else f"{source}_file" + ) return { - "producer_source": f"{source}_file", + "producer_source": producer_source, "input_path": str(path), + "sequence_root": _payload_value(frame, "sequence_root", None), "input_bytes": size, "sampled_bytes": len(sample), "input_digest": digest, diff --git a/tests/test_input_sources.py b/tests/test_input_sources.py index 2f001e6..2ca1f99 100644 --- a/tests/test_input_sources.py +++ b/tests/test_input_sources.py @@ -57,3 +57,33 @@ def test_video_source_routes_frame_index_payload_to_task() -> None: assert frames[0].payload["source"] == "video" assert frames[0].payload["frame_index"] == 3 + + +def test_image_sequence_source_rotates_file_payloads(tmp_path) -> None: + (tmp_path / "frame_b.ppm").write_bytes(b"P6\n1 1\n255\n\x00\xff\x00") + (tmp_path / "frame_a.ppm").write_bytes(b"P6\n1 1\n255\n\xff\x00\x00") + config = OrchestratorConfig( + tasks=( + TaskConfig( + name="detector", + model_path="", + priority=100, + target_fps=5, + latency_budget_ms=100, + queue_size=2, + ), + ), + input_source="image_sequence", + input_path=str(tmp_path), + ) + + source = build_frame_source(config) + first = source.frames_for_cycle(config.tasks, cycle=0, now_ms=0.0)[0] + second = source.frames_for_cycle(config.tasks, cycle=1, now_ms=1.0)[0] + third = source.frames_for_cycle(config.tasks, cycle=2, now_ms=2.0)[0] + + assert first.payload["source"] == "image_sequence" + assert first.payload["path"].endswith("frame_a.ppm") + assert first.payload["sequence_root"] == str(tmp_path) + assert second.payload["path"].endswith("frame_b.ppm") + assert third.payload["path"].endswith("frame_a.ppm") diff --git a/tests/test_multi_workload_sustained.py b/tests/test_multi_workload_sustained.py index d26cd3a..2560895 100644 --- a/tests/test_multi_workload_sustained.py +++ b/tests/test_multi_workload_sustained.py @@ -385,6 +385,53 @@ def test_device_local_input_overrides_use_local_paths(tmp_path) -> None: ) +def test_device_local_vision_input_can_use_image_sequence_directory(tmp_path) -> None: + config = OrchestratorConfig.from_dict( + json.loads( + Path("configs/agent_multi_workload_sustained_device_local.json").read_text( + encoding="utf-8" + ) + ) + ) + sequence_dir = tmp_path / "frames" + sequence_dir.mkdir() + first = sequence_dir / "frame_001.ppm" + second = sequence_dir / "frame_002.ppm" + first.write_bytes(b"P6\n1 1\n255\n\xff\x00\x00") + second.write_bytes(b"P6\n1 1\n255\n\x00\xff\x00") + + overridden = apply_device_local_input_overrides( + config, + vision_input=sequence_dir, + ) + report = write_multi_workload_sustained( + overridden, + output=tmp_path / "device_local_image_sequence.json", + frames=4, + ) + + assert overridden.input_source == "image_sequence" + assert overridden.input_path == str(sequence_dir) + vision_outputs = [ + event["output"] + for event in report["result_events"] + if event["task"] == "vision_agent" + ] + assert vision_outputs + assert {output["producer_source"] for output in vision_outputs} == { + "image_sequence_file" + } + observed_paths = {output["input_path"] for output in vision_outputs} + assert str(first) in observed_paths + assert str(second) in observed_paths + assert {output["sequence_root"] for output in vision_outputs} == { + str(sequence_dir) + } + signals = report["multi_workload_sustained_summary"]["observed_runtime_signals"] + assert "image_sequence_file" in signals["producer_sources"] + assert signals["device_local_producer_count"] == signals["producer_source_count"] + + def test_device_local_vision_can_run_optional_onnx_probe( tmp_path, monkeypatch,