From 1026e4317389ece6484ae20f0cfb86fd44d0fd1a Mon Sep 17 00:00:00 2001 From: Matt Leaverton Date: Fri, 24 Apr 2026 17:40:34 -0500 Subject: [PATCH] feat: emit run_completed/run_failed terminal events in progress.ndjson (#7) - Add FinalCanceled FinalStatus = "canceled" to runtime/final.go - Add emitTerminalProgressEvent() method emitted as the final line of progress.ndjson, after final.json is fully written and git push completes - Add isCanceledError() helper to distinguish externally-canceled runs (context.Canceled/DeadlineExceeded) from hard failures - Modify persistFatalOutcome() to set FinalCanceled status on context cancel - Add docs/runs-layout.md documenting run directory layout, progress.ndjson lifecycle events (including terminal events), and final.json contract - Add TestProgressNDJSON_TerminalEvent_RunCompleted (success path) - Add TestProgressNDJSON_TerminalEvent_RunFailed (fan-in all-fail path) Ordering guarantee: terminal event is emitted AFTER final.json is written and AFTER gitPushIfConfigured(), so it is always the last line in the stream. --- docs/runs-layout.md | 207 ++++++++++++++++++ internal/attractor/engine/engine.go | 50 ++++- .../attractor/engine/terminal_event_test.go | 158 +++++++++++++ internal/attractor/runtime/final.go | 5 +- 4 files changed, 417 insertions(+), 3 deletions(-) create mode 100644 docs/runs-layout.md create mode 100644 internal/attractor/engine/terminal_event_test.go diff --git a/docs/runs-layout.md b/docs/runs-layout.md new file mode 100644 index 00000000..012b3f0b --- /dev/null +++ b/docs/runs-layout.md @@ -0,0 +1,207 @@ +# Run Directory Layout + +Each Kilroy attractor run produces a self-contained directory under +`$XDG_STATE_HOME/kilroy/attractor/runs//` (defaulting to +`~/.local/state/kilroy/attractor/runs//`). + +## Directory Structure + +``` +/ +├── manifest.json # Run identity and configuration snapshot +├── graph.dot # DOT source used for this run (preserved for replay/resume) +├── run_config.json # Snapshotted RunConfig (if used) +├── run.pid # PID of the process currently executing this run +│ +├── checkpoint.json # Last stable execution state (for resume) +│ +├── progress.ndjson # ← Append-only event stream (see §Lifecycle Events) +├── live.json # Last progress event (overwritten on each event) +│ +├── final.json # ← Completion contract (see §Completion Contract) +├── failure_dossier.json # Structured failure analysis (on failure) +│ +├── run.log # Human-readable structured log +├── run.tgz # Archive of the entire run directory +│ +├── / # Per-node artifacts +│ ├── status.json # Stage outcome (status, failure_reason, ...) +│ ├── prompt.md # Agent prompt (LLM nodes) +│ ├── response.md # Agent response (LLM nodes) +│ ├── parallel_results.json # Fan-out results (parallel split nodes) +│ └── ... # Other stage-specific artifacts +│ +├── outputs/ # Declared output artifacts collected after run +├── worktree/ # Git worktree (or plain working directory in no-git mode) +└── modeldb/ # Model catalog snapshot +``` + +## Completion Contract + +### `final.json` + +`final.json` is the **canonical, stable completion signal** for a run. It is +written atomically after all stage execution is complete. Callers that need to +wait for a run to finish should poll for the existence of `final.json`. + +**Schema:** + +```json +{ + "timestamp": "2026-04-24T10:00:00Z", + "status": "success", + "run_id": "01ABC123...", + "final_git_commit_sha": "abc123...", + "failure_reason": "", + "cxdb_context_id": "...", + "cxdb_head_turn_id": "..." +} +``` + +| Field | Values | Description | +|---|---|---| +| `status` | `success`, `fail`, `canceled` | Terminal run status | +| `run_id` | string | Unique run identifier | +| `failure_reason` | string (empty on success) | Human-readable reason for failure | +| `final_git_commit_sha` | string | Last git commit SHA (empty in no-git mode) | +| `timestamp` | ISO8601 | When the outcome was persisted | + +## Lifecycle Events + +`progress.ndjson` is an append-only newline-delimited JSON stream. Each line is +a self-contained JSON object. The file is written with O_APPEND so it is safe +to tail while a run is in progress. + +### Common Event Fields + +Every event in the stream has these envelope fields: + +| Field | Description | +|---|---| +| `event` | Event type identifier (string) | +| `id` | Unique event ID (8-char hex) | +| `run_id` | Run identifier | +| `ts` | ISO8601 timestamp (UTC, nanosecond precision) | + +### Stage Events + +| Event | Description | +|---|---| +| `stage_attempt_start` | A stage (node) attempt is starting | +| `stage_attempt_success` | A stage attempt completed successfully | +| `stage_attempt_fail` | A stage attempt failed | +| `stage_retry_sleep` | Engine is sleeping before a retry | +| `stage_retry_blocked` | Retry was blocked (e.g. deterministic failure) | +| `stage_heartbeat` | Periodic liveness signal from a running stage | + +### Routing Events + +| Event | Description | +|---|---| +| `edge_selected` | An outgoing edge was chosen for traversal | +| `edge_condition_evaluated` | A conditional edge was evaluated | +| `no_matching_fail_edge_fallback` | No fail edge matched; trying retry_target | +| `status_ingestion_decision` | Status source (canonical/worktree/AI) was selected | + +### Input Events + +| Event | Description | +|---|---| +| `input_materialization_start` | Input materialization is beginning | +| `input_materialization_complete` | All inputs are ready | + +### Session Events + +| Event | Description | +|---|---| +| `tmux_session_start` | A tmux-based agent session is starting | +| `tmux_session_complete` | A tmux-based agent session completed | + +### Infrastructure Events + +| Event | Description | +|---|---| +| `stall_watchdog_timeout` | Run stalled (no progress) for the configured timeout | +| `git_push_start` | Pushing run branch to remote | +| `git_push_ok` | Git push succeeded | +| `git_push_failed` | Git push failed (run outcome unaffected) | + +### Terminal Events + +**These are the final events in `progress.ndjson`.** They are emitted after +`final.json` is fully written, so a consumer that observes a terminal event can +immediately open `final.json` and find it complete. + +#### `run_completed` — successful run + +```json +{ + "event": "run_completed", + "status": "success", + "run_id": "01ABC123...", + "id": "a1b2c3d4", + "ts": "2026-04-24T10:05:00.123456789Z" +} +``` + +#### `run_failed` — failed or canceled run + +```json +{ + "event": "run_failed", + "status": "fail", + "run_id": "01ABC123...", + "reason": "stage failed with no outgoing fail edge: ...", + "id": "a1b2c3d4", + "ts": "2026-04-24T10:05:00.123456789Z" +} +``` + +| `status` value | Meaning | +|---|---| +| `fail` | The run encountered a hard failure (e.g. stage error, no routing path) | +| `canceled` | The run was externally canceled via context cancellation | + +The `reason` field is included on `run_failed` events when a failure reason is +available. It is omitted when the reason is empty. + +### Ordering Guarantee + +**`final.json` is always written before the terminal event is appended to +`progress.ndjson`.** This means: + +1. A reader tailing `progress.ndjson` that sees `run_completed` or `run_failed` + can immediately open `final.json` and find it present and complete. +2. Polling for `final.json` remains valid as the primary completion signal. +3. The terminal event in `progress.ndjson` is an additional, streaming-friendly + signal that eliminates the need for file-existence polling. + +### Example Stream + +A minimal successful run (`start → exit`) produces a stream like: + +```ndjson +{"event":"stage_attempt_start","node_id":"start","attempt":1,"run_id":"...","id":"...","ts":"..."} +{"event":"stage_attempt_success","node_id":"start","attempt":1,"run_id":"...","id":"...","ts":"..."} +{"event":"edge_selected","from_node":"start","to_node":"exit","run_id":"...","id":"...","ts":"..."} +{"event":"stage_attempt_start","node_id":"exit","attempt":1,"run_id":"...","id":"...","ts":"..."} +{"event":"stage_attempt_success","node_id":"exit","attempt":1,"run_id":"...","id":"...","ts":"..."} +{"event":"run_completed","status":"success","run_id":"...","id":"...","ts":"..."} +``` + +## Resume Layout + +When a run is resumed after a checkpoint, new stages execute in the same +`/` directory. If a `loop_restart` triggers a new attempt, it creates +a sibling directory: + +``` +/ +└── restart-1/ + ├── progress.ndjson # Events for the restarted attempt + ├── final.json # Completion contract for the restarted attempt + └── ... +``` + +Each restart directory has its own independent `progress.ndjson` with its own +terminal event. diff --git a/internal/attractor/engine/engine.go b/internal/attractor/engine/engine.go index 538346bb..8f9c5de8 100644 --- a/internal/attractor/engine/engine.go +++ b/internal/attractor/engine/engine.go @@ -2,6 +2,7 @@ package engine import ( "context" + "errors" "fmt" "io/fs" "os" @@ -1960,9 +1961,13 @@ func (e *Engine) persistFatalOutcome(ctx context.Context, runErr error) { } failedTurnID, _ := e.cxdbRunFailed(ctx, nodeID, sha, reason) + status := runtime.FinalFail + if isCanceledError(runErr) { + status = runtime.FinalCanceled + } final := runtime.FinalOutcome{ Timestamp: time.Now().UTC(), - Status: runtime.FinalFail, + Status: status, RunID: e.Options.RunID, FinalGitCommitSHA: sha, FailureReason: reason, @@ -2039,6 +2044,49 @@ func (e *Engine) persistTerminalOutcome(ctx context.Context, final runtime.Final // Best-effort push after terminal outcome so remote has final state. e.gitPushIfConfigured() + + // Emit the terminal progress event as the final line of progress.ndjson. + // This MUST be emitted after final.json is written so that any reader + // observing this event can immediately open final.json. + e.emitTerminalProgressEvent(final) +} + +// emitTerminalProgressEvent appends the terminal lifecycle event to progress.ndjson. +// It is called as the very last action of persistTerminalOutcome so that it is +// always the final line in the stream. +func (e *Engine) emitTerminalProgressEvent(final runtime.FinalOutcome) { + switch final.Status { + case runtime.FinalSuccess: + e.appendProgress(map[string]any{ + "event": "run_completed", + "status": "success", + }) + case runtime.FinalFail: + ev := map[string]any{ + "event": "run_failed", + "status": "fail", + } + if reason := strings.TrimSpace(final.FailureReason); reason != "" { + ev["reason"] = reason + } + e.appendProgress(ev) + case runtime.FinalCanceled: + ev := map[string]any{ + "event": "run_failed", + "status": "canceled", + } + if reason := strings.TrimSpace(final.FailureReason); reason != "" { + ev["reason"] = reason + } + e.appendProgress(ev) + } +} + +// isCanceledError reports whether err is a context cancellation error +// (context.Canceled or context.DeadlineExceeded). Used to classify +// externally-canceled runs as FinalCanceled vs internally-failed FinalFail. +func isCanceledError(err error) bool { + return errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) } // gitPushIfConfigured pushes the run branch to the configured remote. diff --git a/internal/attractor/engine/terminal_event_test.go b/internal/attractor/engine/terminal_event_test.go new file mode 100644 index 00000000..31e483ee --- /dev/null +++ b/internal/attractor/engine/terminal_event_test.go @@ -0,0 +1,158 @@ +package engine + +import ( + "bufio" + "context" + "encoding/json" + "fmt" + "os" + "path/filepath" + "strings" + "testing" + "time" +) + +// readLastProgressEvent parses progress.ndjson and returns the last non-empty +// JSON event. It fails the test if the file is missing or contains no events. +func readLastProgressEvent(t *testing.T, progressPath string) map[string]any { + t.Helper() + f, err := os.Open(progressPath) + if err != nil { + t.Fatalf("open progress.ndjson: %v", err) + } + defer func() { _ = f.Close() }() + + var last map[string]any + sc := bufio.NewScanner(f) + for sc.Scan() { + line := strings.TrimSpace(sc.Text()) + if line == "" { + continue + } + var ev map[string]any + if err := json.Unmarshal([]byte(line), &ev); err != nil { + t.Fatalf("unmarshal progress line: %v (line=%q)", err, line) + } + last = ev + } + if err := sc.Err(); err != nil { + t.Fatalf("scan progress.ndjson: %v", err) + } + if last == nil { + t.Fatal("progress.ndjson is empty or has no valid JSON lines") + } + return last +} + +// TestProgressNDJSON_TerminalEvent_RunCompleted verifies that a successful run +// ends with a run_completed event as the last line of progress.ndjson with the +// correct fields (event, status, run_id, ts). +func TestProgressNDJSON_TerminalEvent_RunCompleted(t *testing.T) { + // Minimal graph: start → exit (no agent nodes, no git required). + dot := []byte(` +digraph T { + graph [goal="terminal event test"] + start [shape=Mdiamond] + exit [shape=Msquare] + start -> exit +} +`) + ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) + defer cancel() + + logsRoot := t.TempDir() + runID := fmt.Sprintf("term-ok-%d", time.Now().UnixNano()) + + res, err := runForTest(t, ctx, dot, RunOptions{ + LogsRoot: logsRoot, + RunID: runID, + }) + if err != nil { + t.Fatalf("Run() error: %v", err) + } + + last := readLastProgressEvent(t, filepath.Join(res.LogsRoot, "progress.ndjson")) + + // The last line must be run_completed with correct fields. + if last["event"] != "run_completed" { + t.Fatalf("last progress event: got %q want %q", last["event"], "run_completed") + } + if last["status"] != "success" { + t.Fatalf("last event status: got %q want %q", last["status"], "success") + } + if last["run_id"] != res.RunID { + t.Fatalf("last event run_id: got %q want %q", last["run_id"], res.RunID) + } + ts, _ := last["ts"].(string) + if strings.TrimSpace(ts) == "" { + t.Fatal("last event ts is empty") + } + + // final.json must already exist when run_completed is observed (ordering guarantee). + finalPath := filepath.Join(res.LogsRoot, "final.json") + if _, err := os.Stat(finalPath); err != nil { + t.Fatalf("final.json must exist before run_completed is emitted: %v", err) + } +} + +// TestProgressNDJSON_TerminalEvent_RunFailed verifies that a failed run ends +// with a run_failed event (status=fail) as the last line of progress.ndjson. +// +// The graph uses a parallel fan-in pattern where both branches fail with +// exit code 1. When the fan-in join node has no outgoing fail edge, the engine +// terminates with FinalFail, which triggers the run_failed terminal event. +func TestProgressNDJSON_TerminalEvent_RunFailed(t *testing.T) { + dot := []byte(` +digraph T { + graph [goal="terminal failure event test"] + start [shape=Mdiamond] + par [shape=component] + a [shape=parallelogram, tool_command="exit 1", max_retries=0] + b [shape=parallelogram, tool_command="exit 1", max_retries=0] + join [shape=tripleoctagon, max_retries=0] + exit [shape=Msquare] + start -> par + par -> a + par -> b + a -> join + b -> join + join -> exit +} +`) + ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) + defer cancel() + + logsRoot := t.TempDir() + runID := fmt.Sprintf("term-fail-%d", time.Now().UnixNano()) + + _, err := runForTest(t, ctx, dot, RunOptions{ + LogsRoot: logsRoot, + RunID: runID, + }) + if err == nil { + t.Fatal("expected Run() to return an error for a failing graph") + } + + last := readLastProgressEvent(t, filepath.Join(logsRoot, "progress.ndjson")) + + // The last line must be run_failed with correct fields. + if last["event"] != "run_failed" { + t.Fatalf("last progress event: got %q want %q", last["event"], "run_failed") + } + if last["status"] != "fail" { + t.Fatalf("last event status: got %q want %q", last["status"], "fail") + } + if last["run_id"] != runID { + t.Fatalf("last event run_id: got %q want %q", last["run_id"], runID) + } + ts, _ := last["ts"].(string) + if strings.TrimSpace(ts) == "" { + t.Fatal("last event ts is empty") + } + + // final.json must exist when run_failed is observed (ordering guarantee). + finalPath := filepath.Join(logsRoot, "final.json") + if _, err := os.Stat(finalPath); err != nil { + t.Fatalf("final.json must exist before run_failed is emitted: %v", err) + } +} diff --git a/internal/attractor/runtime/final.go b/internal/attractor/runtime/final.go index 8ec0488a..043907c0 100644 --- a/internal/attractor/runtime/final.go +++ b/internal/attractor/runtime/final.go @@ -8,8 +8,9 @@ import ( type FinalStatus string const ( - FinalSuccess FinalStatus = "success" - FinalFail FinalStatus = "fail" + FinalSuccess FinalStatus = "success" + FinalFail FinalStatus = "fail" + FinalCanceled FinalStatus = "canceled" ) type FinalOutcome struct {