Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
141 changes: 113 additions & 28 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,14 @@ shutdown complete
- [func EveryInterval\(d time.Duration, fn func\(WorkerContext\) error\) func\(WorkerContext\) error](<#EveryInterval>)
- [func Run\(ctx context.Context, workers \[\]\*Worker, opts ...RunOption\) error](<#Run>)
- [func RunWorker\(ctx context.Context, w \*Worker, opts ...RunOption\)](<#RunWorker>)
- [type BaseMetrics](<#BaseMetrics>)
- [func \(BaseMetrics\) ObserveRunDuration\(string, time.Duration\)](<#BaseMetrics.ObserveRunDuration>)
- [func \(BaseMetrics\) SetActiveWorkers\(int\)](<#BaseMetrics.SetActiveWorkers>)
- [func \(BaseMetrics\) WorkerFailed\(string, error\)](<#BaseMetrics.WorkerFailed>)
- [func \(BaseMetrics\) WorkerPanicked\(string\)](<#BaseMetrics.WorkerPanicked>)
- [func \(BaseMetrics\) WorkerRestarted\(string, int\)](<#BaseMetrics.WorkerRestarted>)
- [func \(BaseMetrics\) WorkerStarted\(string\)](<#BaseMetrics.WorkerStarted>)
- [func \(BaseMetrics\) WorkerStopped\(string\)](<#BaseMetrics.WorkerStopped>)
- [type Metrics](<#Metrics>)
- [func NewPrometheusMetrics\(namespace string\) Metrics](<#NewPrometheusMetrics>)
- [type RunOption](<#RunOption>)
Expand All @@ -210,7 +218,7 @@ shutdown complete


<a name="BatchChannelWorker"></a>
## func BatchChannelWorker
## func [BatchChannelWorker](<https://github.com/go-coldbrew/workers/blob/main/helpers.go#L49>)

```go
func BatchChannelWorker[T any](ch <-chan T, maxSize int, maxDelay time.Duration, fn func(WorkerContext, []T) error) func(WorkerContext) error
Expand Down Expand Up @@ -265,7 +273,7 @@ func main() {
</details>

<a name="ChannelWorker"></a>
## func ChannelWorker
## func [ChannelWorker](<https://github.com/go-coldbrew/workers/blob/main/helpers.go#L27>)

```go
func ChannelWorker[T any](ch <-chan T, fn func(WorkerContext, T) error) func(WorkerContext) error
Expand Down Expand Up @@ -321,7 +329,7 @@ world
</details>

<a name="EveryInterval"></a>
## func EveryInterval
## func [EveryInterval](<https://github.com/go-coldbrew/workers/blob/main/helpers.go#L8>)

```go
func EveryInterval(d time.Duration, fn func(WorkerContext) error) func(WorkerContext) error
Expand Down Expand Up @@ -373,7 +381,7 @@ tick 2
</details>

<a name="Run"></a>
## func Run
## func [Run](<https://github.com/go-coldbrew/workers/blob/main/run.go#L120>)

```go
func Run(ctx context.Context, workers []*Worker, opts ...RunOption) error
Expand Down Expand Up @@ -429,7 +437,7 @@ all workers stopped
</details>

<a name="RunWorker"></a>
## func RunWorker
## func [RunWorker](<https://github.com/go-coldbrew/workers/blob/main/run.go#L143>)

```go
func RunWorker(ctx context.Context, w *Worker, opts ...RunOption)
Expand Down Expand Up @@ -478,10 +486,93 @@ done
</p>
</details>

<a name="BaseMetrics"></a>
## type [BaseMetrics](<https://github.com/go-coldbrew/workers/blob/main/metrics.go#L43>)

BaseMetrics provides no\-op implementations of all Metrics methods. Embed it in custom Metrics implementations so that new methods added to the Metrics interface in future versions get safe no\-op defaults instead of breaking your build:

```
type myMetrics struct {
workers.BaseMetrics // forward-compatible
client *statsd.Client
}

func (m *myMetrics) WorkerStarted(name string) {
m.client.Incr("worker.started", []string{"worker:" + name}, 1)
}
```

```go
type BaseMetrics struct{}
```

<a name="BaseMetrics.ObserveRunDuration"></a>
### func \(BaseMetrics\) [ObserveRunDuration](<https://github.com/go-coldbrew/workers/blob/main/metrics.go#L50>)

```go
func (BaseMetrics) ObserveRunDuration(string, time.Duration)
```



<a name="BaseMetrics.SetActiveWorkers"></a>
### func \(BaseMetrics\) [SetActiveWorkers](<https://github.com/go-coldbrew/workers/blob/main/metrics.go#L51>)

```go
func (BaseMetrics) SetActiveWorkers(int)
```



<a name="BaseMetrics.WorkerFailed"></a>
### func \(BaseMetrics\) [WorkerFailed](<https://github.com/go-coldbrew/workers/blob/main/metrics.go#L48>)

```go
func (BaseMetrics) WorkerFailed(string, error)
```



<a name="BaseMetrics.WorkerPanicked"></a>
### func \(BaseMetrics\) [WorkerPanicked](<https://github.com/go-coldbrew/workers/blob/main/metrics.go#L47>)

```go
func (BaseMetrics) WorkerPanicked(string)
```



<a name="BaseMetrics.WorkerRestarted"></a>
### func \(BaseMetrics\) [WorkerRestarted](<https://github.com/go-coldbrew/workers/blob/main/metrics.go#L49>)

```go
func (BaseMetrics) WorkerRestarted(string, int)
```



<a name="BaseMetrics.WorkerStarted"></a>
### func \(BaseMetrics\) [WorkerStarted](<https://github.com/go-coldbrew/workers/blob/main/metrics.go#L45>)

```go
func (BaseMetrics) WorkerStarted(string)
```



<a name="BaseMetrics.WorkerStopped"></a>
### func \(BaseMetrics\) [WorkerStopped](<https://github.com/go-coldbrew/workers/blob/main/metrics.go#L46>)

```go
func (BaseMetrics) WorkerStopped(string)
```



<a name="Metrics"></a>
## type Metrics
## type [Metrics](<https://github.com/go-coldbrew/workers/blob/main/metrics.go#L20-L28>)

Metrics collects worker lifecycle metrics. Implement this interface to provide custom metrics \(e.g., Datadog, StatsD\). Use NoopMetrics to disable metrics, or NewPrometheusMetrics for the built\-in Prometheus implementation.
Metrics collects worker lifecycle metrics. Implement this interface to provide custom metrics \(e.g., Datadog, StatsD\). Use BaseMetrics\{\} to disable metrics, or NewPrometheusMetrics for the built\-in Prometheus implementation.

```go
type Metrics interface {
Expand All @@ -495,14 +586,8 @@ type Metrics interface {
}
```

<a name="NoopMetrics"></a>NoopMetrics is a no\-op implementation of Metrics. Used as the default when no metrics are configured via WithMetrics.

```go
var NoopMetrics Metrics = &noopMetrics{}
```

<a name="NewPrometheusMetrics"></a>
### func NewPrometheusMetrics
### func [NewPrometheusMetrics](<https://github.com/go-coldbrew/workers/blob/main/metrics.go#L71>)

```go
func NewPrometheusMetrics(namespace string) Metrics
Expand All @@ -511,7 +596,7 @@ func NewPrometheusMetrics(namespace string) Metrics
NewPrometheusMetrics creates a Metrics implementation backed by Prometheus. The namespace is prepended to all metric names \(e.g., "myapp" → "myapp\_worker\_started\_total"\). Metrics are auto\-registered with the default Prometheus registry. Safe to call multiple times with the same namespace — returns the cached instance. The cache is process\-global; use a small number of static namespaces \(not per\-request/tenant values\).

<a name="RunOption"></a>
## type RunOption
## type [RunOption](<https://github.com/go-coldbrew/workers/blob/main/run.go#L15>)

RunOption configures the behavior of Run.

Expand All @@ -520,16 +605,16 @@ type RunOption func(*runConfig)
```

<a name="WithMetrics"></a>
### func WithMetrics
### func [WithMetrics](<https://github.com/go-coldbrew/workers/blob/main/run.go#L24>)

```go
func WithMetrics(m Metrics) RunOption
```

WithMetrics sets the metrics implementation for all workers started by Run. Workers inherit this unless they override via Worker.WithMetrics. If not set, NoopMetrics is used.
WithMetrics sets the metrics implementation for all workers started by Run. Workers inherit this unless they override via Worker.WithMetrics. If not set, BaseMetrics\{\} is used.

<a name="Worker"></a>
## type Worker
## type [Worker](<https://github.com/go-coldbrew/workers/blob/main/worker.go#L127-L137>)

Worker represents a background goroutine managed by the framework. Create with NewWorker and configure with builder methods.

Expand All @@ -540,7 +625,7 @@ type Worker struct {
```

<a name="NewWorker"></a>
### func NewWorker
### func [NewWorker](<https://github.com/go-coldbrew/workers/blob/main/worker.go#L141>)

```go
func NewWorker(name string, run func(WorkerContext) error) *Worker
Expand Down Expand Up @@ -588,7 +673,7 @@ worker "greeter" started (attempt 0)
</details>

<a name="Worker.Every"></a>
### func \(\*Worker\) Every
### func \(\*Worker\) [Every](<https://github.com/go-coldbrew/workers/blob/main/worker.go#L198>)

```go
func (w *Worker) Every(d time.Duration) *Worker
Expand Down Expand Up @@ -638,7 +723,7 @@ tick 2
</details>

<a name="Worker.WithBackoffJitter"></a>
### func \(\*Worker\) WithBackoffJitter
### func \(\*Worker\) [WithBackoffJitter](<https://github.com/go-coldbrew/workers/blob/main/worker.go#L175>)

```go
func (w *Worker) WithBackoffJitter(jitter suture.Jitter) *Worker
Expand All @@ -647,7 +732,7 @@ func (w *Worker) WithBackoffJitter(jitter suture.Jitter) *Worker
WithBackoffJitter adds random jitter to the backoff duration to prevent thundering herd on coordinated restarts.

<a name="Worker.WithFailureBackoff"></a>
### func \(\*Worker\) WithFailureBackoff
### func \(\*Worker\) [WithFailureBackoff](<https://github.com/go-coldbrew/workers/blob/main/worker.go#L168>)

```go
func (w *Worker) WithFailureBackoff(d time.Duration) *Worker
Expand All @@ -656,7 +741,7 @@ func (w *Worker) WithFailureBackoff(d time.Duration) *Worker
WithFailureBackoff sets the duration to wait between restarts. Suture default is 15 seconds.

<a name="Worker.WithFailureDecay"></a>
### func \(\*Worker\) WithFailureDecay
### func \(\*Worker\) [WithFailureDecay](<https://github.com/go-coldbrew/workers/blob/main/worker.go#L154>)

```go
func (w *Worker) WithFailureDecay(decay float64) *Worker
Expand All @@ -665,7 +750,7 @@ func (w *Worker) WithFailureDecay(decay float64) *Worker
WithFailureDecay sets the rate at which failure count decays over time. A value of 1.0 means failures decay by one per second. Suture default is 1.0.

<a name="Worker.WithFailureThreshold"></a>
### func \(\*Worker\) WithFailureThreshold
### func \(\*Worker\) [WithFailureThreshold](<https://github.com/go-coldbrew/workers/blob/main/worker.go#L161>)

```go
func (w *Worker) WithFailureThreshold(threshold float64) *Worker
Expand All @@ -674,7 +759,7 @@ func (w *Worker) WithFailureThreshold(threshold float64) *Worker
WithFailureThreshold sets the number of failures allowed before the supervisor gives up restarting. Suture default is 5.

<a name="Worker.WithMetrics"></a>
### func \(\*Worker\) WithMetrics
### func \(\*Worker\) [WithMetrics](<https://github.com/go-coldbrew/workers/blob/main/worker.go#L189>)

```go
func (w *Worker) WithMetrics(m Metrics) *Worker
Expand All @@ -683,7 +768,7 @@ func (w *Worker) WithMetrics(m Metrics) *Worker
WithMetrics sets a per\-worker metrics implementation, overriding the metrics inherited from the parent WorkerContext or Run options.

<a name="Worker.WithRestart"></a>
### func \(\*Worker\) WithRestart
### func \(\*Worker\) [WithRestart](<https://github.com/go-coldbrew/workers/blob/main/worker.go#L147>)

```go
func (w *Worker) WithRestart(restart bool) *Worker
Expand Down Expand Up @@ -732,7 +817,7 @@ func main() {
</details>

<a name="Worker.WithTimeout"></a>
### func \(\*Worker\) WithTimeout
### func \(\*Worker\) [WithTimeout](<https://github.com/go-coldbrew/workers/blob/main/worker.go#L182>)

```go
func (w *Worker) WithTimeout(d time.Duration) *Worker
Expand All @@ -741,7 +826,7 @@ func (w *Worker) WithTimeout(d time.Duration) *Worker
WithTimeout sets the maximum time to wait for the worker to stop during graceful shutdown. Suture default is 10 seconds.

<a name="WorkerContext"></a>
## type WorkerContext
## type [WorkerContext](<https://github.com/go-coldbrew/workers/blob/main/worker.go#L52-L66>)

WorkerContext extends context.Context with worker metadata and dynamic child worker management. The framework creates these — users never need to implement this interface.

Expand Down
37 changes: 23 additions & 14 deletions metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ var (

// Metrics collects worker lifecycle metrics.
// Implement this interface to provide custom metrics (e.g., Datadog, StatsD).
// Use NoopMetrics to disable metrics, or NewPrometheusMetrics for the built-in
// Use BaseMetrics{} to disable metrics, or NewPrometheusMetrics for the built-in
// Prometheus implementation.
type Metrics interface {
WorkerStarted(name string)
Expand All @@ -27,19 +27,28 @@ type Metrics interface {
SetActiveWorkers(count int)
}

// NoopMetrics is a no-op implementation of Metrics. Used as the default
// when no metrics are configured via WithMetrics.
var NoopMetrics Metrics = &noopMetrics{}

type noopMetrics struct{}

func (n *noopMetrics) WorkerStarted(string) {}
func (n *noopMetrics) WorkerStopped(string) {}
func (n *noopMetrics) WorkerPanicked(string) {}
func (n *noopMetrics) WorkerFailed(string, error) {}
func (n *noopMetrics) WorkerRestarted(string, int) {}
func (n *noopMetrics) ObserveRunDuration(string, time.Duration) {}
func (n *noopMetrics) SetActiveWorkers(int) {}
// BaseMetrics provides no-op implementations of all Metrics methods.
// Embed it in custom Metrics implementations so that new methods added
// to the Metrics interface in future versions get safe no-op defaults
// instead of breaking your build:
//
// type myMetrics struct {
// workers.BaseMetrics // forward-compatible
// client *statsd.Client
// }
//
// func (m *myMetrics) WorkerStarted(name string) {
// m.client.Incr("worker.started", []string{"worker:" + name}, 1)
// }
type BaseMetrics struct{}

func (BaseMetrics) WorkerStarted(string) {}
func (BaseMetrics) WorkerStopped(string) {}
func (BaseMetrics) WorkerPanicked(string) {}
func (BaseMetrics) WorkerFailed(string, error) {}
func (BaseMetrics) WorkerRestarted(string, int) {}
func (BaseMetrics) ObserveRunDuration(string, time.Duration) {}
func (BaseMetrics) SetActiveWorkers(int) {}

// prometheusMetrics implements Metrics using Prometheus counters, histograms,
// and gauges registered via promauto.
Expand Down
10 changes: 6 additions & 4 deletions run.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,12 @@ type runConfig struct {

// WithMetrics sets the metrics implementation for all workers started by Run.
// Workers inherit this unless they override via Worker.WithMetrics.
// If not set, NoopMetrics is used.
// If not set, BaseMetrics{} is used.
func WithMetrics(m Metrics) RunOption {
return func(c *runConfig) {
c.metrics = m
if m != nil {
c.metrics = m
}
}
}

Expand Down Expand Up @@ -97,7 +99,7 @@ func resolveMetrics(w *Worker, parent Metrics) Metrics {
if parent != nil {
return parent
}
return NoopMetrics
return BaseMetrics{}
}

// addWorkerToSupervisor creates a child supervisor for the worker,
Expand All @@ -116,7 +118,7 @@ func addWorkerToSupervisor(parent *suture.Supervisor, w *Worker, metrics Metrics
// A worker exiting early (without restart) does not stop other workers.
// Returns nil on clean shutdown.
func Run(ctx context.Context, workers []*Worker, opts ...RunOption) error {
cfg := &runConfig{metrics: NoopMetrics}
cfg := &runConfig{metrics: BaseMetrics{}}
for _, opt := range opts {
opt(cfg)
}
Expand Down
2 changes: 1 addition & 1 deletion worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ func (wc *workerContext) Children() []string {

func newWorkerContext(ctx context.Context, name string, attempt int, sup *suture.Supervisor, metrics Metrics, active *atomic.Int32) WorkerContext {
if metrics == nil {
metrics = NoopMetrics
metrics = BaseMetrics{}
}
return &workerContext{Context: ctx, name: name, attempt: attempt, sup: sup, metrics: metrics, active: active}
}
Expand Down
Loading