Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,11 @@

## Unreleased

### Changed

- `acd list` now shows the intent batch-wait countdown in `wait` rows, so
sparse queues show when the age trigger will commit without extra commands.

### Fixed

- `acd fix --force` now purges failed replay barriers with pending successors,
Expand Down
2 changes: 1 addition & 1 deletion CLAUDE.md
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,7 @@ acd status --repo .

- `acd fix` is the recovery entrypoint. Dry-run is default without `--yes`.
- `--yes` applies safe actions: resolve already-landed barriers, retarget stale anchors, delete obsolete barriers, mark externally-published rows, clear expired manual pauses, clear drained backpressure.
- `--force` also purges blocked barriers with pending successors; combine with `--yes` to apply.
- `--force` also purges terminal replay barriers with pending successors, including failed rows; combine with `--yes` to apply.
- Fixes refuse while a live daemon owns the state DB; state.db is backed up before mutation.
- `acd resume --yes` lifts only manual pause.
- `acd recover` and `acd purge-events` are deprecated and hidden; use `acd fix`.
Expand Down
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,7 @@ acd status # post-check: confirm pending/blocked counts

`acd fix` is the single recovery entrypoint. It backs up `state.db` before mutating, refuses to run while a live daemon owns the database, and won't lift a manual pause unless you pass `--clear-pause`. Safe apply (`acd fix --yes`) handles only self-verifiable cleanup. Force apply is explicit: use `--force` only after the dry-run shows terminal barriers with pending successors and you have verified the captured changes are already represented in `HEAD` or are intentionally being discarded. Use `acd resume --yes` when the only problem is a stale pause marker.

After recovery, `acd list` shows `blk` / `wait` in the default compact table (`blocked` / `waiting` with `--verbose` or `--json`): blocked means operator action is still required; waiting means queued work remains without an active blocker. With intent strategy, pending-only queues may simply be waiting for `ACD_INTENT_MIN_PENDING` or `ACD_INTENT_MAX_PENDING_AGE`; run `acd flush --logical --session-id "$ACD_SESSION_ID"` from an active harness session to drain the visible batch now, or wait for the age trigger.
After recovery, `acd list` shows `blk` / `wait` in the default compact table (`blocked` / `waiting` with `--verbose` or `--json`): blocked means operator action is still required; waiting means queued work remains without an active blocker. With intent strategy, pending-only queues may simply be waiting for `ACD_INTENT_MIN_PENDING` or `ACD_INTENT_MAX_PENDING_AGE`; compact `wait` rows include the remaining age-trigger countdown when that batch wait is active. Run `acd flush --logical --session-id "$ACD_SESSION_ID"` from an active harness session to drain the visible batch now, or wait for the age trigger.

If a parallel committer (Claude Code's atomic-commit hook, Codex ACD hook, your own script) lands the change before ACD's replay tick, you'll see `handled_external` or `superseded_external` in `acd events`. That's normal. Real content mismatches still surface as `blocked_conflict`.

Expand Down
2 changes: 1 addition & 1 deletion docs/capture-replay.md
Original file line number Diff line number Diff line change
Expand Up @@ -607,7 +607,7 @@ the pause fields:
| Value | Meaning |
|---|---|
| `"OK"` | Daemon running, no pause, no stale heartbeat, and no queued or blocked work. |
| `"waiting"` | Work is queued but not blocked. In intent mode this can be a normal batch wait; wait for more captures or the age trigger, or run `acd flush --logical --session-id "$ACD_SESSION_ID"` from an active harness session. |
| `"waiting"` | Work is queued but not blocked. In intent mode this can be a normal batch wait; `acd list` shows the remaining age-trigger countdown when that wait is active. Wait for more captures or the age trigger, or run `acd flush --logical --session-id "$ACD_SESSION_ID"` from an active harness session. |
| `"blocked"` | A terminal barrier is holding replay. Operator action is required: diagnose, dry-run `acd fix`, then choose safe or explicit force apply. |
| `"paused"` | Replay paused (operator or rewind grace). Takes priority over `stale`. |
| `"stale"` | Daemon heartbeat expired or PID dead, at least one live client present. |
Expand Down
119 changes: 103 additions & 16 deletions internal/cli/list.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,22 +29,25 @@ const defaultListWatchInterval = 2 * time.Second
// listEntry is one row in the `acd list` output. JSON marshal tags match
// the §7.7 example shape.
type listEntry struct {
Path string `json:"path"`
RepoHash string `json:"repo_hash"`
Daemon string `json:"daemon"`
PID int `json:"pid,omitempty"`
Clients int `json:"clients"`
LastSeq int64 `json:"last_seq"`
LastCommitOID string `json:"last_commit_oid,omitempty"`
HeartbeatAgeSecs float64 `json:"heartbeat_age_seconds,omitempty"`
PendingEvents int `json:"pending_events"`
BlockedConflicts int `json:"blocked_conflicts"`
ActiveBarriers int `json:"active_barriers,omitempty"`
Status string `json:"status"`
StatusNote string `json:"status_note,omitempty"`
Paused bool `json:"paused,omitempty"`
StaleHeartbeat bool `json:"stale_heartbeat,omitempty"`
Pause *pauseInfo `json:"pause,omitempty"`
Path string `json:"path"`
RepoHash string `json:"repo_hash"`
Daemon string `json:"daemon"`
PID int `json:"pid,omitempty"`
Clients int `json:"clients"`
LastSeq int64 `json:"last_seq"`
LastCommitOID string `json:"last_commit_oid,omitempty"`
HeartbeatAgeSecs float64 `json:"heartbeat_age_seconds,omitempty"`
PendingEvents int `json:"pending_events"`
BlockedConflicts int `json:"blocked_conflicts"`
ActiveBarriers int `json:"active_barriers,omitempty"`
IntentWaitSeconds int64 `json:"intent_wait_seconds,omitempty"`
IntentVisiblePending int `json:"intent_visible_pending,omitempty"`
IntentMinPending int `json:"intent_min_pending,omitempty"`
Status string `json:"status"`
StatusNote string `json:"status_note,omitempty"`
Paused bool `json:"paused,omitempty"`
StaleHeartbeat bool `json:"stale_heartbeat,omitempty"`
Pause *pauseInfo `json:"pause,omitempty"`
}

func newListCmd() *cobra.Command {
Expand Down Expand Up @@ -229,6 +232,15 @@ func collectListSnapshot(ctx context.Context, errOut io.Writer) (listSnapshot, e
} else if summary.pendingEvents > 0 {
e.Status = "waiting"
e.StatusNote = "pending captures queued; no recovery blockers"
if summary.intentWait != nil {
e.IntentWaitSeconds = summary.intentWait.waitSeconds
e.IntentVisiblePending = summary.intentWait.visiblePending
e.IntentMinPending = summary.intentWait.minPending
e.StatusNote = fmt.Sprintf("intent batch wait: pending=%d/%d, trigger in %s",
summary.intentWait.visiblePending,
summary.intentWait.minPending,
formatDurationCompact(time.Duration(summary.intentWait.waitSeconds)*time.Second))
}
}
if summary.pause != nil {
e.Status = "paused"
Expand Down Expand Up @@ -290,6 +302,9 @@ func renderListTableCompact(out io.Writer, entries []listEntry) error {
for _, e := range entries {
repo := listRepoLabelCompact(e.Path, labels)
statusCol := listStatusCompact(e.Status)
if e.Status == "waiting" && e.IntentWaitSeconds > 0 {
statusCol = statusCol + " " + formatDurationCompact(time.Duration(e.IntentWaitSeconds)*time.Second)
}
if listRowMissing(e.Status) {
fmt.Fprintf(tw, "%s\t-\t\t\t\t%s\n", repo, statusCol)
continue
Expand Down Expand Up @@ -366,6 +381,13 @@ type repoSummary struct {
blockedConflicts int
activeBarriers int
pause *pauseInfo
intentWait *listIntentWaitSummary
}

type listIntentWaitSummary struct {
waitSeconds int64
visiblePending int
minPending int
}

// summarizeRepo opens the per-repo state.db read-only and pulls a small
Expand Down Expand Up @@ -470,6 +492,13 @@ func summarizeRepo(ctx context.Context, dbPath string, now time.Time, ttl time.D
}
s.blockedConflicts = blockers.TotalBlockedConflicts
s.activeBarriers = blockers.ActiveBlockedBarriersWithSuccessors
if s.pendingEvents > 0 && s.blockedConflicts == 0 && s.activeBarriers == 0 {
if intentWait, err := loadListIntentWaitSummary(ctx, conn); err != nil {
return repoSummary{}, fmt.Errorf("intent wait summary: %w", err)
} else {
s.intentWait = intentWait
}
}
if info, err := pauseInfoForRepo(ctx, conn, dbPath, now); err != nil {
return repoSummary{}, fmt.Errorf("pause state: %w", err)
} else {
Expand All @@ -479,6 +508,64 @@ func summarizeRepo(ctx context.Context, dbPath string, now time.Time, ttl time.D
return s, nil
}

func loadListIntentWaitSummary(ctx context.Context, conn *sql.DB) (*listIntentWaitSummary, error) {
report := intentStrategyFromEnv()
strategy, err := ResolveEffectiveCommitStrategy(ctx, conn)
if err != nil {
return nil, err
}
report.Strategy = string(strategy)
report.Active = strategy == "intent"
if !report.Active {
return nil, nil
}
if v, ok, err := metaLookup(ctx, conn, "intent.min_pending"); err != nil {
return nil, fmt.Errorf("intent.min_pending: %w", err)
} else if ok {
report.MinPending = parseIntentMetaInt(v, report.MinPending)
}
if v, ok, err := metaLookup(ctx, conn, "intent.max_pending_age"); err != nil {
return nil, fmt.Errorf("intent.max_pending_age: %w", err)
} else if ok {
report.MaxPendingAgeSeconds = parseIntentMetaDurationSeconds(v, report.MaxPendingAgeSeconds)
}
if v, ok, err := metaLookup(ctx, conn, "intent.defer_limit"); err != nil {
return nil, fmt.Errorf("intent.defer_limit: %w", err)
} else if ok {
report.DeferLimit = parseIntentMetaInt(v, report.DeferLimit)
}
if ok, err := sqliteTableExists(ctx, conn, "planner_state"); err != nil {
return nil, fmt.Errorf("planner_state table check: %w", err)
} else if ok {
if err := conn.QueryRowContext(ctx, `
SELECT COUNT(*)
FROM planner_state ps
JOIN capture_events e ON e.seq = ps.event_seq
WHERE e.state = ? AND ps.defer_count >= ?
AND NOT EXISTS (
SELECT 1
FROM capture_events barrier
WHERE barrier.branch_ref = e.branch_ref
AND barrier.branch_generation = e.branch_generation
AND barrier.seq < e.seq
AND barrier.state IN (?, ?)
)`, state.EventStatePending, report.DeferLimit, state.EventStateFailed, state.EventStateBlockedConflict).Scan(&report.ForcedAgingReady); err != nil {
return nil, fmt.Errorf("planner forced-aging summary: %w", err)
}
}
if err := loadIntentBatchWait(ctx, conn, &report); err != nil {
return nil, err
}
if !report.BatchWaitActive || report.AgeTriggerInSeconds <= 0 {
return nil, nil
}
return &listIntentWaitSummary{
waitSeconds: report.AgeTriggerInSeconds,
visiblePending: report.VisiblePendingEvents,
minPending: report.MinPending,
}, nil
}

func countLiveClients(ctx context.Context, conn *sql.DB, now time.Time, ttl time.Duration) (int, error) {
rows, err := conn.QueryContext(ctx,
`SELECT watch_pid, last_seen_ts FROM daemon_clients`)
Expand Down
62 changes: 62 additions & 0 deletions internal/cli/list_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -534,6 +534,68 @@ func TestList_PendingOnlyShowsWaitingNotBlocked(t *testing.T) {
}
}

func TestList_IntentBatchWaitShowsCountdown(t *testing.T) {
roots := withIsolatedHome(t)
ctx := context.Background()

repo, dbPath, d := makeRepoStateDB(t)
if err := state.SaveDaemonState(ctx, d, state.DaemonState{
PID: os.Getpid(), Mode: "running", HeartbeatTS: nowFloat(),
}); err != nil {
t.Fatalf("save daemon: %v", err)
}
for k, v := range map[string]string{
"commit.strategy": "intent",
"intent.min_pending": "3",
"intent.max_pending_age": "2m",
"intent.defer_limit": "1",
} {
if err := state.MetaSet(ctx, d, k, v); err != nil {
t.Fatalf("set %s: %v", k, err)
}
}
appendIntentPendingEvent(t, ctx, d, "wait-a.go", nowFloat()-10)
appendIntentPendingEvent(t, ctx, d, "wait-b.go", nowFloat()-5)
registerRepo(t, roots, repo, dbPath, "codex")

var compactOut, compactErr bytes.Buffer
if err := runList(ctx, &compactOut, &compactErr, false, false); err != nil {
t.Fatalf("runList compact: %v", err)
}
if !strings.Contains(compactOut.String(), "wait 1m") {
t.Fatalf("compact output missing wait countdown:\n%s", compactOut.String())
}

var verboseOut, verboseErr bytes.Buffer
if err := runList(ctx, &verboseOut, &verboseErr, false, true); err != nil {
t.Fatalf("runList verbose: %v", err)
}
if !strings.Contains(verboseOut.String(), "intent batch wait: pending=2/3, trigger in 1m") {
t.Fatalf("verbose output missing intent wait note:\n%s", verboseOut.String())
}

var jsonOut, jsonErr bytes.Buffer
if err := runList(ctx, &jsonOut, &jsonErr, true, false); err != nil {
t.Fatalf("runList json: %v", err)
}
var got struct {
Repos []listEntry `json:"repos"`
}
if err := json.Unmarshal(jsonOut.Bytes(), &got); err != nil {
t.Fatalf("unmarshal: %v\n%s", err, jsonOut.String())
}
if len(got.Repos) != 1 {
t.Fatalf("repos=%d, want 1", len(got.Repos))
}
entry := got.Repos[0]
if entry.Status != "waiting" || entry.IntentVisiblePending != 2 || entry.IntentMinPending != 3 {
t.Fatalf("json entry missing intent wait fields: %+v", entry)
}
if entry.IntentWaitSeconds <= 0 || entry.IntentWaitSeconds > 120 {
t.Fatalf("intent wait seconds=%d, want 1..120", entry.IntentWaitSeconds)
}
}

func TestListStatusCompact(t *testing.T) {
t.Parallel()
tests := []struct {
Expand Down