Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions apps/foreman-dispatch-bridge/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,12 @@ pytest -v
- `DISPATCH_URL` (default `http://dispatch.llm:3000`), `DISPATCH_AGENT_TOKEN`
- `DISPATCH_AGENT_NAME` (default `foreman/coder`), `DISPATCH_LANES` (default `local,cloud,frontier`)
- `FOREMAN_NAMESPACE` (default `llm`)
- `RETRY_MAX_ATTEMPTS` (default `3`): each run, before claiming new work, the
bridge retries `Failed` Workloads it created — delete + recreate at the next
attempt (so the re-run picks up the current config, e.g. `gateProfile`), up to
this many total attempts. At the cap it leaves the Workload as a `Failed`
tombstone (the issue stays claimed, so the groomer won't re-serve it into a
loop) for human triage. Needs `list`/`get`/`delete` RBAC on `workloads`.
- `GATEPROFILE_MAP` (optional, JSON): maps `owner/repo` → a Foreman
[`GateProfile`](https://llmkube.com/docs/foreman/language-gates), stamped on
each Workload's `spec.gateProfile` so non-Go repos run their own language
Expand Down
1 change: 1 addition & 0 deletions apps/foreman-dispatch-bridge/bridge/claim.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ def to_claimed_item(item: dict, lane: str) -> ClaimedItem:
issue_number=int(_number(item)),
intent=str(item.get("title") or ""),
lane=_lane(item) or lane,
issue_id=str(item.get("issueId") or item.get("id") or ""),
)


Expand Down
48 changes: 48 additions & 0 deletions apps/foreman-dispatch-bridge/bridge/main.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
import os
import time
from typing import Callable, Optional
from bridge.models import ClaimedItem
from bridge.workload import build_workload, gate_profile_for, parse_gate_profiles
from bridge.retry import reconcile_failures, DEFAULT_MAX_ATTEMPTS

ClaimOne = Callable[[str, str], Optional[ClaimedItem]] # (agent_name, lane) -> item | None

Expand Down Expand Up @@ -44,6 +46,7 @@ def _real_main() -> None: # pragma: no cover - thin wiring, exercised in the cl
lanes = [l.strip() for l in os.environ.get("DISPATCH_LANES", "local,cloud,frontier").split(",") if l.strip()]
namespace = os.environ.get("FOREMAN_NAMESPACE", "llm")
gate_profiles = parse_gate_profiles(os.environ.get("GATEPROFILE_MAP"))
max_attempts = int(os.environ.get("RETRY_MAX_ATTEMPTS", str(DEFAULT_MAX_ATTEMPTS)))

def http_get(url, headers):
r = requests.get(url, headers=headers, timeout=20)
Expand Down Expand Up @@ -72,6 +75,51 @@ def create_workload(manifest: dict) -> None:
if e.status != 409: # 409 = Workload already exists -> idempotent no-op
raise

def list_failed_workloads() -> list:
resp = api.list_namespaced_custom_object(
group="foreman.llmkube.dev", version="v1alpha1",
namespace=namespace, plural="workloads",
label_selector="created-by=dispatch-bridge",
)
return [
wl for wl in resp.get("items", [])
if (wl.get("status") or {}).get("phase") == "Failed"
]

def delete_workload(name: str) -> None:
# Foreground delete + poll: the retry recreates the same name, so the old
# object (and its owned AgenticTasks) must be fully gone first.
try:
api.delete_namespaced_custom_object(
group="foreman.llmkube.dev", version="v1alpha1",
namespace=namespace, plural="workloads", name=name,
body=client.V1DeleteOptions(propagation_policy="Foreground"),
)
except client.exceptions.ApiException as e:
if e.status == 404: # already gone
return
raise
for _ in range(60): # up to ~60s for cascade to complete
try:
api.get_namespaced_custom_object(
group="foreman.llmkube.dev", version="v1alpha1",
namespace=namespace, plural="workloads", name=name,
)
except client.exceptions.ApiException as e:
if e.status == 404:
return
raise
time.sleep(1)
raise TimeoutError(f"workload {name} still terminating after 60s")

