Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion README.ko.md
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ flowchart LR
| Agent Runtime Contract | Forge agent manifest와 Runtime `result.agent` 참조를 사용하는 Vision / Voice-Command / Safety-Monitor dummy workload | `configs/agent_3_workload_demo.json`, [`docs/agent_orchestration_summary_contract.ko.md`](docs/agent_orchestration_summary_contract.ko.md) |
| Lightweight Sustained Workload Starter | YOLO-like vision, Whisper-like command burst, FastAPI-style ingress, optional tegrastats timeline, producer-backed starter를 포함한 profiled local sustained scenario | `python3 -m inferedge_orchestrator run-multi-workload-sustained ...` |
| Device-Local Sustained Starter | committed image/request/resource snapshot producer를 하나의 `device_local` mode로 실행하는 starter | `configs/agent_multi_workload_sustained_device_local.json` |
| Remote Dispatch Starter | production remote execution을 주장하지 않고 file-based worker registry와 task request contract로 remote edge worker selection을 재현하며 local HTTP worker starter로 명시적 starter 실행을 검증 | [`docs/remote_dispatch_starter.ko.md`](docs/remote_dispatch_starter.ko.md) |
| Remote Dispatch Starter | production remote execution을 주장하지 않고 file-based worker registry와 task request contract로 remote edge worker selection, 명시적 HTTP/SSH starter 실행, bounded fallback evidence를 검증 | [`docs/remote_dispatch_starter.ko.md`](docs/remote_dispatch_starter.ko.md) |

## Validation Evidence

Expand Down
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ The boundary is intentional:
| Sustained Agent Scenario Starter | Normal / overload / sustained-high-load 3-agent modes with queue-depth timeline, latency timeline, and policy decision reasons | `configs/agent_3_workload_sustained_high_load.json` |
| Lightweight Sustained Workload Starter | Profiled local sustained scenario for YOLO-like vision, Whisper-like command burst, FastAPI-style ingress, optional tegrastats timeline, and producer-backed starters | `python3 -m inferedge_orchestrator run-multi-workload-sustained ...` |
| Device-Local Sustained Starter | Device-local mode using committed image, request, and resource snapshot producers before live device integrations | `configs/agent_multi_workload_sustained_device_local.json` |
| Remote Dispatch Starter | File-based worker registry and task request contract for selecting a remote edge worker; optional `--execute-plan` records explicit HTTP/SSH starter evidence with a local HTTP worker starter without claiming production remote execution | [`docs/remote_dispatch_starter.md`](docs/remote_dispatch_starter.md) |
| Remote Dispatch Starter | File-based worker registry and task request contract for selecting a remote edge worker; optional `--execute-plan` records explicit HTTP/SSH starter and bounded fallback evidence without claiming production remote execution | [`docs/remote_dispatch_starter.md`](docs/remote_dispatch_starter.md) |

## Validation Evidence

Expand Down
22 changes: 17 additions & 5 deletions docs/remote_dispatch_starter.ko.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,11 @@ execution을 주장하는 것이 아니라, remote edge worker handoff contract

`--execute-plan`을 사용하지 않으면 network socket이나 SSH 실행을 수행하지
않는다. `--execute-plan`을 명시하면 선택된 worker가 선언한 단일 HTTP POST 또는
SSH command starter를 실행할 수 있다. Cloudflare tunnel, auth, heartbeat,
fallback worker에 대한 자동 retry, long-lived production worker 관리는 여전히
future hardening이다.
SSH command starter를 실행할 수 있다. 이 starter 실행이 task request의
`retry_policy.fallback_on`에 포함된 오류로 실패하고 `max_attempts`가 허용하면,
eligible fallback worker에 대해 제한된 starter attempt를 한 번 수행할 수 있다.
Cloudflare tunnel, auth, heartbeat, long-lived production worker, production retry
orchestration 관리는 여전히 future hardening이다.

## 입력

Expand All @@ -43,6 +45,14 @@ starter는 다음 항목을 기준으로 worker를 선택한다.
- target device
- task request의 optional retry policy

Fallback starter execution은 의도적으로 좁게 유지한다.

- primary worker를 eligible worker 중 먼저 선택한다.
- `max_attempts`가 fallback starter attempt 허용 여부를 결정한다.
- `fallback_on`이 어떤 primary error category에서 fallback을 시도할지 결정한다.
- fallback attempt는 `fallback_execution_result`에 기록된다.
- fallback execution은 evidence collection이며 production-grade retry control이 아니다.

## 실행

