Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions README.ko.md
Original file line number Diff line number Diff line change
Expand Up @@ -312,14 +312,16 @@ request, Safety resource evidence를 기록하며 live YOLO/ONNX, FastAPI,
tegrastats, Jetson/RPi producer는 후속 integration으로 둔다.

config를 수정하지 않고 실행 시점에 committed producer fixture를 로컬 입력으로
교체할 수도 있다.
교체할 수도 있다. `--vision-input`은 단일 image/video file 또는 image frame
directory를 받을 수 있으며, directory는 sustained run 동안 deterministic image
sequence로 순환 처리된다.

```bash
python3 -m inferedge_orchestrator run-multi-workload-sustained \
--config configs/agent_multi_workload_sustained_device_local.json \
--output reports/agent_multi_workload_sustained_device_local.json \
--frames 16 \
--vision-input /path/to/frame.ppm \
--vision-input /path/to/frame-or-image-sequence \
--voice-ingress-payload /path/to/requests.json \
--resource-snapshot /path/to/resources.json
```
Expand Down
6 changes: 4 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -270,14 +270,16 @@ YOLO/ONNX, FastAPI, tegrastats, and Jetson/RPi producers as follow-up
integrations.

You can replace those committed producer fixtures at run time without editing
the config:
the config. `--vision-input` accepts a single image/video file or a directory of
image frames; directories are treated as a deterministic image sequence and
cycled during the sustained run:

```bash
python3 -m inferedge_orchestrator run-multi-workload-sustained \
--config configs/agent_multi_workload_sustained_device_local.json \
--output reports/agent_multi_workload_sustained_device_local.json \
--frames 16 \
--vision-input /path/to/frame.ppm \
--vision-input /path/to/frame-or-image-sequence \
--voice-ingress-payload /path/to/requests.json \
--resource-snapshot /path/to/resources.json
```
Expand Down
10 changes: 6 additions & 4 deletions configs/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,9 @@ Tracked evidence samples live under
committed Vision image, Voice request, and Safety resource producers in one
sustained validation path. The CLI can override those inputs at run time with
`--vision-input`, `--voice-ingress-payload`, `--resource-snapshot`, or
`--capture-process-resource-snapshot`. If optional ONNX Runtime dependencies
are installed, `--vision-onnx-model` adds a lightweight Vision inference probe
that records provider, input/output shapes, and probe latency without claiming
a full live YOLO service.
`--capture-process-resource-snapshot`. `--vision-input` accepts either a
single image/video file or an image-frame directory, which is recorded as
`image_sequence_file` evidence and cycled during the sustained run. If
optional ONNX Runtime dependencies are installed, `--vision-onnx-model` adds
a lightweight Vision inference probe that records provider, input/output
shapes, and probe latency without claiming a full live YOLO service.
5 changes: 4 additions & 1 deletion docs/agent_orchestration_summary_contract.ko.md
Original file line number Diff line number Diff line change
Expand Up @@ -213,7 +213,10 @@ Whisper, FastAPI, live monitor, Jetson producer는 선택적 후속 integration

CLI에서는 committed device-local producer fixture를 실행 시점에
`--vision-input`, `--voice-ingress-payload`, `--resource-snapshot`으로 교체할 수
있습니다. 최소 local process signal이 필요하면
있습니다. `--vision-input`은 단일 image/video file 또는 image frame directory를
받을 수 있으며, directory는 `image_sequence_file` producer evidence로 기록되고
sustained validation 동안 deterministic하게 순환 처리됩니다. 최소 local process
signal이 필요하면
`--capture-process-resource-snapshot`이 output report 옆에 작은 process resource
snapshot을 만들고 Safety workload에
`producer_source=process_resource_snapshot`으로 연결합니다.
Expand Down
3 changes: 3 additions & 0 deletions docs/agent_orchestration_summary_contract.md
Original file line number Diff line number Diff line change
Expand Up @@ -214,6 +214,9 @@ integrations, not required dependencies.

