diff --git a/cmd/internal/exec.go b/cmd/internal/exec.go index cd02a50f..97b158c5 100644 --- a/cmd/internal/exec.go +++ b/cmd/internal/exec.go @@ -4,10 +4,14 @@ import ( "errors" "fmt" "os" + osExec "os/exec" "path/filepath" "strings" + "syscall" "time" + "github.com/google/uuid" + tuikitIO "github.com/flowexec/tuikit/io" "github.com/flowexec/tuikit/views" "github.com/gen2brain/beeep" @@ -33,6 +37,11 @@ import ( "github.com/flowexec/flow/types/workspace" ) +const ( + // backgroundRunIDEnv is set on child processes spawned by --background. + backgroundRunIDEnv = "FLOW_BACKGROUND_RUN_ID" +) + func RegisterExecCmd(ctx *context.Context, rootCmd *cobra.Command) { subCmd := &cobra.Command{ Use: "exec EXECUTABLE_ID [args...]", @@ -76,6 +85,7 @@ func RegisterExecCmd(ctx *context.Context, rootCmd *cobra.Command) { } RegisterFlag(ctx, subCmd, *flags.ParameterValueFlag) RegisterFlag(ctx, subCmd, *flags.LogModeFlag) + RegisterFlag(ctx, subCmd, *flags.BackgroundFlag) rootCmd.AddCommand(subCmd) } @@ -130,6 +140,20 @@ func execFunc(ctx *context.Context, cmd *cobra.Command, verb executable.Verb, ar )) } + // Handle --background: spawn a detached child process and return immediately. + background := flags.ValueFor[bool](cmd, *flags.BackgroundFlag, false) + if background { + launchBackground(ctx, ref, verb, args) + return + } + + // If this is a background child process, eagerly record the log archive path + // so that `logs --running` can stream output while we're still executing. + bgRunID := os.Getenv(backgroundRunIDEnv) + if bgRunID != "" { + linkBackgroundArchive(ctx, bgRunID) + } + if ctx.DataStore != nil { if err := ctx.DataStore.CreateProcessBucket(ref.String()); err != nil { logger.Log().FatalErr(err) @@ -152,6 +176,11 @@ func execFunc(ctx *context.Context, cmd *cobra.Command, verb executable.Verb, ar cleanupProcessStore(ctx) recordExecution(ctx, ref, startTime, dur, runErr) + // Update background run record if this is a child process. + if bgRunID != "" { + finalizeBackgroundRun(ctx, bgRunID, runErr) + } + if runErr != nil { logger.Log().FatalErr(runErr) } @@ -159,6 +188,116 @@ func execFunc(ctx *context.Context, cmd *cobra.Command, verb executable.Verb, ar sendCompletionNotifications(ctx, cmd, dur) } +// launchBackground spawns a detached flow process for the given executable and returns immediately. +func launchBackground(ctx *context.Context, ref executable.Ref, verb executable.Verb, args []string) { + runID := uuid.New().String()[:8] + + // Build the child command: same verb + args. Stdout/stderr are set to nil so + // Go redirects them to /dev/null — terminal output is suppressed but the tuikit + // archive handler still writes to the log file normally. + childArgs := []string{string(verb)} + if len(args) > 0 { + childArgs = append(childArgs, args...) + } + + flowBin, err := os.Executable() + if err != nil { + logger.Log().FatalErr(fmt.Errorf("unable to find flow binary: %w", err)) + } + + child := osExec.Command(flowBin, childArgs...) + child.Env = append(os.Environ(), fmt.Sprintf("%s=%s", backgroundRunIDEnv, runID)) + child.SysProcAttr = &syscall.SysProcAttr{Setpgid: true} + child.Stdout = nil + child.Stderr = nil + child.Stdin = nil + + if err := child.Start(); err != nil { + logger.Log().FatalErr(fmt.Errorf("failed to start background process: %w", err)) + } + + run := store.BackgroundRun{ + ID: runID, + PID: child.Process.Pid, + Ref: ref.String(), + StartedAt: time.Now(), + Status: store.BackgroundRunning, + } + if ctx.DataStore != nil { + if err := ctx.DataStore.SaveBackgroundRun(run); err != nil { + logger.Log().Errorf("failed to save background run record: %v", err) + } + } + + // Release the child process so it survives parent exit. + _ = child.Process.Release() + + logger.Log().Println(fmt.Sprintf("Started background run %s (PID %d) for %s", runID, run.PID, ref)) +} + +// linkBackgroundArchive eagerly writes the log archive path into the background run +// record so that `logs attach` can stream output while the child is still executing. +// Unlike findArchiveByID, this scans the log directory directly without skipping empty +// files — the archive file exists at startup but may not have content yet. +func linkBackgroundArchive(ctx *context.Context, runID string) { + if ctx.DataStore == nil || ctx.LogArchiveID == "" { + return + } + archivePath := findArchiveFileByID(ctx.LogArchiveID) + if archivePath == "" { + return + } + run, err := ctx.DataStore.GetBackgroundRun(runID) + if err != nil { + return + } + run.LogArchiveID = archivePath + _ = ctx.DataStore.SaveBackgroundRun(run) +} + +// findArchiveFileByID scans the logs directory for a file whose name starts with the +// given archive ID. Unlike ListArchiveEntries, this does not skip empty files. +func findArchiveFileByID(archiveID string) string { + logsDir := filesystem.LogsDir() + files, err := os.ReadDir(logsDir) + if err != nil { + return "" + } + for _, f := range files { + if f.IsDir() { + continue + } + if strings.HasPrefix(f.Name(), archiveID) { + return filepath.Join(logsDir, f.Name()) + } + } + return "" +} + +// finalizeBackgroundRun updates the background run record with the final status. +func finalizeBackgroundRun(ctx *context.Context, runID string, runErr error) { + if ctx.DataStore == nil { + return + } + run, err := ctx.DataStore.GetBackgroundRun(runID) + if err != nil { + logger.Log().Debug("failed to load background run for finalization", "err", err) + return + } + now := time.Now() + run.CompletedAt = &now + run.LogArchiveID = findArchiveByID(ctx.LogArchiveID) + if runErr != nil { + run.Status = store.BackgroundFailed + run.Error = runErr.Error() + } else { + run.Status = store.BackgroundCompleted + } + if err := ctx.DataStore.SaveBackgroundRun(run); err != nil { + logger.Log().Debug("failed to finalize background run", "err", err) + } +} + func buildExecEnv(ctx *context.Context, cmd *cobra.Command, e *executable.Executable) map[string]string { envMap := make(map[string]string) if wsData, err := ctx.WorkspacesCache.GetWorkspaceConfigList(); err != nil { diff --git a/cmd/internal/flags/types.go b/cmd/internal/flags/types.go index 0ba201a7..fd4f2737 100644 --- a/cmd/internal/flags/types.go +++ b/cmd/internal/flags/types.go @@ -221,6 +221,21 @@ var StoreAllFlag = &Metadata{ Default: false, } +var BackgroundFlag = &Metadata{ + Name: "background", + Shorthand: "b", + Usage: "Run the executable in the background and return a run ID immediately.", + Default: false, + Required: false, +} + +var RunningFlag = &Metadata{ + Name: "running", + Usage: "Show only active background processes.", + Default: false, + Required: false, +} + var ParameterValueFlag = &Metadata{ Name: "param", Shorthand: "p", diff --git a/cmd/internal/logs.go b/cmd/internal/logs.go index 6d8c84f5..b47f5e2c 100644 --- a/cmd/internal/logs.go +++ b/cmd/internal/logs.go @@ -2,11 +2,14 @@ package internal import ( "fmt" + "io" + "os" + "os/signal" "regexp" "strconv" - "time" - "strings" + "syscall" + "time" tuikitIO "github.com/flowexec/tuikit/io" "github.com/spf13/cobra" @@ -16,6 +19,7 @@ import ( "github.com/flowexec/flow/pkg/context" "github.com/flowexec/flow/pkg/filesystem" "github.com/flowexec/flow/pkg/logger" + "github.com/flowexec/flow/pkg/store" "github.com/flowexec/flow/types/executable" ) @@ -51,11 +55,40 @@ func RegisterLogsCmd(ctx *context.Context, rootCmd *cobra.Command) { }, } + killCmd := &cobra.Command{ + Use: "kill RUN_ID", + Short: "Terminate a background process.", + Long: "Send a termination signal to a running background process identified by its run ID.", + Args: cobra.ExactArgs(1), + Run: func(cmd *cobra.Command, args []string) { + logsKillFunc(ctx, args[0]) + }, + } + + attachCmd := &cobra.Command{ + Use: "attach RUN_ID", + Short: "Attach to a background process output.", + Long: "Stream the log output of a background process identified by its run ID.", + Args: cobra.ExactArgs(1), + Run: func(cmd *cobra.Command, args []string) { + logsAttachFunc(ctx, args[0]) + }, + } + subCmd.AddCommand(clearCmd) + subCmd.AddCommand(killCmd) + subCmd.AddCommand(attachCmd) + RegisterFlag(ctx, subCmd, *flags.RunningFlag) rootCmd.AddCommand(subCmd) } func logFunc(ctx *context.Context, cmd *cobra.Command, args []string) { + running := flags.ValueFor[bool](cmd, *flags.RunningFlag, false) + if running { + logsRunningFunc(ctx, cmd) + return + } + lastEntry := flags.ValueFor[bool](cmd, *flags.LastLogEntryFlag, false) outputFormat := flags.ValueFor[string](cmd, *flags.OutputFormatFlag, false) if err := filesystem.EnsureLogsDir(); err != nil { @@ -189,3 +222,155 @@ func deleteAssociatedLogs(ctx *context.Context, ref string) { } } } + +// logsRunningFunc lists active background processes using the same output/TUI +// patterns as regular execution history. +func logsRunningFunc(ctx *context.Context, cmd *cobra.Command) { + if ctx.DataStore == nil { + logger.Log().Fatalf("data store is not available") + } + + runs, err := ctx.DataStore.ListBackgroundRuns() + if err != nil { + logger.Log().FatalErr(err) + } + + // Prune stale entries and collect active runs. + var active []store.BackgroundRun + for _, run := range runs { + if run.Status != store.BackgroundRunning { + continue + } + if !isProcessAlive(run.PID) { + now := time.Now() + run.Status = store.BackgroundFailed + run.Error = "process exited unexpectedly" + run.CompletedAt = &now + _ = ctx.DataStore.SaveBackgroundRun(run) + continue + } + active = append(active, run) + } + + outputFormat := flags.ValueFor[string](cmd, *flags.OutputFormatFlag, false) + + if TUIEnabled(ctx, cmd) { + view := logs.NewBackgroundRunsView(ctx.TUIContainer, active, ctx.DataStore) + SetView(ctx, cmd, view) + return + } + + logs.PrintBackgroundRuns(outputFormat, active) +} + +// logsKillFunc terminates a background process by run ID. +func logsKillFunc(ctx *context.Context, runID string) { + if ctx.DataStore == nil { + logger.Log().Fatalf("data store is not available") + } + + run, err := ctx.DataStore.GetBackgroundRun(runID) + if err != nil { + logger.Log().FatalErr(fmt.Errorf("background run %s not found: %w", runID, err)) + } + + if run.Status != store.BackgroundRunning { + logger.Log().Fatalf("background run %s is not running (status: %s)", runID, run.Status) + } + + proc, err := os.FindProcess(run.PID) + if err != nil { + logger.Log().FatalErr(fmt.Errorf("unable to find process %d: %w", run.PID, err)) + } + + if err := proc.Signal(syscall.SIGTERM); err != nil { + logger.Log().FatalErr(fmt.Errorf("failed to terminate process %d: %w", run.PID, err)) + } + + now := time.Now() + run.Status = store.BackgroundFailed + run.Error = "killed by user" + run.CompletedAt = &now + if err := ctx.DataStore.SaveBackgroundRun(run); err != nil { + logger.Log().Errorf("failed to update background run record: %v", err) + } + + logger.Log().Println(fmt.Sprintf("Terminated background run %s (PID %d).", runID, run.PID)) +} + +// logsAttachFunc streams log output from a background process, tail-following +// the log archive file until the process exits or the user interrupts. +func logsAttachFunc(ctx *context.Context, runID string) { + if ctx.DataStore == nil { + logger.Log().Fatalf("data store is not available") + } + + run, err := ctx.DataStore.GetBackgroundRun(runID) + if err != nil { + logger.Log().FatalErr(fmt.Errorf("background run %s not found: %w", runID, err)) + } + + archivePath := run.LogArchiveID + if archivePath == "" { + logger.Log().Fatalf("no log output available for background run %s (archive not yet linked)", runID) + } + + f, err := os.Open(archivePath) + if err != nil { + logger.Log().FatalErr(fmt.Errorf("unable to open log archive: %w", err)) + } + defer f.Close() + + sigCh := make(chan os.Signal, 1) + signal.Notify(sigCh, syscall.SIGINT, syscall.SIGTERM) + defer signal.Stop(sigCh) + + buf := make([]byte, 4096) + var pos int64 + for { + select { + case <-sigCh: + _, _ = fmt.Fprintln(ctx.StdOut(), "\nDetached.") + return + default: + } + + n, readErr := f.ReadAt(buf, pos) + if n > 0 { + _, _ = ctx.StdOut().Write(buf[:n]) + pos += int64(n) + continue + } + + // No new data — check if the process is still alive. + if !isProcessAlive(run.PID) { + // Final drain. + for { + n, _ = f.ReadAt(buf, pos) + if n == 0 { + break + } + _, _ = ctx.StdOut().Write(buf[:n]) + pos += int64(n) + } + _, _ = fmt.Fprintln(ctx.StdOut(), "\nBackground process exited.") + return + } + + if readErr != nil && readErr != io.EOF { + logger.Log().FatalErr(fmt.Errorf("error reading log: %w", readErr)) + } + + time.Sleep(200 * time.Millisecond) + } +} + +// isProcessAlive checks whether a process with the given PID is still running. +func isProcessAlive(pid int) bool { + proc, err := os.FindProcess(pid) + if err != nil { + return false + } + // On Unix, signal 0 checks for process existence without actually sending a signal. + return proc.Signal(syscall.Signal(0)) == nil +} diff --git a/docs/.vitepress/config.mts b/docs/.vitepress/config.mts index eee3bdbb..652dd7da 100644 --- a/docs/.vitepress/config.mts +++ b/docs/.vitepress/config.mts @@ -75,7 +75,9 @@ export default defineConfig({ collapsed: true, items: [ { text: 'flow logs', link: '/cli/flow_logs' }, - { text: 'flow logs clear', link: '/cli/flow_logs_clear' } + { text: 'flow logs attach', link: '/cli/flow_logs_attach' }, + { text: 'flow logs clear', link: '/cli/flow_logs_clear' }, + { text: 'flow logs kill', link: '/cli/flow_logs_kill' } ] }, { text: 'flow mcp', link: '/cli/flow_mcp' }, diff --git a/docs/cli/flow_exec.md b/docs/cli/flow_exec.md index bad2397e..6bce3ce5 100644 --- a/docs/cli/flow_exec.md +++ b/docs/cli/flow_exec.md @@ -50,6 +50,7 @@ flow exec EXECUTABLE_ID [args...] [flags] ### Options ``` + -b, --background Run the executable in the background and return a run ID immediately. -h, --help help for exec -m, --log-mode string Log mode (text, logfmt, json, hidden) -p, --param stringArray Set a parameter value by env key. (i.e. KEY=value) Use multiple times to set multiple parameters.This will override any existing parameter values defined for the executable. diff --git a/docs/cli/flow_logs.md b/docs/cli/flow_logs.md index 5baa8618..9a80da47 100644 --- a/docs/cli/flow_logs.md +++ b/docs/cli/flow_logs.md @@ -17,6 +17,7 @@ flow logs [ref] [flags] --last Print the last execution's logs --limit int Maximum number of records to display. -o, --output string Output format. One of: yaml, json, or tui. + --running Show only active background processes. --since string Filter history to entries after a duration (e.g. 1h, 30m, 7d). --status string Filter history by status (success or failure). -w, --workspace string Filter history by workspace name. @@ -32,5 +33,7 @@ flow logs [ref] [flags] ### SEE ALSO * [flow](flow.md) - flow is a command line interface designed to make managing and running development workflows easier. +* [flow logs attach](flow_logs_attach.md) - Attach to a background process output. * [flow logs clear](flow_logs_clear.md) - Clear execution history and logs. +* [flow logs kill](flow_logs_kill.md) - Terminate a background process. diff --git a/docs/cli/flow_logs_attach.md b/docs/cli/flow_logs_attach.md new file mode 100644 index 00000000..d97b3903 --- /dev/null +++ b/docs/cli/flow_logs_attach.md @@ -0,0 +1,29 @@ +## flow logs attach + +Attach to a background process output. + +### Synopsis + +Stream the log output of a background process identified by its run ID. + +``` +flow logs attach RUN_ID [flags] +``` + +### Options + +``` + -h, --help help for attach +``` + +### Options inherited from parent commands + +``` + -L, --log-level string Log verbosity level (debug, info, fatal) (default "info") + --sync Sync flow cache and workspaces +``` + +### SEE ALSO + +* [flow logs](flow_logs.md) - View execution history and logs. + diff --git a/docs/cli/flow_logs_kill.md b/docs/cli/flow_logs_kill.md new file mode 100644 index 00000000..60ed0544 --- /dev/null +++ b/docs/cli/flow_logs_kill.md @@ -0,0 +1,29 @@ +## flow logs kill + +Terminate a background process. + +### Synopsis + +Send a termination signal to a running background process identified by its run ID. + +``` +flow logs kill RUN_ID [flags] +``` + +### Options + +``` + -h, --help help for kill +``` + +### Options inherited from parent commands + +``` + -L, --log-level string Log verbosity level (debug, info, fatal) (default "info") + --sync Sync flow cache and workspaces +``` + +### SEE ALSO + +* [flow logs](flow_logs.md) - View execution history and logs. + diff --git a/docs/guides/execution-history.md b/docs/guides/execution-history.md index 996ba72b..5dcaf949 100644 --- a/docs/guides/execution-history.md +++ b/docs/guides/execution-history.md @@ -65,6 +65,56 @@ flow logs -w api --status success --since 7d Filters work with all output modes (`--last`, `-o yaml`, TUI, etc.). +## Background Execution + +Run any executable in the background to free up your terminal for other work. The process is detached and tracked +automatically — you can check on it, read its output, or terminate it at any time. + +### Starting a Background Run + +Add the `--background` (or `-b`) flag to any `exec` command: + +```shell +flow exec my-task --background +# Started background run a1b2c3d4 (PID 54321) for exec flow/:my-task +``` + +The command returns a short **run ID** immediately. The executable runs in a detached process with its output captured in the log archive. + +### Listing Active Runs + +See what's currently running in the background: + +```shell +flow logs --running +# a1b2c3d4 PID 54321 exec flow/:my-task running 5m30s +``` + +### Streaming Output + +Attach to a background run to stream its log output in real time: + +```shell +flow logs attach a1b2c3d4 +``` + +This tail-follows the log file, printing new output as it appears. Press `Ctrl-C` to detach without +stopping the process. When the background process exits, the stream ends automatically. + +### Terminating a Run + +Stop a running background process by its run ID: + +```shell +flow logs kill a1b2c3d4 +# Terminated background run a1b2c3d4 (PID 54321). +``` + +> [!NOTE] +> Background runs cannot prompt for interactive input (`reviewRequired` gates, parameter prompts). +> Make sure all required parameters are provided via `--param` flags or environment variables when +> using `--background`. + ## Clearing History ```shell diff --git a/internal/io/logs/background.go b/internal/io/logs/background.go new file mode 100644 index 00000000..9dc75bf9 --- /dev/null +++ b/internal/io/logs/background.go @@ -0,0 +1,145 @@ +package logs + +import ( + "encoding/json" + "fmt" + "time" + + "os" + "syscall" + + "github.com/flowexec/tuikit" + "github.com/flowexec/tuikit/themes" + "github.com/flowexec/tuikit/types" + "github.com/flowexec/tuikit/views" + + "gopkg.in/yaml.v3" + + "github.com/flowexec/flow/internal/io/common" + "github.com/flowexec/flow/pkg/logger" + "github.com/flowexec/flow/pkg/store" +) + +type backgroundRunOutput struct { + ID string `json:"id" yaml:"id"` + PID int `json:"pid" yaml:"pid"` + Ref string `json:"ref" yaml:"ref"` + StartedAt string `json:"startedAt" yaml:"startedAt"` + Status string `json:"status" yaml:"status"` + Error string `json:"error,omitempty" yaml:"error,omitempty"` +} + +type backgroundRunsResponse struct { + Runs []backgroundRunOutput `json:"runs" yaml:"runs"` +} + +func toBackgroundRunOutput(r store.BackgroundRun) backgroundRunOutput { + return backgroundRunOutput{ + ID: r.ID, + PID: r.PID, + Ref: r.Ref, + StartedAt: r.StartedAt.Format(time.RFC3339), + Status: string(r.Status), + Error: r.Error, + } +} + +// PrintBackgroundRuns outputs background runs in the specified format (json, yaml, or plain text). +func PrintBackgroundRuns(format string, runs []store.BackgroundRun) { + out := make([]backgroundRunOutput, len(runs)) + for i, r := range runs { + out[i] = toBackgroundRunOutput(r) + } + + switch common.NormalizeFormat(format) { + case common.JSONFormat: + data, err := json.MarshalIndent(backgroundRunsResponse{Runs: out}, "", " ") + if err != nil { + logger.Log().Fatalf("Failed to marshal background runs - %v", err) + } + logger.Log().Println(string(data)) + case common.YAMLFormat: + data, err := yaml.Marshal(backgroundRunsResponse{Runs: out}) + if err != nil { + logger.Log().Fatalf("Failed to marshal background runs - %v", err) + } + logger.Log().Println(string(data)) + default: + if len(runs) == 0 { + logger.Log().Println("No active background processes.") + return + } + printBackgroundRunsText(runs) + } +} + +func printBackgroundRunsText(runs []store.BackgroundRun) { + for _, r := range runs { + dur := time.Since(r.StartedAt).Round(time.Second) + logger.Log().Println(fmt.Sprintf( + "%-8s PID %-7d %-40s running %s", + r.ID, + r.PID, + r.Ref, + dur, + )) + } +} + +// NewBackgroundRunsView creates a TUI view for active background runs. +func NewBackgroundRunsView( + container *tuikit.Container, + runs []store.BackgroundRun, + ds store.DataStore, +) tuikit.View { + if len(runs) == 0 { + return views.NewErrorView(fmt.Errorf("no active background processes"), container.RenderState().Theme) + } + + columns := []views.TableColumn{ + {Title: fmt.Sprintf("Background (%d)", len(runs)), Percentage: 20}, + {Title: "Executable", Percentage: 40}, + {Title: "PID", Percentage: 10}, + {Title: "Running", Percentage: 30}, + } + rows := make([]views.TableRow, 0, len(runs)) + for i, r := range runs { + dur := time.Since(r.StartedAt).Round(time.Second) + rows = append(rows, views.TableRow{ + Data: []string{ + r.ID, + r.Ref, + fmt.Sprintf("%d", r.PID), + dur.String(), + fmt.Sprintf("%d", i), + }, + }) + } + + table := views.NewTable(container.RenderState(), columns, rows, views.TableDisplayMini) + table.SetKeyCallbacks([]types.KeyCallback{ + {Key: "x", Label: "kill all", Callback: func() error { + for _, r := range runs { + killAndUpdate(r, ds) + } + container.SetNotice("all background runs terminated", themes.OutputLevelSuccess) + return nil + }}, + }) + return table +} + +func killAndUpdate(r store.BackgroundRun, ds store.DataStore) { + proc, err := os.FindProcess(r.PID) + if err != nil { + return + } + _ = proc.Signal(syscall.SIGTERM) + now := time.Now() + r.Status = store.BackgroundFailed + r.Error = "killed by user" + r.CompletedAt = &now + if ds != nil { + _ = ds.SaveBackgroundRun(r) + } +} diff --git a/internal/io/logs/output.go b/internal/io/logs/output.go index 75ed5391..4a5dbaed 100644 --- a/internal/io/logs/output.go +++ b/internal/io/logs/output.go @@ -41,33 +41,29 @@ func toRecordOutput(r UnifiedRecord) recordOutput { // PrintRecords outputs unified records in the specified format (json, yaml, or plain text). func PrintRecords(format string, records []UnifiedRecord) { - if len(records) == 0 { - logger.Log().Println("No execution history found.") - return + out := make([]recordOutput, len(records)) + for i, r := range records { + out[i] = toRecordOutput(r) } switch common.NormalizeFormat(format) { case common.JSONFormat: - out := make([]recordOutput, len(records)) - for i, r := range records { - out[i] = toRecordOutput(r) - } data, err := json.MarshalIndent(recordsResponse{History: out}, "", " ") if err != nil { logger.Log().Fatalf("Failed to marshal records - %v", err) } logger.Log().Println(string(data)) case common.YAMLFormat: - out := make([]recordOutput, len(records)) - for i, r := range records { - out[i] = toRecordOutput(r) - } data, err := yaml.Marshal(recordsResponse{History: out}) if err != nil { logger.Log().Fatalf("Failed to marshal records - %v", err) } logger.Log().Println(string(data)) default: + if len(records) == 0 { + logger.Log().Println("No execution history found.") + return + } printRecordsText(records) } } diff --git a/pkg/store/mocks/mock_data_store.go b/pkg/store/mocks/mock_data_store.go index 922eaced..2bbe5921 100644 --- a/pkg/store/mocks/mock_data_store.go +++ b/pkg/store/mocks/mock_data_store.go @@ -67,6 +67,20 @@ func (mr *MockDataStoreMockRecorder) CreateProcessBucket(arg0 any) *gomock.Call return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "CreateProcessBucket", reflect.TypeOf((*MockDataStore)(nil).CreateProcessBucket), arg0) } +// DeleteBackgroundRun mocks base method. +func (m *MockDataStore) DeleteBackgroundRun(arg0 string) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "DeleteBackgroundRun", arg0) + ret0, _ := ret[0].(error) + return ret0 +} + +// DeleteBackgroundRun indicates an expected call of DeleteBackgroundRun. +func (mr *MockDataStoreMockRecorder) DeleteBackgroundRun(arg0 any) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "DeleteBackgroundRun", reflect.TypeOf((*MockDataStore)(nil).DeleteBackgroundRun), arg0) +} + // DeleteCacheEntry mocks base method. func (m *MockDataStore) DeleteCacheEntry(arg0 string) error { m.ctrl.T.Helper() @@ -138,6 +152,21 @@ func (mr *MockDataStoreMockRecorder) GetAllProcessVars(arg0 any) *gomock.Call { return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetAllProcessVars", reflect.TypeOf((*MockDataStore)(nil).GetAllProcessVars), arg0) } +// GetBackgroundRun mocks base method. +func (m *MockDataStore) GetBackgroundRun(arg0 string) (store.BackgroundRun, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "GetBackgroundRun", arg0) + ret0, _ := ret[0].(store.BackgroundRun) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// GetBackgroundRun indicates an expected call of GetBackgroundRun. +func (mr *MockDataStoreMockRecorder) GetBackgroundRun(arg0 any) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetBackgroundRun", reflect.TypeOf((*MockDataStore)(nil).GetBackgroundRun), arg0) +} + // GetCacheEntry mocks base method. func (m *MockDataStore) GetCacheEntry(arg0 string) ([]byte, error) { m.ctrl.T.Helper() @@ -198,6 +227,21 @@ func (mr *MockDataStoreMockRecorder) GetProcessVarKeys(arg0 any) *gomock.Call { return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetProcessVarKeys", reflect.TypeOf((*MockDataStore)(nil).GetProcessVarKeys), arg0) } +// ListBackgroundRuns mocks base method. +func (m *MockDataStore) ListBackgroundRuns() ([]store.BackgroundRun, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "ListBackgroundRuns") + ret0, _ := ret[0].([]store.BackgroundRun) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// ListBackgroundRuns indicates an expected call of ListBackgroundRuns. +func (mr *MockDataStoreMockRecorder) ListBackgroundRuns() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ListBackgroundRuns", reflect.TypeOf((*MockDataStore)(nil).ListBackgroundRuns)) +} + // ListExecutionRefs mocks base method. func (m *MockDataStore) ListExecutionRefs() ([]string, error) { m.ctrl.T.Helper() @@ -227,6 +271,20 @@ func (mr *MockDataStoreMockRecorder) RecordExecution(arg0 any) *gomock.Call { return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "RecordExecution", reflect.TypeOf((*MockDataStore)(nil).RecordExecution), arg0) } +// SaveBackgroundRun mocks base method. +func (m *MockDataStore) SaveBackgroundRun(arg0 store.BackgroundRun) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "SaveBackgroundRun", arg0) + ret0, _ := ret[0].(error) + return ret0 +} + +// SaveBackgroundRun indicates an expected call of SaveBackgroundRun. +func (mr *MockDataStoreMockRecorder) SaveBackgroundRun(arg0 any) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SaveBackgroundRun", reflect.TypeOf((*MockDataStore)(nil).SaveBackgroundRun), arg0) +} + // SetCacheEntry mocks base method. func (m *MockDataStore) SetCacheEntry(arg0 string, arg1 []byte) error { m.ctrl.T.Helper() diff --git a/pkg/store/store.go b/pkg/store/store.go index 8b4df45a..7074082f 100644 --- a/pkg/store/store.go +++ b/pkg/store/store.go @@ -16,10 +16,11 @@ import ( ) const ( - cacheBucket = "cache" - historyBucket = "history" - processBucketName = "process" - storeFileName = "store.db" + cacheBucket = "cache" + historyBucket = "history" + processBucketName = "process" + backgroundBucketName = "background" + storeFileName = "store.db" // BucketEnv is the environment variable used to identify the current process bucket. BucketEnv = "FLOW_PROCESS_BUCKET" @@ -43,6 +44,12 @@ type DataStore interface { //nolint:interfacebloat // single backing store with ListExecutionRefs() ([]string, error) DeleteExecutionHistory(ref string) error + // Background run management (detached process tracking). + SaveBackgroundRun(run BackgroundRun) error + GetBackgroundRun(id string) (BackgroundRun, error) + ListBackgroundRuns() ([]BackgroundRun, error) + DeleteBackgroundRun(id string) error + // Process env var management (per-execution scoped key-value storage). // bucketID identifies the execution scope; use EnvironmentBucket() to get the current scope. CreateProcessBucket(id string) error @@ -67,6 +74,27 @@ type ExecutionRecord struct { LogArchiveID string `json:"logArchiveId,omitempty"` } +// BackgroundRunStatus represents the state of a background run. +type BackgroundRunStatus string + +const ( + BackgroundRunning BackgroundRunStatus = "running" + BackgroundCompleted BackgroundRunStatus = "completed" + BackgroundFailed BackgroundRunStatus = "failed" +) + +// BackgroundRun holds metadata about a detached background execution. +type BackgroundRun struct { + ID string `json:"id"` + PID int `json:"pid"` + Ref string `json:"ref"` + StartedAt time.Time `json:"startedAt"` + Status BackgroundRunStatus `json:"status"` + LogArchiveID string `json:"logArchiveId,omitempty"` + Error string `json:"error,omitempty"` + CompletedAt *time.Time `json:"completedAt,omitempty"` +} + // BoltDataStore opens and closes the BBolt database for each operation, so the // exclusive file lock is held only for the duration of a single transaction. // This allows multiple flow processes to share the same store file safely. @@ -435,6 +463,75 @@ func (s *BoltDataStore) DeleteProcessVar(bucketID, key string) error { }) } +// ---- background bucket ---- + +func (s *BoltDataStore) SaveBackgroundRun(run BackgroundRun) error { + data, err := json.Marshal(run) + if err != nil { + return fmt.Errorf("failed to marshal background run: %w", err) + } + return s.open(func(db *bolt.DB) error { + return db.Update(func(tx *bolt.Tx) error { + b, err := tx.CreateBucketIfNotExists([]byte(backgroundBucketName)) + if err != nil { + return fmt.Errorf("failed to open background bucket: %w", err) + } + return b.Put([]byte(run.ID), data) + }) + }) +} + +func (s *BoltDataStore) GetBackgroundRun(id string) (BackgroundRun, error) { + var run BackgroundRun + err := s.open(func(db *bolt.DB) error { + return db.View(func(tx *bolt.Tx) error { + b := tx.Bucket([]byte(backgroundBucketName)) + if b == nil { + return fmt.Errorf("background run %s not found", id) + } + v := b.Get([]byte(id)) + if v == nil { + return fmt.Errorf("background run %s not found", id) + } + return json.Unmarshal(v, &run) + }) + }) + return run, err +} + +func (s *BoltDataStore) ListBackgroundRuns() ([]BackgroundRun, error) { + var runs []BackgroundRun + err := s.open(func(db *bolt.DB) error { + return db.View(func(tx *bolt.Tx) error { + b := tx.Bucket([]byte(backgroundBucketName)) + if b == nil { + return nil + } + return b.ForEach(func(_, v []byte) error { + var run BackgroundRun + if err := json.Unmarshal(v, &run); err != nil { + return fmt.Errorf("failed to unmarshal background run: %w", err) + } + runs = append(runs, run) + return nil + }) + }) + }) + return runs, err +} + +func (s *BoltDataStore) DeleteBackgroundRun(id string) error { + return s.open(func(db *bolt.DB) error { + return db.Update(func(tx *bolt.Tx) error { + b := tx.Bucket([]byte(backgroundBucketName)) + if b == nil { + return nil + } + return b.Delete([]byte(id)) + }) + }) +} + // Close is a no-op for the per-operation store — each operation opens and closes the DB itself. func (s *BoltDataStore) Close() error { return nil diff --git a/tests/logs_cmds_e2e_test.go b/tests/logs_cmds_e2e_test.go index 0d2f75c4..17340a06 100644 --- a/tests/logs_cmds_e2e_test.go +++ b/tests/logs_cmds_e2e_test.go @@ -4,6 +4,8 @@ package tests_test import ( stdCtx "context" + "os" + "path/filepath" "time" . "github.com/onsi/ginkgo/v2" @@ -38,7 +40,7 @@ var _ = Describe("logs e2e", Ordered, func() { Expect(run.Run(ctx.Context, "logs", "-o", "yaml")).To(Succeed()) out, err := readFileContent(stdOut) Expect(err).NotTo(HaveOccurred()) - Expect(out).To(ContainSubstring("No execution history found.")) + Expect(out).To(ContainSubstring("history: []")) }) }) @@ -93,4 +95,86 @@ var _ = Describe("logs e2e", Ordered, func() { Expect(out).To(ContainSubstring("Cleared history and logs")) }) }) + + When("listing running background processes (flow logs --running)", func() { + It("should display empty runs in yaml format", func() { + stdOut := ctx.StdOut() + Expect(run.Run(ctx.Context, "logs", "--running", "-o", "yaml")).To(Succeed()) + out, err := readFileContent(stdOut) + Expect(err).NotTo(HaveOccurred()) + Expect(out).To(ContainSubstring("runs: []")) + }) + + It("should display active background runs in yaml format", func() { + bgRun := store.BackgroundRun{ + ID: "test1234", + PID: os.Getpid(), // use our own PID so isProcessAlive returns true + Ref: "run default/examples:simple-print", + StartedAt: time.Now(), + Status: store.BackgroundRunning, + } + Expect(ctx.DataStore.SaveBackgroundRun(bgRun)).To(Succeed()) + + stdOut := ctx.StdOut() + Expect(run.Run(ctx.Context, "logs", "--running", "-o", "yaml")).To(Succeed()) + out, err := readFileContent(stdOut) + Expect(err).NotTo(HaveOccurred()) + Expect(out).To(ContainSubstring("id: test1234")) + Expect(out).To(ContainSubstring("status: running")) + Expect(out).To(ContainSubstring("default/examples:simple-print")) + }) + + It("should prune stale background runs", func() { + // Clean up any leftover runs from prior tests in this Ordered suite. + _ = ctx.DataStore.DeleteBackgroundRun("test1234") + + bgRun := store.BackgroundRun{ + ID: "stale123", + PID: 999999999, // non-existent PID + Ref: "run default/examples:simple-print", + StartedAt: time.Now(), + Status: store.BackgroundRunning, + } + Expect(ctx.DataStore.SaveBackgroundRun(bgRun)).To(Succeed()) + + stdOut := ctx.StdOut() + Expect(run.Run(ctx.Context, "logs", "--running", "-o", "yaml")).To(Succeed()) + out, err := readFileContent(stdOut) + Expect(err).NotTo(HaveOccurred()) + Expect(out).To(ContainSubstring("runs: []")) + + // Verify the stale run was updated to failed. + updated, err := ctx.DataStore.GetBackgroundRun("stale123") + Expect(err).NotTo(HaveOccurred()) + Expect(updated.Status).To(Equal(store.BackgroundFailed)) + Expect(updated.Error).To(ContainSubstring("exited unexpectedly")) + }) + }) + + When("attaching to a background process (flow logs attach)", func() { + It("should display log content from the archive file", func() { + // Create a temporary log archive file with known content. + logDir := GinkgoT().TempDir() + logFile := filepath.Join(logDir, "test-archive.log") + Expect(os.WriteFile(logFile, []byte("line 1\nline 2\nline 3\n"), 0600)).To(Succeed()) + + bgRun := store.BackgroundRun{ + ID: "attach12", + PID: 1, + Ref: "run default/examples:simple-print", + StartedAt: time.Now(), + Status: store.BackgroundCompleted, + LogArchiveID: logFile, + } + Expect(ctx.DataStore.SaveBackgroundRun(bgRun)).To(Succeed()) + + stdOut := ctx.StdOut() + Expect(run.Run(ctx.Context, "logs", "attach", "attach12")).To(Succeed()) + out, err := readFileContent(stdOut) + Expect(err).NotTo(HaveOccurred()) + Expect(out).To(ContainSubstring("line 1")) + Expect(out).To(ContainSubstring("line 2")) + Expect(out).To(ContainSubstring("line 3")) + }) + }) })