From 1b1d6c8bd1eaf1fd733bf133d4146894e9c0a08a Mon Sep 17 00:00:00 2001 From: Lex Christopherson Date: Thu, 30 Apr 2026 10:40:06 -0600 Subject: [PATCH 1/4] feat: add pretty task log diagnostics --- cmd/logs.go | 116 ++++++++++++++++++++++++++++- cmd/logs_filter.go | 84 +++++++++++++++++++++ cmd/logs_pretty.go | 111 +++++++++++++++++++++++++++ cmd/logs_test.go | 32 ++++++++ internal/logging/lifecycle.go | 99 ++++++++++++++++++++++++ internal/logging/lifecycle_test.go | 40 ++++++++++ 6 files changed, 481 insertions(+), 1 deletion(-) create mode 100644 cmd/logs_filter.go create mode 100644 cmd/logs_pretty.go create mode 100644 internal/logging/lifecycle.go create mode 100644 internal/logging/lifecycle_test.go diff --git a/cmd/logs.go b/cmd/logs.go index c54b9c0..8af6ea8 100644 --- a/cmd/logs.go +++ b/cmd/logs.go @@ -6,6 +6,7 @@ import ( "io" "os" "os/signal" + "strings" "syscall" "time" @@ -13,6 +14,20 @@ import ( "github.com/spf13/cobra" ) +type logsOptionsState struct { + sessionID string + taskID string + lastTask bool + since time.Duration + level string + pretty bool + json bool + color string + noColor bool +} + +var logsOptions logsOptionsState + func streamLogFile(ctx context.Context, logPath string, out io.Writer, pollInterval time.Duration) error { fh, err := os.Open(logPath) if err != nil { @@ -65,6 +80,10 @@ var logsCmd = &cobra.Command{ Use: "logs", Short: "Tail the daemon log file", RunE: func(cmd *cobra.Command, args []string) error { + if err := validateLogsOptions(); err != nil { + return err + } + logPath := service.LogPath() if _, err := os.Stat(logPath); os.IsNotExist(err) { return fmt.Errorf("no log file at %s — is the daemon installed?", logPath) @@ -77,10 +96,105 @@ var logsCmd = &cobra.Command{ ctx, stop := signal.NotifyContext(parent, os.Interrupt, syscall.SIGTERM) defer stop() - return streamLogFile(ctx, logPath, os.Stdout, 250*time.Millisecond) + if !logsOptions.hasStructuredMode() { + return streamLogFile(ctx, logPath, cmd.OutOrStdout(), 250*time.Millisecond) + } + + return renderLogFile(logPath, cmd.OutOrStdout()) }, } +func (opts logsOptionsState) hasStructuredMode() bool { + return opts.sessionID != "" || + opts.taskID != "" || + opts.lastTask || + opts.since > 0 || + opts.level != "" || + opts.pretty || + opts.json || + opts.color != "auto" || + opts.noColor +} + +func validateLogsOptions() error { + count := 0 + if logsOptions.sessionID != "" { + count++ + } + if logsOptions.taskID != "" { + count++ + } + if logsOptions.lastTask { + count++ + } + if count > 1 { + return fmt.Errorf("--session, --task, and --last-task are mutually exclusive") + } + if logsOptions.pretty && logsOptions.json { + return fmt.Errorf("--pretty and --json are mutually exclusive") + } + switch logsOptions.color { + case string(colorAuto), string(colorAlways), string(colorNever): + default: + return fmt.Errorf("--color must be auto, always, or never") + } + return nil +} + +func renderLogFile(logPath string, out io.Writer) error { + data, err := os.ReadFile(logPath) + if err != nil { + return err + } + lines := strings.Split(string(data), "\n") + filter := logFilter{ + TaskID: logsOptions.taskID, + SessionID: logsOptions.sessionID, + Since: logsOptions.since, + Level: logsOptions.level, + } + if logsOptions.lastTask { + filter.TaskID = latestTaskID(lines) + } + events := filterLogLines(lines, filter) + if logsOptions.json { + for _, event := range events { + if event.raw == "" { + continue + } + if _, err := fmt.Fprintln(out, event.raw); err != nil { + return err + } + } + return nil + } + mode := colorMode(logsOptions.color) + if logsOptions.noColor { + mode = colorNever + } + _, err = io.WriteString(out, renderPrettyTimeline(events, mode)) + return err +} + +func latestTaskID(lines []string) string { + events := filterLogLines(lines, logFilter{}) + for i := len(events) - 1; i >= 0; i-- { + if events[i].TaskID != "" { + return events[i].TaskID + } + } + return "" +} + func init() { + logsCmd.Flags().StringVar(&logsOptions.sessionID, "session", "", "show log events for one session") + logsCmd.Flags().StringVar(&logsOptions.taskID, "task", "", "show log events for one task") + logsCmd.Flags().BoolVar(&logsOptions.lastTask, "last-task", false, "show the latest known task in local logs") + logsCmd.Flags().DurationVar(&logsOptions.since, "since", 0, "limit logs by duration, for example 10m or 2h") + logsCmd.Flags().StringVar(&logsOptions.level, "level", "", "minimum level: debug, info, warn, error") + logsCmd.Flags().BoolVar(&logsOptions.pretty, "pretty", false, "render a human timeline") + logsCmd.Flags().BoolVar(&logsOptions.json, "json", false, "emit filtered structured JSON lines") + logsCmd.Flags().StringVar(&logsOptions.color, "color", "auto", "color mode: auto, always, never") + logsCmd.Flags().BoolVar(&logsOptions.noColor, "no-color", false, "disable color output") rootCmd.AddCommand(logsCmd) } diff --git a/cmd/logs_filter.go b/cmd/logs_filter.go new file mode 100644 index 0000000..3935c35 --- /dev/null +++ b/cmd/logs_filter.go @@ -0,0 +1,84 @@ +package cmd + +import ( + "encoding/json" + "strings" + "time" +) + +type logFilter struct { + TaskID string + SessionID string + Since time.Duration + Level string +} + +type logEvent struct { + Time string `json:"time"` + Level string `json:"level"` + Msg string `json:"msg"` + Event string `json:"event"` + Phase string `json:"phase"` + TaskID string `json:"taskId"` + SessionID string `json:"sessionId"` + RequestID string `json:"requestId"` + TraceID string `json:"traceId"` + AttemptID string `json:"attemptId"` + AttemptNumber int `json:"attemptNumber"` + TurnKind string `json:"turnKind"` + ElapsedMs int64 `json:"elapsedMs"` + Model string `json:"model"` + Provider string `json:"provider"` + PID int `json:"pid"` + FailureCode string `json:"failureCode"` + Retryable bool `json:"retryable"` + PromptPreview string `json:"promptPreview"` + Cleanup string `json:"cleanup"` + raw string +} + +func filterLogLines(lines []string, filter logFilter) []logEvent { + out := make([]logEvent, 0, len(lines)) + cutoff := time.Time{} + if filter.Since > 0 { + cutoff = time.Now().Add(-filter.Since) + } + for _, line := range lines { + line = strings.TrimSpace(line) + if line == "" { + continue + } + var event logEvent + if err := json.Unmarshal([]byte(line), &event); err != nil { + continue + } + event.raw = line + if filter.TaskID != "" && event.TaskID != filter.TaskID { + continue + } + if filter.SessionID != "" && event.SessionID != filter.SessionID { + continue + } + if !cutoff.IsZero() { + parsed, err := time.Parse(time.RFC3339Nano, event.Time) + if err == nil && parsed.Before(cutoff) { + continue + } + } + if filter.Level != "" && !levelAtLeast(event.Level, filter.Level) { + continue + } + out = append(out, event) + } + return out +} + +func levelAtLeast(got string, want string) bool { + order := map[string]int{"DEBUG": 0, "INFO": 1, "WARN": 2, "ERROR": 3} + gotLevel, gotOK := order[strings.ToUpper(got)] + wantLevel, wantOK := order[strings.ToUpper(want)] + if !gotOK || !wantOK { + return false + } + return gotLevel >= wantLevel +} diff --git a/cmd/logs_pretty.go b/cmd/logs_pretty.go new file mode 100644 index 0000000..ce72f91 --- /dev/null +++ b/cmd/logs_pretty.go @@ -0,0 +1,111 @@ +package cmd + +import ( + "fmt" + "strings" + "time" +) + +type colorMode string + +const ( + colorAuto colorMode = "auto" + colorAlways colorMode = "always" + colorNever colorMode = "never" +) + +func renderPrettyTimeline(events []logEvent, mode colorMode) string { + var b strings.Builder + for _, event := range events { + label := prettyPhase(event.Phase) + ts := prettyTime(event.Time) + line := fmt.Sprintf("%s %-18s", ts, label) + if event.TaskID != "" && event.Phase == "task_received" { + line += " task=" + shortID(event.TaskID) + } + if event.PID != 0 { + line += fmt.Sprintf(" pid=%d", event.PID) + } + if event.Model != "" { + line += " model=" + event.Model + } + if event.FailureCode != "" { + line += " " + event.FailureCode + } + if event.Retryable { + line += " retryable" + } + if event.ElapsedMs > 0 { + line += fmt.Sprintf(" elapsed=%.1fs", float64(event.ElapsedMs)/1000) + } + if event.PromptPreview != "" { + line += ` "` + event.PromptPreview + `"` + } + if event.Cleanup != "" { + line += " " + event.Cleanup + } + if mode != colorNever { + line = colorizePrettyLine(line, event.Phase) + } + b.WriteString(line) + b.WriteByte('\n') + } + return b.String() +} + +func prettyTime(raw string) string { + parsed, err := time.Parse(time.RFC3339Nano, raw) + if err != nil { + return "--:--:--" + } + return parsed.Format("15:04:05") +} + +func prettyPhase(phase string) string { + switch phase { + case "task_received": + return "task received" + case "pi_process_started": + return "pi started" + case "prompt_written": + return "prompt written" + case "first_event_seen": + return "first event seen" + case "first_visible_event_seen": + return "first visible event" + case "cleanup_started": + return "cleanup started" + case "cleanup_finished": + return "cleanup finished" + case "task_completed": + return "completed" + case "task_failed": + return "failed" + case "timed_out", "task_timed_out": + return "timed out" + default: + return strings.ReplaceAll(phase, "_", " ") + } +} + +func shortID(id string) string { + if len(id) <= 8 { + return id + } + return id[:8] +} + +func colorizePrettyLine(line string, phase string) string { + switch phase { + case "task_completed", "cleanup_finished": + return "\x1b[32m" + line + "\x1b[0m" + case "task_failed", "task_timed_out", "timed_out", "task_lost": + return "\x1b[31m" + line + "\x1b[0m" + case "waiting_input", "retry_scheduled": + return "\x1b[33m" + line + "\x1b[0m" + case "pi_process_started", "prompt_written", "first_event_seen", "first_visible_event_seen": + return "\x1b[36m" + line + "\x1b[0m" + default: + return line + } +} diff --git a/cmd/logs_test.go b/cmd/logs_test.go index cfa6aac..6101ff3 100644 --- a/cmd/logs_test.go +++ b/cmd/logs_test.go @@ -75,3 +75,35 @@ func TestStreamLogFileFollowsExistingAndAppendedContent(t *testing.T) { t.Fatalf("streamLogFile returned error: %v", err) } } + +func TestFilterLogLinesByTaskAndSession(t *testing.T) { + lines := []string{ + `{"time":"2026-04-30T12:30:55Z","level":"INFO","event":"task_lifecycle","phase":"task_received","taskId":"task-1","sessionId":"session-1"}`, + `{"time":"2026-04-30T12:30:56Z","level":"INFO","event":"task_lifecycle","phase":"task_received","taskId":"task-2","sessionId":"session-2"}`, + } + got := filterLogLines(lines, logFilter{TaskID: "task-1"}) + if len(got) != 1 || got[0].TaskID != "task-1" { + t.Fatalf("filtered by task = %#v", got) + } + got = filterLogLines(lines, logFilter{SessionID: "session-2"}) + if len(got) != 1 || got[0].SessionID != "session-2" { + t.Fatalf("filtered by session = %#v", got) + } +} + +func TestPrettyTimelineIncludesPromptPreviewAndFailureCode(t *testing.T) { + events := []logEvent{ + {Time: "2026-04-30T12:30:55Z", Event: "task_lifecycle", Phase: "task_received", TaskID: "d1fae004-71f0-481a-9980-0cd6cecf49cb", SessionID: "session-1", PromptPreview: "write the full update spec"}, + {Time: "2026-04-30T12:32:26Z", Event: "task_lifecycle", Phase: "timed_out", FailureCode: "no_first_event_timeout", ElapsedMs: 90000}, + } + got := renderPrettyTimeline(events, colorNever) + if !strings.Contains(got, "task received") || !strings.Contains(got, `"write the full update spec"`) { + t.Fatalf("missing received line: %s", got) + } + if !strings.Contains(got, "no_first_event_timeout") { + t.Fatalf("missing failure code: %s", got) + } + if strings.Contains(got, "\x1b[") { + t.Fatalf("color escaped in colorNever output: %q", got) + } +} diff --git a/internal/logging/lifecycle.go b/internal/logging/lifecycle.go new file mode 100644 index 0000000..a6c2c3a --- /dev/null +++ b/internal/logging/lifecycle.go @@ -0,0 +1,99 @@ +package logging + +import ( + "regexp" + "strings" +) + +const MaxPromptPreviewChars = 40 +const MaxPromptPreviewWords = 8 + +var secretPattern = regexp.MustCompile(`(?i)(sk-[a-z0-9_-]{8,}|xox[baprs]-[a-z0-9-]{8,}|gh[pousr]_[a-z0-9_]{8,}|bearer\s+[a-z0-9._-]{8,})`) + +type TaskLifecycleLogInput struct { + Phase string + Status string + TaskID string + SessionID string + ChannelID string + RequestID string + TraceID string + AttemptID string + AttemptNumber int + TurnKind string + ElapsedMs int64 + Model string + Provider string + PID int + FailureCode string + Retryable bool + PromptText string + Cleanup string +} + +type TaskLifecycleLogEvent struct { + Event string `json:"event"` + Phase string `json:"phase"` + Status string `json:"status,omitempty"` + TaskID string `json:"taskId,omitempty"` + SessionID string `json:"sessionId,omitempty"` + ChannelID string `json:"channelId,omitempty"` + RequestID string `json:"requestId,omitempty"` + TraceID string `json:"traceId,omitempty"` + AttemptID string `json:"attemptId,omitempty"` + AttemptNumber int `json:"attemptNumber,omitempty"` + TurnKind string `json:"turnKind,omitempty"` + ElapsedMs int64 `json:"elapsedMs,omitempty"` + Model string `json:"model,omitempty"` + Provider string `json:"provider,omitempty"` + PID int `json:"pid,omitempty"` + FailureCode string `json:"failureCode,omitempty"` + Retryable bool `json:"retryable,omitempty"` + PromptPreview string `json:"promptPreview,omitempty"` + Cleanup string `json:"cleanup,omitempty"` +} + +func PromptPreview(prompt string) string { + collapsed := strings.Join(strings.Fields(prompt), " ") + if collapsed == "" { + return "" + } + collapsed = secretPattern.ReplaceAllString(collapsed, "[REDACTED_SECRET]") + words := strings.Fields(collapsed) + if len(words) > MaxPromptPreviewWords { + words = words[:MaxPromptPreviewWords] + } + kept := make([]string, 0, len(words)) + for _, word := range words { + candidate := strings.Join(append(append([]string{}, kept...), word), " ") + if len(candidate) > MaxPromptPreviewChars { + break + } + kept = append(kept, word) + } + return strings.Join(kept, " ") +} + +func NewTaskLifecycleLog(input TaskLifecycleLogInput) TaskLifecycleLogEvent { + return TaskLifecycleLogEvent{ + Event: "task_lifecycle", + Phase: input.Phase, + Status: input.Status, + TaskID: input.TaskID, + SessionID: input.SessionID, + ChannelID: input.ChannelID, + RequestID: input.RequestID, + TraceID: input.TraceID, + AttemptID: input.AttemptID, + AttemptNumber: input.AttemptNumber, + TurnKind: input.TurnKind, + ElapsedMs: input.ElapsedMs, + Model: input.Model, + Provider: input.Provider, + PID: input.PID, + FailureCode: input.FailureCode, + Retryable: input.Retryable, + PromptPreview: PromptPreview(input.PromptText), + Cleanup: input.Cleanup, + } +} diff --git a/internal/logging/lifecycle_test.go b/internal/logging/lifecycle_test.go new file mode 100644 index 0000000..2e67ca0 --- /dev/null +++ b/internal/logging/lifecycle_test.go @@ -0,0 +1,40 @@ +package logging + +import "testing" + +func TestPromptPreviewCapsAndCollapsesWhitespace(t *testing.T) { + got := PromptPreview("write the full update spec\nwith all lifecycle diagnostics and tests") + if got != "write the full update spec with all" { + t.Fatalf("preview = %q", got) + } +} + +func TestPromptPreviewRedactsSecrets(t *testing.T) { + got := PromptPreview("deploy token sk-ant-1234567890abcdef and continue") + if got == "" || got == "deploy token sk-ant-1234567890abcdef" { + t.Fatalf("secret leaked in preview: %q", got) + } + if want := "deploy token [REDACTED_SECRET] and"; got != want { + t.Fatalf("preview = %q, want %q", got, want) + } +} + +func TestLifecycleEventFields(t *testing.T) { + ev := NewTaskLifecycleLog(TaskLifecycleLogInput{ + Phase: "prompt_written", + TaskID: "task-1", + SessionID: "session-1", + RequestID: "request-1", + TraceID: "trace-1", + AttemptID: "attempt-1", + TurnKind: "user", + ElapsedMs: 700, + PromptText: "write the plan", + }) + if ev.Event != "task_lifecycle" || ev.Phase != "prompt_written" { + t.Fatalf("event = %#v", ev) + } + if ev.PromptPreview != "write the plan" { + t.Fatalf("prompt preview = %q", ev.PromptPreview) + } +} From 1185df0e6f82037445f44084d1d3a7fe655b0c7a Mon Sep 17 00:00:00 2001 From: Lex Christopherson Date: Thu, 30 Apr 2026 10:45:55 -0600 Subject: [PATCH 2/4] feat: supervise task lifecycle attempts --- go.mod | 2 + go.sum | 2 - internal/pi/executor.go | 58 ++++- internal/pi/executor_test.go | 3 + internal/pi/worker.go | 1 + internal/session/actor.go | 265 ++++++++++++++++++++--- internal/session/turn_supervisor.go | 184 ++++++++++++++++ internal/session/turn_supervisor_test.go | 87 ++++++++ 8 files changed, 569 insertions(+), 33 deletions(-) create mode 100644 internal/session/turn_supervisor.go create mode 100644 internal/session/turn_supervisor_test.go diff --git a/go.mod b/go.mod index 7fa700b..a76f722 100644 --- a/go.mod +++ b/go.mod @@ -14,3 +14,5 @@ require ( github.com/inconshreveable/mousetrap v1.1.0 // indirect github.com/spf13/pflag v1.0.9 // indirect ) + +replace github.com/gsd-build/protocol-go => /Users/lexchristopherson/Developer/gsd/gsd-build-protocol-go/.worktrees/turn-lifecycle-protocol diff --git a/go.sum b/go.sum index 4990ba2..0cd4003 100644 --- a/go.sum +++ b/go.sum @@ -3,8 +3,6 @@ github.com/coder/websocket v1.8.14/go.mod h1:NX3SzP+inril6yawo5CQXx8+fk145lPDC6p github.com/cpuguy83/go-md2man/v2 v2.0.6/go.mod h1:oOW0eioCTA6cOiMLiUPZOpcVxMig6NIQQ7OS05n1F4g= github.com/creack/pty v1.1.24 h1:bJrF4RRfyJnbTJqzRLHzcGaZK1NeM5kTC9jGgovnR1s= github.com/creack/pty v1.1.24/go.mod h1:08sCNb52WyoAwi2QDyzUCTgcvVFhUzewun7wtTfvcwE= -github.com/gsd-build/protocol-go v0.29.1 h1:ZqJUGAuAShdtgXRr82NfV4Vq/gmUgmQVsxRBAMcDlSQ= -github.com/gsd-build/protocol-go v0.29.1/go.mod h1:vECSwMFp59Ihu5ZH4aLF5fuW9zJ4a3ZXCYngmzfBn8s= github.com/inconshreveable/mousetrap v1.1.0 h1:wN+x4NVGpMsO7ErUn/mUI3vEoE6Jt13X2s0bqwp9tc8= github.com/inconshreveable/mousetrap v1.1.0/go.mod h1:vpF70FUmC8bwa3OWnCshd2FqLfsEA9PFc4w1p2J65bw= github.com/russross/blackfriday/v2 v2.1.0/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM= diff --git a/internal/pi/executor.go b/internal/pi/executor.go index 0649bba..acd992f 100644 --- a/internal/pi/executor.go +++ b/internal/pi/executor.go @@ -97,10 +97,20 @@ type Executor struct { opts Options OnPIDStart func(pid int) OnPIDExit func(pid int) + OnLifecycle LifecycleHooks OnToolExecutionStart func(ToolExecutionStart) OnToolExecutionEnd func(ToolExecutionEnd) } +type LifecycleHooks struct { + ProcessStarted func(pid int) + PromptWritten func() + FirstEventSeen func() + FirstVisibleEvent func() + CleanupStarted func(pid int) + CleanupFinished func(pid int, cleanup string) +} + // NewExecutor constructs an Executor. Call Run to spawn. func NewExecutor(opts Options) *Executor { if opts.BinaryPath == "" { @@ -378,6 +388,9 @@ func (e *Executor) Run(ctx context.Context, onEvent func(claude.Event) error, on if e.OnPIDStart != nil { e.OnPIDStart(pid) } + if e.OnLifecycle.ProcessStarted != nil { + e.OnLifecycle.ProcessStarted(pid) + } defer func() { if e.OnPIDExit != nil { e.OnPIDExit(pid) @@ -432,10 +445,19 @@ func (e *Executor) Run(ctx context.Context, onEvent func(claude.Event) error, on "message": e.opts.Prompt, }) if _, err := stdin.Write(append(promptFrame, '\n')); err != nil { + if e.OnLifecycle.CleanupStarted != nil { + e.OnLifecycle.CleanupStarted(pid) + } _ = terminateProcessGroupAndWait(cmd, pid, stdin, 2*time.Second) + if e.OnLifecycle.CleanupFinished != nil { + e.OnLifecycle.CleanupFinished(pid, "process_group_killed") + } <-stderrDone return fmt.Errorf("write prompt frame: %w", err) } + if e.OnLifecycle.PromptWritten != nil { + e.OnLifecycle.PromptWritten() + } // After agent_end fires the parser signals via agentEndCh. Pi RPC mode // keeps running waiting for more frames; we SIGTERM it to exit. @@ -446,7 +468,7 @@ func (e *Executor) Run(ctx context.Context, onEvent func(claude.Event) error, on model: e.opts.Model, taskID: e.opts.TaskID, } - parseErr := streamPiEvents(ctx, stdout, stdin, onEvent, onUIRequest, e.OnToolExecutionStart, e.OnToolExecutionEnd, agentEndCh, true, state, startedAt) + parseErr := streamPiEvents(ctx, stdout, stdin, onEvent, onUIRequest, e.OnToolExecutionStart, e.OnToolExecutionEnd, &e.OnLifecycle, agentEndCh, true, state, startedAt) agentEnded := false select { case <-agentEndCh: @@ -454,7 +476,13 @@ func (e *Executor) Run(ctx context.Context, onEvent func(claude.Event) error, on default: } if !agentEnded { + if e.OnLifecycle.CleanupStarted != nil { + e.OnLifecycle.CleanupStarted(pid) + } waitErr := terminateProcessGroupAndWait(cmd, pid, stdin, 2*time.Second) + if e.OnLifecycle.CleanupFinished != nil { + e.OnLifecycle.CleanupFinished(pid, "process_group_killed") + } <-stderrDone if ctx.Err() != nil { return nil @@ -477,7 +505,13 @@ func (e *Executor) Run(ctx context.Context, onEvent func(claude.Event) error, on } return fmt.Errorf("pi stream ended before agent_end") } + if e.OnLifecycle.CleanupStarted != nil { + e.OnLifecycle.CleanupStarted(pid) + } waitErr := terminateProcessGroupAndWait(cmd, pid, stdin, 2*time.Second) + if e.OnLifecycle.CleanupFinished != nil { + e.OnLifecycle.CleanupFinished(pid, "process_group_killed") + } <-stderrDone if waitErr != nil { @@ -636,6 +670,7 @@ func streamPiEvents( onUIRequest UIRequestHandler, onToolExecutionStart func(ToolExecutionStart), onToolExecutionEnd func(ToolExecutionEnd), + lifecycleHooks *LifecycleHooks, agentEndCh chan<- struct{}, translate bool, state *translatorState, @@ -647,6 +682,8 @@ func streamPiEvents( if state == nil { state = &translatorState{} } + firstEventSeen := false + firstVisibleEvent := false for scanner.Scan() { line := scanner.Bytes() @@ -668,6 +705,12 @@ func streamPiEvents( if peek.Type == "response" { continue } + if !firstEventSeen { + firstEventSeen = true + if lifecycleHooks != nil && lifecycleHooks.FirstEventSeen != nil { + lifecycleHooks.FirstEventSeen() + } + } notifyToolExecutionStart(raw, onToolExecutionStart) notifyToolExecutionEnd(raw, onToolExecutionEnd) @@ -682,11 +725,23 @@ func streamPiEvents( if translate { for _, ev := range translatePiEvent(raw, state) { + if !firstVisibleEvent { + firstVisibleEvent = true + if lifecycleHooks != nil && lifecycleHooks.FirstVisibleEvent != nil { + lifecycleHooks.FirstVisibleEvent() + } + } if err := onEvent(claude.Event{Type: ev.Type, Raw: ev.Raw}); err != nil { return err } } } else { + if !firstVisibleEvent { + firstVisibleEvent = true + if lifecycleHooks != nil && lifecycleHooks.FirstVisibleEvent != nil { + lifecycleHooks.FirstVisibleEvent() + } + } if err := onEvent(claude.Event{Type: peek.Type, Raw: raw}); err != nil { return err } @@ -850,6 +905,7 @@ func (e *Executor) StreamFromReaderForTest( onUIRequest, e.OnToolExecutionStart, e.OnToolExecutionEnd, + &e.OnLifecycle, agentEndCh, true, state, diff --git a/internal/pi/executor_test.go b/internal/pi/executor_test.go index 71a17ac..7172ab2 100644 --- a/internal/pi/executor_test.go +++ b/internal/pi/executor_test.go @@ -563,6 +563,7 @@ func TestStreamPiEvents_FiresOnToolExecutionEnd(t *testing.T) { nil, nil, func(ev ToolExecutionEnd) { got = &ev }, + nil, make(chan struct{}, 1), false, state, @@ -594,6 +595,7 @@ func TestStreamPiEventsForwardsToolExecutionUpdate(t *testing.T) { nil, nil, nil, + nil, make(chan struct{}, 1), true, &translatorState{sessionID: "sess-1"}, @@ -652,6 +654,7 @@ func TestStreamPiEvents_ReturnsAgentEndError(t *testing.T) { nil, nil, nil, + nil, make(chan struct{}, 1), true, &translatorState{}, diff --git a/internal/pi/worker.go b/internal/pi/worker.go index 475ea3d..694acd3 100644 --- a/internal/pi/worker.go +++ b/internal/pi/worker.go @@ -227,6 +227,7 @@ func (w *Worker) Prompt(ctx context.Context, req PromptRequest) error { req.OnUIRequest, req.OnToolExecutionStart, req.OnToolExecutionEnd, + nil, agentEndCh, true, state, diff --git a/internal/session/actor.go b/internal/session/actor.go index 49a121d..6b2d23a 100644 --- a/internal/session/actor.go +++ b/internal/session/actor.go @@ -19,6 +19,7 @@ import ( "github.com/gsd-build/daemon/internal/api" "github.com/gsd-build/daemon/internal/claude" daemonfs "github.com/gsd-build/daemon/internal/fs" + daemonlogging "github.com/gsd-build/daemon/internal/logging" "github.com/gsd-build/daemon/internal/pi" "github.com/gsd-build/daemon/internal/pidfile" "github.com/gsd-build/daemon/internal/skills" @@ -134,6 +135,11 @@ type pendingFileTool struct { type taskContext struct { TaskID string + SessionID string + AttemptID string + AttemptNumber int + TurnKind protocol.TurnKind + Deadlines protocol.TaskDeadlines ChannelID string ActorContext context.Context StartedAt time.Time @@ -548,13 +554,17 @@ func (a *Actor) Run(ctx context.Context) error { slog.Error("task failed", "taskId", task.TaskID, "sessionId", a.opts.SessionID, "err", err) sendCtx, sendCancel := context.WithTimeout(ctx, 30*time.Second) _ = a.opts.Relay.Send(sendCtx, &protocol.TaskError{ - Type: protocol.MsgTypeTaskError, - TaskID: task.TaskID, - SessionID: a.opts.SessionID, - ChannelID: task.ChannelID, - Error: err.Error(), - RequestID: task.RequestID, - Traceparent: task.Traceparent, + Type: protocol.MsgTypeTaskError, + TaskID: task.TaskID, + AttemptID: task.AttemptID, + AttemptNumber: task.AttemptNumber, + SessionID: a.opts.SessionID, + ChannelID: task.ChannelID, + Error: err.Error(), + FailureCode: taskFailureCode(err), + Retryable: taskFailureRetryable(err), + RequestID: task.RequestID, + Traceparent: task.Traceparent, }) sendCancel() } @@ -600,6 +610,11 @@ func (a *Actor) executeTask(ctx context.Context, task protocol.Task) error { executionPrompt := contextBlock + task.Prompt tc := &taskContext{ TaskID: task.TaskID, + SessionID: a.opts.SessionID, + AttemptID: task.AttemptID, + AttemptNumber: task.AttemptNumber, + TurnKind: task.TurnKind, + Deadlines: task.DeadlineProfile, ChannelID: task.ChannelID, ActorContext: ctx, StartedAt: time.Now(), @@ -635,13 +650,15 @@ func (a *Actor) executeTask(ctx context.Context, task protocol.Task) error { sendCtx, sendCancel := context.WithTimeout(ctx, 30*time.Second) if err := a.opts.Relay.Send(sendCtx, &protocol.TaskStarted{ - Type: protocol.MsgTypeTaskStarted, - TaskID: task.TaskID, - SessionID: a.opts.SessionID, - ChannelID: tc.ChannelID, - StartedAt: tc.StartedAt.UTC().Format(time.RFC3339Nano), - RequestID: tc.RequestID, - Traceparent: tc.Traceparent, + Type: protocol.MsgTypeTaskStarted, + TaskID: task.TaskID, + AttemptID: task.AttemptID, + AttemptNumber: task.AttemptNumber, + SessionID: a.opts.SessionID, + ChannelID: tc.ChannelID, + StartedAt: tc.StartedAt.UTC().Format(time.RFC3339Nano), + RequestID: tc.RequestID, + Traceparent: tc.Traceparent, }); err != nil { sendCancel() return fmt.Errorf("send taskStarted: %w", err) @@ -656,25 +673,31 @@ func (a *Actor) executeTask(ctx context.Context, task protocol.Task) error { if taskCtx.Err() == context.DeadlineExceeded { errCtx, errCancel := context.WithTimeout(ctx, 30*time.Second) _ = a.opts.Relay.Send(errCtx, &protocol.TaskError{ - Type: protocol.MsgTypeTaskError, - TaskID: task.TaskID, - SessionID: a.opts.SessionID, - ChannelID: tc.ChannelID, - Error: fmt.Sprintf("task timed out after %s", a.taskTimeout), - RequestID: task.RequestID, - Traceparent: tc.Traceparent, + Type: protocol.MsgTypeTaskError, + TaskID: task.TaskID, + AttemptID: task.AttemptID, + AttemptNumber: task.AttemptNumber, + SessionID: a.opts.SessionID, + ChannelID: tc.ChannelID, + Error: fmt.Sprintf("task timed out after %s", a.taskTimeout), + FailureCode: "task_deadline_exceeded", + Retryable: false, + RequestID: task.RequestID, + Traceparent: tc.Traceparent, }) errCancel() return nil } cancelCtx, cancelCancel := context.WithTimeout(ctx, 30*time.Second) _ = a.opts.Relay.Send(cancelCtx, &protocol.TaskCancelled{ - Type: protocol.MsgTypeTaskCancelled, - TaskID: task.TaskID, - SessionID: a.opts.SessionID, - ChannelID: tc.ChannelID, - RequestID: task.RequestID, - Traceparent: tc.Traceparent, + Type: protocol.MsgTypeTaskCancelled, + TaskID: task.TaskID, + AttemptID: task.AttemptID, + AttemptNumber: task.AttemptNumber, + SessionID: a.opts.SessionID, + ChannelID: tc.ChannelID, + RequestID: task.RequestID, + Traceparent: tc.Traceparent, }) cancelCancel() return nil @@ -692,6 +715,146 @@ func taskActorContext(tc *taskContext, fallback context.Context) context.Context return fallback } +type actorLifecycleSink struct { + relay RelaySender + tc *taskContext + model string + provider string +} + +func (s actorLifecycleSink) Phase(phase string, fields map[string]any) { + if fields == nil { + fields = map[string]any{} + } + elapsedMs, _ := fields["elapsedMs"].(int64) + pid, _ := fields["pid"].(int) + failureCode, _ := fields["failureCode"].(string) + retryable, _ := fields["retryable"].(bool) + cleanup, _ := fields["cleanup"].(string) + traceID := protocol.TraceID(s.tc.Traceparent) + logEvent := daemonlogging.NewTaskLifecycleLog(daemonlogging.TaskLifecycleLogInput{ + Phase: phase, + Status: string(taskAttemptStatusForPhase(phase)), + TaskID: s.tc.TaskID, + SessionID: s.tc.SessionID, + ChannelID: s.tc.ChannelID, + RequestID: s.tc.RequestID, + TraceID: traceID, + AttemptID: s.tc.AttemptID, + AttemptNumber: s.tc.AttemptNumber, + TurnKind: turnKindString(s.tc.TurnKind), + ElapsedMs: elapsedMs, + Model: s.model, + Provider: s.provider, + PID: pid, + FailureCode: failureCode, + Retryable: retryable, + PromptText: s.tc.OriginalPrompt, + Cleanup: cleanup, + }) + slog.Info("task lifecycle", + "event", logEvent.Event, + "phase", logEvent.Phase, + "status", logEvent.Status, + "taskId", logEvent.TaskID, + "sessionId", logEvent.SessionID, + "channelId", logEvent.ChannelID, + "requestId", logEvent.RequestID, + "traceId", logEvent.TraceID, + "attemptId", logEvent.AttemptID, + "attemptNumber", logEvent.AttemptNumber, + "turnKind", logEvent.TurnKind, + "elapsedMs", logEvent.ElapsedMs, + "model", logEvent.Model, + "provider", logEvent.Provider, + "pid", logEvent.PID, + "failureCode", logEvent.FailureCode, + "retryable", logEvent.Retryable, + "promptPreview", logEvent.PromptPreview, + "cleanup", logEvent.Cleanup, + ) + if s.tc.AttemptID == "" || s.relay == nil { + return + } + msg := &protocol.TaskLifecycle{ + Type: protocol.MsgTypeTaskLifecycle, + TaskID: s.tc.TaskID, + AttemptID: s.tc.AttemptID, + AttemptNumber: s.tc.AttemptNumber, + SessionID: logEvent.SessionID, + ChannelID: s.tc.ChannelID, + Phase: taskLifecyclePhase(phase), + Status: taskAttemptStatusForPhase(phase), + Retryable: retryable, + FailureCode: failureCode, + ObservedAt: time.Now().UTC(), + PID: pid, + Provider: s.provider, + Model: s.model, + RequestID: s.tc.RequestID, + Traceparent: s.tc.Traceparent, + } + sendCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + if err := s.relay.Send(sendCtx, msg); err != nil { + slog.Warn("send task lifecycle failed", "taskId", s.tc.TaskID, "attemptId", s.tc.AttemptID, "phase", phase, "err", err) + } +} + +func turnKindString(kind protocol.TurnKind) string { + if kind == "" { + return "user" + } + return string(kind) +} + +func taskLifecyclePhase(phase string) protocol.TaskLifecyclePhase { + switch phase { + case "task_started": + return protocol.TaskLifecyclePhaseStarted + case "pi_process_started": + return protocol.TaskLifecyclePhasePiStarted + case "task_timed_out": + return protocol.TaskLifecyclePhaseTimedOut + default: + return protocol.TaskLifecyclePhase(phase) + } +} + +func taskAttemptStatusForPhase(phase string) protocol.TaskAttemptStatus { + switch phase { + case "task_started": + return protocol.TaskAttemptStatusStarted + case "pi_process_started": + return protocol.TaskAttemptStatusPiStarted + case "tool_started": + return protocol.TaskAttemptStatusToolRunning + case "cleanup_finished": + return protocol.TaskAttemptStatusCleanupFinished + case "task_timed_out", "timed_out": + return protocol.TaskAttemptStatusTimedOut + case "task_failed": + return protocol.TaskAttemptStatusFailed + case "task_completed": + return protocol.TaskAttemptStatusCompleted + default: + return protocol.TaskAttemptStatus(phase) + } +} + +func turnDeadlinesFromProtocol(deadlines protocol.TaskDeadlines) TurnDeadlines { + return TurnDeadlines{ + ProcessStart: time.Duration(deadlines.ProcessStartMs) * time.Millisecond, + PromptWrite: time.Duration(deadlines.PromptWriteMs) * time.Millisecond, + FirstEvent: time.Duration(deadlines.FirstEventMs) * time.Millisecond, + FirstVisibleEvent: time.Duration(deadlines.FirstVisibleEventMs) * time.Millisecond, + StreamIdle: time.Duration(deadlines.StreamIdleMs) * time.Millisecond, + ToolIdle: time.Duration(deadlines.ToolIdleMs) * time.Millisecond, + UserInput: time.Duration(deadlines.UserInputMs) * time.Millisecond, + CleanupTerm: time.Duration(deadlines.CleanupTermMs) * time.Millisecond, + } +} + func (a *Actor) runExecutor(actorCtx context.Context, taskCtx context.Context, tc *taskContext, prompt string) error { switch tc.Engine { case "", "pi": @@ -789,11 +952,48 @@ func (a *Actor) runPiExecutor(actorCtx context.Context, taskCtx context.Context, exec := pi.NewExecutor(opts) a.attachPiPIDCallbacks(exec, tc.TaskID) - exec.OnToolExecutionStart = a.capturePiToolStartWithEvidence(coordinator, tc.PlanRuntime) - exec.OnToolExecutionEnd = a.capturePiToolEndWithEvidence(tc.ChannelID, tc.PlanRuntime) + lifecycleSink := actorLifecycleSink{relay: a.opts.Relay, tc: tc, model: model, provider: provider} + toolStart := a.capturePiToolStartWithEvidence(coordinator, tc.PlanRuntime) + toolEnd := a.capturePiToolEndWithEvidence(tc.ChannelID, tc.PlanRuntime) resultRaw, err := a.forwardExecutorEvents(actorCtx, taskCtx, tc, func(ctx context.Context, onEvent func(claude.Event) error) error { - return exec.Run(ctx, onEvent, a.makePiUIHandler(ctx, tc, coordinator)) + supervisor := NewTurnSupervisor(TurnSupervisorOptions{ + TaskID: tc.TaskID, + SessionID: a.opts.SessionID, + ChannelID: tc.ChannelID, + RequestID: tc.RequestID, + TraceID: protocol.TraceID(tc.Traceparent), + AttemptID: tc.AttemptID, + AttemptNumber: tc.AttemptNumber, + TurnKind: string(tc.TurnKind), + Deadlines: turnDeadlinesFromProtocol(tc.Deadlines), + Sink: lifecycleSink, + }) + return supervisor.Run(ctx, func(supervisedCtx context.Context, hooks TurnHooks) error { + exec.OnLifecycle = pi.LifecycleHooks{ + ProcessStarted: func(pid int) { + lifecycleSink.Phase("pi_process_started", map[string]any{"pid": pid}) + }, + PromptWritten: hooks.PromptWritten, + FirstEventSeen: hooks.FirstEventSeen, + FirstVisibleEvent: hooks.FirstVisibleEventSeen, + CleanupStarted: func(pid int) { + lifecycleSink.Phase("cleanup_started", map[string]any{"pid": pid}) + }, + CleanupFinished: func(pid int, cleanup string) { + lifecycleSink.Phase("cleanup_finished", map[string]any{"pid": pid, "cleanup": cleanup}) + }, + } + exec.OnToolExecutionStart = func(event pi.ToolExecutionStart) { + hooks.ToolStarted(event.ToolCallID, event.ToolName) + toolStart(event) + } + exec.OnToolExecutionEnd = func(event pi.ToolExecutionEnd) { + hooks.ToolFinished(event.ToolCallID, event.ToolName) + toolEnd(event) + } + return exec.Run(supervisedCtx, onEvent, a.makePiUIHandler(supervisedCtx, tc, coordinator)) + }) }) if err != nil { return err @@ -1219,6 +1419,9 @@ func (a *Actor) forwardExecutorEvents(actorCtx context.Context, taskCtx context. // Send to relay frame := &protocol.Stream{ Type: protocol.MsgTypeStream, + TaskID: tc.TaskID, + AttemptID: tc.AttemptID, + AttemptNumber: tc.AttemptNumber, SessionID: a.opts.SessionID, ChannelID: tc.ChannelID, SequenceNumber: next, @@ -1335,6 +1538,8 @@ func (a *Actor) handleResult(ctx context.Context, tc *taskContext, raw json.RawM return a.opts.Relay.Send(sendCtx, &protocol.TaskComplete{ Type: protocol.MsgTypeTaskComplete, TaskID: tc.TaskID, + AttemptID: tc.AttemptID, + AttemptNumber: tc.AttemptNumber, SessionID: a.opts.SessionID, ChannelID: tc.ChannelID, ClaudeSessionID: resultSessionID, diff --git a/internal/session/turn_supervisor.go b/internal/session/turn_supervisor.go new file mode 100644 index 0000000..37c36bf --- /dev/null +++ b/internal/session/turn_supervisor.go @@ -0,0 +1,184 @@ +package session + +import ( + "context" + "sync" + "time" +) + +type TurnDeadlines struct { + ProcessStart time.Duration + PromptWrite time.Duration + FirstEvent time.Duration + FirstVisibleEvent time.Duration + StreamIdle time.Duration + ToolIdle time.Duration + UserInput time.Duration + CleanupTerm time.Duration +} + +type TurnResult struct { + FailureCode string + Retryable bool +} + +type LifecycleSink interface { + Phase(phase string, fields map[string]any) +} + +type TurnSupervisorOptions struct { + TaskID string + SessionID string + ChannelID string + RequestID string + TraceID string + AttemptID string + AttemptNumber int + TurnKind string + Deadlines TurnDeadlines + Sink LifecycleSink +} + +type TurnHooks struct { + PromptWritten func() + FirstEventSeen func() + FirstVisibleEventSeen func() + ToolStarted func(toolCallID string, toolName string) + ToolFinished func(toolCallID string, toolName string) +} + +type TurnSupervisor struct { + opts TurnSupervisorOptions + startedAt time.Time + mu sync.Mutex + retrySafe bool + result TurnResult + cancel context.CancelFunc + firstEvent bool + firstVisible bool +} + +func NewTurnSupervisor(opts TurnSupervisorOptions) *TurnSupervisor { + if opts.TurnKind == "" { + opts.TurnKind = "user" + } + return &TurnSupervisor{opts: opts, retrySafe: true} +} + +func (s *TurnSupervisor) Run(parent context.Context, run func(context.Context, TurnHooks) error) error { + s.startedAt = time.Now() + ctx, cancel := context.WithCancel(parent) + s.cancel = cancel + defer cancel() + s.emit("task_started", nil) + + hooks := TurnHooks{ + PromptWritten: func() { + s.emit("prompt_written", nil) + if s.opts.Deadlines.FirstEvent > 0 { + time.AfterFunc(s.opts.Deadlines.FirstEvent, func() { + s.mu.Lock() + shouldTimeout := !s.firstEvent && s.result.FailureCode == "" + s.mu.Unlock() + if shouldTimeout { + s.timeout("no_first_event_timeout", true) + } + }) + } + }, + FirstEventSeen: func() { + s.mu.Lock() + s.firstEvent = true + s.mu.Unlock() + s.emit("first_event_seen", nil) + }, + FirstVisibleEventSeen: func() { + s.mu.Lock() + s.firstVisible = true + s.retrySafe = false + s.mu.Unlock() + s.emit("first_visible_event_seen", nil) + }, + ToolStarted: func(toolCallID string, toolName string) { + s.mu.Lock() + s.retrySafe = false + s.mu.Unlock() + s.emit("tool_started", map[string]any{"toolCallId": toolCallID, "toolName": toolName}) + if s.opts.Deadlines.ToolIdle > 0 { + time.AfterFunc(s.opts.Deadlines.ToolIdle, func() { + s.timeout("tool_idle_timeout", false) + }) + } + }, + ToolFinished: func(toolCallID string, toolName string) { + s.emit("tool_finished", map[string]any{"toolCallId": toolCallID, "toolName": toolName}) + }, + } + + err := run(ctx, hooks) + s.mu.Lock() + hasFailure := s.result.FailureCode != "" + s.mu.Unlock() + if hasFailure { + return turnFailureError{result: s.Result()} + } + return err +} + +type turnFailureError struct { + result TurnResult +} + +func (e turnFailureError) Error() string { + return e.result.FailureCode +} + +func taskFailureCode(err error) string { + if failure, ok := err.(turnFailureError); ok { + return failure.result.FailureCode + } + return "" +} + +func taskFailureRetryable(err error) bool { + if failure, ok := err.(turnFailureError); ok { + return failure.result.Retryable + } + return false +} + +func (s *TurnSupervisor) Result() TurnResult { + s.mu.Lock() + defer s.mu.Unlock() + return s.result +} + +func (s *TurnSupervisor) timeout(code string, retryable bool) { + s.mu.Lock() + if s.result.FailureCode != "" { + s.mu.Unlock() + return + } + s.result = TurnResult{FailureCode: code, Retryable: retryable && s.retrySafe} + cancel := s.cancel + s.mu.Unlock() + s.emit("task_timed_out", map[string]any{"failureCode": code, "retryable": retryable}) + if cancel != nil { + cancel() + } +} + +func (s *TurnSupervisor) emit(phase string, fields map[string]any) { + if s.opts.Sink == nil { + return + } + if fields == nil { + fields = map[string]any{} + } + fields["taskId"] = s.opts.TaskID + fields["sessionId"] = s.opts.SessionID + fields["attemptId"] = s.opts.AttemptID + fields["turnKind"] = s.opts.TurnKind + fields["elapsedMs"] = time.Since(s.startedAt).Milliseconds() + s.opts.Sink.Phase(phase, fields) +} diff --git a/internal/session/turn_supervisor_test.go b/internal/session/turn_supervisor_test.go new file mode 100644 index 0000000..5fb9dfc --- /dev/null +++ b/internal/session/turn_supervisor_test.go @@ -0,0 +1,87 @@ +package session + +import ( + "context" + "testing" + "time" +) + +func TestTurnSupervisorNoFirstEventTimeoutIsRetryable(t *testing.T) { + sink := &recordingLifecycleSink{} + supervisor := NewTurnSupervisor(TurnSupervisorOptions{ + TaskID: "task-1", + SessionID: "session-1", + AttemptID: "attempt-1", + Deadlines: TurnDeadlines{ + FirstEvent: 10 * time.Millisecond, + CleanupTerm: 10 * time.Millisecond, + }, + Sink: sink, + }) + + err := supervisor.Run(context.Background(), func(ctx context.Context, hooks TurnHooks) error { + hooks.PromptWritten() + <-ctx.Done() + return ctx.Err() + }) + + result := supervisor.Result() + if err == nil { + t.Fatal("expected timeout error") + } + if result.FailureCode != "no_first_event_timeout" || !result.Retryable { + t.Fatalf("result = %#v", result) + } + if !sink.HasPhase("prompt_written") || !sink.HasPhase("task_timed_out") { + t.Fatalf("phases = %#v", sink.Phases()) + } +} + +func TestTurnSupervisorToolStartMakesRetryUnsafe(t *testing.T) { + sink := &recordingLifecycleSink{} + supervisor := NewTurnSupervisor(TurnSupervisorOptions{ + TaskID: "task-1", + SessionID: "session-1", + AttemptID: "attempt-1", + Deadlines: TurnDeadlines{ + ToolIdle: 10 * time.Millisecond, + CleanupTerm: 10 * time.Millisecond, + }, + Sink: sink, + }) + + _ = supervisor.Run(context.Background(), func(ctx context.Context, hooks TurnHooks) error { + hooks.PromptWritten() + hooks.FirstEventSeen() + hooks.FirstVisibleEventSeen() + hooks.ToolStarted("tool-1", "bash") + <-ctx.Done() + return ctx.Err() + }) + + result := supervisor.Result() + if result.FailureCode != "tool_idle_timeout" || result.Retryable { + t.Fatalf("result = %#v", result) + } +} + +type recordingLifecycleSink struct { + phases []string +} + +func (s *recordingLifecycleSink) Phase(phase string, fields map[string]any) { + s.phases = append(s.phases, phase) +} + +func (s *recordingLifecycleSink) HasPhase(phase string) bool { + for _, got := range s.phases { + if got == phase { + return true + } + } + return false +} + +func (s *recordingLifecycleSink) Phases() []string { + return append([]string(nil), s.phases...) +} From 6967dc10d994caeac0d7a61f88b721d93b8eafc0 Mon Sep 17 00:00:00 2001 From: Lex Christopherson Date: Thu, 30 Apr 2026 11:01:48 -0600 Subject: [PATCH 3/4] feat: expose plan tools by attempt --- internal/pi/executor.go | 4 +++- internal/pi/executor_test.go | 10 ++++++++++ internal/pi/extension/plan-tools.js | 1 + 3 files changed, 14 insertions(+), 1 deletion(-) diff --git a/internal/pi/executor.go b/internal/pi/executor.go index acd992f..78a5ea4 100644 --- a/internal/pi/executor.go +++ b/internal/pi/executor.go @@ -609,7 +609,7 @@ func browserEnv(base []string, grantID string, browserID string, sessionID strin } func planCapabilityEnv(base []string, cap *protocol.PlanCapability) []string { - env := make([]string, 0, len(base)+3) + env := make([]string, 0, len(base)+5) for _, entry := range base { if strings.HasPrefix(entry, "GSD_PLAN_") { continue @@ -618,6 +618,8 @@ func planCapabilityEnv(base []string, cap *protocol.PlanCapability) []string { } if cap != nil { env = append(env, + "GSD_PLAN_CAPABILITY_ID="+cap.ID, + "GSD_PLAN_CAPABILITY_ATTEMPT_ID="+cap.AttemptID, "GSD_PLAN_API_BASE_URL="+cap.APIBaseURL, "GSD_PLAN_CAPABILITY_TOKEN="+cap.Token, "GSD_PLAN_CAPABILITY_EXPIRES_AT="+cap.ExpiresAt, diff --git a/internal/pi/executor_test.go b/internal/pi/executor_test.go index 7172ab2..622ae2a 100644 --- a/internal/pi/executor_test.go +++ b/internal/pi/executor_test.go @@ -237,6 +237,8 @@ func TestExecutorPassesPlanCapabilityEnv(t *testing.T) { envFile := filepath.Join(t.TempDir(), "pi.env") fakePi := writeFakePi(t, ` { + printf 'GSD_PLAN_CAPABILITY_ID=%s\n' "${GSD_PLAN_CAPABILITY_ID:-}" + printf 'GSD_PLAN_CAPABILITY_ATTEMPT_ID=%s\n' "${GSD_PLAN_CAPABILITY_ATTEMPT_ID:-}" printf 'GSD_PLAN_API_BASE_URL=%s\n' "${GSD_PLAN_API_BASE_URL:-}" printf 'GSD_PLAN_CAPABILITY_TOKEN=%s\n' "${GSD_PLAN_CAPABILITY_TOKEN:-}" printf 'GSD_PLAN_CAPABILITY_EXPIRES_AT=%s\n' "${GSD_PLAN_CAPABILITY_EXPIRES_AT:-}" @@ -257,6 +259,8 @@ printf '%s\n' '{"type":"agent_end","messages":[{"role":"assistant","content":[{" Provider: "claude-cli", Prompt: "hello", PlanCapability: &protocol.PlanCapability{ + ID: "capability-1", + AttemptID: "attempt-1", APIBaseURL: "https://app.test", Token: "gsd_plan_test_secret", ExpiresAt: "2026-04-28T22:30:00Z", @@ -275,6 +279,12 @@ printf '%s\n' '{"type":"agent_end","messages":[{"role":"assistant","content":[{" if !strings.Contains(got, "GSD_PLAN_API_BASE_URL=https://app.test\n") { t.Fatalf("env missing api base url: %s", got) } + if !strings.Contains(got, "GSD_PLAN_CAPABILITY_ID=capability-1\n") { + t.Fatalf("env missing capability id: %s", got) + } + if !strings.Contains(got, "GSD_PLAN_CAPABILITY_ATTEMPT_ID=attempt-1\n") { + t.Fatalf("env missing attempt id: %s", got) + } if !strings.Contains(got, "GSD_PLAN_CAPABILITY_TOKEN=gsd_plan_test_secret\n") { t.Fatalf("env missing capability token: %s", got) } diff --git a/internal/pi/extension/plan-tools.js b/internal/pi/extension/plan-tools.js index 6db05e3..5c2251d 100644 --- a/internal/pi/extension/plan-tools.js +++ b/internal/pi/extension/plan-tools.js @@ -233,6 +233,7 @@ const PlanDoneParams = Type.Object({ export function hasPlanCapability(env = process.env) { return Boolean( env.GSD_PLAN_API_BASE_URL && + env.GSD_PLAN_CAPABILITY_ATTEMPT_ID && env.GSD_PLAN_CAPABILITY_TOKEN && env.GSD_PLAN_CAPABILITY_EXPIRES_AT, ); From 5913ff0b5a2d15d1ff5e6d417fb5db52cdb6dc6b Mon Sep 17 00:00:00 2001 From: Lex Christopherson Date: Thu, 30 Apr 2026 11:15:41 -0600 Subject: [PATCH 4/4] chore: consume task lifecycle protocol --- go.mod | 4 +--- go.sum | 2 ++ 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/go.mod b/go.mod index a76f722..640db9a 100644 --- a/go.mod +++ b/go.mod @@ -5,7 +5,7 @@ go 1.26.2 require ( github.com/coder/websocket v1.8.14 github.com/creack/pty v1.1.24 - github.com/gsd-build/protocol-go v0.29.1 + github.com/gsd-build/protocol-go v0.32.0 github.com/spf13/cobra v1.10.2 gopkg.in/natefinch/lumberjack.v2 v2.2.1 ) @@ -14,5 +14,3 @@ require ( github.com/inconshreveable/mousetrap v1.1.0 // indirect github.com/spf13/pflag v1.0.9 // indirect ) - -replace github.com/gsd-build/protocol-go => /Users/lexchristopherson/Developer/gsd/gsd-build-protocol-go/.worktrees/turn-lifecycle-protocol diff --git a/go.sum b/go.sum index 0cd4003..817f9a6 100644 --- a/go.sum +++ b/go.sum @@ -3,6 +3,8 @@ github.com/coder/websocket v1.8.14/go.mod h1:NX3SzP+inril6yawo5CQXx8+fk145lPDC6p github.com/cpuguy83/go-md2man/v2 v2.0.6/go.mod h1:oOW0eioCTA6cOiMLiUPZOpcVxMig6NIQQ7OS05n1F4g= github.com/creack/pty v1.1.24 h1:bJrF4RRfyJnbTJqzRLHzcGaZK1NeM5kTC9jGgovnR1s= github.com/creack/pty v1.1.24/go.mod h1:08sCNb52WyoAwi2QDyzUCTgcvVFhUzewun7wtTfvcwE= +github.com/gsd-build/protocol-go v0.32.0 h1:4Vk/8GFH8s539xx01EFENO7snhJkndvnp9OxiANoCSI= +github.com/gsd-build/protocol-go v0.32.0/go.mod h1:vECSwMFp59Ihu5ZH4aLF5fuW9zJ4a3ZXCYngmzfBn8s= github.com/inconshreveable/mousetrap v1.1.0 h1:wN+x4NVGpMsO7ErUn/mUI3vEoE6Jt13X2s0bqwp9tc8= github.com/inconshreveable/mousetrap v1.1.0/go.mod h1:vpF70FUmC8bwa3OWnCshd2FqLfsEA9PFc4w1p2J65bw= github.com/russross/blackfriday/v2 v2.1.0/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM=