diff --git a/cmd/internal/cache.go b/cmd/internal/cache.go index c9816a61..0606a400 100644 --- a/cmd/internal/cache.go +++ b/cmd/internal/cache.go @@ -9,9 +9,9 @@ import ( "github.com/flowexec/flow/cmd/internal/flags" cacheIO "github.com/flowexec/flow/internal/io/cache" - "github.com/flowexec/flow/internal/services/store" "github.com/flowexec/flow/pkg/context" "github.com/flowexec/flow/pkg/logger" + "github.com/flowexec/flow/pkg/store" ) func RegisterCacheCmd(ctx *context.Context, rootCmd *cobra.Command) { @@ -76,24 +76,12 @@ func cacheSetFunc(ctx *context.Context, cmd *cobra.Command, args []string) { value = strings.Join(args[1:], " ") } - s, err := store.NewStore(store.Path()) - if err != nil { - logger.Log().FatalErr(err) - } bucketName := store.EnvironmentBucket() global := flags.ValueFor[bool](cmd, *flags.GlobalCacheFlag, false) if global { bucketName = store.RootBucket } - if _, err = s.CreateAndSetBucket(bucketName); err != nil { - logger.Log().FatalErr(err) - } - defer func() { - if err = s.Close(); err != nil { - logger.Log().WrapError(err, "cleanup failure") - } - }() - if err = s.Set(key, value); err != nil { + if err := ctx.DataStore.SetProcessVar(bucketName, key, value); err != nil { logger.Log().FatalErr(err) } logger.Log().PlainTextInfo(fmt.Sprintf("Key %q set in the cache", key)) @@ -114,27 +102,15 @@ func registerCacheGetCmd(ctx *context.Context, rootCmd *cobra.Command) { rootCmd.AddCommand(subCmd) } -func cacheGetFunc(_ *context.Context, cmd *cobra.Command, args []string) { +func cacheGetFunc(ctx *context.Context, cmd *cobra.Command, args []string) { key := args[0] - s, err := store.NewStore(store.Path()) - if err != nil { - logger.Log().FatalErr(err) - } bucketName := store.EnvironmentBucket() global := flags.ValueFor[bool](cmd, *flags.GlobalCacheFlag, false) if global { bucketName = store.RootBucket } - if _, err = s.CreateAndSetBucket(bucketName); err != nil { - logger.Log().FatalErr(err) - } - defer func() { - if err := s.Close(); err != nil { - logger.Log().WrapError(err, "cleanup failure") - } - }() - value, err := s.Get(key) + value, err := ctx.DataStore.GetProcessVar(bucketName, key) if err != nil { logger.Log().FatalErr(err) } @@ -159,19 +135,7 @@ func registerCacheListCmd(ctx *context.Context, rootCmd *cobra.Command) { } func cacheListFunc(ctx *context.Context, cmd *cobra.Command, _ []string) { - s, err := store.NewStore(store.Path()) - if err != nil { - logger.Log().FatalErr(err) - } - if _, err = s.CreateAndSetBucket(store.EnvironmentBucket()); err != nil { - logger.Log().FatalErr(err) - } - defer func() { - if err := s.Close(); err != nil { - logger.Log().WrapError(err, "cleanup failure") - } - }() - data, err := s.GetAll() + data, err := ctx.DataStore.GetAllProcessVars(store.EnvironmentBucket()) if err != nil { logger.Log().FatalErr(err) } @@ -199,27 +163,15 @@ func registerCacheRemoveCmd(ctx *context.Context, rootCmd *cobra.Command) { rootCmd.AddCommand(subCmd) } -func cacheRemoveFunc(_ *context.Context, cmd *cobra.Command, args []string) { +func cacheRemoveFunc(ctx *context.Context, cmd *cobra.Command, args []string) { key := args[0] - s, err := store.NewStore(store.Path()) - if err != nil { - logger.Log().FatalErr(err) - } bucketName := store.EnvironmentBucket() global := flags.ValueFor[bool](cmd, *flags.GlobalCacheFlag, false) if global { bucketName = store.RootBucket } - if _, err = s.CreateAndSetBucket(bucketName); err != nil { - logger.Log().FatalErr(err) - } - defer func() { - if err := s.Close(); err != nil { - logger.Log().WrapError(err, "cleanup failure") - } - }() - if err = s.Delete(key); err != nil { + if err := ctx.DataStore.DeleteProcessVar(bucketName, key); err != nil { logger.Log().FatalErr(err) } logger.Log().PlainTextSuccess(fmt.Sprintf("Key %q removed from the cache", key)) @@ -240,7 +192,7 @@ func registerCacheClearCmd(ctx *context.Context, rootCmd *cobra.Command) { rootCmd.AddCommand(subCmd) } -func cacheClearFunc(_ *context.Context, cmd *cobra.Command, _ []string) { +func cacheClearFunc(ctx *context.Context, cmd *cobra.Command, _ []string) { full := flags.ValueFor[bool](cmd, *flags.StoreAllFlag, false) if full { if err := store.DestroyStore(); err != nil { @@ -249,16 +201,7 @@ func cacheClearFunc(_ *context.Context, cmd *cobra.Command, _ []string) { logger.Log().PlainTextSuccess("Cache cleared") return } - s, err := store.NewStore(store.Path()) - if err != nil { - logger.Log().FatalErr(err) - } - defer func() { - if err := s.Close(); err != nil { - logger.Log().WrapError(err, "cleanup failure") - } - }() - if err := s.DeleteBucket(store.EnvironmentBucket()); err != nil { + if err := ctx.DataStore.DeleteProcessBucket(store.EnvironmentBucket()); err != nil { logger.Log().FatalErr(err) } logger.Log().PlainTextSuccess("Cache cleared") diff --git a/cmd/internal/exec.go b/cmd/internal/exec.go index 3397503c..cd02a50f 100644 --- a/cmd/internal/exec.go +++ b/cmd/internal/exec.go @@ -23,11 +23,12 @@ import ( "github.com/flowexec/flow/internal/runner/render" "github.com/flowexec/flow/internal/runner/request" "github.com/flowexec/flow/internal/runner/serial" - "github.com/flowexec/flow/internal/services/store" "github.com/flowexec/flow/internal/utils/env" "github.com/flowexec/flow/pkg/context" flowErrors "github.com/flowexec/flow/pkg/errors" + "github.com/flowexec/flow/pkg/filesystem" "github.com/flowexec/flow/pkg/logger" + "github.com/flowexec/flow/pkg/store" "github.com/flowexec/flow/types/executable" "github.com/flowexec/flow/types/workspace" ) @@ -87,9 +88,6 @@ func execPreRun(_ *context.Context, _ *cobra.Command, _ []string) { runner.RegisterRunner(parallel.NewRunner()) } -// TODO: refactor this function to simplify the logic -// -//nolint:funlen,gocognit func execFunc(ctx *context.Context, cmd *cobra.Command, verb executable.Verb, args []string) { logMode := flags.ValueFor[string](cmd, *flags.LogModeFlag, false) if logMode != "" { @@ -132,17 +130,37 @@ func execFunc(ctx *context.Context, cmd *cobra.Command, verb executable.Verb, ar )) } - s, err := store.NewStore(store.Path()) - if err != nil { - logger.Log().FatalErr(err) + if ctx.DataStore != nil { + if err := ctx.DataStore.CreateProcessBucket(ref.String()); err != nil { + logger.Log().FatalErr(err) + } + _ = os.Setenv(store.BucketEnv, ref.String()) } - if _, err = s.CreateAndSetBucket(ref.String()); err != nil { - logger.Log().FatalErr(err) + + envMap := buildExecEnv(ctx, cmd, e) + + var execArgs []string + if len(args) >= 2 { + execArgs = args[1:] + } + + startTime := time.Now() + eng := engine.NewExecEngine() + runErr := runner.Exec(ctx, e, eng, envMap, execArgs) + dur := time.Since(startTime) + + cleanupProcessStore(ctx) + recordExecution(ctx, ref, startTime, dur, runErr) + + if runErr != nil { + logger.Log().FatalErr(runErr) } - _ = s.Close() + logger.Log().Debug(fmt.Sprintf("%s flow completed", ref), "Elapsed", dur.Round(time.Millisecond)) + sendCompletionNotifications(ctx, cmd, dur) +} +func buildExecEnv(ctx *context.Context, cmd *cobra.Command, e *executable.Executable) map[string]string { envMap := make(map[string]string) - // add workspace env variables to the env map if wsData, err := ctx.WorkspacesCache.GetWorkspaceConfigList(); err != nil { logger.Log().Errorf("failed to get workspace cache data, skipping env file resolution: %v", err) } else { @@ -153,11 +171,9 @@ func execFunc(ctx *context.Context, cmd *cobra.Command, verb executable.Verb, ar } } - // add --param overrides to the env map paramOverrides := flags.ValueFor[[]string](cmd, *flags.ParameterValueFlag, false) applyParameterOverrides(paramOverrides, envMap) - // add values from the prompt param type to the env map textInputs := pendingFormFields(ctx, e, envMap) if len(textInputs) > 0 { form, err := views.NewForm(logger.Theme(ctx.Config.Theme.String()), ctx.StdIn(), ctx.StdOut(), textInputs...) @@ -171,37 +187,62 @@ func execFunc(ctx *context.Context, cmd *cobra.Command, verb executable.Verb, ar envMap[key] = fmt.Sprintf("%v", val) } } + return envMap +} - startTime := time.Now() - eng := engine.NewExecEngine() +func cleanupProcessStore(ctx *context.Context) { + if ctx.DataStore != nil { + if err := ctx.DataStore.DeleteProcessBucket(store.EnvironmentBucket()); err != nil { + logger.Log().Errorf("failed clearing process store\n%v", err) + } + } +} - var execArgs []string - if len(args) >= 2 { - execArgs = args[1:] +func recordExecution(ctx *context.Context, ref executable.Ref, startTime time.Time, dur time.Duration, runErr error) { + record := store.ExecutionRecord{ + Ref: ref.String(), + StartedAt: startTime, + Duration: dur, + } + if runErr != nil { + record.ExitCode = 1 + record.Error = runErr.Error() } + record.LogArchiveID = findArchiveByID(ctx.LogArchiveID) + if ctx.DataStore != nil { + if recErr := ctx.DataStore.RecordExecution(record); recErr != nil { + logger.Log().Debug("failed to record execution history", "err", recErr) + } + } +} - if err := runner.Exec(ctx, e, eng, envMap, execArgs); err != nil { - logger.Log().FatalErr(err) +// findArchiveByID searches log archive entries for one matching the given ID. +// Returns the entry's path if found, empty string otherwise. +func findArchiveByID(archiveID string) string { + if archiveID == "" { + return "" } - dur := time.Since(startTime) - processStore, err := store.NewStore(store.Path()) + entries, err := tuikitIO.ListArchiveEntries(filesystem.LogsDir()) if err != nil { - logger.Log().Errorf("failed clearing process store\n%v", err) + return "" } - if processStore != nil { - if err = processStore.DeleteBucket(store.EnvironmentBucket()); err != nil { - logger.Log().Errorf("failed clearing process store\n%v", err) + for _, e := range entries { + if e.ID == archiveID { + return e.Path } - _ = processStore.Close() } - logger.Log().Debug(fmt.Sprintf("%s flow completed", ref), "Elapsed", dur.Round(time.Millisecond)) - if TUIEnabled(ctx, cmd) { - if dur > 1*time.Minute && ctx.Config.SendSoundNotification() { - _ = beeep.Beep(beeep.DefaultFreq, beeep.DefaultDuration) - } - if dur > 1*time.Minute && ctx.Config.SendTextNotification() { - _ = beeep.Notify("Flow", "Flow completed", "") - } + return "" +} + +func sendCompletionNotifications(ctx *context.Context, cmd *cobra.Command, dur time.Duration) { + if !TUIEnabled(ctx, cmd) || dur <= 1*time.Minute { + return + } + if ctx.Config.SendSoundNotification() { + _ = beeep.Beep(beeep.DefaultFreq, beeep.DefaultDuration) + } + if ctx.Config.SendTextNotification() { + _ = beeep.Notify("Flow", "Flow completed", "") } } diff --git a/cmd/internal/flags/types.go b/cmd/internal/flags/types.go index 05fd4110..0ba201a7 100644 --- a/cmd/internal/flags/types.go +++ b/cmd/internal/flags/types.go @@ -148,6 +148,35 @@ var LastLogEntryFlag = &Metadata{ Required: false, } +var LogFilterWorkspaceFlag = &Metadata{ + Name: "workspace", + Shorthand: "w", + Usage: "Filter history by workspace name.", + Default: "", + Required: false, +} + +var LogFilterStatusFlag = &Metadata{ + Name: "status", + Usage: "Filter history by status (success or failure).", + Default: "", + Required: false, +} + +var LogFilterSinceFlag = &Metadata{ + Name: "since", + Usage: "Filter history to entries after a duration (e.g. 1h, 30m, 7d).", + Default: "", + Required: false, +} + +var LogFilterLimitFlag = &Metadata{ + Name: "limit", + Usage: "Maximum number of records to display.", + Default: 0, + Required: false, +} + var TemplateWorkspaceFlag = &Metadata{ Name: "workspace", Shorthand: "w", diff --git a/cmd/internal/logs.go b/cmd/internal/logs.go index 7531f2d0..6d8c84f5 100644 --- a/cmd/internal/logs.go +++ b/cmd/internal/logs.go @@ -2,6 +2,11 @@ package internal import ( "fmt" + "regexp" + "strconv" + "time" + + "strings" tuikitIO "github.com/flowexec/tuikit/io" "github.com/spf13/cobra" @@ -11,14 +16,17 @@ import ( "github.com/flowexec/flow/pkg/context" "github.com/flowexec/flow/pkg/filesystem" "github.com/flowexec/flow/pkg/logger" + "github.com/flowexec/flow/types/executable" ) func RegisterLogsCmd(ctx *context.Context, rootCmd *cobra.Command) { subCmd := &cobra.Command{ - Use: "logs", - Aliases: []string{"log"}, + Use: "logs [ref]", + Aliases: []string{"log", "history", "hist"}, Short: "View execution history and logs.", - Args: cobra.NoArgs, + Long: "View execution history recorded in the data store, with associated log output. " + + "Optionally filter by executable reference.", + Args: cobra.ArbitraryArgs, PreRun: func(cmd *cobra.Command, args []string) { StartTUI(ctx, cmd) }, PostRun: func(cmd *cobra.Command, args []string) { WaitForTUI(ctx, cmd) }, Run: func(cmd *cobra.Command, args []string) { @@ -27,35 +35,157 @@ func RegisterLogsCmd(ctx *context.Context, rootCmd *cobra.Command) { } RegisterFlag(ctx, subCmd, *flags.LastLogEntryFlag) RegisterFlag(ctx, subCmd, *flags.OutputFormatFlag) + RegisterFlag(ctx, subCmd, *flags.LogFilterWorkspaceFlag) + RegisterFlag(ctx, subCmd, *flags.LogFilterStatusFlag) + RegisterFlag(ctx, subCmd, *flags.LogFilterSinceFlag) + RegisterFlag(ctx, subCmd, *flags.LogFilterLimitFlag) + + clearCmd := &cobra.Command{ + Use: "clear [ref]", + Short: "Clear execution history and logs.", + Long: "Remove execution history records and associated log files. " + + "If a ref is provided, only that executable's data is cleared.", + Args: cobra.ArbitraryArgs, + Run: func(cmd *cobra.Command, args []string) { + logsClearFunc(ctx, args) + }, + } + + subCmd.AddCommand(clearCmd) rootCmd.AddCommand(subCmd) } -func logFunc(ctx *context.Context, cmd *cobra.Command, _ []string) { +func logFunc(ctx *context.Context, cmd *cobra.Command, args []string) { lastEntry := flags.ValueFor[bool](cmd, *flags.LastLogEntryFlag, false) outputFormat := flags.ValueFor[string](cmd, *flags.OutputFormatFlag, false) if err := filesystem.EnsureLogsDir(); err != nil { logger.Log().FatalErr(err) } - if TUIEnabled(ctx, cmd) { - view := logs.NewLogView(ctx.TUIContainer, filesystem.LogsDir(), lastEntry) - SetView(ctx, cmd, view) - return + + var records []logs.UnifiedRecord + var err error + if len(args) > 0 { + ref := expandRefArgs(ctx, args) + records, err = logs.LoadRecordsForRef(ctx.DataStore, filesystem.LogsDir(), ref, 50) + } else { + records, err = logs.LoadRecords(ctx.DataStore, filesystem.LogsDir()) } - entries, err := tuikitIO.ListArchiveEntries(filesystem.LogsDir()) if err != nil { logger.Log().FatalErr(err) } + filter := buildRecordFilter(cmd) + records = logs.FilterRecords(records, filter) + + if TUIEnabled(ctx, cmd) { + view := logs.NewUnifiedLogView(ctx.TUIContainer, records, lastEntry, ctx.DataStore) + SetView(ctx, cmd, view) + return + } + if lastEntry { - if len(entries) == 0 { - logger.Log().Fatalf("No log entries found") + if len(records) == 0 { + logger.Log().Fatalf("No execution history found") + } + logs.PrintLastRecord(outputFormat, records[0], ctx.StdOut()) + } else { + logs.PrintRecords(outputFormat, records) + } +} + +// durationWithDays extends time.ParseDuration to support a "d" (day) suffix. +var durationWithDaysRe = regexp.MustCompile(`^(\d+)d(.*)$`) + +func parseDurationWithDays(s string) (time.Duration, error) { + if m := durationWithDaysRe.FindStringSubmatch(s); m != nil { + days, _ := strconv.Atoi(m[1]) + d := time.Duration(days) * 24 * time.Hour + if m[2] != "" { + rest, err := time.ParseDuration(m[2]) + if err != nil { + return 0, fmt.Errorf("invalid duration %q: %w", s, err) + } + d += rest } - data, err := entries[0].Read() + return d, nil + } + return time.ParseDuration(s) +} + +func buildRecordFilter(cmd *cobra.Command) logs.RecordFilter { + var f logs.RecordFilter + + f.Workspace = flags.ValueFor[string](cmd, *flags.LogFilterWorkspaceFlag, false) + f.Status = strings.ToLower(flags.ValueFor[string](cmd, *flags.LogFilterStatusFlag, false)) + f.Limit = flags.ValueFor[int](cmd, *flags.LogFilterLimitFlag, false) + + sinceStr := flags.ValueFor[string](cmd, *flags.LogFilterSinceFlag, false) + if sinceStr != "" { + d, err := parseDurationWithDays(sinceStr) if err != nil { - logger.Log().FatalErr(err) + logger.Log().Fatalf("Invalid --since value %q: %v", sinceStr, err) + } + f.Since = time.Now().Add(-d) + } + + return f +} + +// expandRefArgs builds a fully-qualified ref string from args, expanding the +// current workspace/namespace when not provided — matching how exec records refs. +func expandRefArgs(ctx *context.Context, args []string) string { + verb := executable.Verb(args[0]) + id := strings.Join(args[1:], " ") + ref := context.ExpandRef(ctx, executable.NewRef(id, verb)) + return ref.String() +} + +func logsClearFunc(ctx *context.Context, args []string) { + if ctx.DataStore == nil { + logger.Log().Fatalf("data store is not available") + } + + if len(args) > 0 { + ref := expandRefArgs(ctx, args) + clearRefHistory(ctx, ref) + return + } + clearAllHistory(ctx) +} + +func clearRefHistory(ctx *context.Context, ref string) { + deleteAssociatedLogs(ctx, ref) + if err := ctx.DataStore.DeleteExecutionHistory(ref); err != nil { + logger.Log().FatalErr(err) + } + logger.Log().Println(fmt.Sprintf("Cleared history and logs for %s.", ref)) +} + +func clearAllHistory(ctx *context.Context) { + refs, err := ctx.DataStore.ListExecutionRefs() + if err != nil { + logger.Log().FatalErr(err) + } + for _, ref := range refs { + deleteAssociatedLogs(ctx, ref) + _ = ctx.DataStore.DeleteExecutionHistory(ref) + } + // Also clean up any orphaned log archive files. + entries, _ := tuikitIO.ListArchiveEntries(filesystem.LogsDir()) + for _, e := range entries { + _ = tuikitIO.DeleteArchiveEntry(e.Path) + } + logger.Log().Println("Cleared all execution history and logs.") +} + +func deleteAssociatedLogs(ctx *context.Context, ref string) { + records, err := ctx.DataStore.GetExecutionHistory(ref, 0) + if err != nil { + return + } + for _, r := range records { + if r.LogArchiveID != "" { + _ = tuikitIO.DeleteArchiveEntry(r.LogArchiveID) } - _, _ = fmt.Fprint(ctx.StdOut(), data) - } else { - logs.PrintEntries(outputFormat, entries) } } diff --git a/cmd/internal/sync.go b/cmd/internal/sync.go index e61f655e..27bfed60 100644 --- a/cmd/internal/sync.go +++ b/cmd/internal/sync.go @@ -26,9 +26,9 @@ func RegisterSyncCmd(ctx *context.Context, rootCmd *cobra.Command) { rootCmd.AddCommand(subCmd) } -func syncFunc(_ *context.Context, _ *cobra.Command, _ []string) { +func syncFunc(ctx *context.Context, _ *cobra.Command, _ []string) { start := time.Now() - if err := cache.UpdateAll(); err != nil { + if err := cache.UpdateAll(ctx.DataStore); err != nil { logger.Log().FatalErr(err) } duration := time.Since(start) diff --git a/cmd/internal/workspace.go b/cmd/internal/workspace.go index 79258ebc..2764156e 100644 --- a/cmd/internal/workspace.go +++ b/cmd/internal/workspace.go @@ -106,7 +106,7 @@ func addWorkspaceFunc(ctx *context.Context, cmd *cobra.Command, args []string) { logger.Log().FatalErr(err) } - if err := cache.UpdateAll(); err != nil { + if err := cache.UpdateAll(ctx.DataStore); err != nil { logger.Log().FatalErr(errors.Wrap(err, "failure updating cache")) } @@ -201,7 +201,7 @@ func removeWorkspaceFunc(ctx *context.Context, _ *cobra.Command, args []string) logger.Log().Warnf("Workspace '%s' deleted", name) - if err := cache.UpdateAll(); err != nil { + if err := cache.UpdateAll(ctx.DataStore); err != nil { logger.Log().FatalErr(errors.Wrap(err, "unable to update cache")) } } diff --git a/cmd/root.go b/cmd/root.go index 5d261036..a4c567e5 100644 --- a/cmd/root.go +++ b/cmd/root.go @@ -34,7 +34,7 @@ func NewRootCmd(ctx *context.Context) *cobra.Command { } sync := flags.ValueFor[bool](cmd.Root(), *flags.SyncCacheFlag, true) if sync { - if err := cache.UpdateAll(); err != nil { + if err := cache.UpdateAll(ctx.DataStore); err != nil { logger.Log().FatalErr(err) } } diff --git a/docs/.vitepress/config.mts b/docs/.vitepress/config.mts index a3093bf1..eee3bdbb 100644 --- a/docs/.vitepress/config.mts +++ b/docs/.vitepress/config.mts @@ -50,6 +50,7 @@ export default defineConfig({ { text: 'Executables', link: '/guides/executables' }, { text: 'Workspaces', link: '/guides/workspaces' }, { text: 'Secrets', link: '/guides/secrets' }, + { text: 'Execution History & Logs', link: '/guides/execution-history' }, ]}, { text: 'Advanced', items: [ @@ -69,7 +70,14 @@ export default defineConfig({ { text: 'Overview', link: '/cli/flow' }, { text: 'flow browse', link: '/cli/flow_browse' }, { text: 'flow exec', link: '/cli/flow_exec' }, - { text: 'flow logs', link: '/cli/flow_logs' }, + { + text: 'Logs', + collapsed: true, + items: [ + { text: 'flow logs', link: '/cli/flow_logs' }, + { text: 'flow logs clear', link: '/cli/flow_logs_clear' } + ] + }, { text: 'flow mcp', link: '/cli/flow_mcp' }, { text: 'flow sync', link: '/cli/flow_sync' }, { diff --git a/docs/cli/flow_logs.md b/docs/cli/flow_logs.md index 967432d1..5baa8618 100644 --- a/docs/cli/flow_logs.md +++ b/docs/cli/flow_logs.md @@ -2,16 +2,24 @@ View execution history and logs. +### Synopsis + +View execution history recorded in the data store, with associated log output. Optionally filter by executable reference. + ``` -flow logs [flags] +flow logs [ref] [flags] ``` ### Options ``` - -h, --help help for logs - --last Print the last execution's logs - -o, --output string Output format. One of: yaml, json, or tui. + -h, --help help for logs + --last Print the last execution's logs + --limit int Maximum number of records to display. + -o, --output string Output format. One of: yaml, json, or tui. + --since string Filter history to entries after a duration (e.g. 1h, 30m, 7d). + --status string Filter history by status (success or failure). + -w, --workspace string Filter history by workspace name. ``` ### Options inherited from parent commands @@ -24,4 +32,5 @@ flow logs [flags] ### SEE ALSO * [flow](flow.md) - flow is a command line interface designed to make managing and running development workflows easier. +* [flow logs clear](flow_logs_clear.md) - Clear execution history and logs. diff --git a/docs/cli/flow_logs_clear.md b/docs/cli/flow_logs_clear.md new file mode 100644 index 00000000..8ea8a944 --- /dev/null +++ b/docs/cli/flow_logs_clear.md @@ -0,0 +1,29 @@ +## flow logs clear + +Clear execution history and logs. + +### Synopsis + +Remove execution history records and associated log files. If a ref is provided, only that executable's data is cleared. + +``` +flow logs clear [ref] [flags] +``` + +### Options + +``` + -h, --help help for clear +``` + +### Options inherited from parent commands + +``` + -L, --log-level string Log verbosity level (debug, info, fatal) (default "info") + --sync Sync flow cache and workspaces +``` + +### SEE ALSO + +* [flow logs](flow_logs.md) - View execution history and logs. + diff --git a/docs/guides/execution-history.md b/docs/guides/execution-history.md new file mode 100644 index 00000000..996ba72b --- /dev/null +++ b/docs/guides/execution-history.md @@ -0,0 +1,84 @@ +--- +title: Execution History & Logs +--- + +# Execution History & Logs + +flow automatically records every execution, building a searchable history with associated log output. + +## Viewing History + +Open the interactive log viewer: + +```shell +flow logs +``` + +This shows a table of recent executions with the executable reference, time, duration, and status. Press `Enter` to view full details and log output for any entry. + +**TUI keyboard shortcuts:** + +| Key | Context | Action | +|---------|-------------|---------------------------| +| `Enter` | List view | Open detail view | +| `d` | Detail view | Delete the current record | +| `x` | List view | Delete all records | + +### Structured Output + +Export history for scripting or CI: + +```shell +flow logs -o yaml +flow logs -o json +``` + +### Last Execution + +Review the most recent execution's metadata and full log output: + +```shell +flow logs --last +``` + +## Filtering + +### By Executable Reference + +Pass a ref argument to scope history to a single executable: + +```shell +flow logs verb ws/ns:name +``` + +### By Workspace, Status, or Time + +Use flags to narrow results: + +```shell +flow logs -w my-workspace # filter by workspace +flow logs --status failure # only failed executions +flow logs --since 1h # last hour (supports d, h, m, s) +flow logs --limit 5 # at most 5 records +flow logs -w api --status success --since 7d +``` + +Filters work with all output modes (`--last`, `-o yaml`, TUI, etc.). + +## Clearing History + +```shell +# Clear all history and logs +flow logs clear + +# Clear history for a specific executable +flow logs clear verb ws/ns:name +``` + +> [!NOTE] +> Clearing history also removes the associated log archive files. + +## What's Next? + +- **Customize your interface** → [Interactive UI](interactive.md) +- **Integrate with external tools** → [Integrations](integrations.md) diff --git a/docs/guides/index.md b/docs/guides/index.md index 0067ed40..e019a95a 100644 --- a/docs/guides/index.md +++ b/docs/guides/index.md @@ -20,6 +20,7 @@ Master the core features of flow: - **[Workspaces](workspaces.md)** - Organize your automation across projects - **[Executables](executables.md)** - Complete reference for all executable types and configuration - **[Working with Secrets](secrets.md)** - Secure your workflows with encrypted vaults +- **[Execution History & Logs](execution-history.md)** - View, filter, and manage your execution history ## Advanced Topics diff --git a/go.mod b/go.mod index 55086924..59051d4f 100644 --- a/go.mod +++ b/go.mod @@ -11,6 +11,7 @@ require ( github.com/flowexec/tuikit v0.3.2 github.com/flowexec/vault v0.2.1 github.com/gen2brain/beeep v0.11.2 + github.com/google/uuid v1.6.0 github.com/jahvon/expression v0.1.4 github.com/mark3labs/mcp-go v0.43.2 github.com/onsi/ginkgo/v2 v2.28.1 @@ -66,7 +67,6 @@ require ( github.com/godbus/dbus/v5 v5.1.0 // indirect github.com/google/go-cmp v0.7.0 // indirect github.com/google/pprof v0.0.0-20260115054156-294ebfa9ad83 // indirect - github.com/google/uuid v1.6.0 // indirect github.com/gorilla/css v1.0.1 // indirect github.com/inconshreveable/mousetrap v1.1.0 // indirect github.com/invopop/jsonschema v0.13.0 // indirect diff --git a/internal/io/logs/output.go b/internal/io/logs/output.go index 891d7f8b..75ed5391 100644 --- a/internal/io/logs/output.go +++ b/internal/io/logs/output.go @@ -2,64 +2,131 @@ package logs import ( "encoding/json" + "fmt" + "io" + "time" - "github.com/flowexec/tuikit/io" "gopkg.in/yaml.v3" "github.com/flowexec/flow/internal/io/common" "github.com/flowexec/flow/pkg/logger" ) -type entry struct { - ID string `json:"id" yaml:"id"` - Time string `json:"time" yaml:"time"` - File string `json:"file" yaml:"file"` +type recordOutput struct { + Ref string `json:"ref" yaml:"ref"` + StartedAt string `json:"startedAt" yaml:"startedAt"` + Duration string `json:"duration" yaml:"duration"` + ExitCode int `json:"exitCode" yaml:"exitCode"` + Error string `json:"error,omitempty" yaml:"error,omitempty"` + LogFile string `json:"logFile,omitempty" yaml:"logFile,omitempty"` } -type entryResponse struct { - Logs []entry `json:"logs" yaml:"logs"` +type recordsResponse struct { + History []recordOutput `json:"history" yaml:"history"` } -func tuikitToEntry(e io.ArchiveEntry) entry { - return entry{ - ID: e.ID, - Time: e.Time.String(), - File: e.Path, +func toRecordOutput(r UnifiedRecord) recordOutput { + out := recordOutput{ + Ref: r.Ref, + StartedAt: r.StartedAt.Format(time.RFC3339), + Duration: r.Duration.Round(time.Millisecond).String(), + ExitCode: r.ExitCode, + Error: r.Error, } + if r.LogEntry != nil { + out.LogFile = r.LogEntry.Path + } + return out } -func marshalEntriesJSON(entries []io.ArchiveEntry) ([]byte, error) { - entriesJSON := make([]entry, len(entries)) - for i, e := range entries { - entriesJSON[i] = tuikitToEntry(e) +// PrintRecords outputs unified records in the specified format (json, yaml, or plain text). +func PrintRecords(format string, records []UnifiedRecord) { + if len(records) == 0 { + logger.Log().Println("No execution history found.") + return + } + + switch common.NormalizeFormat(format) { + case common.JSONFormat: + out := make([]recordOutput, len(records)) + for i, r := range records { + out[i] = toRecordOutput(r) + } + data, err := json.MarshalIndent(recordsResponse{History: out}, "", " ") + if err != nil { + logger.Log().Fatalf("Failed to marshal records - %v", err) + } + logger.Log().Println(string(data)) + case common.YAMLFormat: + out := make([]recordOutput, len(records)) + for i, r := range records { + out[i] = toRecordOutput(r) + } + data, err := yaml.Marshal(recordsResponse{History: out}) + if err != nil { + logger.Log().Fatalf("Failed to marshal records - %v", err) + } + logger.Log().Println(string(data)) + default: + printRecordsText(records) } - entriesResponse := entryResponse{Logs: entriesJSON} - return json.MarshalIndent(entriesResponse, "", " ") } -func marshalEntriesYAML(entries []io.ArchiveEntry) ([]byte, error) { - entriesYAML := make([]entry, len(entries)) - for i, e := range entries { - entriesYAML[i] = tuikitToEntry(e) +func printRecordsText(records []UnifiedRecord) { + for _, r := range records { + status := "ok" + if r.ExitCode != 0 { + status = fmt.Sprintf("exit(%d)", r.ExitCode) + } + logger.Log().Println(fmt.Sprintf( + "%s %-40s %6s %s", + r.StartedAt.Format(time.RFC3339), + r.Ref, + r.Duration.Round(time.Millisecond), + status, + )) } - entriesResponse := entryResponse{Logs: entriesYAML} - return yaml.Marshal(entriesResponse) } -func PrintEntries(format string, entries []io.ArchiveEntry) { - logger.Log().Debugf("listing %d log entries", len(entries)) +// PrintLastRecord outputs metadata and log content for a single record. +func PrintLastRecord(format string, record UnifiedRecord, stdout io.Writer) { + out := toRecordOutput(record) + switch common.NormalizeFormat(format) { - case common.YAMLFormat: - str, err := marshalEntriesYAML(entries) + case common.JSONFormat: + data, err := json.MarshalIndent(out, "", " ") if err != nil { - logger.Log().Fatalf("Failed to marshal log entries - %v", err) + logger.Log().Fatalf("Failed to marshal record - %v", err) } - logger.Log().Println(string(str)) - case common.JSONFormat: - str, err := marshalEntriesJSON(entries) + _, _ = fmt.Fprintln(stdout, string(data)) + case common.YAMLFormat: + data, err := yaml.Marshal(out) if err != nil { - logger.Log().Fatalf("Failed to marshal log entries - %v", err) + logger.Log().Fatalf("Failed to marshal record - %v", err) + } + _, _ = fmt.Fprint(stdout, string(data)) + default: + status := "ok" + if record.ExitCode != 0 { + status = fmt.Sprintf("exit(%d)", record.ExitCode) + } + + _, _ = fmt.Fprintf(stdout, "Executable: %s\n", record.Ref) + _, _ = fmt.Fprintf(stdout, "Time: %s\n", record.StartedAt.Format(time.RFC3339)) + _, _ = fmt.Fprintf(stdout, "Duration: %s\n", record.Duration.Round(time.Millisecond)) + _, _ = fmt.Fprintf(stdout, "Status: %s\n", status) + if record.Error != "" { + _, _ = fmt.Fprintf(stdout, "Error: %s\n", record.Error) + } + _, _ = fmt.Fprintln(stdout) + + if record.LogEntry != nil { + content, err := record.LogEntry.Read() + if err != nil { + _, _ = fmt.Fprintf(stdout, "error reading log: %v\n", err) + } else if content != "" { + _, _ = fmt.Fprint(stdout, content) + } } - logger.Log().Println(string(str)) } } diff --git a/internal/io/logs/records.go b/internal/io/logs/records.go new file mode 100644 index 00000000..15a4cf36 --- /dev/null +++ b/internal/io/logs/records.go @@ -0,0 +1,151 @@ +package logs + +import ( + "sort" + "strings" + "time" + + tuikitIO "github.com/flowexec/tuikit/io" + + "github.com/flowexec/flow/pkg/store" +) + +// RecordFilter holds optional criteria for filtering unified records. +type RecordFilter struct { + Workspace string + Status string // "success" or "failure" + Since time.Time + Limit int +} + +// extractWorkspace parses the workspace from a ref formatted as "verb ws/ns:name". +func extractWorkspace(ref string) string { + _, rest, ok := strings.Cut(ref, " ") + if !ok { + return "" + } + ws, _, ok := strings.Cut(rest, "/") + if !ok { + return "" + } + return ws +} + +// FilterRecords applies the filter criteria to a slice of unified records. +func FilterRecords(records []UnifiedRecord, f RecordFilter) []UnifiedRecord { + var filtered []UnifiedRecord + for _, r := range records { + if f.Workspace != "" { + // Refs are formatted as "verb ws/ns:name" — workspace is between the space and the first "/". + ws := extractWorkspace(r.Ref) + if ws != f.Workspace { + continue + } + } + if f.Status != "" { + switch f.Status { + case "success": + if r.ExitCode != 0 { + continue + } + case "failure": + if r.ExitCode == 0 { + continue + } + } + } + if !f.Since.IsZero() && r.StartedAt.Before(f.Since) { + continue + } + filtered = append(filtered, r) + } + if f.Limit > 0 && len(filtered) > f.Limit { + filtered = filtered[:f.Limit] + } + return filtered +} + +// UnifiedRecord joins an execution history record with its corresponding log archive entry (if available). +type UnifiedRecord struct { + store.ExecutionRecord + LogEntry *tuikitIO.ArchiveEntry +} + +// LoadRecords retrieves all execution history from the data store, joined with any matching log archive entries. +// If ds is nil, returns empty (log-only fallback is not supported without metadata). +func LoadRecords(ds store.DataStore, logsDir string) ([]UnifiedRecord, error) { + if ds == nil { + return nil, nil + } + + records, err := getAllExecutionHistory(ds) + if err != nil { + return nil, err + } + + archiveIndex := buildArchiveIndex(logsDir) + return joinRecords(records, archiveIndex), nil +} + +// LoadRecordsForRef retrieves execution history for a specific ref, joined with matching log archive entries. +func LoadRecordsForRef(ds store.DataStore, logsDir string, ref string, limit int) ([]UnifiedRecord, error) { + if ds == nil { + return nil, nil + } + + records, err := ds.GetExecutionHistory(ref, limit) + if err != nil { + return nil, err + } + + archiveIndex := buildArchiveIndex(logsDir) + return joinRecords(records, archiveIndex), nil +} + +// getAllExecutionHistory retrieves recent history across all refs, up to 10 records per ref. +func getAllExecutionHistory(ds store.DataStore) ([]store.ExecutionRecord, error) { + refs, err := ds.ListExecutionRefs() + if err != nil { + return nil, err + } + var all []store.ExecutionRecord + for _, ref := range refs { + records, err := ds.GetExecutionHistory(ref, 10) + if err != nil { + continue + } + all = append(all, records...) + } + return all, nil +} + +// buildArchiveIndex loads log archive entries from disk and indexes them by path for O(1) lookup. +func buildArchiveIndex(logsDir string) map[string]tuikitIO.ArchiveEntry { + entries, err := tuikitIO.ListArchiveEntries(logsDir) + if err != nil || len(entries) == 0 { + return nil + } + index := make(map[string]tuikitIO.ArchiveEntry, len(entries)) + for _, e := range entries { + index[e.Path] = e + } + return index +} + +// joinRecords merges execution records with their log archive entries and sorts by StartedAt descending. +func joinRecords(records []store.ExecutionRecord, archiveIndex map[string]tuikitIO.ArchiveEntry) []UnifiedRecord { + unified := make([]UnifiedRecord, 0, len(records)) + for _, r := range records { + ur := UnifiedRecord{ExecutionRecord: r} + if archiveIndex != nil { + if entry, ok := archiveIndex[r.LogArchiveID]; ok { + ur.LogEntry = &entry + } + } + unified = append(unified, ur) + } + sort.Slice(unified, func(i, j int) bool { + return unified[i].StartedAt.After(unified[j].StartedAt) + }) + return unified +} diff --git a/internal/io/logs/views.go b/internal/io/logs/views.go index 22e918e7..7fa987e4 100644 --- a/internal/io/logs/views.go +++ b/internal/io/logs/views.go @@ -2,97 +2,136 @@ package logs import ( "fmt" - "slices" + "time" "github.com/flowexec/tuikit" tuikitIO "github.com/flowexec/tuikit/io" "github.com/flowexec/tuikit/themes" "github.com/flowexec/tuikit/types" "github.com/flowexec/tuikit/views" + + "github.com/flowexec/flow/pkg/store" ) -func NewLogView( +// NewUnifiedLogView creates a TUI view for unified execution records. +// If lastEntry is true, it shows the detail view for the most recent record. +func NewUnifiedLogView( container *tuikit.Container, - archiveDir string, + records []UnifiedRecord, lastEntry bool, + ds store.DataStore, ) tuikit.View { - entries, err := tuikitIO.ListArchiveEntries(archiveDir) - if err != nil { - return views.NewErrorView(err, container.RenderState().Theme) - } - if len(entries) == 0 { - return views.NewErrorView(fmt.Errorf("no log entries found"), container.RenderState().Theme) + if len(records) == 0 { + return views.NewErrorView(fmt.Errorf("no execution history found"), container.RenderState().Theme) } - // Most recent first - slices.Reverse(entries) - if lastEntry { - return logDetailView(container, entries[0]) + return unifiedDetailView(container, records[0], ds) } - - return logListView(container, entries) + return unifiedListView(container, records, ds) } -func logDetailView(container *tuikit.Container, entry tuikitIO.ArchiveEntry) tuikit.View { - content, err := entry.Read() - if err != nil { - return views.NewErrorView(err, container.RenderState().Theme) - } - if content == "" { - content = "no data found in log entry" +func statusText(exitCode int) string { + if exitCode == 0 { + return "ok" } - - metadata := []views.DetailField{ - {Key: "Executable", Value: entry.ID}, - {Key: "Time", Value: entry.Description()}, - } - - detail := views.NewDetailView(container.RenderState(), content, metadata...) - detail.SetKeyCallbacks([]types.KeyCallback{ - {Key: "d", Label: "delete", Callback: func() error { - if err := tuikitIO.DeleteArchiveEntry(entry.Path); err != nil { - container.SetNotice("unable to delete log entry", themes.OutputLevelError) - } else { - container.SetNotice("log entry deleted", themes.OutputLevelSuccess) - } - return nil - }}, - }) - return detail + return fmt.Sprintf("exit(%d)", exitCode) } -func logListView(container *tuikit.Container, entries []tuikitIO.ArchiveEntry) tuikit.View { +func unifiedListView(container *tuikit.Container, records []UnifiedRecord, ds store.DataStore) tuikit.View { columns := []views.TableColumn{ - {Title: fmt.Sprintf("Logs (%d)", len(entries)), Percentage: 50}, - {Title: "Time", Percentage: 50}, + {Title: fmt.Sprintf("History (%d)", len(records)), Percentage: 35}, + {Title: "Time", Percentage: 25}, + {Title: "Duration", Percentage: 20}, + {Title: "Status", Percentage: 20}, } - rows := make([]views.TableRow, 0, len(entries)) - for i, e := range entries { + rows := make([]views.TableRow, 0, len(records)) + for i, r := range records { rows = append(rows, views.TableRow{ - Data: []string{e.ID, e.Description(), fmt.Sprintf("%d", i)}, + Data: []string{ + r.Ref, + r.StartedAt.Format(time.RFC3339), + r.Duration.Round(time.Millisecond).String(), + statusText(r.ExitCode), + fmt.Sprintf("%d", i), + }, }) } + table := views.NewTable(container.RenderState(), columns, rows, views.TableDisplayMini) table.SetOnSelect(func(_ int) error { row := table.GetSelectedRow() - if row == nil || len(row.Data()) < 3 { - return fmt.Errorf("no log entry selected") + if row == nil || len(row.Data()) < 5 { + return fmt.Errorf("no record selected") } var idx int - if _, err := fmt.Sscanf(row.Data()[2], "%d", &idx); err != nil || idx >= len(entries) { - return fmt.Errorf("invalid log entry") + if _, err := fmt.Sscanf(row.Data()[4], "%d", &idx); err != nil || idx >= len(records) { + return fmt.Errorf("invalid record") } - return container.SetView(logDetailView(container, entries[idx])) + return container.SetView(unifiedDetailView(container, records[idx], ds)) }) table.SetKeyCallbacks([]types.KeyCallback{ {Key: "x", Label: "delete all", Callback: func() error { - for _, e := range entries { - _ = tuikitIO.DeleteArchiveEntry(e.Path) + for _, r := range records { + if r.LogEntry != nil { + _ = tuikitIO.DeleteArchiveEntry(r.LogEntry.Path) + } + } + if ds != nil { + refs := make(map[string]bool) + for _, r := range records { + refs[r.Ref] = true + } + for ref := range refs { + _ = ds.DeleteExecutionHistory(ref) + } } - container.SetNotice("all log entries deleted", themes.OutputLevelSuccess) + container.SetNotice("all execution history deleted", themes.OutputLevelSuccess) return nil }}, }) return table } + +func unifiedDetailView(container *tuikit.Container, record UnifiedRecord, ds store.DataStore) tuikit.View { + var body string + switch record.LogEntry { + case nil: + body = "no log content available" + default: + content, err := record.LogEntry.Read() + switch { + case err != nil: + body = fmt.Sprintf("error reading log: %v", err) + case content == "": + body = "no data found in log entry" + default: + body = content + } + } + + metadata := []views.DetailField{ + {Key: "Executable", Value: record.Ref}, + {Key: "Time", Value: record.StartedAt.Format(time.RFC3339)}, + {Key: "Duration", Value: record.Duration.Round(time.Millisecond).String()}, + {Key: "Status", Value: statusText(record.ExitCode)}, + } + if record.Error != "" { + metadata = append(metadata, views.DetailField{Key: "Error", Value: record.Error}) + } + + detail := views.NewDetailView(container.RenderState(), body, metadata...) + detail.SetKeyCallbacks([]types.KeyCallback{ + {Key: "d", Label: "delete", Callback: func() error { + if record.LogEntry != nil { + _ = tuikitIO.DeleteArchiveEntry(record.LogEntry.Path) + } + if ds != nil { + _ = ds.DeleteExecutionHistory(record.Ref) + } + container.SetNotice("execution record deleted", themes.OutputLevelSuccess) + return nil + }}, + }) + return detail +} diff --git a/internal/runner/parallel/parallel.go b/internal/runner/parallel/parallel.go index 24466023..6f589c28 100644 --- a/internal/runner/parallel/parallel.go +++ b/internal/runner/parallel/parallel.go @@ -15,11 +15,11 @@ import ( "github.com/flowexec/flow/internal/runner" "github.com/flowexec/flow/internal/runner/engine" - "github.com/flowexec/flow/internal/services/store" envUtils "github.com/flowexec/flow/internal/utils/env" execUtils "github.com/flowexec/flow/internal/utils/executables" "github.com/flowexec/flow/pkg/context" "github.com/flowexec/flow/pkg/logger" + "github.com/flowexec/flow/pkg/store" "github.com/flowexec/flow/types/executable" ) @@ -215,8 +215,12 @@ func handleExec( runExec := func() error { task := tracker.StartTask(taskName) // Shallow-copy the context so each goroutine has its own CurrentTask + // and a /dev/null stdin — multiple goroutines sharing a terminal fd + // causes escape sequence responses to leak into captured output. taskCtx := *ctx taskCtx.CurrentTask = task + devNull, _ := os.Open(os.DevNull) + taskCtx.SetIO(devNull, ctx.StdOut()) err := runner.Exec(&taskCtx, exec, eng, childEnv, childArgs) if err != nil { tracker.CompleteTask(task, io.TaskFailed, err) @@ -233,22 +237,10 @@ func handleExec( stepNum := i + 1 totalSteps := len(parallelSpec.Execs) conditionFunc = func() (bool, error) { - str, err := store.NewStore(store.Path()) + cacheData, err := ctx.DataStore.GetAllProcessVars(store.EnvironmentBucket()) if err != nil { return false, err } - if _, err := str.CreateAndSetBucket(store.EnvironmentBucket()); err != nil { - _ = str.Close() - return false, err - } - cacheData, err := str.GetAll() - if err != nil { - _ = str.Close() - return false, err - } - if err := str.Close(); err != nil { - logger.Log().WrapError(err, "unable to close store") - } conditionalData := runner.ExpressionEnv(ctx, parent, cacheData, inputEnv) truthy, err := expression.IsTruthy(ifCondition, conditionalData) diff --git a/internal/runner/serial/serial.go b/internal/runner/serial/serial.go index 45bdfb0e..e046c0d4 100644 --- a/internal/runner/serial/serial.go +++ b/internal/runner/serial/serial.go @@ -13,11 +13,11 @@ import ( "github.com/flowexec/flow/internal/runner" "github.com/flowexec/flow/internal/runner/engine" - "github.com/flowexec/flow/internal/services/store" envUtils "github.com/flowexec/flow/internal/utils/env" execUtils "github.com/flowexec/flow/internal/utils/executables" "github.com/flowexec/flow/pkg/context" "github.com/flowexec/flow/pkg/logger" + "github.com/flowexec/flow/pkg/store" "github.com/flowexec/flow/types/executable" ) @@ -220,22 +220,10 @@ func handleExec( stepNum := i + 1 totalSteps := len(serialSpec.Execs) conditionFunc = func() (bool, error) { - str, err := store.NewStore(store.Path()) + cacheData, err := ctx.DataStore.GetAllProcessVars(store.EnvironmentBucket()) if err != nil { return false, err } - if _, err := str.CreateAndSetBucket(store.EnvironmentBucket()); err != nil { - _ = str.Close() - return false, err - } - cacheData, err := str.GetAll() - if err != nil { - _ = str.Close() - return false, err - } - if err := str.Close(); err != nil { - logger.Log().WrapError(err, "unable to close store") - } conditionalData := runner.ExpressionEnv(ctx, parent, cacheData, inputEnv) truthy, err := expression.IsTruthy(ifCondition, conditionalData) diff --git a/internal/services/store/mocks/mock_store.go b/internal/services/store/mocks/mock_store.go deleted file mode 100644 index e3242d4e..00000000 --- a/internal/services/store/mocks/mock_store.go +++ /dev/null @@ -1,169 +0,0 @@ -// Code generated by MockGen. DO NOT EDIT. -// Source: github.com/flowexec/flow/internal/services/store (interfaces: Store) -// -// Generated by this command: -// -// mockgen -destination=mocks/mock_store.go -package=mocks github.com/flowexec/flow/internal/services/store Store -// - -// Package mocks is a generated GoMock package. -package mocks - -import ( - reflect "reflect" - - gomock "go.uber.org/mock/gomock" -) - -// MockStore is a mock of Store interface. -type MockStore struct { - ctrl *gomock.Controller - recorder *MockStoreMockRecorder -} - -// MockStoreMockRecorder is the mock recorder for MockStore. -type MockStoreMockRecorder struct { - mock *MockStore -} - -// NewMockStore creates a new mock instance. -func NewMockStore(ctrl *gomock.Controller) *MockStore { - mock := &MockStore{ctrl: ctrl} - mock.recorder = &MockStoreMockRecorder{mock} - return mock -} - -// EXPECT returns an object that allows the caller to indicate expected use. -func (m *MockStore) EXPECT() *MockStoreMockRecorder { - return m.recorder -} - -// Close mocks base method. -func (m *MockStore) Close() error { - m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "Close") - ret0, _ := ret[0].(error) - return ret0 -} - -// Close indicates an expected call of Close. -func (mr *MockStoreMockRecorder) Close() *gomock.Call { - mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Close", reflect.TypeOf((*MockStore)(nil).Close)) -} - -// CreateAndSetBucket mocks base method. -func (m *MockStore) CreateAndSetBucket(arg0 string) (string, error) { - m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "CreateAndSetBucket", arg0) - ret0, _ := ret[0].(string) - ret1, _ := ret[1].(error) - return ret0, ret1 -} - -// CreateAndSetBucket indicates an expected call of CreateAndSetBucket. -func (mr *MockStoreMockRecorder) CreateAndSetBucket(arg0 any) *gomock.Call { - mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "CreateAndSetBucket", reflect.TypeOf((*MockStore)(nil).CreateAndSetBucket), arg0) -} - -// CreateBucket mocks base method. -func (m *MockStore) CreateBucket(arg0 string) error { - m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "CreateBucket", arg0) - ret0, _ := ret[0].(error) - return ret0 -} - -// CreateBucket indicates an expected call of CreateBucket. -func (mr *MockStoreMockRecorder) CreateBucket(arg0 any) *gomock.Call { - mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "CreateBucket", reflect.TypeOf((*MockStore)(nil).CreateBucket), arg0) -} - -// Delete mocks base method. -func (m *MockStore) Delete(arg0 string) error { - m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "Delete", arg0) - ret0, _ := ret[0].(error) - return ret0 -} - -// Delete indicates an expected call of Delete. -func (mr *MockStoreMockRecorder) Delete(arg0 any) *gomock.Call { - mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Delete", reflect.TypeOf((*MockStore)(nil).Delete), arg0) -} - -// DeleteBucket mocks base method. -func (m *MockStore) DeleteBucket(arg0 string) error { - m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "DeleteBucket", arg0) - ret0, _ := ret[0].(error) - return ret0 -} - -// DeleteBucket indicates an expected call of DeleteBucket. -func (mr *MockStoreMockRecorder) DeleteBucket(arg0 any) *gomock.Call { - mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "DeleteBucket", reflect.TypeOf((*MockStore)(nil).DeleteBucket), arg0) -} - -// Get mocks base method. -func (m *MockStore) Get(arg0 string) (string, error) { - m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "Get", arg0) - ret0, _ := ret[0].(string) - ret1, _ := ret[1].(error) - return ret0, ret1 -} - -// Get indicates an expected call of Get. -func (mr *MockStoreMockRecorder) Get(arg0 any) *gomock.Call { - mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Get", reflect.TypeOf((*MockStore)(nil).Get), arg0) -} - -// GetAll mocks base method. -func (m *MockStore) GetAll() (map[string]string, error) { - m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "GetAll") - ret0, _ := ret[0].(map[string]string) - ret1, _ := ret[1].(error) - return ret0, ret1 -} - -// GetAll indicates an expected call of GetAll. -func (mr *MockStoreMockRecorder) GetAll() *gomock.Call { - mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetAll", reflect.TypeOf((*MockStore)(nil).GetAll)) -} - -// GetKeys mocks base method. -func (m *MockStore) GetKeys() ([]string, error) { - m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "GetKeys") - ret0, _ := ret[0].([]string) - ret1, _ := ret[1].(error) - return ret0, ret1 -} - -// GetKeys indicates an expected call of GetKeys. -func (mr *MockStoreMockRecorder) GetKeys() *gomock.Call { - mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetKeys", reflect.TypeOf((*MockStore)(nil).GetKeys)) -} - -// Set mocks base method. -func (m *MockStore) Set(arg0, arg1 string) error { - m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "Set", arg0, arg1) - ret0, _ := ret[0].(error) - return ret0 -} - -// Set indicates an expected call of Set. -func (mr *MockStoreMockRecorder) Set(arg0, arg1 any) *gomock.Call { - mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Set", reflect.TypeOf((*MockStore)(nil).Set), arg0, arg1) -} diff --git a/internal/services/store/store.go b/internal/services/store/store.go deleted file mode 100644 index bec6991e..00000000 --- a/internal/services/store/store.go +++ /dev/null @@ -1,284 +0,0 @@ -package store - -import ( - "fmt" - "os" - "path/filepath" - "strings" - "time" - - "github.com/flowexec/flow/pkg/filesystem" - "github.com/pkg/errors" - bolt "go.etcd.io/bbolt" - boltErrors "go.etcd.io/bbolt/errors" -) - -const ( - BucketEnv = "FLOW_PROCESS_BUCKET" - RootBucket = "root" - - storeFileName = "store.db" -) - -//go:generate mockgen -destination=mocks/mock_store.go -package=mocks github.com/flowexec/flow/internal/services/store Store -type Store interface { - CreateBucket(id string) error - CreateAndSetBucket(id string) (string, error) - DeleteBucket(id string) error - - Set(key, value string) error - Get(key string) (string, error) - GetAll() (map[string]string, error) - GetKeys() ([]string, error) - Delete(key string) error - - Close() error -} - -type BoltStore struct { - db *bolt.DB - processBucket string -} - -// NewStore creates a new store with a given db path -// If dbPath is empty, it will use the default path -func NewStore(dbPath string) (Store, error) { - if dbPath == "" { - dbPath = Path() - } - db, err := bolt.Open(dbPath, 0666, &bolt.Options{Timeout: 3 * time.Second}) - if err != nil { - return nil, fmt.Errorf("failed to open db: %w", err) - } - return &BoltStore{db: db}, nil -} - -// CreateBucket creates a bucket with a given id if it doesn't exist -func (s *BoltStore) CreateBucket(id string) error { - return s.db.Update(func(tx *bolt.Tx) error { - _, err := tx.CreateBucketIfNotExists([]byte(id)) - if err != nil { - return fmt.Errorf("failed to create bucket %s: %w", id, err) - } - return nil - }) -} - -// DeleteBucket deletes a bucket by its id -func (s *BoltStore) DeleteBucket(id string) error { - return s.db.Update(func(tx *bolt.Tx) error { - err := tx.DeleteBucket([]byte(id)) - if err != nil { - if errors.Is(err, boltErrors.ErrBucketNotFound) { - return nil - } - return fmt.Errorf("failed to delete bucket %s: %w", id, err) - } - return nil - }) -} - -// CreateAndSetBucket creates a temporary bucket for the process and returns the bucket's name -func (s *BoltStore) CreateAndSetBucket(id string) (string, error) { - if err := s.CreateBucket(id); err != nil { - return "", fmt.Errorf("failed to create bucket %s: %w", id, err) - } - s.processBucket = id - if id != RootBucket { - _ = os.Setenv(BucketEnv, id) - } - return id, nil -} - -func EnvironmentBucket() string { - id := RootBucket - if val, set := os.LookupEnv(BucketEnv); set && val != "" { - id = val - } - replacer := strings.NewReplacer(":", "_", "/", "_", " ", "_") - id = replacer.Replace(id) - return id -} - -// Set stores a key-value pair in the process bucket -func (s *BoltStore) Set(key, value string) error { - if s.processBucket == "" { - if _, err := s.CreateAndSetBucket(RootBucket); err != nil { - return fmt.Errorf("failed to create process bucket: %w", err) - } - } - return s.db.Update(func(tx *bolt.Tx) error { - bucket := tx.Bucket([]byte(s.processBucket)) - if bucket == nil { - return fmt.Errorf("bucket %s not found", s.processBucket) - } - err := bucket.Put([]byte(key), []byte(value)) - if err != nil { - return fmt.Errorf("failed to put key-value pair for key %s in bucket %s: %w", key, s.processBucket, err) - } - return nil - }) -} - -// Get retrieves a value for a key from the process bucket -func (s *BoltStore) Get(key string) (string, error) { - if s.processBucket == "" { - if _, err := s.CreateAndSetBucket(RootBucket); err != nil { - return "", fmt.Errorf("failed to create process bucket: %w", err) - } - } - var value []byte - err := s.db.View(func(tx *bolt.Tx) error { - bucket := tx.Bucket([]byte(s.processBucket)) - if bucket == nil { - return fmt.Errorf("bucket %s not found", s.processBucket) - } - value = bucket.Get([]byte(key)) - if value == nil && s.processBucket != RootBucket { - rBucket := tx.Bucket([]byte(RootBucket)) - if rBucket != nil { - value = rBucket.Get([]byte(key)) - } - } - if value == nil { - return fmt.Errorf("key %s not found in bucket %s", key, s.processBucket) - } - return nil - }) - return string(value), err -} - -// Keys returns all keys in the process bucket -func (s *BoltStore) GetKeys() ([]string, error) { - if s.processBucket == "" { - if _, err := s.CreateAndSetBucket(RootBucket); err != nil { - return nil, fmt.Errorf("failed to create process bucket: %w", err) - } - } - - var keys []string - err := s.db.View(func(tx *bolt.Tx) error { - bucket := tx.Bucket([]byte(s.processBucket)) - if bucket == nil { - return fmt.Errorf("bucket %s not found", s.processBucket) - } - - // Collect keys from process bucket - processKeys := make(map[string]bool) - err := bucket.ForEach(func(k, _ []byte) error { - key := string(k) - keys = append(keys, key) - processKeys[key] = true - return nil - }) - if err != nil { - return err - } - - // If not using root bucket, also collect keys from root bucket - if s.processBucket != RootBucket { - rBucket := tx.Bucket([]byte(RootBucket)) - if rBucket == nil { - return nil - } - err := rBucket.ForEach(func(k, _ []byte) error { - // Only add keys that aren't already in the process bucket - if key := string(k); !processKeys[key] { - keys = append(keys, key) - } - return nil - }) - if err != nil { - return err - } - } - - return nil - }) - return keys, err -} - -// BucketMap returns a map of all keys and values in the process bucket -func (s *BoltStore) GetAll() (map[string]string, error) { - if s.processBucket == "" { - if _, err := s.CreateAndSetBucket(RootBucket); err != nil { - return nil, fmt.Errorf("failed to create process bucket: %w", err) - } - } - - m := make(map[string]string) - err := s.db.View(func(tx *bolt.Tx) error { - bucket := tx.Bucket([]byte(s.processBucket)) - if bucket == nil { - return fmt.Errorf("bucket %s not found", s.processBucket) - } - - // First, collect all key-value pairs from process bucket - err := bucket.ForEach(func(k, v []byte) error { - m[string(k)] = string(v) - return nil - }) - if err != nil { - return err - } - - // If not using root bucket, also collect from root bucket - if s.processBucket != RootBucket { - rBucket := tx.Bucket([]byte(RootBucket)) - if rBucket == nil { - return nil - } - err := rBucket.ForEach(func(k, v []byte) error { - key := string(k) - // Only add if key doesn't already exist in process bucket - if _, exists := m[key]; !exists { - m[key] = string(v) - } - return nil - }) - if err != nil { - return err - } - } - - return nil - }) - return m, err -} - -// Delete removes a key from the process bucket -func (s *BoltStore) Delete(key string) error { - if s.processBucket == "" { - if _, err := s.CreateAndSetBucket(RootBucket); err != nil { - return fmt.Errorf("failed to create process bucket: %w", err) - } - } - return s.db.Update(func(tx *bolt.Tx) error { - bucket := tx.Bucket([]byte(s.processBucket)) - if bucket == nil { - return fmt.Errorf("bucket %s not found", s.processBucket) - } - err := bucket.Delete([]byte(key)) - if err != nil { - return fmt.Errorf("failed to delete key %s from bucket %s: %w", key, s.processBucket, err) - } - return nil - }) -} - -func (s *BoltStore) Close() error { - return s.db.Close() -} - -func DestroyStore() error { - err := os.Remove(Path()) - if err != nil && !errors.Is(err, os.ErrNotExist) { - return fmt.Errorf("failed to destroy store: %w", err) - } - return nil -} - -func Path() string { - cacheDir := filesystem.CachedDataDirPath() - return filepath.Join(cacheDir, storeFileName) -} diff --git a/internal/services/store/store_test.go b/internal/services/store/store_test.go deleted file mode 100644 index 32f4e3d6..00000000 --- a/internal/services/store/store_test.go +++ /dev/null @@ -1,157 +0,0 @@ -package store_test - -import ( - "fmt" - "path/filepath" - "testing" - - . "github.com/onsi/ginkgo/v2" - . "github.com/onsi/gomega" - - "github.com/flowexec/flow/internal/services/store" -) - -func TestStore(t *testing.T) { - RegisterFailHandler(Fail) - RunSpecs(t, "BoltStore Suite") -} - -var _ = Describe("BoltStore", func() { - var s store.Store - var err error - - BeforeEach(func() { - path := filepath.Join(GinkgoT().TempDir(), fmt.Sprintf("test_%s.db", GinkgoT().Name())) - s, err = store.NewStore(path) - Expect(err).NotTo(HaveOccurred()) - Expect(s).NotTo(BeNil()) - }) - - AfterEach(func() { - err = s.Close() - Expect(err).NotTo(HaveOccurred()) - }) - - Describe("CreateBucket", func() { - It("should create a new bucket", func() { - err := s.CreateBucket(store.RootBucket) - Expect(err).NotTo(HaveOccurred()) - }) - }) - - Describe("DeleteBucket", func() { - It("should delete an existing bucket", func() { - err := s.CreateBucket("test") - Expect(err).NotTo(HaveOccurred()) - - err = s.DeleteBucket("test") - Expect(err).NotTo(HaveOccurred()) - }) - }) - - Describe("Set and Get", func() { - It("should set and get a key-value pair", func() { - err = s.Set("key", "value") - Expect(err).NotTo(HaveOccurred()) - - value, err := s.Get("key") - Expect(err).NotTo(HaveOccurred()) - Expect(value).To(Equal("value")) - }) - - It("should set and get key-value pairs across multiple buckets", func() { - By("setting a key-value pair in the root bucket") - err = s.Set("key", "value") - Expect(err).NotTo(HaveOccurred()) - - err = s.Set("key-inherited", "value2") - Expect(err).NotTo(HaveOccurred()) - - By("setting a key-value pair in a process bucket") - id, err := s.CreateAndSetBucket("process2") - Expect(id).To(Equal("process2")) - Expect(err).NotTo(HaveOccurred()) - - err = s.Set("key", "value3") - Expect(err).NotTo(HaveOccurred()) - - By("getting key-value pairs from a process bucket") - _, err = s.CreateAndSetBucket(store.EnvironmentBucket()) - Expect(err).NotTo(HaveOccurred()) - - value, err := s.Get("key") - Expect(err).NotTo(HaveOccurred()) - Expect(value).To(Equal("value3")) - - _, err = s.Get("key-unset") - Expect(err).To(HaveOccurred()) - - value, err = s.Get("key-inherited") - Expect(err).NotTo(HaveOccurred()) - Expect(value).To(Equal("value2")) - - By("getting key-value pairs from the root bucket") - id, err = s.CreateAndSetBucket(store.RootBucket) - Expect(id).To(Equal(store.RootBucket)) - Expect(err).NotTo(HaveOccurred()) - - value, err = s.Get("key") - Expect(err).NotTo(HaveOccurred()) - Expect(value).To(Equal("value")) - }) - }) - - Describe("GetAll", func() { - It("should get all key-value pairs from the bucket", func() { - err = s.CreateBucket("test") - Expect(err).NotTo(HaveOccurred()) - - err = s.Set("key1", "value1") - Expect(err).NotTo(HaveOccurred()) - - err = s.Set("key2", "value2") - Expect(err).NotTo(HaveOccurred()) - - pairs, err := s.GetAll() - Expect(err).NotTo(HaveOccurred()) - Expect(pairs).To(HaveLen(2)) - Expect(pairs["key1"]).To(Equal("value1")) - Expect(pairs["key2"]).To(Equal("value2")) - }) - }) - - Describe("GetKeys", func() { - It("should get all keys from the bucket", func() { - err = s.CreateBucket("test") - Expect(err).NotTo(HaveOccurred()) - - err = s.Set("key1", "value1") - Expect(err).NotTo(HaveOccurred()) - - err = s.Set("key2", "value2") - Expect(err).NotTo(HaveOccurred()) - - keys, err := s.GetKeys() - Expect(err).NotTo(HaveOccurred()) - Expect(keys).To(HaveLen(2)) - Expect(keys).To(ContainElement("key1")) - Expect(keys).To(ContainElement("key2")) - }) - }) - - Describe("Delete", func() { - It("should delete a key from the bucket", func() { - err := s.CreateBucket("test") - Expect(err).NotTo(HaveOccurred()) - - err = s.Set("key", "value") - Expect(err).NotTo(HaveOccurred()) - - err = s.Delete("key") - Expect(err).NotTo(HaveOccurred()) - - _, err = s.Get("key") - Expect(err).To(HaveOccurred()) - }) - }) -}) diff --git a/main.go b/main.go index 13da032e..ba34c11d 100644 --- a/main.go +++ b/main.go @@ -7,12 +7,15 @@ import ( "os" "slices" + "github.com/google/uuid" + "github.com/flowexec/flow/cmd" "github.com/flowexec/flow/internal/io" "github.com/flowexec/flow/pkg/cli" "github.com/flowexec/flow/pkg/context" "github.com/flowexec/flow/pkg/filesystem" "github.com/flowexec/flow/pkg/logger" + "github.com/flowexec/flow/pkg/store" "github.com/flowexec/flow/types/executable" ) @@ -22,16 +25,13 @@ func main() { panic(fmt.Errorf("user config load error: %w", err)) } - var archiveDir string - if args := os.Args; len(args) > 1 && slices.Contains(executable.ValidVerbs(), executable.Verb(args[1])) { - // only create a log archive file for exec commands - archiveDir = filesystem.LogsDir() - } + archiveDir, archiveID := initLogArchive() loggerOpts := logger.InitOptions{ StdOut: io.Stdout, LogMode: cfg.DefaultLogMode, Theme: logger.Theme(cfg.Theme.String()), ArchiveDirectory: archiveDir, + ArchiveID: archiveID, } logger.Init(loggerOpts) defer func() { @@ -43,8 +43,11 @@ func main() { } }() + initStore() + bkgCtx, cancelFunc := stdCtx.WithCancel(stdCtx.Background()) ctx := context.NewContext(bkgCtx, cancelFunc, io.Stdin, io.Stdout) + ctx.LogArchiveID = archiveID defer ctx.Finalize() if ctx == nil { @@ -56,3 +59,17 @@ func main() { logger.Log().FatalErr(err) } } + +func initLogArchive() (dir, id string) { + if args := os.Args; len(args) > 1 && slices.Contains(executable.ValidVerbs(), executable.Verb(args[1])) { + dir = filesystem.LogsDir() + id = uuid.New().String() + } + return +} + +func initStore() { + if err := store.MigrateProcessBuckets(""); err != nil { + logger.Log().Debug("process bucket migration skipped or failed", "err", err) + } +} diff --git a/pkg/cache/cache.go b/pkg/cache/cache.go index 6f6a6f70..1226d934 100644 --- a/pkg/cache/cache.go +++ b/pkg/cache/cache.go @@ -1,12 +1,18 @@ package cache -func UpdateAll() error { - wsCache := NewWorkspaceCache() +import ( + "github.com/flowexec/flow/pkg/store" +) + +// UpdateAll refreshes all caches using the provided DataStore. +// The caller is responsible for the DataStore lifecycle (closing, etc.). +func UpdateAll(ds store.DataStore) error { + wsCache := NewWorkspaceCache(ds) if err := wsCache.Update(); err != nil { return err } - execCache := NewExecutableCache(wsCache) + execCache := NewExecutableCache(wsCache, ds) if err := execCache.Update(); err != nil { return err } diff --git a/pkg/cache/executables_cache.go b/pkg/cache/executables_cache.go index f70e8ed6..90b24d1d 100644 --- a/pkg/cache/executables_cache.go +++ b/pkg/cache/executables_cache.go @@ -1,7 +1,10 @@ package cache import ( + "encoding/json" "fmt" + "os" + "path/filepath" "github.com/pkg/errors" "gopkg.in/yaml.v3" @@ -10,6 +13,7 @@ import ( flowErrors "github.com/flowexec/flow/pkg/errors" "github.com/flowexec/flow/pkg/filesystem" "github.com/flowexec/flow/pkg/logger" + "github.com/flowexec/flow/pkg/store" "github.com/flowexec/flow/types/common" "github.com/flowexec/flow/types/executable" "github.com/flowexec/flow/types/workspace" @@ -42,10 +46,12 @@ type ExecutableCacheData struct { type ExecutableCacheImpl struct { Data *ExecutableCacheData `json:",inline" yaml:",inline"` WorkspaceCache WorkspaceCache `json:"-" yaml:"-"` + Store store.DataStore } -func NewExecutableCache(wsCache WorkspaceCache) ExecutableCache { +func NewExecutableCache(wsCache WorkspaceCache, s store.DataStore) ExecutableCache { return &ExecutableCacheImpl{ + Store: s, Data: &ExecutableCacheData{ ExecutableMap: make(map[executable.Ref]string), AliasMap: make(map[executable.Ref]executable.Ref), @@ -136,13 +142,12 @@ func (c *ExecutableCacheImpl) Update() error { //nolint:gocognit } } - data, err := yaml.Marshal(cacheData) + data, err := json.Marshal(cacheData) if err != nil { return errors.Wrap(err, "unable to encode cache data") } - err = filesystem.WriteLatestCachedData(execCacheKey, data) - if err != nil { + if err := c.Store.SetCacheEntry(execCacheKey, data); err != nil { return errors.Wrap(err, "unable to write cache data") } @@ -256,17 +261,30 @@ func (c *ExecutableCacheImpl) GetExecutableList() (executable.ExecutableList, er } func (c *ExecutableCacheImpl) initExecutableCacheData() error { - cacheData, err := filesystem.LoadLatestCachedData(execCacheKey) + cacheData, err := c.Store.GetCacheEntry(execCacheKey) if err != nil { return errors.Wrap(err, "unable to load executable cache data") - } else if cacheData == nil { + } + + if cacheData == nil { + // Lazy-migrate from legacy YAML file if it exists. + cacheData = migrateExecutableCacheFromFile() + if cacheData != nil { + if writeErr := c.Store.SetCacheEntry(execCacheKey, cacheData); writeErr != nil { + logger.Log().Warn("failed to persist migrated executable cache", "err", writeErr) + } + } + } + + if cacheData == nil { if err := c.Update(); err != nil { return errors.Wrap(err, "unable to update executable cache data") } + return nil } c.Data = &ExecutableCacheData{} - if err := yaml.Unmarshal(cacheData, c.Data); err != nil { + if err := json.Unmarshal(cacheData, c.Data); err != nil { return errors.Wrap(err, "unable to decode executable cache data") } return nil @@ -325,3 +343,27 @@ func enumerateExecutableAliasRefs( return refs } + +// migrateExecutableCacheFromFile attempts to read the legacy YAML cache file, re-encodes it +// as JSON, deletes the old file, and returns the JSON bytes. Returns nil if no legacy file +// or if any step fails (best-effort migration). +func migrateExecutableCacheFromFile() []byte { + legacyPath := filepath.Join(filesystem.CachedDataDirPath(), "latestcache", execCacheKey) + yamlData, err := os.ReadFile(legacyPath) + if err != nil { + return nil + } + + var legacy ExecutableCacheData + if err := yaml.Unmarshal(yamlData, &legacy); err != nil { + return nil + } + + jsonData, err := json.Marshal(&legacy) + if err != nil { + return nil + } + + _ = os.Remove(legacyPath) + return jsonData +} diff --git a/pkg/cache/executables_cache_test.go b/pkg/cache/executables_cache_test.go index b1540127..db2c90ae 100644 --- a/pkg/cache/executables_cache_test.go +++ b/pkg/cache/executables_cache_test.go @@ -14,6 +14,7 @@ import ( cacheMocks "github.com/flowexec/flow/pkg/cache/mocks" "github.com/flowexec/flow/pkg/filesystem" "github.com/flowexec/flow/pkg/logger" + "github.com/flowexec/flow/pkg/store" "github.com/flowexec/flow/types/common" "github.com/flowexec/flow/types/executable" "github.com/flowexec/flow/types/workspace" @@ -40,6 +41,10 @@ var _ = Describe("ExecutableCacheImpl", func() { Expect(err).NotTo(HaveOccurred()) Expect(os.Setenv(filesystem.FlowConfigDirEnvVar, configDir)).To(Succeed()) + ds, err := store.NewDataStore(filepath.Join(cacheDir, "test.db")) + Expect(err).NotTo(HaveOccurred()) + DeferCleanup(func() { _ = ds.Close() }) + wsName = "test" wsPath = filepath.Join(cacheDir, "workspace") err = filesystem.InitWorkspaceConfig(wsName, wsPath) @@ -58,20 +63,15 @@ var _ = Describe("ExecutableCacheImpl", func() { execCfg.SetContext(wsName, wsPath, filepath.Join(wsPath, "test"+executable.FlowFileExt)) err = filesystem.WriteFlowFile(execCfg.ConfigPath(), execCfg) Expect(err).NotTo(HaveOccurred()) - execCacheData := &cache.ExecutableCacheData{ - ExecutableMap: make(map[executable.Ref]string), - AliasMap: make(map[executable.Ref]executable.Ref), - ConfigMap: make(map[string]cache.WorkspaceInfo), - } + wsCache = cacheMocks.NewMockWorkspaceCache(gomock.NewController(GinkgoT())) wsCache.EXPECT().GetLatestData().Return(&cache.WorkspaceCacheData{ Workspaces: map[string]*workspace.Workspace{wsName: wsConfig}, WorkspaceLocations: map[string]string{wsName: wsPath}, }, nil).AnyTimes() - execCache = &cache.ExecutableCacheImpl{ - Data: execCacheData, - WorkspaceCache: wsCache, - } + var ok bool + execCache, ok = cache.NewExecutableCache(wsCache, ds).(*cache.ExecutableCacheImpl) + Expect(ok).To(BeTrue()) }) AfterEach(func() { diff --git a/pkg/cache/workspaces_cache.go b/pkg/cache/workspaces_cache.go index 30c4a973..e2637c3b 100644 --- a/pkg/cache/workspaces_cache.go +++ b/pkg/cache/workspaces_cache.go @@ -1,15 +1,20 @@ package cache import ( + "encoding/json" + "os" + "path/filepath" + "github.com/pkg/errors" "gopkg.in/yaml.v3" "github.com/flowexec/flow/pkg/filesystem" "github.com/flowexec/flow/pkg/logger" + "github.com/flowexec/flow/pkg/store" "github.com/flowexec/flow/types/workspace" ) -const wsCacheKey = "workspace" +const wsCacheKey = "workspaces" //go:generate mockgen -destination=mocks/mock_workspace_cache.go -package=mocks github.com/flowexec/flow/pkg/cache WorkspaceCache type WorkspaceCache interface { @@ -20,17 +25,19 @@ type WorkspaceCache interface { } type WorkspaceCacheData struct { // Map of workspace name to workspace config - Workspaces map[string]*workspace.Workspace `yaml:"workspaces"` + Workspaces map[string]*workspace.Workspace `json:"workspaces" yaml:"workspaces"` // Map of workspace name to workspace path - WorkspaceLocations map[string]string `yaml:"workspaceLocations"` + WorkspaceLocations map[string]string `json:"workspaceLocations" yaml:"workspaceLocations"` } type WorkspaceCacheImpl struct { - Data *WorkspaceCacheData + Data *WorkspaceCacheData + Store store.DataStore } -func NewWorkspaceCache() WorkspaceCache { +func NewWorkspaceCache(s store.DataStore) WorkspaceCache { workspaceCache := &WorkspaceCacheImpl{ + Store: s, Data: &WorkspaceCacheData{ Workspaces: make(map[string]*workspace.Workspace), WorkspaceLocations: make(map[string]string), @@ -59,13 +66,12 @@ func (c *WorkspaceCacheImpl) Update() error { cacheData.Workspaces[name] = wsCfg cacheData.WorkspaceLocations[name] = path } - data, err := yaml.Marshal(cacheData) + data, err := json.Marshal(cacheData) if err != nil { return errors.Wrap(err, "unable to encode cache data") } - err = filesystem.WriteLatestCachedData(wsCacheKey, data) - if err != nil { + if err := c.Store.SetCacheEntry(wsCacheKey, data); err != nil { return errors.Wrap(err, "unable to write cache data") } @@ -78,17 +84,30 @@ func (c *WorkspaceCacheImpl) GetData() *WorkspaceCacheData { } func (c *WorkspaceCacheImpl) GetLatestData() (*WorkspaceCacheData, error) { - cacheData, err := filesystem.LoadLatestCachedData(wsCacheKey) + cacheData, err := c.Store.GetCacheEntry(wsCacheKey) if err != nil { return nil, errors.Wrap(err, "unable to load workspace cache data") - } else if cacheData == nil { + } + + if cacheData == nil { + // Lazy-migrate from legacy YAML file if it exists. + cacheData = migrateWorkspaceCacheFromFile() + if cacheData != nil { + if writeErr := c.Store.SetCacheEntry(wsCacheKey, cacheData); writeErr != nil { + logger.Log().Warn("failed to persist migrated workspace cache", "err", writeErr) + } + } + } + + if cacheData == nil { if err := c.Update(); err != nil { return nil, errors.Wrap(err, "unable to get updated workspace cache data") } + return c.Data, nil } c.Data = &WorkspaceCacheData{} - if err := yaml.Unmarshal(cacheData, c.Data); err != nil { + if err := json.Unmarshal(cacheData, c.Data); err != nil { return nil, errors.Wrap(err, "unable to decode workspace cache data") } return c.Data, nil @@ -113,3 +132,28 @@ func (c *WorkspaceCacheImpl) GetWorkspaceConfigList() (workspace.WorkspaceList, } return wsCfgs, nil } + +// migrateWorkspaceCacheFromFile attempts to read the legacy YAML cache file, re-encodes it +// as JSON, deletes the old file, and returns the JSON bytes. Returns nil if no legacy file +// or if any step fails (best-effort migration). +func migrateWorkspaceCacheFromFile() []byte { + // Legacy key was "workspace" (singular); legacy dir was /latestcache/ + legacyPath := filepath.Join(filesystem.CachedDataDirPath(), "latestcache", "workspace") + yamlData, err := os.ReadFile(legacyPath) + if err != nil { + return nil + } + + var legacy WorkspaceCacheData + if err := yaml.Unmarshal(yamlData, &legacy); err != nil { + return nil + } + + jsonData, err := json.Marshal(&legacy) + if err != nil { + return nil + } + + _ = os.Remove(legacyPath) + return jsonData +} diff --git a/pkg/cache/workspaces_cache_test.go b/pkg/cache/workspaces_cache_test.go index e95bcd54..ff9644e6 100644 --- a/pkg/cache/workspaces_cache_test.go +++ b/pkg/cache/workspaces_cache_test.go @@ -2,6 +2,7 @@ package cache_test import ( "os" + "path/filepath" "github.com/flowexec/tuikit/io/mocks" . "github.com/onsi/ginkgo/v2" @@ -11,6 +12,7 @@ import ( "github.com/flowexec/flow/pkg/cache" "github.com/flowexec/flow/pkg/filesystem" "github.com/flowexec/flow/pkg/logger" + "github.com/flowexec/flow/pkg/store" "github.com/flowexec/flow/types/workspace" ) @@ -26,12 +28,6 @@ var _ = Describe("WorkspaceCacheImpl", func() { ctrl := gomock.NewController(GinkgoT()) mockLogger = mocks.NewMockLogger(ctrl) logger.Init(logger.InitOptions{Logger: mockLogger, TestingTB: GinkgoTB()}) - wsCache = &cache.WorkspaceCacheImpl{ - Data: &cache.WorkspaceCacheData{ - Workspaces: make(map[string]*workspace.Workspace), - WorkspaceLocations: make(map[string]string), - }, - } var err error cacheDir, err = os.MkdirTemp("", "flow-cache-test") @@ -41,6 +37,14 @@ var _ = Describe("WorkspaceCacheImpl", func() { Expect(err).NotTo(HaveOccurred()) Expect(os.Setenv(filesystem.FlowConfigDirEnvVar, configDir)).To(Succeed()) + ds, err := store.NewDataStore(filepath.Join(cacheDir, "test.db")) + Expect(err).NotTo(HaveOccurred()) + DeferCleanup(func() { _ = ds.Close() }) + + var ok bool + wsCache, ok = cache.NewWorkspaceCache(ds).(*cache.WorkspaceCacheImpl) + Expect(ok).To(BeTrue()) + testWs := &workspace.Workspace{} testWs.SetContext("test", cacheDir) wsCache.Data.Workspaces["test"] = testWs diff --git a/pkg/context/context.go b/pkg/context/context.go index 556685fb..5d94c666 100644 --- a/pkg/context/context.go +++ b/pkg/context/context.go @@ -16,6 +16,7 @@ import ( "github.com/flowexec/flow/pkg/cache" "github.com/flowexec/flow/pkg/filesystem" "github.com/flowexec/flow/pkg/logger" + "github.com/flowexec/flow/pkg/store" "github.com/flowexec/flow/types/config" "github.com/flowexec/flow/types/executable" "github.com/flowexec/flow/types/workspace" @@ -37,6 +38,7 @@ type Context struct { TUIContainer *tuikit.Container WorkspacesCache cache.WorkspaceCache ExecutableCache cache.ExecutableCache + DataStore store.DataStore // RootExecutable is the executable that is being run in the current context. // This will be nil if the context is not associated with an executable run. @@ -50,6 +52,11 @@ type Context struct { // parallel or serial runner. It is set per-goroutine (via shallow copy) so // that downstream writers can prefix output with the task name. CurrentTask *io.TaskContext + + // LogArchiveID is the unique identifier used in the log archive filename for + // this process. It is set at startup and used to link execution records to + // their log output. + LogArchiveID string } func NewContext(ctx context.Context, cancelFunc context.CancelFunc, stdIn, stdOut *os.File) *Context { @@ -71,8 +78,13 @@ func NewContext(ctx context.Context, cancelFunc context.CancelFunc, stdIn, stdOu panic(fmt.Errorf("workspace config not found in current workspace (%s)", cfg.CurrentWorkspace)) } - workspaceCache := cache.NewWorkspaceCache() - executableCache := cache.NewExecutableCache(workspaceCache) + ds, err := store.NewDataStore(store.Path()) + if err != nil { + panic(errors.Wrap(err, "data store initialization error")) + } + + workspaceCache := cache.NewWorkspaceCache(ds) + executableCache := cache.NewExecutableCache(workspaceCache, ds) c := &Context{ ctx: ctx, @@ -83,6 +95,7 @@ func NewContext(ctx context.Context, cancelFunc context.CancelFunc, stdIn, stdOu CurrentWorkspace: wsConfig, WorkspacesCache: workspaceCache, ExecutableCache: executableCache, + DataStore: ds, } app := tuikit.NewApplication( diff --git a/pkg/filesystem/cache.go b/pkg/filesystem/cache.go index f292e0a9..b18e9696 100644 --- a/pkg/filesystem/cache.go +++ b/pkg/filesystem/cache.go @@ -1,8 +1,6 @@ package filesystem import ( - "bufio" - "bytes" "os" "path/filepath" @@ -22,84 +20,3 @@ func CachedDataDirPath() string { } return filepath.Join(dirname, dataDirName) } - -func LatestCachedDataDir() string { - return CachedDataDirPath() + "/latestcache" -} - -func LatestCachedDataFilePath(cacheKey string) string { - return filepath.Join(LatestCachedDataDir(), cacheKey) -} - -func EnsureCachedDataDir() error { - if _, err := os.Stat(LatestCachedDataDir()); os.IsNotExist(err) { - err = os.MkdirAll(LatestCachedDataDir(), 0750) - if err != nil { - return errors.Wrap(err, "unable to create cache directory") - } - } else if err != nil { - return errors.Wrap(err, "unable to check for cache directory") - } - - return nil -} - -func WriteLatestCachedData(cacheKey string, data []byte) error { - if err := EnsureCachedDataDir(); err != nil { - return errors.Wrap(err, "unable to ensure existence of cache directory") - } - - file, err := os.OpenFile(filepath.Clean(LatestCachedDataFilePath(cacheKey)), os.O_WRONLY|os.O_CREATE, 0600) - if err != nil { - return errors.Wrap(err, "unable to open cache data file") - } - defer file.Close() - - if err := file.Truncate(0); err != nil { - return errors.Wrap(err, "unable to truncate cache data file") - } - - if !bytes.HasSuffix(data, []byte("\n")) { - data = append(data, []byte("\n")...) - } - if _, err := file.Write(data); err != nil { - return errors.Wrap(err, "unable to write cache data file") - } - - return nil -} - -func LoadLatestCachedData(cacheKey string) ([]byte, error) { - if err := EnsureCachedDataDir(); err != nil { - return nil, errors.Wrap(err, "unable to ensure existence of cache directory") - } - - if _, err := os.Stat(LatestCachedDataFilePath(cacheKey)); os.IsNotExist(err) { - return nil, nil - } else if err != nil { - return nil, errors.Wrap(err, "unable to stat cache data file") - } - - file, err := os.Open(LatestCachedDataFilePath(cacheKey)) - if err != nil { - return nil, errors.Wrap(err, "unable to open cache data file") - } - defer file.Close() - - data := make([]byte, 0) - buf := bufio.NewReader(file) - for { - var line []byte - line, err = buf.ReadBytes('\n') - if err != nil { - break - } - data = append(data, line...) - } - if err.Error() != "EOF" { - return nil, errors.Wrap(err, "unable to read cache data file") - } - - data = bytes.TrimSuffix(data, []byte("\n")) - return data, nil -} diff --git a/pkg/filesystem/cache_test.go b/pkg/filesystem/cache_test.go index 339d6a31..cc14e1ec 100644 --- a/pkg/filesystem/cache_test.go +++ b/pkg/filesystem/cache_test.go @@ -2,7 +2,6 @@ package filesystem_test import ( "os" - "path/filepath" "github.com/flowexec/flow/pkg/filesystem" . "github.com/onsi/ginkgo/v2" @@ -10,9 +9,7 @@ import ( ) var _ = Describe("Cache", func() { - var ( - tmpDir string - ) + var tmpDir string BeforeEach(func() { var err error @@ -31,31 +28,4 @@ var _ = Describe("Cache", func() { Expect(filesystem.CachedDataDirPath()).To(Equal(tmpDir)) }) }) - - Describe("LatestCachedDataDir", func() { - It("returns the correct path", func() { - Expect(filesystem.LatestCachedDataDir()).To(Equal(filepath.Join(tmpDir, "latestcache"))) - }) - }) - - Describe("EnsureCachedDataDir", func() { - It("creates the directory if it does not exist", func() { - Expect(filesystem.EnsureCachedDataDir()).To(Succeed()) - _, err := os.Stat(filesystem.LatestCachedDataDir()) - Expect(err).NotTo(HaveOccurred()) - }) - }) - - Describe("WriteLatestCachedData and LoadLatestCachedData", func() { - It("writes and reads data correctly", func() { - cacheKey := "test" - data := []byte("test data") - - Expect(filesystem.WriteLatestCachedData(cacheKey, data)).To(Succeed()) - - readData, err := filesystem.LoadLatestCachedData(cacheKey) - Expect(err).NotTo(HaveOccurred()) - Expect(string(readData)).To(Equal(string(data))) - }) - }) }) diff --git a/pkg/logger/logger.go b/pkg/logger/logger.go index 9efc25d0..37cb178a 100644 --- a/pkg/logger/logger.go +++ b/pkg/logger/logger.go @@ -22,6 +22,7 @@ var ( type InitOptions struct { StdOut *os.File ArchiveDirectory string + ArchiveID string // Unique identifier embedded in the archive filename for reliable linking. LogMode io.LogMode Theme themes.Theme @@ -57,6 +58,9 @@ func Init(opts InitOptions) { } if opts.ArchiveDirectory != "" { loggerOpts = append(loggerOpts, io.WithArchiveDirectory(opts.ArchiveDirectory)) + if opts.ArchiveID != "" { + loggerOpts = append(loggerOpts, io.WithArchiveID(opts.ArchiveID)) + } } globalLogger = io.NewLogger(loggerOpts...) diff --git a/pkg/store/mocks/mock_data_store.go b/pkg/store/mocks/mock_data_store.go new file mode 100644 index 00000000..922eaced --- /dev/null +++ b/pkg/store/mocks/mock_data_store.go @@ -0,0 +1,256 @@ +// Code generated by MockGen. DO NOT EDIT. +// Source: github.com/flowexec/flow/pkg/store (interfaces: DataStore) +// +// Generated by this command: +// +// mockgen -destination=mocks/mock_data_store.go -package=mocks github.com/flowexec/flow/pkg/store DataStore +// + +// Package mocks is a generated GoMock package. +package mocks + +import ( + reflect "reflect" + + store "github.com/flowexec/flow/pkg/store" + gomock "go.uber.org/mock/gomock" +) + +// MockDataStore is a mock of DataStore interface. +type MockDataStore struct { + ctrl *gomock.Controller + recorder *MockDataStoreMockRecorder +} + +// MockDataStoreMockRecorder is the mock recorder for MockDataStore. +type MockDataStoreMockRecorder struct { + mock *MockDataStore +} + +// NewMockDataStore creates a new mock instance. +func NewMockDataStore(ctrl *gomock.Controller) *MockDataStore { + mock := &MockDataStore{ctrl: ctrl} + mock.recorder = &MockDataStoreMockRecorder{mock} + return mock +} + +// EXPECT returns an object that allows the caller to indicate expected use. +func (m *MockDataStore) EXPECT() *MockDataStoreMockRecorder { + return m.recorder +} + +// Close mocks base method. +func (m *MockDataStore) Close() error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Close") + ret0, _ := ret[0].(error) + return ret0 +} + +// Close indicates an expected call of Close. +func (mr *MockDataStoreMockRecorder) Close() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Close", reflect.TypeOf((*MockDataStore)(nil).Close)) +} + +// CreateProcessBucket mocks base method. +func (m *MockDataStore) CreateProcessBucket(arg0 string) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "CreateProcessBucket", arg0) + ret0, _ := ret[0].(error) + return ret0 +} + +// CreateProcessBucket indicates an expected call of CreateProcessBucket. +func (mr *MockDataStoreMockRecorder) CreateProcessBucket(arg0 any) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "CreateProcessBucket", reflect.TypeOf((*MockDataStore)(nil).CreateProcessBucket), arg0) +} + +// DeleteCacheEntry mocks base method. +func (m *MockDataStore) DeleteCacheEntry(arg0 string) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "DeleteCacheEntry", arg0) + ret0, _ := ret[0].(error) + return ret0 +} + +// DeleteCacheEntry indicates an expected call of DeleteCacheEntry. +func (mr *MockDataStoreMockRecorder) DeleteCacheEntry(arg0 any) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "DeleteCacheEntry", reflect.TypeOf((*MockDataStore)(nil).DeleteCacheEntry), arg0) +} + +// DeleteExecutionHistory mocks base method. +func (m *MockDataStore) DeleteExecutionHistory(arg0 string) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "DeleteExecutionHistory", arg0) + ret0, _ := ret[0].(error) + return ret0 +} + +// DeleteExecutionHistory indicates an expected call of DeleteExecutionHistory. +func (mr *MockDataStoreMockRecorder) DeleteExecutionHistory(arg0 any) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "DeleteExecutionHistory", reflect.TypeOf((*MockDataStore)(nil).DeleteExecutionHistory), arg0) +} + +// DeleteProcessBucket mocks base method. +func (m *MockDataStore) DeleteProcessBucket(arg0 string) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "DeleteProcessBucket", arg0) + ret0, _ := ret[0].(error) + return ret0 +} + +// DeleteProcessBucket indicates an expected call of DeleteProcessBucket. +func (mr *MockDataStoreMockRecorder) DeleteProcessBucket(arg0 any) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "DeleteProcessBucket", reflect.TypeOf((*MockDataStore)(nil).DeleteProcessBucket), arg0) +} + +// DeleteProcessVar mocks base method. +func (m *MockDataStore) DeleteProcessVar(arg0, arg1 string) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "DeleteProcessVar", arg0, arg1) + ret0, _ := ret[0].(error) + return ret0 +} + +// DeleteProcessVar indicates an expected call of DeleteProcessVar. +func (mr *MockDataStoreMockRecorder) DeleteProcessVar(arg0, arg1 any) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "DeleteProcessVar", reflect.TypeOf((*MockDataStore)(nil).DeleteProcessVar), arg0, arg1) +} + +// GetAllProcessVars mocks base method. +func (m *MockDataStore) GetAllProcessVars(arg0 string) (map[string]string, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "GetAllProcessVars", arg0) + ret0, _ := ret[0].(map[string]string) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// GetAllProcessVars indicates an expected call of GetAllProcessVars. +func (mr *MockDataStoreMockRecorder) GetAllProcessVars(arg0 any) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetAllProcessVars", reflect.TypeOf((*MockDataStore)(nil).GetAllProcessVars), arg0) +} + +// GetCacheEntry mocks base method. +func (m *MockDataStore) GetCacheEntry(arg0 string) ([]byte, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "GetCacheEntry", arg0) + ret0, _ := ret[0].([]byte) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// GetCacheEntry indicates an expected call of GetCacheEntry. +func (mr *MockDataStoreMockRecorder) GetCacheEntry(arg0 any) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetCacheEntry", reflect.TypeOf((*MockDataStore)(nil).GetCacheEntry), arg0) +} + +// GetExecutionHistory mocks base method. +func (m *MockDataStore) GetExecutionHistory(arg0 string, arg1 int) ([]store.ExecutionRecord, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "GetExecutionHistory", arg0, arg1) + ret0, _ := ret[0].([]store.ExecutionRecord) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// GetExecutionHistory indicates an expected call of GetExecutionHistory. +func (mr *MockDataStoreMockRecorder) GetExecutionHistory(arg0, arg1 any) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetExecutionHistory", reflect.TypeOf((*MockDataStore)(nil).GetExecutionHistory), arg0, arg1) +} + +// GetProcessVar mocks base method. +func (m *MockDataStore) GetProcessVar(arg0, arg1 string) (string, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "GetProcessVar", arg0, arg1) + ret0, _ := ret[0].(string) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// GetProcessVar indicates an expected call of GetProcessVar. +func (mr *MockDataStoreMockRecorder) GetProcessVar(arg0, arg1 any) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetProcessVar", reflect.TypeOf((*MockDataStore)(nil).GetProcessVar), arg0, arg1) +} + +// GetProcessVarKeys mocks base method. +func (m *MockDataStore) GetProcessVarKeys(arg0 string) ([]string, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "GetProcessVarKeys", arg0) + ret0, _ := ret[0].([]string) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// GetProcessVarKeys indicates an expected call of GetProcessVarKeys. +func (mr *MockDataStoreMockRecorder) GetProcessVarKeys(arg0 any) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetProcessVarKeys", reflect.TypeOf((*MockDataStore)(nil).GetProcessVarKeys), arg0) +} + +// ListExecutionRefs mocks base method. +func (m *MockDataStore) ListExecutionRefs() ([]string, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "ListExecutionRefs") + ret0, _ := ret[0].([]string) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// ListExecutionRefs indicates an expected call of ListExecutionRefs. +func (mr *MockDataStoreMockRecorder) ListExecutionRefs() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ListExecutionRefs", reflect.TypeOf((*MockDataStore)(nil).ListExecutionRefs)) +} + +// RecordExecution mocks base method. +func (m *MockDataStore) RecordExecution(arg0 store.ExecutionRecord) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "RecordExecution", arg0) + ret0, _ := ret[0].(error) + return ret0 +} + +// RecordExecution indicates an expected call of RecordExecution. +func (mr *MockDataStoreMockRecorder) RecordExecution(arg0 any) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "RecordExecution", reflect.TypeOf((*MockDataStore)(nil).RecordExecution), arg0) +} + +// SetCacheEntry mocks base method. +func (m *MockDataStore) SetCacheEntry(arg0 string, arg1 []byte) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "SetCacheEntry", arg0, arg1) + ret0, _ := ret[0].(error) + return ret0 +} + +// SetCacheEntry indicates an expected call of SetCacheEntry. +func (mr *MockDataStoreMockRecorder) SetCacheEntry(arg0, arg1 any) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SetCacheEntry", reflect.TypeOf((*MockDataStore)(nil).SetCacheEntry), arg0, arg1) +} + +// SetProcessVar mocks base method. +func (m *MockDataStore) SetProcessVar(arg0, arg1, arg2 string) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "SetProcessVar", arg0, arg1, arg2) + ret0, _ := ret[0].(error) + return ret0 +} + +// SetProcessVar indicates an expected call of SetProcessVar. +func (mr *MockDataStoreMockRecorder) SetProcessVar(arg0, arg1, arg2 any) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SetProcessVar", reflect.TypeOf((*MockDataStore)(nil).SetProcessVar), arg0, arg1, arg2) +} diff --git a/pkg/store/store.go b/pkg/store/store.go new file mode 100644 index 00000000..8b4df45a --- /dev/null +++ b/pkg/store/store.go @@ -0,0 +1,515 @@ +package store + +import ( + "encoding/binary" + "encoding/json" + "fmt" + "os" + "path/filepath" + "strings" + "time" + + bolt "go.etcd.io/bbolt" + boltErrors "go.etcd.io/bbolt/errors" + + "github.com/flowexec/flow/pkg/filesystem" +) + +const ( + cacheBucket = "cache" + historyBucket = "history" + processBucketName = "process" + storeFileName = "store.db" + + // BucketEnv is the environment variable used to identify the current process bucket. + BucketEnv = "FLOW_PROCESS_BUCKET" + // RootBucket is the name of the global (root) process bucket. + RootBucket = "root" + + openTimeout = 3 * time.Second +) + +// DataStore manages structured internal state (cache, execution history, and process env vars). +// It is intentionally in pkg/ so external consumers (e.g. pro wrapper) can import it. +// +//go:generate mockgen -destination=mocks/mock_data_store.go -package=mocks github.com/flowexec/flow/pkg/store DataStore +type DataStore interface { //nolint:interfacebloat // single backing store with cache, history, and process var concerns + SetCacheEntry(key string, value []byte) error + GetCacheEntry(key string) ([]byte, error) + DeleteCacheEntry(key string) error + + RecordExecution(record ExecutionRecord) error + GetExecutionHistory(ref string, limit int) ([]ExecutionRecord, error) + ListExecutionRefs() ([]string, error) + DeleteExecutionHistory(ref string) error + + // Process env var management (per-execution scoped key-value storage). + // bucketID identifies the execution scope; use EnvironmentBucket() to get the current scope. + CreateProcessBucket(id string) error + DeleteProcessBucket(id string) error + SetProcessVar(bucketID, key, value string) error + GetProcessVar(bucketID, key string) (string, error) + GetAllProcessVars(bucketID string) (map[string]string, error) + GetProcessVarKeys(bucketID string) ([]string, error) + DeleteProcessVar(bucketID, key string) error + + Close() error +} + +// ExecutionRecord holds metadata about a single executable run. +type ExecutionRecord struct { + Ref string `json:"ref"` + StartedAt time.Time `json:"startedAt"` + Duration time.Duration `json:"duration"` + ExitCode int `json:"exitCode"` + Error string `json:"error,omitempty"` + // LogArchiveID links this record to a tuikit log archive entry for cross-referencing. + LogArchiveID string `json:"logArchiveId,omitempty"` +} + +// BoltDataStore opens and closes the BBolt database for each operation, so the +// exclusive file lock is held only for the duration of a single transaction. +// This allows multiple flow processes to share the same store file safely. +type BoltDataStore struct { + dbPath string +} + +// NewDataStore creates a DataStore backed by the BBolt file at dbPath. +// If dbPath is empty, the default path is used. +// The database file is opened and closed per operation; no persistent lock is held. +func NewDataStore(dbPath string) (DataStore, error) { + if dbPath == "" { + dbPath = Path() + } + + // Verify the path is usable by opening and immediately closing. + db, err := bolt.Open(dbPath, 0666, &bolt.Options{Timeout: openTimeout}) + if err != nil { + return nil, fmt.Errorf("failed to open data store: %w", err) + } + _ = db.Close() + + return &BoltDataStore{dbPath: dbPath}, nil +} + +// open acquires the BBolt file lock, runs fn, and releases the lock. +func (s *BoltDataStore) open(fn func(db *bolt.DB) error) error { + db, err := bolt.Open(s.dbPath, 0666, &bolt.Options{Timeout: openTimeout}) + if err != nil { + return fmt.Errorf("failed to open data store: %w", err) + } + defer db.Close() + return fn(db) +} + +// Path returns the default store database path. +func Path() string { + return filepath.Join(filesystem.CachedDataDirPath(), storeFileName) +} + +// EnvironmentBucket returns the process bucket ID for the current execution scope, +// determined by the FLOW_PROCESS_BUCKET environment variable. Falls back to RootBucket. +func EnvironmentBucket() string { + id := RootBucket + if val, set := os.LookupEnv(BucketEnv); set && val != "" { + id = val + } + replacer := strings.NewReplacer(":", "_", "/", "_", " ", "_") + return replacer.Replace(id) +} + +// DestroyStore removes the store database file entirely. +func DestroyStore() error { + err := os.Remove(Path()) + if err != nil && !isNotExist(err) { + return fmt.Errorf("failed to destroy store: %w", err) + } + return nil +} + +// ---- cache bucket ---- + +func (s *BoltDataStore) SetCacheEntry(key string, value []byte) error { + return s.open(func(db *bolt.DB) error { + return db.Update(func(tx *bolt.Tx) error { + b, err := tx.CreateBucketIfNotExists([]byte(cacheBucket)) + if err != nil { + return fmt.Errorf("failed to open cache bucket: %w", err) + } + if err := b.Put([]byte(key), value); err != nil { + return fmt.Errorf("failed to set cache entry %s: %w", key, err) + } + return nil + }) + }) +} + +func (s *BoltDataStore) GetCacheEntry(key string) ([]byte, error) { + var value []byte + err := s.open(func(db *bolt.DB) error { + return db.View(func(tx *bolt.Tx) error { + b := tx.Bucket([]byte(cacheBucket)) + if b == nil { + return nil + } + v := b.Get([]byte(key)) + if v != nil { + value = make([]byte, len(v)) + copy(value, v) + } + return nil + }) + }) + return value, err +} + +func (s *BoltDataStore) DeleteCacheEntry(key string) error { + return s.open(func(db *bolt.DB) error { + return db.Update(func(tx *bolt.Tx) error { + b := tx.Bucket([]byte(cacheBucket)) + if b == nil { + return nil + } + return b.Delete([]byte(key)) + }) + }) +} + +// ---- history bucket ---- + +func (s *BoltDataStore) RecordExecution(record ExecutionRecord) error { + data, err := json.Marshal(record) + if err != nil { + return fmt.Errorf("failed to marshal execution record: %w", err) + } + return s.open(func(db *bolt.DB) error { + return db.Update(func(tx *bolt.Tx) error { + parent, err := tx.CreateBucketIfNotExists([]byte(historyBucket)) + if err != nil { + return fmt.Errorf("failed to open history bucket: %w", err) + } + refBucket, err := parent.CreateBucketIfNotExists([]byte(record.Ref)) + if err != nil { + return fmt.Errorf("failed to open ref bucket for %s: %w", record.Ref, err) + } + seq, err := refBucket.NextSequence() + if err != nil { + return fmt.Errorf("failed to generate sequence for %s: %w", record.Ref, err) + } + key := make([]byte, 8) + binary.BigEndian.PutUint64(key, seq) + return refBucket.Put(key, data) + }) + }) +} + +func (s *BoltDataStore) GetExecutionHistory(ref string, limit int) ([]ExecutionRecord, error) { + var records []ExecutionRecord + err := s.open(func(db *bolt.DB) error { + return db.View(func(tx *bolt.Tx) error { + parent := tx.Bucket([]byte(historyBucket)) + if parent == nil { + return nil + } + refBucket := parent.Bucket([]byte(ref)) + if refBucket == nil { + return nil + } + var all [][]byte + _ = refBucket.ForEach(func(_, v []byte) error { + cp := make([]byte, len(v)) + copy(cp, v) + all = append(all, cp) + return nil + }) + start := 0 + if limit > 0 && len(all) > limit { + start = len(all) - limit + } + for _, v := range all[start:] { + var rec ExecutionRecord + if err := json.Unmarshal(v, &rec); err != nil { + return fmt.Errorf("failed to unmarshal execution record: %w", err) + } + records = append(records, rec) + } + return nil + }) + }) + return records, err +} + +func (s *BoltDataStore) ListExecutionRefs() ([]string, error) { + var refs []string + err := s.open(func(db *bolt.DB) error { + return db.View(func(tx *bolt.Tx) error { + parent := tx.Bucket([]byte(historyBucket)) + if parent == nil { + return nil + } + return parent.ForEach(func(k, v []byte) error { + if v == nil { // sub-bucket, not a key-value pair + refs = append(refs, string(k)) + } + return nil + }) + }) + }) + return refs, err +} + +func (s *BoltDataStore) DeleteExecutionHistory(ref string) error { + return s.open(func(db *bolt.DB) error { + return db.Update(func(tx *bolt.Tx) error { + parent := tx.Bucket([]byte(historyBucket)) + if parent == nil { + return nil + } + err := parent.DeleteBucket([]byte(ref)) + if err != nil && !isNotFound(err) { + return fmt.Errorf("failed to delete history for ref %s: %w", ref, err) + } + return nil + }) + }) +} + +// ---- process bucket ---- + +// processBucketFor returns the sub-bucket for id nested under the top-level "process" bucket. +func processBucketFor(tx *bolt.Tx, id string, create bool) (*bolt.Bucket, error) { + var parent *bolt.Bucket + var err error + if create { + parent, err = tx.CreateBucketIfNotExists([]byte(processBucketName)) + if err != nil { + return nil, fmt.Errorf("failed to open process bucket: %w", err) + } + b, err := parent.CreateBucketIfNotExists([]byte(id)) + if err != nil { + return nil, fmt.Errorf("failed to create process sub-bucket %s: %w", id, err) + } + return b, nil + } + parent = tx.Bucket([]byte(processBucketName)) + if parent == nil { + return nil, nil //nolint:nilnil // nil bucket signals "not found" — callers handle it + } + return parent.Bucket([]byte(id)), nil +} + +func (s *BoltDataStore) CreateProcessBucket(id string) error { + return s.open(func(db *bolt.DB) error { + return db.Update(func(tx *bolt.Tx) error { + _, err := processBucketFor(tx, id, true) + return err + }) + }) +} + +func (s *BoltDataStore) DeleteProcessBucket(id string) error { + return s.open(func(db *bolt.DB) error { + return db.Update(func(tx *bolt.Tx) error { + parent := tx.Bucket([]byte(processBucketName)) + if parent == nil { + return nil + } + err := parent.DeleteBucket([]byte(id)) + if err != nil && !isNotFound(err) { + return fmt.Errorf("failed to delete process bucket %s: %w", id, err) + } + return nil + }) + }) +} + +func (s *BoltDataStore) SetProcessVar(bucketID, key, value string) error { + return s.open(func(db *bolt.DB) error { + return db.Update(func(tx *bolt.Tx) error { + bucket, err := processBucketFor(tx, bucketID, true) + if err != nil { + return err + } + if err := bucket.Put([]byte(key), []byte(value)); err != nil { + return fmt.Errorf("failed to set var %s in bucket %s: %w", key, bucketID, err) + } + return nil + }) + }) +} + +// GetProcessVar retrieves the value for key in bucketID, falling back to RootBucket if not found. +func (s *BoltDataStore) GetProcessVar(bucketID, key string) (string, error) { + var value []byte + err := s.open(func(db *bolt.DB) error { + return db.View(func(tx *bolt.Tx) error { + bucket, _ := processBucketFor(tx, bucketID, false) + if bucket != nil { + value = bucket.Get([]byte(key)) + } + if value == nil && bucketID != RootBucket { + rootBucket, _ := processBucketFor(tx, RootBucket, false) + if rootBucket != nil { + value = rootBucket.Get([]byte(key)) + } + } + if value == nil { + return fmt.Errorf("key %s not found in bucket %s", key, bucketID) + } + return nil + }) + }) + return string(value), err +} + +// GetAllProcessVars returns all key-value pairs from bucketID merged with RootBucket (bucketID wins on conflict). +func (s *BoltDataStore) GetAllProcessVars(bucketID string) (map[string]string, error) { + m := make(map[string]string) + err := s.open(func(db *bolt.DB) error { + return db.View(func(tx *bolt.Tx) error { + bucket, _ := processBucketFor(tx, bucketID, false) + if bucket != nil { + _ = bucket.ForEach(func(k, v []byte) error { + m[string(k)] = string(v) + return nil + }) + } + if bucketID != RootBucket { + rootBucket, _ := processBucketFor(tx, RootBucket, false) + if rootBucket != nil { + _ = rootBucket.ForEach(func(k, v []byte) error { + if _, exists := m[string(k)]; !exists { + m[string(k)] = string(v) + } + return nil + }) + } + } + return nil + }) + }) + return m, err +} + +// GetProcessVarKeys returns all keys from bucketID merged with RootBucket. +func (s *BoltDataStore) GetProcessVarKeys(bucketID string) ([]string, error) { + var keys []string + err := s.open(func(db *bolt.DB) error { + return db.View(func(tx *bolt.Tx) error { + processKeys := make(map[string]bool) + bucket, _ := processBucketFor(tx, bucketID, false) + if bucket != nil { + _ = bucket.ForEach(func(k, _ []byte) error { + key := string(k) + keys = append(keys, key) + processKeys[key] = true + return nil + }) + } + if bucketID != RootBucket { + rootBucket, _ := processBucketFor(tx, RootBucket, false) + if rootBucket != nil { + _ = rootBucket.ForEach(func(k, _ []byte) error { + if key := string(k); !processKeys[key] { + keys = append(keys, key) + } + return nil + }) + } + } + return nil + }) + }) + return keys, err +} + +func (s *BoltDataStore) DeleteProcessVar(bucketID, key string) error { + return s.open(func(db *bolt.DB) error { + return db.Update(func(tx *bolt.Tx) error { + bucket, err := processBucketFor(tx, bucketID, true) + if err != nil { + return err + } + return bucket.Delete([]byte(key)) + }) + }) +} + +// Close is a no-op for the per-operation store — each operation opens and closes the DB itself. +func (s *BoltDataStore) Close() error { + return nil +} + +// MigrateProcessBuckets moves any legacy top-level exec-ref buckets (created before the "process" +// parent bucket was introduced) into the nested structure under the "process" bucket. +// This is safe to call multiple times; already-migrated buckets are skipped. +func MigrateProcessBuckets(dbPath string) error { + if dbPath == "" { + dbPath = Path() + } + db, err := bolt.Open(dbPath, 0666, &bolt.Options{Timeout: openTimeout}) + if err != nil { + return fmt.Errorf("failed to open db for migration: %w", err) + } + defer db.Close() + + reserved := map[string]bool{ + processBucketName: true, + cacheBucket: true, + historyBucket: true, + } + + var legacy []string + err = db.View(func(tx *bolt.Tx) error { + return tx.ForEach(func(name []byte, _ *bolt.Bucket) error { + n := string(name) + if !reserved[n] { + legacy = append(legacy, n) + } + return nil + }) + }) + if err != nil { + return fmt.Errorf("failed to enumerate buckets for migration: %w", err) + } + if len(legacy) == 0 { + return nil + } + + return db.Update(func(tx *bolt.Tx) error { + parent, err := tx.CreateBucketIfNotExists([]byte(processBucketName)) + if err != nil { + return fmt.Errorf("failed to create process bucket: %w", err) + } + for _, name := range legacy { + src := tx.Bucket([]byte(name)) + if src == nil { + continue + } + if parent.Bucket([]byte(name)) != nil { + continue + } + dst, err := parent.CreateBucket([]byte(name)) + if err != nil { + return fmt.Errorf("failed to create nested bucket %s: %w", name, err) + } + if err := src.ForEach(func(k, v []byte) error { + return dst.Put(k, v) + }); err != nil { + return fmt.Errorf("failed to migrate bucket %s: %w", name, err) + } + if err := tx.DeleteBucket([]byte(name)); err != nil { + return fmt.Errorf("failed to delete legacy bucket %s: %w", name, err) + } + } + return nil + }) +} + +func isNotFound(err error) bool { + return err != nil && err.Error() == boltErrors.ErrBucketNotFound.Error() +} + +func isNotExist(err error) bool { + return os.IsNotExist(err) +} diff --git a/pkg/store/store_test.go b/pkg/store/store_test.go new file mode 100644 index 00000000..d87e7fb3 --- /dev/null +++ b/pkg/store/store_test.go @@ -0,0 +1,159 @@ +package store_test + +import ( + "fmt" + "path/filepath" + "testing" + "time" + + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" + + "github.com/flowexec/flow/pkg/store" +) + +func TestStore(t *testing.T) { + RegisterFailHandler(Fail) + RunSpecs(t, "DataStore Suite") +} + +var _ = Describe("BoltDataStore", func() { + var ds store.DataStore + var err error + + BeforeEach(func() { + path := filepath.Join(GinkgoT().TempDir(), fmt.Sprintf("test_%s.db", GinkgoT().Name())) + ds, err = store.NewDataStore(path) + Expect(err).NotTo(HaveOccurred()) + Expect(ds).NotTo(BeNil()) + }) + + AfterEach(func() { + Expect(ds.Close()).To(Succeed()) + }) + + Describe("Cache operations", func() { + It("should set and get a cache entry", func() { + Expect(ds.SetCacheEntry("key", []byte("value"))).To(Succeed()) + + val, err := ds.GetCacheEntry("key") + Expect(err).NotTo(HaveOccurred()) + Expect(val).To(Equal([]byte("value"))) + }) + + It("should return nil for a missing cache entry", func() { + val, err := ds.GetCacheEntry("missing") + Expect(err).NotTo(HaveOccurred()) + Expect(val).To(BeNil()) + }) + + It("should overwrite an existing cache entry", func() { + Expect(ds.SetCacheEntry("key", []byte("v1"))).To(Succeed()) + Expect(ds.SetCacheEntry("key", []byte("v2"))).To(Succeed()) + + val, err := ds.GetCacheEntry("key") + Expect(err).NotTo(HaveOccurred()) + Expect(val).To(Equal([]byte("v2"))) + }) + + It("should delete a cache entry", func() { + Expect(ds.SetCacheEntry("key", []byte("value"))).To(Succeed()) + Expect(ds.DeleteCacheEntry("key")).To(Succeed()) + + val, err := ds.GetCacheEntry("key") + Expect(err).NotTo(HaveOccurred()) + Expect(val).To(BeNil()) + }) + + It("should not error when deleting a missing cache entry", func() { + Expect(ds.DeleteCacheEntry("missing")).To(Succeed()) + }) + }) + + Describe("Execution history operations", func() { + var ref = "ws/ns:exec" + + It("should record and retrieve an execution", func() { + rec := store.ExecutionRecord{ + Ref: ref, + StartedAt: time.Now().UTC().Truncate(time.Millisecond), + Duration: 500 * time.Millisecond, + ExitCode: 0, + LogArchiveID: "archive-123", + } + Expect(ds.RecordExecution(rec)).To(Succeed()) + + history, err := ds.GetExecutionHistory(ref, 0) + Expect(err).NotTo(HaveOccurred()) + Expect(history).To(HaveLen(1)) + Expect(history[0].Ref).To(Equal(ref)) + Expect(history[0].ExitCode).To(Equal(0)) + Expect(history[0].LogArchiveID).To(Equal("archive-123")) + }) + + It("should return empty history for an unknown ref", func() { + history, err := ds.GetExecutionHistory("unknown/ns:exec", 0) + Expect(err).NotTo(HaveOccurred()) + Expect(history).To(BeEmpty()) + }) + + It("should respect the limit parameter", func() { + for i := range 5 { + Expect(ds.RecordExecution(store.ExecutionRecord{ + Ref: ref, + ExitCode: i, + })).To(Succeed()) + } + + history, err := ds.GetExecutionHistory(ref, 3) + Expect(err).NotTo(HaveOccurred()) + Expect(history).To(HaveLen(3)) + // Most recent 3 entries (exit codes 2, 3, 4) + Expect(history[0].ExitCode).To(Equal(2)) + Expect(history[2].ExitCode).To(Equal(4)) + }) + + It("should record execution failure with error message", func() { + rec := store.ExecutionRecord{ + Ref: ref, + ExitCode: 1, + Error: "command not found", + } + Expect(ds.RecordExecution(rec)).To(Succeed()) + + history, err := ds.GetExecutionHistory(ref, 0) + Expect(err).NotTo(HaveOccurred()) + Expect(history).To(HaveLen(1)) + Expect(history[0].Error).To(Equal("command not found")) + }) + + It("should delete all history for a ref", func() { + Expect(ds.RecordExecution(store.ExecutionRecord{Ref: ref})).To(Succeed()) + Expect(ds.DeleteExecutionHistory(ref)).To(Succeed()) + + history, err := ds.GetExecutionHistory(ref, 0) + Expect(err).NotTo(HaveOccurred()) + Expect(history).To(BeEmpty()) + }) + + It("should not error when deleting history for an unknown ref", func() { + Expect(ds.DeleteExecutionHistory("unknown/ns:exec")).To(Succeed()) + }) + + It("should maintain separate history per ref", func() { + ref2 := "ws/ns:other" + Expect(ds.RecordExecution(store.ExecutionRecord{Ref: ref, ExitCode: 0})).To(Succeed()) + Expect(ds.RecordExecution(store.ExecutionRecord{Ref: ref2, ExitCode: 1})).To(Succeed()) + + h1, err := ds.GetExecutionHistory(ref, 0) + Expect(err).NotTo(HaveOccurred()) + Expect(h1).To(HaveLen(1)) + Expect(h1[0].ExitCode).To(Equal(0)) + + h2, err := ds.GetExecutionHistory(ref2, 0) + Expect(err).NotTo(HaveOccurred()) + Expect(h2).To(HaveLen(1)) + Expect(h2[0].ExitCode).To(Equal(1)) + }) + }) +}) diff --git a/tests/logs_cmds_e2e_test.go b/tests/logs_cmds_e2e_test.go index e27beb39..0d2f75c4 100644 --- a/tests/logs_cmds_e2e_test.go +++ b/tests/logs_cmds_e2e_test.go @@ -4,10 +4,12 @@ package tests_test import ( stdCtx "context" + "time" . "github.com/onsi/ginkgo/v2" . "github.com/onsi/gomega" + "github.com/flowexec/flow/pkg/store" "github.com/flowexec/flow/tests/utils" ) @@ -30,25 +32,65 @@ var _ = Describe("logs e2e", Ordered, func() { ctx.Finalize() }) - When("viewing logs (flow logs)", func() { - It("should display logs in yaml format", func() { + When("viewing logs with no history (flow logs)", func() { + It("should display empty history message", func() { stdOut := ctx.StdOut() - Expect(run.Run(ctx.Context, "logs")).To(Succeed()) + Expect(run.Run(ctx.Context, "logs", "-o", "yaml")).To(Succeed()) out, err := readFileContent(stdOut) Expect(err).NotTo(HaveOccurred()) - Expect(out).To(ContainSubstring("logs:")) + Expect(out).To(ContainSubstring("No execution history found.")) + }) + }) + + When("viewing logs with history (flow logs)", func() { + It("should display history in yaml format", func() { + record := store.ExecutionRecord{ + Ref: "default/examples:simple-print", + StartedAt: time.Now(), + Duration: 100 * time.Millisecond, + } + Expect(ctx.DataStore.RecordExecution(record)).To(Succeed()) + + stdOut := ctx.StdOut() + Expect(run.Run(ctx.Context, "logs", "-o", "yaml")).To(Succeed()) + out, err := readFileContent(stdOut) + Expect(err).NotTo(HaveOccurred()) + Expect(out).To(ContainSubstring("history:")) + Expect(out).To(ContainSubstring("default/examples:simple-print")) }) }) When("viewing last log entry (flow logs --last)", func() { - It("should display the last log entry", func() { - // TODO: test that log archiving works - Skip("e2e test does not include log archiving, so this will not return any logs") + It("should display the last execution record", func() { + record := store.ExecutionRecord{ + Ref: "default/examples:simple-print", + StartedAt: time.Now(), + Duration: 200 * time.Millisecond, + } + Expect(ctx.DataStore.RecordExecution(record)).To(Succeed()) + stdOut := ctx.StdOut() Expect(run.Run(ctx.Context, "logs", "--last")).To(Succeed()) out, err := readFileContent(stdOut) Expect(err).NotTo(HaveOccurred()) - Expect(out).To(Or(ContainSubstring("msg="), ContainSubstring("No log entries"))) + Expect(out).To(ContainSubstring("ref: default/examples:simple-print")) + }) + }) + + When("clearing logs (flow logs clear)", func() { + It("should clear history for a specific ref", func() { + record := store.ExecutionRecord{ + Ref: "default/examples:simple-print", + StartedAt: time.Now(), + Duration: 50 * time.Millisecond, + } + Expect(ctx.DataStore.RecordExecution(record)).To(Succeed()) + + stdOut := ctx.StdOut() + Expect(run.Run(ctx.Context, "logs", "clear", "default/examples:simple-print")).To(Succeed()) + out, err := readFileContent(stdOut) + Expect(err).NotTo(HaveOccurred()) + Expect(out).To(ContainSubstring("Cleared history and logs")) }) }) }) diff --git a/tests/utils/context.go b/tests/utils/context.go index 682a374d..b90b6ece 100644 --- a/tests/utils/context.go +++ b/tests/utils/context.go @@ -15,12 +15,12 @@ import ( "gopkg.in/yaml.v3" "github.com/flowexec/flow/internal/runner/mocks" - "github.com/flowexec/flow/internal/services/store" "github.com/flowexec/flow/pkg/cache" cacheMocks "github.com/flowexec/flow/pkg/cache/mocks" "github.com/flowexec/flow/pkg/context" "github.com/flowexec/flow/pkg/filesystem" "github.com/flowexec/flow/pkg/logger" + "github.com/flowexec/flow/pkg/store" "github.com/flowexec/flow/tests/utils/builder" "github.com/flowexec/flow/types/config" "github.com/flowexec/flow/types/workspace" @@ -81,7 +81,10 @@ type ContextWithMocks struct { // NewContextWithMocks creates a new context for testing runners. It initializes the context with // a mock logger and mock caches. The mock logger is set to expect debug calls. func NewContextWithMocks(ctx stdCtx.Context, tb testing.TB) *ContextWithMocks { - null := os.NewFile(0, os.DevNull) + null, err := os.OpenFile(os.DevNull, os.O_RDWR, 0) + if err != nil { + tb.Fatalf("unable to open devnull: %v", err) + } configDir, cacheDir, wsDir := initTestDirectories(tb) setTestEnv(tb, configDir, cacheDir) testWsCfg, err := testWsConfig(wsDir) @@ -165,7 +168,7 @@ func newTestContext( tb.Fatalf("unable to create user config: %v", err) } - wsCache, execCache := testCaches(tb) + wsCache, execCache, ds := testCaches(tb) cancel := func() { <-ctx.Done() @@ -176,6 +179,7 @@ func newTestContext( CurrentWorkspace: testWsCfg, WorkspacesCache: wsCache, ExecutableCache: execCache, + DataStore: ds, } ctxx.SetContext(ctx, cancel) ctxx.SetIO(stdIn, stdOut) @@ -263,9 +267,16 @@ func testWsConfig(wsDir string) (*workspace.Workspace, error) { } // testCaches must be called after the user and workspace configs have been created. -func testCaches(tb testing.TB) (cache.WorkspaceCache, cache.ExecutableCache) { - wsCache := cache.NewWorkspaceCache() - execCache := cache.NewExecutableCache(wsCache) +func testCaches(tb testing.TB) (cache.WorkspaceCache, cache.ExecutableCache, store.DataStore) { + dbPath := filepath.Join(tb.TempDir(), "test_store.db") + ds, err := store.NewDataStore(dbPath) + if err != nil { + tb.Fatalf("unable to open test data store: %v", err) + } + tb.Cleanup(func() { _ = ds.Close() }) + + wsCache := cache.NewWorkspaceCache(ds) + execCache := cache.NewExecutableCache(wsCache, ds) if err := wsCache.Update(); err != nil { tb.Fatalf("unable to update cache: %v", err) @@ -273,7 +284,7 @@ func testCaches(tb testing.TB) (cache.WorkspaceCache, cache.ExecutableCache) { if err := execCache.Update(); err != nil { tb.Fatalf("unable to update cache: %v", err) } - return wsCache, execCache + return wsCache, execCache, ds } func setTestEnv(tb testing.TB, configDir, cacheDir string) {