# Retry failed workloads first (so a re-run this tick uses the current config),
# then claim new work.
for line in reconcile_failures(
agent_name, list_failed_workloads, create_workload, delete_workload,
namespace, gate_profiles, max_attempts,
):
print(line)

for line in run_once(lanes, agent_name, dispatch.claim_one, create_workload, namespace, gate_profiles):
print(line)

Expand Down
3 changes: 2 additions & 1 deletion apps/foreman-dispatch-bridge/bridge/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,4 +6,5 @@ class ClaimedItem:
repo: str # "owner/name"
issue_number: int
intent: str
lane: str # "normal" | "escalated"
lane: str # dispatch worker lane, e.g. "local" | "cloud" | "frontier"
issue_id: str = "" # dispatch DB id; needed to unclaim (release) the issue
85 changes: 85 additions & 0 deletions apps/foreman-dispatch-bridge/bridge/retry.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
from typing import Callable

from bridge.models import ClaimedItem
from bridge.workload import (
build_workload,
gate_profile_for,
ATTEMPT_ANNOTATION,
ISSUE_ID_ANNOTATION,
)

# How many total coder attempts before the bridge stops retrying a Workload and
# leaves it as a Failed tombstone for human triage. Override via env.
DEFAULT_MAX_ATTEMPTS = 3

ListFailed = Callable[[], list] # () -> list of Failed Workload manifests (dicts)
DeleteWorkload = Callable[[str], None] # (name) -> None; blocks until the object is gone


def attempt_of(wl: dict) -> int:
"""Read the attempt counter off a Workload; absent/garbage -> 1."""
ann = (wl.get("metadata") or {}).get("annotations") or {}
try:
return max(1, int(ann.get(ATTEMPT_ANNOTATION, "1")))
except (TypeError, ValueError):
return 1


def item_from_workload(wl: dict) -> ClaimedItem:
"""Reconstruct the ClaimedItem from a Workload so build_workload can re-render
it (with the CURRENT gateProfile/config) on retry."""
meta = wl.get("metadata") or {}
spec = wl.get("spec") or {}
labels = meta.get("labels") or {}
ann = meta.get("annotations") or {}
issues = spec.get("issues") or [0]
return ClaimedItem(
repo=str(spec.get("repo") or ""),
issue_number=int(issues[0]),
intent=str(spec.get("intent") or ""),
lane=str(labels.get("lane") or ""),
issue_id=str(ann.get(ISSUE_ID_ANNOTATION) or ""),
)


def reconcile_failures(
agent_name: str,
list_failed: ListFailed,
create_workload: Callable[[dict], None],
delete_workload: DeleteWorkload,
namespace: str,
gate_profiles: dict,
max_attempts: int = DEFAULT_MAX_ATTEMPTS,
) -> list:
"""Retry Failed bridge Workloads, bounded by max_attempts.

For each Failed Workload:
- attempt < max_attempts: delete it and recreate a fresh one at attempt+1,
so it re-runs with the current config (gateProfile, agent refs). The name
is deterministic, so delete-then-recreate reuses the same name/branch;
delete_workload must block until the old object is gone.
- attempt >= max_attempts: leave it as a Failed tombstone (no action). The
issue stays claimed so the groomer won't re-serve it into a loop; a human
triages from the lingering Workload.

Returns per-Workload outcome strings.
"""
results = []
for wl in list_failed():
name = ((wl.get("metadata") or {}).get("name")) or "?"
attempt = attempt_of(wl)
if attempt >= max_attempts:
results.append(f"{name}:giveup:{attempt}/{max_attempts}")
continue
item = item_from_workload(wl)
delete_workload(name)
manifest = build_workload(
item,
namespace,
gate_profile_for(item.repo, gate_profiles),
agent_name,
attempt + 1,
)
create_workload(manifest)
results.append(f"{name}:retry:{attempt + 1}/{max_attempts}")
return results
21 changes: 20 additions & 1 deletion apps/foreman-dispatch-bridge/bridge/workload.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,12 @@
# no entry of its own.
GATE_PROFILE_WILDCARD = "*"