```bash
Expand Down Expand Up @@ -132,8 +142,10 @@ starter는 기본적으로 execution planning만 기록하고 network connection
위한 local HTTP starter endpoint를 제공한다.
- timeout, connection failure, HTTP error, command failure는 unstructured crash가
아니라 `remote_execution_result`에 기록된다.
- fallback execution은 아직 자동 수행하지 않는다. fallback candidate는 이후
retry/fallback hardening을 위한 evidence로만 기록한다.
- primary starter가 실패하고 retry policy가 허용하면 `fallback_execution_result`에
attempted fallback worker, status, transport, final starter outcome을 기록한다.
- fallback execution은 starter evidence로만 제한한다. production-grade retry,
heartbeat, failover state, worker lifecycle management는 future hardening이다.

## Boundary

Expand Down
25 changes: 20 additions & 5 deletions docs/remote_dispatch_starter.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,12 @@ This starter answers one narrow question:

Without `--execute-plan`, it does not open network sockets or run SSH commands.
With `--execute-plan`, it may perform a single starter HTTP POST or SSH command
declared by the selected worker. It still does not manage Cloudflare tunnels,
auth, heartbeat, retries against fallback workers, or long-lived production
workers. Those remain future hardening steps.
declared by the selected worker. If that starter execution fails with an error
listed in the task request `retry_policy.fallback_on`, it can make one bounded
starter attempt against an eligible fallback worker when `max_attempts` allows
it. It still does not manage Cloudflare tunnels, auth, heartbeat, long-lived
production workers, or production retry orchestration. Those remain future
hardening steps.

## Inputs

Expand All @@ -44,6 +47,14 @@ The starter matches:
- target device
- optional retry policy fields in the task request

Fallback starter execution is intentionally narrow:

- the primary worker is selected first from eligible workers
- `max_attempts` controls whether a fallback starter attempt is allowed
- `fallback_on` controls which primary error categories trigger fallback
- fallback attempts are recorded in `fallback_execution_result`
- fallback execution is evidence collection, not production-grade retry control

## Run

```bash
Expand Down Expand Up @@ -133,8 +144,12 @@ When execution is requested:
repeatable success-path smoke validation.
- timeout, connection failure, HTTP error, and command failure are recorded in
`remote_execution_result` instead of raising an unstructured crash.
- fallback execution is not automatic yet; fallback candidates remain recorded
as future retry/fallback evidence.
- if the primary starter fails and the retry policy allows fallback,
`fallback_execution_result` records the attempted fallback worker, status,
transport, and final starter outcome.
- fallback execution remains bounded to starter evidence. Production-grade
retry, heartbeat, failover state, and worker lifecycle management remain
future hardening.

## Boundary