The CLI can override the committed device-local producer fixtures at run time
with `--vision-input`, `--voice-ingress-payload`, and `--resource-snapshot`.
`--vision-input` accepts a single image/video file or a directory of image
frames. Directories are recorded as `image_sequence_file` producer evidence and
cycled deterministically during sustained validation.
For a minimal local process signal, `--capture-process-resource-snapshot` writes
a small process resource snapshot next to the output report and routes it into
the Safety workload with `producer_source=process_resource_snapshot`.
Expand Down
5 changes: 4 additions & 1 deletion src/inferedge_orchestrator/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,10 @@ def build_parser() -> argparse.ArgumentParser:
)
sustained_parser.add_argument(
"--vision-input",
help="optional local image/video path to use for the Vision producer",
help=(
"optional local image, video, or image-sequence directory to use "
"for the Vision producer"
),
)
sustained_parser.add_argument(
"--vision-onnx-model",
Expand Down
7 changes: 5 additions & 2 deletions src/inferedge_orchestrator/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@


DROP_POLICIES = {"drop_oldest", "drop_newest", "drop_low_priority"}
INPUT_SOURCES = {"dummy", "image", "video"}
INPUT_SOURCES = {"dummy", "image", "image_sequence", "video"}
SCENARIO_MODES = {"normal", "overload", "sustained_high_load", "device_local"}
WORKERS = {"dummy", "onnxruntime", "tensorrt"}

Expand Down Expand Up @@ -182,7 +182,10 @@ def validate(self) -> None:
raise ValueError("frame_interval_ms must be > 0")
if self.input_source not in INPUT_SOURCES:
raise ValueError(f"unsupported input_source {self.input_source!r}")
if self.input_source in {"image", "video"} and not self.input_path:
if (
self.input_source in {"image", "image_sequence", "video"}
and not self.input_path
):
raise ValueError(f"{self.input_source} input_source requires input_path")

def task_map(self) -> dict[str, TaskConfig]:
Expand Down
41 changes: 35 additions & 6 deletions src/inferedge_orchestrator/frames.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,11 +48,14 @@ def frames_for_cycle(


class FileFrameSource:
"""Routes image or video file metadata to workers without binding scheduler logic."""
"""Routes file-backed metadata to workers without binding scheduler logic."""

def __init__(self, *, source: str, path: str) -> None:
self._source = source
self._path = str(Path(path))
self._sequence_paths = (
_image_sequence_paths(Path(path)) if source == "image_sequence" else ()
)
self._sequence = 0

def frames_for_cycle(
Expand All @@ -67,18 +70,26 @@ def frames_for_cycle(
if cycle % task.emit_every_cycles != 0:
continue
self._sequence += 1
selected_path = self._path
if self._sequence_paths:
selected_path = str(
self._sequence_paths[cycle % len(self._sequence_paths)]
)
payload = {
"source": self._source,
"path": selected_path,
"frame_index": cycle,
}
if self._sequence_paths:
payload["sequence_root"] = self._path
frames.append(
FrameEnvelope(
frame_id=f"{task.name}-{cycle}-{self._sequence}",
task_name=task.name,
sequence=self._sequence,
created_at_ms=now_ms,
deadline_at_ms=now_ms + task.latency_budget_ms,
payload={
"source": self._source,
"path": self._path,
"frame_index": cycle,
},
payload=payload,
)
)
return frames
Expand All @@ -90,3 +101,21 @@ def build_frame_source(config: OrchestratorConfig) -> DummyFrameSource | FileFra
if config.input_path is None:
raise ValueError(f"{config.input_source} input_source requires input_path")
return FileFrameSource(source=config.input_source, path=config.input_path)


def _image_sequence_paths(path: Path) -> tuple[Path, ...]:
if not path.exists():
raise FileNotFoundError(f"image_sequence input path does not exist: {path}")
if not path.is_dir():
raise ValueError(f"image_sequence input path must be a directory: {path}")
extensions = {".jpg", ".jpeg", ".png", ".ppm", ".bmp"}
images = tuple(
sorted(
child
for child in path.iterdir()
if child.is_file() and child.suffix.lower() in extensions
)
)
if not images:
raise ValueError(f"image_sequence input path has no supported image files: {path}")
return images
3 changes: 3 additions & 0 deletions src/inferedge_orchestrator/sustained.py
Original file line number Diff line number Diff line change
Expand Up @@ -297,6 +297,8 @@ def _next_validation_step(config: OrchestratorConfig) -> str:


def _vision_input_source(path: Path) -> str:
if path.is_dir():
return "image_sequence"
if path.suffix.lower() in {".mp4", ".mov", ".mkv", ".avi", ".webm"}:
return "video"
return "image"
Expand Down Expand Up @@ -353,6 +355,7 @@ def _producer_source_signals(report: dict[str, Any]) -> dict[str, Any]:
producer_sources: list[str] = []
device_local_sources = {
"image_file",
"image_sequence_file",
"video_file",
"fastapi_request_fixture",
"resource_snapshot_fixture",
Expand Down
8 changes: 6 additions & 2 deletions src/inferedge_orchestrator/workers.py
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,7 @@ def _vision_file_sample(
) -> tuple[dict[str, object], bytes]:
source = _payload_value(frame, "source", "dummy")
path_value = _payload_value(frame, "path", None)
if source not in {"image", "video"} or path_value is None:
if source not in {"image", "image_sequence", "video"} or path_value is None:
return {}, b""

path = Path(str(path_value))
Expand All @@ -207,9 +207,13 @@ def _vision_file_sample(

size = path.stat().st_size
digest = hashlib.sha1(sample).hexdigest()[:12] if sample else "empty"
producer_source = (
"image_sequence_file" if source == "image_sequence" else f"{source}_file"
)
return {
"producer_source": f"{source}_file",
"producer_source": producer_source,
"input_path": str(path),
"sequence_root": _payload_value(frame, "sequence_root", None),
"input_bytes": size,
"sampled_bytes": len(sample),
"input_digest": digest,
Expand Down
30 changes: 30 additions & 0 deletions tests/test_input_sources.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,3 +57,33 @@ def test_video_source_routes_frame_index_payload_to_task() -> None:

assert frames[0].payload["source"] == "video"
assert frames[0].payload["frame_index"] == 3


def test_image_sequence_source_rotates_file_payloads(tmp_path) -> None:
(tmp_path / "frame_b.ppm").write_bytes(b"P6\n1 1\n255\n\x00\xff\x00")
(tmp_path / "frame_a.ppm").write_bytes(b"P6\n1 1\n255\n\xff\x00\x00")
config = OrchestratorConfig(
tasks=(
TaskConfig(
name="detector",
model_path="",
priority=100,
target_fps=5,
latency_budget_ms=100,
queue_size=2,
),
),
input_source="image_sequence",
input_path=str(tmp_path),
)

source = build_frame_source(config)
first = source.frames_for_cycle(config.tasks, cycle=0, now_ms=0.0)[0]
second = source.frames_for_cycle(config.tasks, cycle=1, now_ms=1.0)[0]
third = source.frames_for_cycle(config.tasks, cycle=2, now_ms=2.0)[0]

assert first.payload["source"] == "image_sequence"
assert first.payload["path"].endswith("frame_a.ppm")
assert first.payload["sequence_root"] == str(tmp_path)
assert second.payload["path"].endswith("frame_b.ppm")
assert third.payload["path"].endswith("frame_a.ppm")
47 changes: 47 additions & 0 deletions tests/test_multi_workload_sustained.py
Original file line number Diff line number Diff line change
Expand Up @@ -385,6 +385,53 @@ def test_device_local_input_overrides_use_local_paths(tmp_path) -> None:
)


def test_device_local_vision_input_can_use_image_sequence_directory(tmp_path) -> None:
config = OrchestratorConfig.from_dict(
json.loads(
Path("configs/agent_multi_workload_sustained_device_local.json").read_text(
encoding="utf-8"
)
)
)
sequence_dir = tmp_path / "frames"
sequence_dir.mkdir()
first = sequence_dir / "frame_001.ppm"
second = sequence_dir / "frame_002.ppm"
first.write_bytes(b"P6\n1 1\n255\n\xff\x00\x00")
second.write_bytes(b"P6\n1 1\n255\n\x00\xff\x00")

overridden = apply_device_local_input_overrides(
config,
vision_input=sequence_dir,
)
report = write_multi_workload_sustained(
overridden,
output=tmp_path / "device_local_image_sequence.json",
frames=4,
)

assert overridden.input_source == "image_sequence"
assert overridden.input_path == str(sequence_dir)
vision_outputs = [
event["output"]
for event in report["result_events"]
if event["task"] == "vision_agent"
]
assert vision_outputs
assert {output["producer_source"] for output in vision_outputs} == {
"image_sequence_file"
}
observed_paths = {output["input_path"] for output in vision_outputs}
assert str(first) in observed_paths
assert str(second) in observed_paths
assert {output["sequence_root"] for output in vision_outputs} == {
str(sequence_dir)
}
signals = report["multi_workload_sustained_summary"]["observed_runtime_signals"]
assert "image_sequence_file" in signals["producer_sources"]
assert signals["device_local_producer_count"] == signals["producer_source_count"]


def test_device_local_vision_can_run_optional_onnx_probe(
tmp_path,
monkeypatch,
Expand Down