# Annotation keys the bridge stamps on each Workload so the failed-workload
# retry loop can read attempt count + the dispatch identity needed to unclaim.
ATTEMPT_ANNOTATION = "foreman.llmkube.dev/attempt"
ISSUE_ID_ANNOTATION = "foreman.llmkube.dev/issue-id"
AGENT_NAME_ANNOTATION = "foreman.llmkube.dev/agent-name"


def parse_gate_profiles(raw: Optional[str]) -> dict:
"""Parse the GATEPROFILE_MAP env var (JSON object: repo -> GateProfile).
Expand Down Expand Up @@ -51,7 +57,13 @@ def workload_name(item: ClaimedItem) -> str:
return f"wl-{owner_repo}-{item.issue_number}"


def build_workload(item: ClaimedItem, namespace: str, gate_profile: Optional[dict] = None) -> dict:
def build_workload(
item: ClaimedItem,
namespace: str,
gate_profile: Optional[dict] = None,
agent_name: str = "",
attempt: int = 1,
) -> dict:
spec = {
"intent": item.intent,
"repo": item.repo,
Expand All @@ -72,6 +84,13 @@ def build_workload(item: ClaimedItem, namespace: str, gate_profile: Optional[dic
"name": workload_name(item),
"namespace": namespace,
"labels": {"created-by": "dispatch-bridge", "lane": item.lane},
# attempt drives the retry cap; issue-id + agent-name let the retry
# loop unclaim the dispatch issue when retries are exhausted.
"annotations": {
ATTEMPT_ANNOTATION: str(attempt),
ISSUE_ID_ANNOTATION: item.issue_id,
AGENT_NAME_ANNOTATION: agent_name,
},
},
"spec": spec,
}
2 changes: 1 addition & 1 deletion apps/foreman-dispatch-bridge/docker-bake.hcl
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ variable "APP" {
}

variable "VERSION" {
default = "0.2.0"
default = "0.3.0"
}

variable "SOURCE" {
Expand Down
2 changes: 2 additions & 0 deletions apps/foreman-dispatch-bridge/tests/test_claim.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ def test_to_claimed_item_maps_dispatch_fields():
assert item == ClaimedItem(
repo="joryirving/home-ops", issue_number=42,
intent="Fix the flaky reconcile test", lane="local",
issue_id="iss_abc123",
)


Expand All @@ -45,6 +46,7 @@ def fake_post(url, headers, payload):
assert item == ClaimedItem(
repo="joryirving/home-ops", issue_number=42,
intent="Fix the flaky reconcile test", lane="local",
issue_id="iss_abc123",
)
assert captured["get_url"] == "http://d/api/agents/foreman/coder/queue?lane=local&includeClaimed=true"
assert captured["claim_payload"] == {
Expand Down
82 changes: 82 additions & 0 deletions apps/foreman-dispatch-bridge/tests/test_retry.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
from bridge.retry import attempt_of, item_from_workload, reconcile_failures


def _failed_wl(name, repo="misospace/dispatch", issue=7, attempt=None, lane="local", issue_id="id-7"):
ann = {"foreman.llmkube.dev/issue-id": issue_id}
if attempt is not None:
ann["foreman.llmkube.dev/attempt"] = str(attempt)
return {
"metadata": {"name": name, "labels": {"created-by": "dispatch-bridge", "lane": lane}, "annotations": ann},
"spec": {"intent": "fix it", "repo": repo, "issues": [issue]},
"status": {"phase": "Failed"},
}


def test_attempt_of_defaults_and_parses():
assert attempt_of(_failed_wl("w")) == 1 # no annotation -> 1
assert attempt_of(_failed_wl("w", attempt=2)) == 2
assert attempt_of({"metadata": {"annotations": {"foreman.llmkube.dev/attempt": "junk"}}}) == 1
assert attempt_of({}) == 1


def test_item_from_workload_reconstructs_fields():
item = item_from_workload(_failed_wl("w", repo="a/b", issue=42, lane="cloud", issue_id="xyz"))
assert item.repo == "a/b"
assert item.issue_number == 42
assert item.intent == "fix it"
assert item.lane == "cloud"
assert item.issue_id == "xyz"


class _Recorder:
def __init__(self, failed):
self.failed = failed
self.deleted = []
self.created = []

def list_failed(self):
return self.failed

def delete(self, name):
self.deleted.append(name)

def create(self, manifest):
self.created.append(manifest)


def test_reconcile_retries_below_max_deletes_and_recreates_at_next_attempt():
r = _Recorder([_failed_wl("wl-misospace-dispatch-7", attempt=1)])
profiles = {"*": {"language": "generic"}}
out = reconcile_failures("foreman-coder", r.list_failed, r.create, r.delete,
namespace="llm", gate_profiles=profiles, max_attempts=3)
assert out == ["wl-misospace-dispatch-7:retry:2/3"]
assert r.deleted == ["wl-misospace-dispatch-7"]
assert len(r.created) == 1
m = r.created[0]
# recreated with attempt+1, the current gateProfile, and the same name/branch
assert m["metadata"]["annotations"]["foreman.llmkube.dev/attempt"] == "2"
assert m["metadata"]["name"] == "wl-misospace-dispatch-7"
assert m["spec"]["gateProfile"] == {"language": "generic"}


def test_reconcile_gives_up_at_max_without_touching_the_workload():
r = _Recorder([_failed_wl("wl-misospace-dispatch-7", attempt=3)])
out = reconcile_failures("foreman-coder", r.list_failed, r.create, r.delete,
namespace="llm", gate_profiles={}, max_attempts=3)
assert out == ["wl-misospace-dispatch-7:giveup:3/3"]
assert r.deleted == [] # left as a tombstone
assert r.created == []


def test_reconcile_first_attempt_annotation_absent_counts_as_one():
r = _Recorder([_failed_wl("wl-a-b-1", attempt=None)])
out = reconcile_failures("foreman-coder", r.list_failed, r.create, r.delete,
namespace="llm", gate_profiles={}, max_attempts=3)
assert out == ["wl-a-b-1:retry:2/3"]
assert r.created[0]["metadata"]["annotations"]["foreman.llmkube.dev/attempt"] == "2"


def test_reconcile_empty_is_noop():
r = _Recorder([])
assert reconcile_failures("foreman-coder", r.list_failed, r.create, r.delete,
namespace="llm", gate_profiles={}, max_attempts=3) == []
12 changes: 12 additions & 0 deletions apps/foreman-dispatch-bridge/tests/test_workload.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,18 @@ def test_build_workload_omits_gate_profile_by_default():
assert "gateProfile" not in build_workload(ITEM, namespace="llm")["spec"]


def test_build_workload_stamps_retry_annotations():
item = ClaimedItem(repo="a/b", issue_number=9, intent="x", lane="local", issue_id="id-9")
ann = build_workload(item, namespace="llm", agent_name="foreman-coder", attempt=2)["metadata"]["annotations"]
assert ann["foreman.llmkube.dev/attempt"] == "2"
assert ann["foreman.llmkube.dev/issue-id"] == "id-9"
assert ann["foreman.llmkube.dev/agent-name"] == "foreman-coder"


def test_build_workload_defaults_attempt_to_one():
assert build_workload(ITEM, namespace="llm")["metadata"]["annotations"]["foreman.llmkube.dev/attempt"] == "1"


def test_build_workload_passes_gate_profile_through_verbatim():
profile = {"language": "node", "commands": {"test": "corepack pnpm i && corepack pnpm test"}}
wl = build_workload(ITEM, namespace="llm", gate_profile=profile)
Expand Down
Loading