Expand Down
160 changes: 146 additions & 14 deletions src/inferedge_orchestrator/remote_dispatch.py
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,14 @@ def _build_result(
execute_plan=execute_plan,
timeout_sec=timeout_sec,
)
fallback_execution_result = _execute_fallback_plan(
workers=workers,
request=request,
worker_selection=worker_selection,
primary_execution_result=remote_execution_result,
execute_plan=execute_plan,
timeout_sec=timeout_sec,
)
remote_execution_plan = _build_remote_execution_plan(
selected,
request,
Expand All @@ -169,6 +177,7 @@ def _build_result(
request,
worker_selection,
execution_result=remote_execution_result,
fallback_execution_result=fallback_execution_result,
)
runtime_events = [{
"event": "remote_dispatch_selected" if selected else "remote_dispatch_rejected",
Expand All @@ -191,7 +200,24 @@ def _build_result(
"error_category": remote_execution_result.get("error_category"),
}
)
return {
if fallback_execution_result:
for attempt in fallback_execution_result["attempts"]:
runtime_events.append(
{
"event": "remote_fallback_execution_completed"
if attempt["status"] == "succeeded"
else "remote_fallback_execution_failed",
"task_id": request.get("task_id"),
"agent_id": request.get("agent_id"),
"selected_worker_id": attempt.get("selected_worker_id"),
"primary_worker_id": fallback_execution_result["primary_worker_id"],
"transport": attempt.get("transport"),
"status": attempt["status"],
"error_category": attempt.get("error_category"),
"fallback_attempt": attempt["fallback_attempt"],
}
)
result = {
"schema_version": RESULT_SCHEMA_VERSION,
"dispatch_status": status,
"selected_worker_id": selected.worker_id if selected else None,
Expand All @@ -214,6 +240,9 @@ def _build_result(
},
"runtime_events": runtime_events,
}
if fallback_execution_result:
result["fallback_execution_result"] = fallback_execution_result
return result


def _evaluate_worker(worker: RemoteWorker, request: dict[str, Any]) -> dict[str, Any]:
Expand Down Expand Up @@ -269,27 +298,35 @@ def _build_retry_fallback_plan(
request: dict[str, Any],
worker_selection: dict[str, Any],
execution_result: dict[str, Any],
fallback_execution_result: dict[str, Any] | None = None,
) -> dict[str, Any]:
retry_policy = request.get("retry_policy", {})
if not isinstance(retry_policy, dict):
retry_policy = {}
max_attempts = _positive_int(retry_policy.get("max_attempts"), default=1)
fallback_on = retry_policy.get(
"fallback_on",
["timeout", "worker_unhealthy", "runtime_error"],
)
if not isinstance(fallback_on, list):
fallback_on = ["timeout", "worker_unhealthy", "runtime_error"]
return {
max_attempts, fallback_on = _normalized_retry_policy(request)
plan = {
"schema_version": "inferedge-remote-retry-fallback-plan-v1",
"max_attempts": max_attempts,
"fallback_on": [str(item) for item in fallback_on],
"fallback_on": fallback_on,
"primary_worker_id": worker_selection.get("selected_worker_id"),
"fallback_worker_ids": worker_selection.get("fallback_worker_ids", []),
"execution_performed": execution_result["execution_performed"],
"fallback_execution_performed": False,
"last_execution_status": execution_result["status"],
}
if fallback_execution_result:
plan.update(
{
"fallback_execution_performed": any(
attempt["execution_performed"]
for attempt in fallback_execution_result["attempts"]
),
"fallback_attempted_worker_ids": fallback_execution_result[
"attempted_worker_ids"
],
"fallback_final_status": fallback_execution_result["final_status"],
"last_execution_status": fallback_execution_result["final_status"],
}
)
else:
plan["fallback_execution_performed"] = False
return plan


def _build_remote_execution_plan(
Expand Down Expand Up @@ -376,6 +413,87 @@ def _execute_remote_plan(
return result


def _execute_fallback_plan(
*,
workers: list[RemoteWorker],
request: dict[str, Any],
worker_selection: dict[str, Any],
primary_execution_result: dict[str, Any],
execute_plan: bool,
timeout_sec: float,
) -> dict[str, Any] | None:
if not _should_try_fallback(
request=request,
worker_selection=worker_selection,
primary_execution_result=primary_execution_result,
execute_plan=execute_plan,
):
return None

max_attempts, _fallback_on = _normalized_retry_policy(request)
fallback_limit = max_attempts - 1
worker_by_id = {worker.worker_id: worker for worker in workers}
attempts: list[dict[str, Any]] = []
for fallback_worker_id in worker_selection.get("fallback_worker_ids", []):
if len(attempts) >= fallback_limit:
break
fallback_worker = worker_by_id.get(fallback_worker_id)
if fallback_worker is None:
continue
attempt = _execute_remote_plan(
selected=fallback_worker,
request=request,
execute_plan=True,
timeout_sec=timeout_sec,
)
attempt["fallback_attempt"] = len(attempts) + 1
attempt["fallback_for_worker_id"] = primary_execution_result.get(
"selected_worker_id"
)
attempts.append(attempt)
if attempt["status"] == "succeeded":
break

if not attempts:
return None

return {
"schema_version": "inferedge-remote-fallback-execution-v1",
"fallback_requested": True,
"fallback_reason": primary_execution_result.get("error_category")
or primary_execution_result["status"],
"primary_worker_id": primary_execution_result.get("selected_worker_id"),
"attempted_worker_ids": [
str(attempt.get("selected_worker_id")) for attempt in attempts
],
"final_status": attempts[-1]["status"],
"attempts": attempts,
"production_remote_execution": False,
}


def _should_try_fallback(
*,
request: dict[str, Any],
worker_selection: dict[str, Any],
primary_execution_result: dict[str, Any],
execute_plan: bool,
) -> bool:
if not execute_plan:
return False
if primary_execution_result["status"] == "succeeded":
return False
if not worker_selection.get("fallback_worker_ids"):
return False
max_attempts, fallback_on = _normalized_retry_policy(request)
if max_attempts <= 1:
return False
error_category = primary_execution_result.get("error_category")
if error_category:
return str(error_category) in fallback_on
return primary_execution_result["status"] in fallback_on


def _execute_http_request(
selected: RemoteWorker,
request: dict[str, Any],
Expand Down Expand Up @@ -619,6 +737,20 @@ def _positive_int(value: Any, *, default: int) -> int:
return parsed if parsed > 0 else default


def _normalized_retry_policy(request: dict[str, Any]) -> tuple[int, list[str]]:
retry_policy = request.get("retry_policy", {})
if not isinstance(retry_policy, dict):
retry_policy = {}
max_attempts = _positive_int(retry_policy.get("max_attempts"), default=1)
fallback_on = retry_policy.get(
"fallback_on",
["timeout", "worker_unhealthy", "runtime_error"],
)
if not isinstance(fallback_on, list):
fallback_on = ["timeout", "worker_unhealthy", "runtime_error"]
return max_attempts, [str(item) for item in fallback_on]


def _transport_from_endpoint_type(endpoint_type: str) -> str:
if endpoint_type.startswith("ssh"):
return "ssh"
Expand Down
Loading