From 360642218179bc0ca58413cefc49668db700e7a0 Mon Sep 17 00:00:00 2001 From: Jussi Maki Date: Thu, 11 Dec 2025 15:04:49 +0100 Subject: [PATCH 1/6] cell/lifecycle: add HookDescriptiveInterface for logging --- cell/lifecycle.go | 18 ++++++++++++++++++ 1 file changed, 18 insertions(+) diff --git a/cell/lifecycle.go b/cell/lifecycle.go index c6db269..618cb03 100644 --- a/cell/lifecycle.go +++ b/cell/lifecycle.go @@ -28,6 +28,14 @@ type HookInterface interface { Stop(HookContext) error } +// HookDescriptiveInterface extends HookInterface with the HookInfo +// method for logging more details about what was started or stopped. +// This will be used instead of just showing the function name and module. +type HookDescriptiveInterface interface { + HookInterface + HookInfo() string +} + // Hook is a pair of start and stop callbacks. Both are optional. // They're paired up to make sure that on failed start all corresponding // stop hooks are executed. @@ -116,6 +124,10 @@ func (lc *DefaultLifecycle) Start(log *slog.Logger, ctx context.Context) error { } l := log.With("function", fnName) + desc, ok := hook.HookInterface.(HookDescriptiveInterface) + if ok { + l = l.With("detail", desc.HookInfo()) + } // Do not attempt to start already started hooks. if i < lc.numStarted { @@ -161,7 +173,13 @@ func (lc *DefaultLifecycle) Stop(log *slog.Logger, ctx context.Context) error { if !exists { continue } + l := log.With("function", fnName) + desc, ok := hook.HookInterface.(HookDescriptiveInterface) + if ok { + l = l.With("detail", desc.HookInfo()) + } else { + } l.Debug("Executing stop hook") t0 := time.Now() if err := hook.Stop(ctx); err != nil { From cc18d872e725873508506a9a3e51c6dd650a7287 Mon Sep 17 00:00:00 2001 From: Jussi Maki Date: Thu, 11 Dec 2025 15:05:13 +0100 Subject: [PATCH 2/6] cell/lifecycle: allow Start to run newly appended hooks --- cell/lifecycle.go | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/cell/lifecycle.go b/cell/lifecycle.go index 618cb03..4721ee1 100644 --- a/cell/lifecycle.go +++ b/cell/lifecycle.go @@ -114,7 +114,8 @@ func (lc *DefaultLifecycle) Start(log *slog.Logger, ctx context.Context) error { ctx, cancel := context.WithCancel(ctx) defer cancel() - for i, hook := range lc.hooks { + from := lc.numStarted + for i, hook := range lc.hooks[from:] { fnName, exists := getHookFuncName(hook, true) if !exists { @@ -130,7 +131,7 @@ func (lc *DefaultLifecycle) Start(log *slog.Logger, ctx context.Context) error { } // Do not attempt to start already started hooks. - if i < lc.numStarted { + if from+i < lc.numStarted { l.Error("Hook appears to be running. Skipping") continue } From 22db05c5ceb48a4b0b3bf81fc898f4861726786a Mon Sep 17 00:00:00 2001 From: Jussi Maki Date: Thu, 11 Dec 2025 15:06:19 +0100 Subject: [PATCH 3/6] job: move lifecycle management into registry Refactor job groups so registry owns the lifecycle. This ensures that jobs are always started in the order they're added and not in the order the job groups are created. - move lifecyle hooks onto the registry type and keep start/stop logic centralized - drop group-level context/wait-group management and route Group.Add through the registry - adjust job implementations/tests and all callers to the simplified API --- example/events.go | 2 +- job/job.go | 208 ++++++++++++++++++++----------------------- job/job_test.go | 16 ++-- job/observer.go | 8 +- job/observer_test.go | 6 +- job/oneshot.go | 8 +- job/oneshot_test.go | 33 ++++--- job/timer.go | 7 +- job/timer_test.go | 14 +-- shell/server.go | 2 +- shell/shell_test.go | 2 +- 11 files changed, 154 insertions(+), 152 deletions(-) diff --git a/example/events.go b/example/events.go index 79d1451..a2150d1 100644 --- a/example/events.go +++ b/example/events.go @@ -84,7 +84,7 @@ func newExampleEvents(lc cell.Lifecycle, jobs job.Registry, health cell.Health) es.Observable, es.emit, es.complete = stream.Multicast[ExampleEvent]() // Create a new job group and add emitter as a one-shot job. - g := jobs.NewGroup(health, lc) + g := jobs.NewGroup(health) g.Add(job.OneShot("emitter", es.emitter)) return es } diff --git a/job/job.go b/job/job.go index d46f4e8..c7a4ef5 100644 --- a/job/job.go +++ b/job/job.go @@ -34,33 +34,67 @@ type Registry interface { // NewGroup creates a new group of jobs which can be started and stopped together as part of the cells lifecycle. // The provided scope is used to report health status of the jobs. A `cell.Scope` can be obtained via injection // an object with the correct scope is provided by the closest `cell.Module`. - NewGroup(health cell.Health, lc cell.Lifecycle, opts ...groupOpt) Group + NewGroup(health cell.Health, opts ...groupOpt) Group } type registry struct { logger *slog.Logger shutdowner hive.Shutdowner - mu sync.Mutex - groups []Group + mu sync.Mutex + groups []*group + started bool + + lifecycle cell.Lifecycle + dynamicLC *cell.DefaultLifecycle } +var _ cell.HookInterface = (*registry)(nil) + func newRegistry( logger *slog.Logger, shutdowner hive.Shutdowner, + lc cell.Lifecycle, ) Registry { - return ®istry{ + r := ®istry{ logger: logger, shutdowner: shutdowner, + lifecycle: lc, + } + lc.Append(r) + return r +} + +func (c *registry) Start(cell.HookContext) error { + c.mu.Lock() + defer c.mu.Unlock() + + if c.started { + return nil + } + c.started = true + c.dynamicLC = cell.NewDefaultLifecycle(nil, 0, 0) + return nil +} + +func (c *registry) Stop(ctx cell.HookContext) error { + c.mu.Lock() + defer c.mu.Unlock() + if !c.started { + return nil } + c.started = false + c.dynamicLC.Stop(c.logger, ctx) + return nil } // NewGroup creates a new Group with the given `opts` options, which allows you to customize the behavior for the // group as a whole. For example by allowing you to add pprof labels to the group or by customizing the logger. // -// Jobs added to the group before it is started will be appended to the provided lifecycle. Jobs added -// after starting are started immediately. -func (c *registry) NewGroup(health cell.Health, lc cell.Lifecycle, opts ...groupOpt) Group { +// Jobs added to the group before it is started will be appended to the registry's lifecycle. +// Jobs added after starting are started immediately and stopped sequentially in reverse order +// when registry is stopped. +func (c *registry) NewGroup(health cell.Health, opts ...groupOpt) Group { c.mu.Lock() defer c.mu.Unlock() @@ -73,21 +107,45 @@ func (c *registry) NewGroup(health cell.Health, lc cell.Lifecycle, opts ...group } g := &group{ - options: options, - lifecycle: lc, - wg: &sync.WaitGroup{}, - health: health, + registry: c, + options: options, + health: health, } - // Append the lifecycle hooks for the group. The start hook sets up a context - // for the dynamical jobs (jobs added after starting) and a stop hook to cancel - // the context and wait for the jobs to finish. - lc.Append((*groupHooks)(g)) c.groups = append(c.groups, g) return g } +func (c *registry) addJobs(health cell.Health, opts options, jobs ...Job) { + c.mu.Lock() + defer c.mu.Unlock() + + if !c.started { + for _, job := range jobs { + c.lifecycle.Append(&queuedJob{ + registry: c, + job: job, + health: health, + options: opts, + }) + } + return + } + + for _, job := range jobs { + qj := &queuedJob{ + registry: c, + job: job, + health: health, + options: opts, + } + c.dynamicLC.Append(qj) + // Start the newly appended job immediately. + c.dynamicLC.Start(c.logger, context.Background()) + } +} + // Group aims to streamline the management of work within a cell. // A group allows you to add multiple types of jobs with different kinds of logic. // No matter the job type, the function provided to is always called with a context which @@ -104,43 +162,44 @@ type Group interface { // Job in an interface that describes a unit of work which can be added to a Group. This interface contains unexported // methods and thus can only be implemented by functions in this package such as OneShot, Timer, or Observer. type Job interface { - start(ctx context.Context, wg *sync.WaitGroup, health cell.Health, options options) + start(ctx context.Context, health cell.Health, options options) + info() string } type queuedJob struct { - job Job - health cell.Health - options options - - wq sync.WaitGroup - cancel context.CancelFunc + registry *registry + job Job + health cell.Health + options options + cancel context.CancelFunc + done chan struct{} } // Start implements cell.HookInterface. func (qj *queuedJob) Start(cell.HookContext) error { - qj.wq.Add(1) - - var ctx context.Context - ctx, qj.cancel = context.WithCancel(context.Background()) + qj.done = make(chan struct{}) + ctx, cancel := context.WithCancel(context.Background()) + qj.cancel = cancel pprof.Do(ctx, qj.options.pprofLabels, func(ctx context.Context) { - go qj.job.start(ctx, &qj.wq, qj.health, qj.options) + go func() { + defer close(qj.done) + qj.job.start(ctx, qj.health, qj.options) + }() }) return nil } +func (qj *queuedJob) HookInfo() string { + return qj.job.info() +} + // Stop implements cell.HookInterface. func (qj *queuedJob) Stop(ctx cell.HookContext) error { qj.cancel() - - stopped := make(chan struct{}) - go func() { - qj.wq.Wait() - close(stopped) - }() select { case <-ctx.Done(): return ctx.Err() - case <-stopped: + case <-qj.done: return nil } } @@ -148,17 +207,9 @@ func (qj *queuedJob) Stop(ctx cell.HookContext) error { var _ cell.HookInterface = &queuedJob{} type group struct { - options options - lifecycle cell.Lifecycle - - // wg is a wait group for "dynamic" jobs added after starting. - wg *sync.WaitGroup - - mu sync.Mutex - ctx context.Context - cancel context.CancelFunc - - health cell.Health + registry *registry + options options + health cell.Health } type options struct { @@ -192,77 +243,12 @@ func WithMetrics(metrics Metrics) groupOpt { } } -var _ cell.HookInterface = (*groupHooks)(nil) - -// groupHooks implements the Hive start and stop hooks. Hidden as these -// are appended by NewGroup. -type groupHooks group - -// Start implements the cell.HookInterface interface -func (gh *groupHooks) Start(_ cell.HookContext) error { - jg := (*group)(gh) - jg.mu.Lock() - defer jg.mu.Unlock() - - // Create a context for the dynamically started jobs. - jg.ctx, jg.cancel = context.WithCancel(context.Background()) - return nil -} - -// Stop implements the cell.HookInterface interface -func (gh *groupHooks) Stop(stopCtx cell.HookContext) error { - jg := (*group)(gh) - jg.mu.Lock() - defer jg.mu.Unlock() - - // Stop all dynamically started jobs. - done := make(chan struct{}) - go func() { - jg.wg.Wait() - close(done) - }() - - jg.cancel() - - select { - case <-stopCtx.Done(): - jg.options.logger.Error("Stop hook context expired before job group was done") - case <-done: - } - - return nil -} - func (jg *group) Add(jobs ...Job) { jg.add(jg.health, jobs...) } func (jg *group) add(health cell.Health, jobs ...Job) { - jg.mu.Lock() - defer jg.mu.Unlock() - - // The context is only set once the group has been started. - // If we have not yet started append hooks for the jobs to be started as part - // of the normal lifecycle. This makes sure that the start order reflects the - // order in which the jobs are added and avoids e.g. starting a job before its - // dependencies. - if jg.ctx == nil { - for _, job := range jobs { - jg.lifecycle.Append(&queuedJob{ - job: job, - health: health, - options: jg.options, - }) - } - return - } - - for _, j := range jobs { - jg.wg.Add(1) - pprof.Do(jg.ctx, jg.options.pprofLabels, func(ctx context.Context) { - go j.start(ctx, jg.wg, health, jg.options) - }) - } + jg.registry.addJobs(health, jg.options, jobs...) } // Scoped creates a scoped group, jobs added to this scoped group will appear as a sub-scope in the health reporter diff --git a/job/job_test.go b/job/job_test.go index c1bffe1..3e03c40 100644 --- a/job/job_test.go +++ b/job/job_test.go @@ -5,10 +5,12 @@ package job import ( "context" + "fmt" "log/slog" "runtime" "runtime/pprof" "strings" + "sync" "sync/atomic" "testing" "time" @@ -57,15 +59,15 @@ func TestRegistry(t *testing.T) { h := fixture(func(r Registry, s cell.Health, l cell.Lifecycle) { r1 = r - g1 = r.NewGroup(s, l) - g2 = r.NewGroup(s, l) + g1 = r.NewGroup(s) + g2 = r.NewGroup(s) }) h.Populate(hivetest.Logger(t)) - if r1.(*registry).groups[0] != g1 { + if r1.(*registry).groups[0] != g1.(*group) { t.Fail() } - if r1.(*registry).groups[1] != g2 { + if r1.(*registry).groups[1] != g2.(*group) { t.Fail() } } @@ -81,7 +83,7 @@ func TestGroup_JobPreStart(t *testing.T) { } h := fixture(func(r Registry, s cell.Health, l cell.Lifecycle) { - g := r.NewGroup(s, l) + g := r.NewGroup(s) g.Add( OneShot("queued1", incFunc), OneShot("queued2", incFunc), @@ -110,7 +112,7 @@ func TestGroup_JobRuntime(t *testing.T) { ) h := fixture(func(r Registry, s cell.Health, l cell.Lifecycle) { - g = r.NewGroup(s, l) + g = r.NewGroup(s) }) h.Start(slog.Default(), context.Background()) @@ -133,7 +135,7 @@ func TestModuleDecoratedGroup(t *testing.T) { opts := hive.DefaultOptions() opts.ModulePrivateProviders = cell.ModulePrivateProviders{ func(r Registry, h cell.Health, modID cell.FullModuleID, l *slog.Logger, lc cell.Lifecycle) Group { - g := r.NewGroup(h, lc, + g := r.NewGroup(h, WithLogger(l), WithPprofLabels(pprof.Labels("module", modID.String()))) return g diff --git a/job/observer.go b/job/observer.go index aa29f57..0507f95 100644 --- a/job/observer.go +++ b/job/observer.go @@ -6,8 +6,8 @@ package job import ( "context" "errors" + "fmt" "strconv" - "sync" "time" "github.com/cilium/stream" @@ -55,9 +55,11 @@ type jobObserver[T any] struct { shutdown hive.Shutdowner } -func (jo *jobObserver[T]) start(ctx context.Context, wg *sync.WaitGroup, health cell.Health, options options) { - defer wg.Done() +func (jo *jobObserver[T]) info() string { + return fmt.Sprintf("%s (%s)", jo.name, internal.FuncNameAndLocation(jo.fn)) +} +func (jo *jobObserver[T]) start(ctx context.Context, health cell.Health, options options) { for _, opt := range jo.opts { opt(jo) } diff --git a/job/observer_test.go b/job/observer_test.go index aa0e10e..29c3085 100644 --- a/job/observer_test.go +++ b/job/observer_test.go @@ -26,7 +26,7 @@ func TestObserver_ShortStream(t *testing.T) { streamSlice := []string{"a", "b", "c"} h := fixture(func(r Registry, s cell.Health, l cell.Lifecycle) { - g = r.NewGroup(s, l) + g = r.NewGroup(s) g.Add( Observer("retry-fail", func(ctx context.Context, event string) error { @@ -66,7 +66,7 @@ func TestObserver_LongStream(t *testing.T) { inChan := make(chan struct{}) h := fixture(func(r Registry, s cell.Health, l cell.Lifecycle) { - g = r.NewGroup(s, l) + g = r.NewGroup(s) g.Add( Observer("retry-fail", func(ctx context.Context, _ struct{}) error { @@ -100,7 +100,7 @@ func TestObserver_CtxClose(t *testing.T) { streamSlice := []string{"a", "b", "c"} h := fixture(func(r Registry, s cell.Health, l cell.Lifecycle) { - g := r.NewGroup(s, l) + g := r.NewGroup(s) g.Add( Observer("retry-fail", func(ctx context.Context, event string) error { diff --git a/job/oneshot.go b/job/oneshot.go index 959bbe8..d363320 100644 --- a/job/oneshot.go +++ b/job/oneshot.go @@ -6,7 +6,7 @@ package job import ( "context" "errors" - "sync" + "fmt" "time" "github.com/cilium/hive" @@ -102,9 +102,11 @@ type jobOneShot struct { shutdownOnError bool } -func (jos *jobOneShot) start(ctx context.Context, wg *sync.WaitGroup, health cell.Health, options options) { - defer wg.Done() +func (jos *jobOneShot) info() string { + return fmt.Sprintf("%s (%s)", jos.name, internal.FuncNameAndLocation(jos.fn)) +} +func (jos *jobOneShot) start(ctx context.Context, health cell.Health, options options) { for _, opt := range jos.opts { opt(jos) } diff --git a/job/oneshot_test.go b/job/oneshot_test.go index 42c81b8..398fd72 100644 --- a/job/oneshot_test.go +++ b/job/oneshot_test.go @@ -24,7 +24,7 @@ func TestOneShot_ShortRun(t *testing.T) { stop := make(chan struct{}) h := fixture(func(r Registry, s cell.Health, l cell.Lifecycle) { - g := r.NewGroup(s, l) + g := r.NewGroup(s) g.Add( OneShot("short", func(ctx context.Context, health cell.Health) error { @@ -49,7 +49,7 @@ func TestOneShot_LongRun(t *testing.T) { stopped := make(chan struct{}) h := fixture(func(r Registry, s cell.Health, l cell.Lifecycle) { - g := r.NewGroup(s, l) + g := r.NewGroup(s) g.Add( OneShot("long", func(ctx context.Context, health cell.Health) error { @@ -82,7 +82,7 @@ func TestOneShot_RetryFail(t *testing.T) { rateLimiter := &ExponentialBackoff{Min: 10 * time.Millisecond, Max: 20 * time.Millisecond} h := fixture(func(r Registry, s cell.Health, l cell.Lifecycle) { - g = r.NewGroup(s, l) + g = r.NewGroup(s) g.Add( OneShot("retry-fail", func(ctx context.Context, health cell.Health) error { @@ -146,7 +146,7 @@ func testOneShot_RetryBackoff(t *testing.T) (bool, error) { rateLimiter := &ExponentialBackoff{Min: retryMin, Max: retryMax} h := fixture(func(r Registry, s cell.Health, l cell.Lifecycle) { - g = r.NewGroup(s, l) + g = r.NewGroup(s) g.Add( OneShot("retry-backoff", func(ctx context.Context, health cell.Health) error { @@ -161,8 +161,9 @@ func testOneShot_RetryBackoff(t *testing.T) (bool, error) { return true, err } - // Continue as soon as all jobs stopped - g.(*group).wg.Wait() + require.Eventually(t, func() bool { + return len(times) == retries+1 + }, timeout, tick) if err := h.Stop(log, context.Background()); err != nil { return true, err @@ -201,7 +202,7 @@ func TestOneShot_RetryRecover(t *testing.T) { rateLimiter := &ExponentialBackoff{Min: 10 * time.Millisecond, Max: 20 * time.Millisecond} h := fixture(func(r Registry, s cell.Health, l cell.Lifecycle) { - g = r.NewGroup(s, l) + g = r.NewGroup(s) g.Add( OneShot("retry-recover", func(ctx context.Context, health cell.Health) error { @@ -239,7 +240,7 @@ func TestOneShot_Shutdown(t *testing.T) { targetErr := errors.New("Always error") h := fixture(func(r Registry, s cell.Health, l cell.Lifecycle) { - g := r.NewGroup(s, l) + g := r.NewGroup(s) g.Add( OneShot("shutdown", func(ctx context.Context, health cell.Health) error { @@ -266,7 +267,7 @@ func TestOneShot_RetryFailShutdown(t *testing.T) { targetErr := errors.New("Always error") h := fixture(func(r Registry, s cell.Health, l cell.Lifecycle) { - g := r.NewGroup(s, l) + g := r.NewGroup(s) g.Add( OneShot("retry-fail-shutdown", func(ctx context.Context, health cell.Health) error { @@ -301,7 +302,7 @@ func TestOneShot_RetryRecoverNoShutdown(t *testing.T) { const retries = 5 h := fixture(func(r Registry, s cell.Health, l cell.Lifecycle) { - g = r.NewGroup(s, l) + g = r.NewGroup(s) }) log := hivetest.Logger(t) @@ -309,9 +310,15 @@ func TestOneShot_RetryRecoverNoShutdown(t *testing.T) { require.NoError(t, h.Start(log, ctx)) // Add the job dynamically. + jobDone := make(chan struct{}) + g.Add( OneShot("retry-recover-no-shutdown", func(ctx context.Context, health cell.Health) error { - defer func() { i.Add(1) }() + defer func() { + if i.Add(1) == 2 { + close(jobDone) + } + }() if i.Load() == 0 { close(started) @@ -327,7 +334,7 @@ func TestOneShot_RetryRecoverNoShutdown(t *testing.T) { // Manually trigger a shutdown after the group has no more running jobs, will exit the hive with a nil go func() { <-started - g.(*group).wg.Wait() + <-jobDone h.Shutdown() close(shutdown) }() @@ -352,7 +359,7 @@ func TestOneShot_RetryWhileShuttingDown(t *testing.T) { shutdown := make(chan struct{}) h := fixture(func(r Registry, s cell.Health, l cell.Lifecycle) { - g = r.NewGroup(s, l) + g = r.NewGroup(s) g.Add( OneShot("retry-context-closed", func(ctx context.Context, health cell.Health) error { diff --git a/job/timer.go b/job/timer.go index 4fe37f0..5da6fb3 100644 --- a/job/timer.go +++ b/job/timer.go @@ -6,6 +6,7 @@ package job import ( "context" "errors" + "fmt" "sync" "time" @@ -144,9 +145,11 @@ type jobTimer struct { shutdown hive.Shutdowner } -func (jt *jobTimer) start(ctx context.Context, wg *sync.WaitGroup, health cell.Health, options options) { - defer wg.Done() +func (jt *jobTimer) info() string { + return fmt.Sprintf("%s (%s)", jt.name, internal.FuncNameAndLocation(jt.fn)) +} +func (jt *jobTimer) start(ctx context.Context, health cell.Health, options options) { for _, opt := range jt.opts { opt(jt) } diff --git a/job/timer_test.go b/job/timer_test.go index a532d68..22685e2 100644 --- a/job/timer_test.go +++ b/job/timer_test.go @@ -24,7 +24,7 @@ func TestTimer_OnInterval(t *testing.T) { var i atomic.Int32 h := fixture(func(r Registry, s cell.Health, l cell.Lifecycle) { - g := r.NewGroup(s, l) + g := r.NewGroup(s) g.Add( Timer("on-interval", func(ctx context.Context) error { @@ -61,7 +61,7 @@ func TestTimer_Trigger(t *testing.T) { trigger := NewTrigger() h := fixture(func(r Registry, s cell.Health, l cell.Lifecycle) { - g := r.NewGroup(s, l) + g := r.NewGroup(s) g.Add( Timer("on-interval", func(ctx context.Context) error { @@ -108,7 +108,7 @@ func TestTimer_DoubleTrigger(t *testing.T) { trigger := NewTrigger() h := fixture(func(r Registry, s cell.Health, l cell.Lifecycle) { - g := r.NewGroup(s, l) + g := r.NewGroup(s) g.Add( Timer("on-interval", func(ctx context.Context) error { @@ -157,7 +157,7 @@ func TestTimer_TriggerDebounce(t *testing.T) { trigger := NewTrigger(WithDebounce(time.Hour)) h := fixture(func(r Registry, s cell.Health, l cell.Lifecycle) { - g := r.NewGroup(s, l) + g := r.NewGroup(s) g.Add( Timer("on-interval", func(ctx context.Context) error { @@ -209,7 +209,7 @@ func TestTimer_TriggerOnly(t *testing.T) { trigger := NewTrigger() h := fixture(func(r Registry, s cell.Health, l cell.Lifecycle) { - g := r.NewGroup(s, l) + g := r.NewGroup(s) g.Add( Timer("on-interval", func(ctx context.Context) error { @@ -251,7 +251,7 @@ func TestTimer_ExitOnClose(t *testing.T) { var i atomic.Int32 h := fixture(func(r Registry, s cell.Health, l cell.Lifecycle) { - g := r.NewGroup(s, l) + g := r.NewGroup(s) g.Add( Timer("on-interval", func(ctx context.Context) error { @@ -283,7 +283,7 @@ func TestTimer_ExitOnCloseFnCtx(t *testing.T) { var i atomic.Int32 started := make(chan struct{}) h := fixture(func(r Registry, s cell.Health, l cell.Lifecycle) { - g := r.NewGroup(s, l) + g := r.NewGroup(s) g.Add( Timer("on-interval", func(ctx context.Context) error { diff --git a/shell/server.go b/shell/server.go index 3894106..f05ec0f 100644 --- a/shell/server.go +++ b/shell/server.go @@ -38,7 +38,7 @@ var defaultCmdsToInclude = []string{ } func registerShell(in hive.ScriptCmds, log *slog.Logger, lc cell.Lifecycle, jobs job.Registry, health cell.Health, c Config) { - jg := jobs.NewGroup(health, lc) + jg := jobs.NewGroup(health) if c.ShellSockPath == "" { log.Info("Shell socket path not set, not starting shell server") diff --git a/shell/shell_test.go b/shell/shell_test.go index 4c3654e..8d5cb30 100644 --- a/shell/shell_test.go +++ b/shell/shell_test.go @@ -46,7 +46,7 @@ func fixture(t *testing.T, cfg Config) { cell.SimpleHealthCell, cell.Provide( func(r job.Registry, lc cell.Lifecycle, health cell.Health) job.Group { - return r.NewGroup(health, lc) + return r.NewGroup(health) }, ), ServerCell(cfg.ShellSockPath), From 8a45eb6ac6971b11dc9fc4aa898670faf5950372 Mon Sep 17 00:00:00 2001 From: Jussi Maki Date: Thu, 11 Dec 2025 15:24:29 +0100 Subject: [PATCH 4/6] job: add log-based lifecycle ordering test --- job/job_test.go | 156 ++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 156 insertions(+) diff --git a/job/job_test.go b/job/job_test.go index 3e03c40..130256b 100644 --- a/job/job_test.go +++ b/job/job_test.go @@ -131,6 +131,56 @@ func TestGroup_JobRuntime(t *testing.T) { h.Stop(slog.Default(), context.Background()) } +func TestJobLifecycleOrderingAcrossGroups(t *testing.T) { + t.Parallel() + + var ( + g1 Group + g2 Group + ) + + staticStarts := []string{"g1-first", "g1-second", "g2-first", "g1-third"} + expectStarts := append(append([]string{}, staticStarts...), "g2-dynamic", "g1-dynamic") + expectStops := []string{"g1-third", "g2-first", "g1-second", "g1-first", "g1-dynamic", "g2-dynamic"} + + addJob := func(g Group, name string) { + g.Add(&orderingJob{name: name}) + } + + h := fixture(func(r Registry, s cell.Health, l cell.Lifecycle) { + g1 = r.NewGroup(s) + g2 = r.NewGroup(s) + + addJob(g1, "g1-first") + addJob(g1, "g1-second") + addJob(g2, "g2-first") + addJob(g1, "g1-third") + }) + + log, logs := newRecordingLogger() + ctx := context.Background() + require.NoError(t, h.Start(log, ctx)) + + waitForDetails(t, logs, len(staticStarts)) + startDetails := logs.attrs("detail") + assert.Equal(t, staticStarts, startDetails) + + // Add jobs dynamically after the lifecycle has already started. + addJob(g2, "g2-dynamic") + addJob(g1, "g1-dynamic") + + waitForDetails(t, logs, len(expectStarts)) + startDetails = logs.attrs("detail") + assert.Equal(t, expectStarts, startDetails) + logs.clear() + + require.NoError(t, h.Stop(log, ctx)) + + waitForDetails(t, logs, len(expectStops)) + stopDetails := logs.attrs("detail") + assert.Equal(t, expectStops, stopDetails) +} + func TestModuleDecoratedGroup(t *testing.T) { opts := hive.DefaultOptions() opts.ModulePrivateProviders = cell.ModulePrivateProviders{ @@ -228,3 +278,109 @@ func Test_sanitizeName(t *testing.T) { }) } } + +type orderingJob struct { + name string +} + +func (oj *orderingJob) start(ctx context.Context, _ cell.Health, _ options) { + <-ctx.Done() +} + +func (oj *orderingJob) info() string { + return oj.name +} + +type logRecord struct { + msg string + attrs map[string]string +} + +type logCollector struct { + mu sync.Mutex + records []logRecord +} + +func (lc *logCollector) add(rec logRecord) { + lc.mu.Lock() + defer lc.mu.Unlock() + lc.records = append(lc.records, rec) +} + +func (lc *logCollector) clear() { + lc.mu.Lock() + defer lc.mu.Unlock() + lc.records = nil +} + +func (lc *logCollector) attrs(attr string) []string { + lc.mu.Lock() + defer lc.mu.Unlock() + + var out []string + for _, rec := range lc.records { + value, ok := rec.attrs[attr] + if !ok { + continue + } + out = append(out, value) + } + return out +} + +type recordingHandler struct { + collector *logCollector + attrs []slog.Attr +} + +func newRecordingLogger() (*slog.Logger, *logCollector) { + collector := &logCollector{} + return slog.New(&recordingHandler{collector: collector}), collector +} + +func (rh *recordingHandler) Enabled(_ context.Context, lvl slog.Level) bool { + return lvl >= slog.LevelInfo +} + +func (rh *recordingHandler) Handle(_ context.Context, r slog.Record) error { + rec := logRecord{ + msg: r.Message, + attrs: make(map[string]string), + } + for _, attr := range rh.attrs { + rec.attrs[attr.Key] = fmt.Sprint(attr.Value) + } + r.Attrs(func(a slog.Attr) bool { + rec.attrs[a.Key] = fmt.Sprint(a.Value) + return true + }) + rh.collector.add(rec) + return nil +} + +func (rh *recordingHandler) WithAttrs(attrs []slog.Attr) slog.Handler { + nrh := &recordingHandler{ + collector: rh.collector, + attrs: append(append([]slog.Attr{}, rh.attrs...), attrs...), + } + return nrh +} + +func (rh *recordingHandler) WithGroup(string) slog.Handler { + return &recordingHandler{ + collector: rh.collector, + attrs: append([]slog.Attr{}, rh.attrs...), + } +} + +func waitForDetails(t *testing.T, logs *logCollector, expectedLen int) { + t.Helper() + deadline := time.Now().Add(timeout) + for time.Now().Before(deadline) { + if len(logs.attrs("detail")) >= expectedLen { + return + } + time.Sleep(tick) + } + t.Fatalf("timeout waiting for logs (expected %d); have %v", expectedLen, logs.attrs("detail")) +} From 0dcc9fa45642bc37841ae603482c608ca7fc29d0 Mon Sep 17 00:00:00 2001 From: Jussi Maki Date: Thu, 11 Dec 2025 15:45:58 +0100 Subject: [PATCH 5/6] job: Fix race in testOneShot_RetryBackoff "make test-race" was failing due to racy access to `times`. Use a mutex. Signed-off-by: Jussi Maki --- job/oneshot_test.go | 28 ++++++++++++++++++---------- 1 file changed, 18 insertions(+), 10 deletions(-) diff --git a/job/oneshot_test.go b/job/oneshot_test.go index 398fd72..00ea1b7 100644 --- a/job/oneshot_test.go +++ b/job/oneshot_test.go @@ -6,6 +6,7 @@ package job import ( "context" "errors" + "sync" "sync/atomic" "testing" "time" @@ -132,8 +133,9 @@ func TestOneShot_RetryBackoff(t *testing.T) { // This test asserts that the one shot jobs have a delay equal to the expected behavior of the passed in ratelimiter. func testOneShot_RetryBackoff(t *testing.T) (bool, error) { var ( - g Group - times []time.Time + g Group + times []time.Time + timesMu sync.Mutex ) failed := false @@ -148,12 +150,12 @@ func testOneShot_RetryBackoff(t *testing.T) (bool, error) { h := fixture(func(r Registry, s cell.Health, l cell.Lifecycle) { g = r.NewGroup(s) - g.Add( - OneShot("retry-backoff", func(ctx context.Context, health cell.Health) error { - times = append(times, time.Now()) - return errors.New("Always error") - }, WithRetry(retries, rateLimiter)), - ) + g.Add(OneShot("retry-backoff", func(ctx context.Context, health cell.Health) error { + timesMu.Lock() + times = append(times, time.Now()) + timesMu.Unlock() + return errors.New("Always error") + }, WithRetry(retries, rateLimiter))) }) log := hivetest.Logger(t) @@ -162,6 +164,8 @@ func testOneShot_RetryBackoff(t *testing.T) (bool, error) { } require.Eventually(t, func() bool { + timesMu.Lock() + defer timesMu.Unlock() return len(times) == retries+1 }, timeout, tick) @@ -169,9 +173,13 @@ func testOneShot_RetryBackoff(t *testing.T) (bool, error) { return true, err } + timesMu.Lock() + copied := append([]time.Time(nil), times...) + timesMu.Unlock() + var last time.Duration - for i := 1; i < len(times); i++ { - diff := times[i].Sub(times[i-1]) + for i := 1; i < len(copied); i++ { + diff := copied[i].Sub(copied[i-1]) if i > 2 { // Test that the rate of change is 2x (+- 50%, the 50% to account for CI time dilation). // The 10 factor is to add avoid integer rounding. From 6c85d8c61f03787ddf8bb94cad1883ca7c88fed4 Mon Sep 17 00:00:00 2001 From: Jussi Maki Date: Fri, 19 Dec 2025 08:11:12 +0100 Subject: [PATCH 6/6] go.mod: tidy Looks like this hadn't been updated when an indirect dependency became a direct dependency. Signed-off-by: Jussi Maki --- go.mod | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/go.mod b/go.mod index 95dbe35..9fbfe66 100644 --- a/go.mod +++ b/go.mod @@ -13,6 +13,8 @@ require ( github.com/stretchr/testify v1.8.4 go.uber.org/dig v1.17.1 go.uber.org/goleak v1.3.0 + golang.org/x/exp v0.0.0-20231206192017-f3f8817b8deb + golang.org/x/sys v0.15.0 golang.org/x/term v0.15.0 golang.org/x/tools v0.16.0 ) @@ -32,8 +34,6 @@ require ( github.com/spf13/afero v1.11.0 // indirect github.com/subosito/gotenv v1.6.0 // indirect go.uber.org/multierr v1.11.0 // indirect - golang.org/x/exp v0.0.0-20231206192017-f3f8817b8deb // indirect - golang.org/x/sys v0.15.0 // indirect golang.org/x/text v0.14.0 // indirect golang.org/x/time v0.5.0 // indirect gopkg.in/ini.v1 v1.67.0 // indirect