diff --git a/apps/foreman-dispatch-bridge/README.md b/apps/foreman-dispatch-bridge/README.md index 7386515c..1700e957 100644 --- a/apps/foreman-dispatch-bridge/README.md +++ b/apps/foreman-dispatch-bridge/README.md @@ -31,6 +31,12 @@ pytest -v - `DISPATCH_URL` (default `http://dispatch.llm:3000`), `DISPATCH_AGENT_TOKEN` - `DISPATCH_AGENT_NAME` (default `foreman/coder`), `DISPATCH_LANES` (default `local,cloud,frontier`) - `FOREMAN_NAMESPACE` (default `llm`) +- `RETRY_MAX_ATTEMPTS` (default `3`): each run, before claiming new work, the + bridge retries `Failed` Workloads it created — delete + recreate at the next + attempt (so the re-run picks up the current config, e.g. `gateProfile`), up to + this many total attempts. At the cap it leaves the Workload as a `Failed` + tombstone (the issue stays claimed, so the groomer won't re-serve it into a + loop) for human triage. Needs `list`/`get`/`delete` RBAC on `workloads`. - `GATEPROFILE_MAP` (optional, JSON): maps `owner/repo` → a Foreman [`GateProfile`](https://llmkube.com/docs/foreman/language-gates), stamped on each Workload's `spec.gateProfile` so non-Go repos run their own language diff --git a/apps/foreman-dispatch-bridge/bridge/claim.py b/apps/foreman-dispatch-bridge/bridge/claim.py index 5e66d603..33c8c101 100644 --- a/apps/foreman-dispatch-bridge/bridge/claim.py +++ b/apps/foreman-dispatch-bridge/bridge/claim.py @@ -49,6 +49,7 @@ def to_claimed_item(item: dict, lane: str) -> ClaimedItem: issue_number=int(_number(item)), intent=str(item.get("title") or ""), lane=_lane(item) or lane, + issue_id=str(item.get("issueId") or item.get("id") or ""), ) diff --git a/apps/foreman-dispatch-bridge/bridge/main.py b/apps/foreman-dispatch-bridge/bridge/main.py index b4926514..e2fc42b5 100644 --- a/apps/foreman-dispatch-bridge/bridge/main.py +++ b/apps/foreman-dispatch-bridge/bridge/main.py @@ -1,7 +1,9 @@ import os +import time from typing import Callable, Optional from bridge.models import ClaimedItem from bridge.workload import build_workload, gate_profile_for, parse_gate_profiles +from bridge.retry import reconcile_failures, DEFAULT_MAX_ATTEMPTS ClaimOne = Callable[[str, str], Optional[ClaimedItem]] # (agent_name, lane) -> item | None @@ -44,6 +46,7 @@ def _real_main() -> None: # pragma: no cover - thin wiring, exercised in the cl lanes = [l.strip() for l in os.environ.get("DISPATCH_LANES", "local,cloud,frontier").split(",") if l.strip()] namespace = os.environ.get("FOREMAN_NAMESPACE", "llm") gate_profiles = parse_gate_profiles(os.environ.get("GATEPROFILE_MAP")) + max_attempts = int(os.environ.get("RETRY_MAX_ATTEMPTS", str(DEFAULT_MAX_ATTEMPTS))) def http_get(url, headers): r = requests.get(url, headers=headers, timeout=20) @@ -72,6 +75,51 @@ def create_workload(manifest: dict) -> None: if e.status != 409: # 409 = Workload already exists -> idempotent no-op raise + def list_failed_workloads() -> list: + resp = api.list_namespaced_custom_object( + group="foreman.llmkube.dev", version="v1alpha1", + namespace=namespace, plural="workloads", + label_selector="created-by=dispatch-bridge", + ) + return [ + wl for wl in resp.get("items", []) + if (wl.get("status") or {}).get("phase") == "Failed" + ] + + def delete_workload(name: str) -> None: + # Foreground delete + poll: the retry recreates the same name, so the old + # object (and its owned AgenticTasks) must be fully gone first. + try: + api.delete_namespaced_custom_object( + group="foreman.llmkube.dev", version="v1alpha1", + namespace=namespace, plural="workloads", name=name, + body=client.V1DeleteOptions(propagation_policy="Foreground"), + ) + except client.exceptions.ApiException as e: + if e.status == 404: # already gone + return + raise + for _ in range(60): # up to ~60s for cascade to complete + try: + api.get_namespaced_custom_object( + group="foreman.llmkube.dev", version="v1alpha1", + namespace=namespace, plural="workloads", name=name, + ) + except client.exceptions.ApiException as e: + if e.status == 404: + return + raise + time.sleep(1) + raise TimeoutError(f"workload {name} still terminating after 60s") + + # Retry failed workloads first (so a re-run this tick uses the current config), + # then claim new work. + for line in reconcile_failures( + agent_name, list_failed_workloads, create_workload, delete_workload, + namespace, gate_profiles, max_attempts, + ): + print(line) + for line in run_once(lanes, agent_name, dispatch.claim_one, create_workload, namespace, gate_profiles): print(line) diff --git a/apps/foreman-dispatch-bridge/bridge/models.py b/apps/foreman-dispatch-bridge/bridge/models.py index 97953ae5..fc65be8e 100644 --- a/apps/foreman-dispatch-bridge/bridge/models.py +++ b/apps/foreman-dispatch-bridge/bridge/models.py @@ -6,4 +6,5 @@ class ClaimedItem: repo: str # "owner/name" issue_number: int intent: str - lane: str # "normal" | "escalated" + lane: str # dispatch worker lane, e.g. "local" | "cloud" | "frontier" + issue_id: str = "" # dispatch DB id; needed to unclaim (release) the issue diff --git a/apps/foreman-dispatch-bridge/bridge/retry.py b/apps/foreman-dispatch-bridge/bridge/retry.py new file mode 100644 index 00000000..f7151452 --- /dev/null +++ b/apps/foreman-dispatch-bridge/bridge/retry.py @@ -0,0 +1,85 @@ +from typing import Callable + +from bridge.models import ClaimedItem +from bridge.workload import ( + build_workload, + gate_profile_for, + ATTEMPT_ANNOTATION, + ISSUE_ID_ANNOTATION, +) + +# How many total coder attempts before the bridge stops retrying a Workload and +# leaves it as a Failed tombstone for human triage. Override via env. +DEFAULT_MAX_ATTEMPTS = 3 + +ListFailed = Callable[[], list] # () -> list of Failed Workload manifests (dicts) +DeleteWorkload = Callable[[str], None] # (name) -> None; blocks until the object is gone + + +def attempt_of(wl: dict) -> int: + """Read the attempt counter off a Workload; absent/garbage -> 1.""" + ann = (wl.get("metadata") or {}).get("annotations") or {} + try: + return max(1, int(ann.get(ATTEMPT_ANNOTATION, "1"))) + except (TypeError, ValueError): + return 1 + + +def item_from_workload(wl: dict) -> ClaimedItem: + """Reconstruct the ClaimedItem from a Workload so build_workload can re-render + it (with the CURRENT gateProfile/config) on retry.""" + meta = wl.get("metadata") or {} + spec = wl.get("spec") or {} + labels = meta.get("labels") or {} + ann = meta.get("annotations") or {} + issues = spec.get("issues") or [0] + return ClaimedItem( + repo=str(spec.get("repo") or ""), + issue_number=int(issues[0]), + intent=str(spec.get("intent") or ""), + lane=str(labels.get("lane") or ""), + issue_id=str(ann.get(ISSUE_ID_ANNOTATION) or ""), + ) + + +def reconcile_failures( + agent_name: str, + list_failed: ListFailed, + create_workload: Callable[[dict], None], + delete_workload: DeleteWorkload, + namespace: str, + gate_profiles: dict, + max_attempts: int = DEFAULT_MAX_ATTEMPTS, +) -> list: + """Retry Failed bridge Workloads, bounded by max_attempts. + + For each Failed Workload: + - attempt < max_attempts: delete it and recreate a fresh one at attempt+1, + so it re-runs with the current config (gateProfile, agent refs). The name + is deterministic, so delete-then-recreate reuses the same name/branch; + delete_workload must block until the old object is gone. + - attempt >= max_attempts: leave it as a Failed tombstone (no action). The + issue stays claimed so the groomer won't re-serve it into a loop; a human + triages from the lingering Workload. + + Returns per-Workload outcome strings. + """ + results = [] + for wl in list_failed(): + name = ((wl.get("metadata") or {}).get("name")) or "?" + attempt = attempt_of(wl) + if attempt >= max_attempts: + results.append(f"{name}:giveup:{attempt}/{max_attempts}") + continue + item = item_from_workload(wl) + delete_workload(name) + manifest = build_workload( + item, + namespace, + gate_profile_for(item.repo, gate_profiles), + agent_name, + attempt + 1, + ) + create_workload(manifest) + results.append(f"{name}:retry:{attempt + 1}/{max_attempts}") + return results diff --git a/apps/foreman-dispatch-bridge/bridge/workload.py b/apps/foreman-dispatch-bridge/bridge/workload.py index edfa455e..c477f5a6 100644 --- a/apps/foreman-dispatch-bridge/bridge/workload.py +++ b/apps/foreman-dispatch-bridge/bridge/workload.py @@ -12,6 +12,12 @@ # no entry of its own. GATE_PROFILE_WILDCARD = "*" +# Annotation keys the bridge stamps on each Workload so the failed-workload +# retry loop can read attempt count + the dispatch identity needed to unclaim. +ATTEMPT_ANNOTATION = "foreman.llmkube.dev/attempt" +ISSUE_ID_ANNOTATION = "foreman.llmkube.dev/issue-id" +AGENT_NAME_ANNOTATION = "foreman.llmkube.dev/agent-name" + def parse_gate_profiles(raw: Optional[str]) -> dict: """Parse the GATEPROFILE_MAP env var (JSON object: repo -> GateProfile). @@ -51,7 +57,13 @@ def workload_name(item: ClaimedItem) -> str: return f"wl-{owner_repo}-{item.issue_number}" -def build_workload(item: ClaimedItem, namespace: str, gate_profile: Optional[dict] = None) -> dict: +def build_workload( + item: ClaimedItem, + namespace: str, + gate_profile: Optional[dict] = None, + agent_name: str = "", + attempt: int = 1, +) -> dict: spec = { "intent": item.intent, "repo": item.repo, @@ -72,6 +84,13 @@ def build_workload(item: ClaimedItem, namespace: str, gate_profile: Optional[dic "name": workload_name(item), "namespace": namespace, "labels": {"created-by": "dispatch-bridge", "lane": item.lane}, + # attempt drives the retry cap; issue-id + agent-name let the retry + # loop unclaim the dispatch issue when retries are exhausted. + "annotations": { + ATTEMPT_ANNOTATION: str(attempt), + ISSUE_ID_ANNOTATION: item.issue_id, + AGENT_NAME_ANNOTATION: agent_name, + }, }, "spec": spec, } diff --git a/apps/foreman-dispatch-bridge/docker-bake.hcl b/apps/foreman-dispatch-bridge/docker-bake.hcl index 5d132564..a8747c8e 100644 --- a/apps/foreman-dispatch-bridge/docker-bake.hcl +++ b/apps/foreman-dispatch-bridge/docker-bake.hcl @@ -5,7 +5,7 @@ variable "APP" { } variable "VERSION" { - default = "0.2.0" + default = "0.3.0" } variable "SOURCE" { diff --git a/apps/foreman-dispatch-bridge/tests/test_claim.py b/apps/foreman-dispatch-bridge/tests/test_claim.py index 5e1c2d18..026558be 100644 --- a/apps/foreman-dispatch-bridge/tests/test_claim.py +++ b/apps/foreman-dispatch-bridge/tests/test_claim.py @@ -26,6 +26,7 @@ def test_to_claimed_item_maps_dispatch_fields(): assert item == ClaimedItem( repo="joryirving/home-ops", issue_number=42, intent="Fix the flaky reconcile test", lane="local", + issue_id="iss_abc123", ) @@ -45,6 +46,7 @@ def fake_post(url, headers, payload): assert item == ClaimedItem( repo="joryirving/home-ops", issue_number=42, intent="Fix the flaky reconcile test", lane="local", + issue_id="iss_abc123", ) assert captured["get_url"] == "http://d/api/agents/foreman/coder/queue?lane=local&includeClaimed=true" assert captured["claim_payload"] == { diff --git a/apps/foreman-dispatch-bridge/tests/test_retry.py b/apps/foreman-dispatch-bridge/tests/test_retry.py new file mode 100644 index 00000000..3a17cea3 --- /dev/null +++ b/apps/foreman-dispatch-bridge/tests/test_retry.py @@ -0,0 +1,82 @@ +from bridge.retry import attempt_of, item_from_workload, reconcile_failures + + +def _failed_wl(name, repo="misospace/dispatch", issue=7, attempt=None, lane="local", issue_id="id-7"): + ann = {"foreman.llmkube.dev/issue-id": issue_id} + if attempt is not None: + ann["foreman.llmkube.dev/attempt"] = str(attempt) + return { + "metadata": {"name": name, "labels": {"created-by": "dispatch-bridge", "lane": lane}, "annotations": ann}, + "spec": {"intent": "fix it", "repo": repo, "issues": [issue]}, + "status": {"phase": "Failed"}, + } + + +def test_attempt_of_defaults_and_parses(): + assert attempt_of(_failed_wl("w")) == 1 # no annotation -> 1 + assert attempt_of(_failed_wl("w", attempt=2)) == 2 + assert attempt_of({"metadata": {"annotations": {"foreman.llmkube.dev/attempt": "junk"}}}) == 1 + assert attempt_of({}) == 1 + + +def test_item_from_workload_reconstructs_fields(): + item = item_from_workload(_failed_wl("w", repo="a/b", issue=42, lane="cloud", issue_id="xyz")) + assert item.repo == "a/b" + assert item.issue_number == 42 + assert item.intent == "fix it" + assert item.lane == "cloud" + assert item.issue_id == "xyz" + + +class _Recorder: + def __init__(self, failed): + self.failed = failed + self.deleted = [] + self.created = [] + + def list_failed(self): + return self.failed + + def delete(self, name): + self.deleted.append(name) + + def create(self, manifest): + self.created.append(manifest) + + +def test_reconcile_retries_below_max_deletes_and_recreates_at_next_attempt(): + r = _Recorder([_failed_wl("wl-misospace-dispatch-7", attempt=1)]) + profiles = {"*": {"language": "generic"}} + out = reconcile_failures("foreman-coder", r.list_failed, r.create, r.delete, + namespace="llm", gate_profiles=profiles, max_attempts=3) + assert out == ["wl-misospace-dispatch-7:retry:2/3"] + assert r.deleted == ["wl-misospace-dispatch-7"] + assert len(r.created) == 1 + m = r.created[0] + # recreated with attempt+1, the current gateProfile, and the same name/branch + assert m["metadata"]["annotations"]["foreman.llmkube.dev/attempt"] == "2" + assert m["metadata"]["name"] == "wl-misospace-dispatch-7" + assert m["spec"]["gateProfile"] == {"language": "generic"} + + +def test_reconcile_gives_up_at_max_without_touching_the_workload(): + r = _Recorder([_failed_wl("wl-misospace-dispatch-7", attempt=3)]) + out = reconcile_failures("foreman-coder", r.list_failed, r.create, r.delete, + namespace="llm", gate_profiles={}, max_attempts=3) + assert out == ["wl-misospace-dispatch-7:giveup:3/3"] + assert r.deleted == [] # left as a tombstone + assert r.created == [] + + +def test_reconcile_first_attempt_annotation_absent_counts_as_one(): + r = _Recorder([_failed_wl("wl-a-b-1", attempt=None)]) + out = reconcile_failures("foreman-coder", r.list_failed, r.create, r.delete, + namespace="llm", gate_profiles={}, max_attempts=3) + assert out == ["wl-a-b-1:retry:2/3"] + assert r.created[0]["metadata"]["annotations"]["foreman.llmkube.dev/attempt"] == "2" + + +def test_reconcile_empty_is_noop(): + r = _Recorder([]) + assert reconcile_failures("foreman-coder", r.list_failed, r.create, r.delete, + namespace="llm", gate_profiles={}, max_attempts=3) == [] diff --git a/apps/foreman-dispatch-bridge/tests/test_workload.py b/apps/foreman-dispatch-bridge/tests/test_workload.py index dfaf3f09..2bf701d0 100644 --- a/apps/foreman-dispatch-bridge/tests/test_workload.py +++ b/apps/foreman-dispatch-bridge/tests/test_workload.py @@ -37,6 +37,18 @@ def test_build_workload_omits_gate_profile_by_default(): assert "gateProfile" not in build_workload(ITEM, namespace="llm")["spec"] +def test_build_workload_stamps_retry_annotations(): + item = ClaimedItem(repo="a/b", issue_number=9, intent="x", lane="local", issue_id="id-9") + ann = build_workload(item, namespace="llm", agent_name="foreman-coder", attempt=2)["metadata"]["annotations"] + assert ann["foreman.llmkube.dev/attempt"] == "2" + assert ann["foreman.llmkube.dev/issue-id"] == "id-9" + assert ann["foreman.llmkube.dev/agent-name"] == "foreman-coder" + + +def test_build_workload_defaults_attempt_to_one(): + assert build_workload(ITEM, namespace="llm")["metadata"]["annotations"]["foreman.llmkube.dev/attempt"] == "1" + + def test_build_workload_passes_gate_profile_through_verbatim(): profile = {"language": "node", "commands": {"test": "corepack pnpm i && corepack pnpm test"}} wl = build_workload(ITEM, namespace="llm", gate_profile=profile)