Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 21 additions & 2 deletions cell/lifecycle.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,14 @@ type HookInterface interface {
Stop(HookContext) error
}

// HookDescriptiveInterface extends HookInterface with the HookInfo
// method for logging more details about what was started or stopped.
// This will be used instead of just showing the function name and module.
type HookDescriptiveInterface interface {
HookInterface
HookInfo() string
}

// Hook is a pair of start and stop callbacks. Both are optional.
// They're paired up to make sure that on failed start all corresponding
// stop hooks are executed.
Expand Down Expand Up @@ -106,7 +114,8 @@ func (lc *DefaultLifecycle) Start(log *slog.Logger, ctx context.Context) error {
ctx, cancel := context.WithCancel(ctx)
defer cancel()

for i, hook := range lc.hooks {
from := lc.numStarted
for i, hook := range lc.hooks[from:] {
fnName, exists := getHookFuncName(hook, true)

if !exists {
Expand All @@ -116,9 +125,13 @@ func (lc *DefaultLifecycle) Start(log *slog.Logger, ctx context.Context) error {
}

l := log.With("function", fnName)
desc, ok := hook.HookInterface.(HookDescriptiveInterface)
if ok {
l = l.With("detail", desc.HookInfo())
}

// Do not attempt to start already started hooks.
if i < lc.numStarted {
if from+i < lc.numStarted {
l.Error("Hook appears to be running. Skipping")
continue
}
Expand Down Expand Up @@ -161,7 +174,13 @@ func (lc *DefaultLifecycle) Stop(log *slog.Logger, ctx context.Context) error {
if !exists {
continue
}

l := log.With("function", fnName)
desc, ok := hook.HookInterface.(HookDescriptiveInterface)
if ok {
l = l.With("detail", desc.HookInfo())
} else {
}
l.Debug("Executing stop hook")
t0 := time.Now()
if err := hook.Stop(ctx); err != nil {
Expand Down
2 changes: 1 addition & 1 deletion example/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ func newExampleEvents(lc cell.Lifecycle, jobs job.Registry, health cell.Health)
es.Observable, es.emit, es.complete = stream.Multicast[ExampleEvent]()

// Create a new job group and add emitter as a one-shot job.
g := jobs.NewGroup(health, lc)
g := jobs.NewGroup(health)
g.Add(job.OneShot("emitter", es.emitter))
return es
}
Expand Down
4 changes: 2 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ require (
github.com/stretchr/testify v1.8.4
go.uber.org/dig v1.17.1
go.uber.org/goleak v1.3.0
golang.org/x/exp v0.0.0-20231206192017-f3f8817b8deb
golang.org/x/sys v0.15.0
golang.org/x/term v0.15.0
golang.org/x/tools v0.16.0
)
Expand All @@ -32,8 +34,6 @@ require (
github.com/spf13/afero v1.11.0 // indirect
github.com/subosito/gotenv v1.6.0 // indirect
go.uber.org/multierr v1.11.0 // indirect
golang.org/x/exp v0.0.0-20231206192017-f3f8817b8deb // indirect
golang.org/x/sys v0.15.0 // indirect
golang.org/x/text v0.14.0 // indirect
golang.org/x/time v0.5.0 // indirect
gopkg.in/ini.v1 v1.67.0 // indirect
Expand Down
208 changes: 97 additions & 111 deletions job/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,33 +34,67 @@ type Registry interface {
// NewGroup creates a new group of jobs which can be started and stopped together as part of the cells lifecycle.
// The provided scope is used to report health status of the jobs. A `cell.Scope` can be obtained via injection
// an object with the correct scope is provided by the closest `cell.Module`.
NewGroup(health cell.Health, lc cell.Lifecycle, opts ...groupOpt) Group
NewGroup(health cell.Health, opts ...groupOpt) Group
}

type registry struct {
logger *slog.Logger
shutdowner hive.Shutdowner

mu sync.Mutex
groups []Group
mu sync.Mutex
groups []*group
started bool

lifecycle cell.Lifecycle
dynamicLC *cell.DefaultLifecycle
}

var _ cell.HookInterface = (*registry)(nil)

func newRegistry(
logger *slog.Logger,
shutdowner hive.Shutdowner,
lc cell.Lifecycle,
) Registry {
return &registry{
r := &registry{
logger: logger,
shutdowner: shutdowner,
lifecycle: lc,
}
lc.Append(r)
return r
}

func (c *registry) Start(cell.HookContext) error {
c.mu.Lock()
defer c.mu.Unlock()

if c.started {
return nil
}
c.started = true
c.dynamicLC = cell.NewDefaultLifecycle(nil, 0, 0)
return nil
}

func (c *registry) Stop(ctx cell.HookContext) error {
c.mu.Lock()
defer c.mu.Unlock()
if !c.started {
return nil
}
c.started = false
c.dynamicLC.Stop(c.logger, ctx)
return nil
}

// NewGroup creates a new Group with the given `opts` options, which allows you to customize the behavior for the
// group as a whole. For example by allowing you to add pprof labels to the group or by customizing the logger.
//
// Jobs added to the group before it is started will be appended to the provided lifecycle. Jobs added
// after starting are started immediately.
func (c *registry) NewGroup(health cell.Health, lc cell.Lifecycle, opts ...groupOpt) Group {
// Jobs added to the group before it is started will be appended to the registry's lifecycle.
// Jobs added after starting are started immediately and stopped sequentially in reverse order
// when registry is stopped.
func (c *registry) NewGroup(health cell.Health, opts ...groupOpt) Group {
c.mu.Lock()
defer c.mu.Unlock()

Expand All @@ -73,21 +107,45 @@ func (c *registry) NewGroup(health cell.Health, lc cell.Lifecycle, opts ...group
}

g := &group{
options: options,
lifecycle: lc,
wg: &sync.WaitGroup{},
health: health,
registry: c,
options: options,
health: health,
}
// Append the lifecycle hooks for the group. The start hook sets up a context
// for the dynamical jobs (jobs added after starting) and a stop hook to cancel
// the context and wait for the jobs to finish.
lc.Append((*groupHooks)(g))

c.groups = append(c.groups, g)

return g
}

func (c *registry) addJobs(health cell.Health, opts options, jobs ...Job) {
c.mu.Lock()
defer c.mu.Unlock()

if !c.started {
for _, job := range jobs {
c.lifecycle.Append(&queuedJob{
registry: c,
job: job,
health: health,
options: opts,
})
}
return
}

for _, job := range jobs {
qj := &queuedJob{
registry: c,
job: job,
health: health,
options: opts,
}
c.dynamicLC.Append(qj)
// Start the newly appended job immediately.
c.dynamicLC.Start(c.logger, context.Background())
}
}

// Group aims to streamline the management of work within a cell.
// A group allows you to add multiple types of jobs with different kinds of logic.
// No matter the job type, the function provided to is always called with a context which
Expand All @@ -104,61 +162,54 @@ type Group interface {
// Job in an interface that describes a unit of work which can be added to a Group. This interface contains unexported
// methods and thus can only be implemented by functions in this package such as OneShot, Timer, or Observer.
type Job interface {
start(ctx context.Context, wg *sync.WaitGroup, health cell.Health, options options)
start(ctx context.Context, health cell.Health, options options)
info() string
}

type queuedJob struct {
job Job
health cell.Health
options options

wq sync.WaitGroup
cancel context.CancelFunc
registry *registry
job Job
health cell.Health
options options
cancel context.CancelFunc
done chan struct{}
}

// Start implements cell.HookInterface.
func (qj *queuedJob) Start(cell.HookContext) error {
qj.wq.Add(1)

var ctx context.Context
ctx, qj.cancel = context.WithCancel(context.Background())
qj.done = make(chan struct{})
ctx, cancel := context.WithCancel(context.Background())
qj.cancel = cancel
pprof.Do(ctx, qj.options.pprofLabels, func(ctx context.Context) {
go qj.job.start(ctx, &qj.wq, qj.health, qj.options)
go func() {
defer close(qj.done)
qj.job.start(ctx, qj.health, qj.options)
}()
})
return nil
}

func (qj *queuedJob) HookInfo() string {
return qj.job.info()
}

// Stop implements cell.HookInterface.
func (qj *queuedJob) Stop(ctx cell.HookContext) error {
qj.cancel()

stopped := make(chan struct{})
go func() {
qj.wq.Wait()
close(stopped)
}()
select {
case <-ctx.Done():
return ctx.Err()
case <-stopped:
case <-qj.done:
return nil
}
}

var _ cell.HookInterface = &queuedJob{}

type group struct {
options options
lifecycle cell.Lifecycle

// wg is a wait group for "dynamic" jobs added after starting.
wg *sync.WaitGroup

mu sync.Mutex
ctx context.Context
cancel context.CancelFunc

health cell.Health
registry *registry
options options
health cell.Health
}

type options struct {
Expand Down Expand Up @@ -192,77 +243,12 @@ func WithMetrics(metrics Metrics) groupOpt {
}
}

var _ cell.HookInterface = (*groupHooks)(nil)

// groupHooks implements the Hive start and stop hooks. Hidden as these
// are appended by NewGroup.
type groupHooks group

// Start implements the cell.HookInterface interface
func (gh *groupHooks) Start(_ cell.HookContext) error {
jg := (*group)(gh)
jg.mu.Lock()
defer jg.mu.Unlock()

// Create a context for the dynamically started jobs.
jg.ctx, jg.cancel = context.WithCancel(context.Background())
return nil
}

// Stop implements the cell.HookInterface interface
func (gh *groupHooks) Stop(stopCtx cell.HookContext) error {
jg := (*group)(gh)
jg.mu.Lock()
defer jg.mu.Unlock()

// Stop all dynamically started jobs.
done := make(chan struct{})
go func() {
jg.wg.Wait()
close(done)
}()

jg.cancel()

select {
case <-stopCtx.Done():
jg.options.logger.Error("Stop hook context expired before job group was done")
case <-done:
}

return nil
}

func (jg *group) Add(jobs ...Job) {
jg.add(jg.health, jobs...)
}

func (jg *group) add(health cell.Health, jobs ...Job) {
jg.mu.Lock()
defer jg.mu.Unlock()

// The context is only set once the group has been started.
// If we have not yet started append hooks for the jobs to be started as part
// of the normal lifecycle. This makes sure that the start order reflects the
// order in which the jobs are added and avoids e.g. starting a job before its
// dependencies.
if jg.ctx == nil {
for _, job := range jobs {
jg.lifecycle.Append(&queuedJob{
job: job,
health: health,
options: jg.options,
})
}
return
}

for _, j := range jobs {
jg.wg.Add(1)
pprof.Do(jg.ctx, jg.options.pprofLabels, func(ctx context.Context) {
go j.start(ctx, jg.wg, health, jg.options)
})
}
jg.registry.addJobs(health, jg.options, jobs...)
}

// Scoped creates a scoped group, jobs added to this scoped group will appear as a sub-scope in the health reporter
Expand Down
Loading