Skip to content

Commit ab83b51

Browse files
committed
fix documentation issues and add CLAUDE.md
README.md fixes: - fix Go version requirement (1.23+ -> 1.24+) - fix trailing comma in Quick Start example - fix WithCompleteFn -> WithWorkerCompleteFn with correct signature - fix dependencies claim (now mentions golang.org/x packages) - clarify work distribution (shared channel vs random accumulator) - clarify completion callback behavior (skipped on context.Canceled only) - clarify Dropped metric is user-incrementable - clarify AvgLatency uses max ProcessingTime across workers CLAUDE.md: - add comprehensive project documentation for AI assistants
1 parent 0b5f6fd commit ab83b51

2 files changed

Lines changed: 192 additions & 18 deletions

File tree

CLAUDE.md

Lines changed: 173 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,173 @@
1+
# CLAUDE.md
2+
3+
This file provides guidance to Claude Code (claude.ai/code) when working with code in this repository.
4+
5+
## Build and Test Commands
6+
7+
```bash
8+
# run all tests with race detection and coverage
9+
go test -timeout=60s -race -covermode=atomic -coverprofile=profile.cov ./...
10+
11+
# run a single test
12+
go test -run TestName
13+
14+
# run tests with verbose output
15+
go test -v ./...
16+
17+
# run benchmarks
18+
go test -bench=. -run=Bench
19+
20+
# lint (required for CI)
21+
golangci-lint run
22+
23+
# format code
24+
gofmt -s -w .
25+
goimports -w .
26+
```
27+
28+
## Architecture Overview
29+
30+
Generic worker pool library for Go 1.24+ with batching, work distribution, and metrics.
31+
32+
### Package Structure
33+
34+
- **pool.go** - Core `WorkerGroup[T]` with builder-pattern configuration
35+
- **collector.go** - `Collector[V]` for async result gathering
36+
- **metrics/** - Stats tracking (processed, errors, timings, custom counters)
37+
- **middleware/** - Built-in middleware (Retry, Timeout, Recovery, Validator, RateLimiter)
38+
- **examples/** - Usage examples (separate go.mod, excluded from tests)
39+
40+
### Core Types
41+
42+
```go
43+
// Worker interface - implement Do() or use WorkerFunc adapter
44+
type Worker[T any] interface {
45+
Do(ctx context.Context, v T) error
46+
}
47+
48+
// WorkerFunc adapter for functions
49+
type WorkerFunc[T any] func(ctx context.Context, v T) error
50+
51+
// Middleware wraps worker
52+
type Middleware[T any] func(Worker[T]) Worker[T]
53+
```
54+
55+
### Pool Modes
56+
57+
1. **Stateless** (`New[T](size, worker)`) - Single shared worker for all goroutines
58+
2. **Stateful** (`NewStateful[T](size, maker)`) - Each goroutine gets own worker via maker function
59+
60+
### Channel Architecture
61+
62+
WorkerGroup maintains two sets of channels:
63+
64+
**Shared channels** (used when no `WithChunkFn`):
65+
- `sharedCh chan T` - all workers compete for items
66+
- `sharedBatchCh chan []T` - for batch mode
67+
68+
**Per-worker channels** (used with `WithChunkFn`):
69+
- `workersCh []chan T` - dedicated channel per worker
70+
- `workerBatchCh []chan []T` - batch channel per worker
71+
72+
Channel selection in `Go()`:
73+
```go
74+
workerCh := p.sharedCh
75+
batchCh := p.sharedBatchCh
76+
if p.chunkFn != nil {
77+
workerCh = p.workersCh[i]
78+
batchCh = p.workerBatchCh[i]
79+
}
80+
```
81+
82+
**Important**: `WithWorkerChanSize()` recreates ALL channels (both shared and per-worker).
83+
84+
### Work Distribution
85+
86+
**Direct mode (batchSize=0):**
87+
- Without `WithChunkFn`: Shared channel - workers compete for items
88+
- With `WithChunkFn`: Per-worker channels with FNV-1a hash routing
89+
90+
**Batching mode (batchSize>0):**
91+
- Without `WithChunkFn`: Random accumulator slot selection via `rand.Intn(poolSize)`
92+
- With `WithChunkFn`: Consistent hashing using FNV-1a hash:
93+
```go
94+
h := fnv.New32a()
95+
h.Write([]byte(p.chunkFn(v)))
96+
id := int(h.Sum32()) % p.poolSize
97+
```
98+
99+
### Batching
100+
101+
When `batchSize > 0`:
102+
1. Items accumulate in `accumulators[id]` (one per worker slot)
103+
2. When accumulator reaches `batchSize`, batch sent to channel
104+
3. `Close()` flushes remaining partial batches
105+
106+
### Lifecycle
107+
108+
1. Create pool: `New[T]()` or `NewStateful[T]()`
109+
2. Configure: chain `With*()` methods (must be before `Go()`)
110+
3. Start: `Go(ctx)` - starts worker goroutines via errgroup
111+
4. Submit: `Submit(v)` (single producer) or `Send(v)` (concurrent-safe)
112+
5. Finish: `Close(ctx)` - flushes batches, closes channels, waits for workers
113+
114+
### Metrics System
115+
116+
`metrics.Value` tracks per-worker stats without locks (each worker writes to own slot):
117+
- `workerStats []Stats` - per-worker counters (Processed, Errors, Dropped, timings)
118+
- `userData map[string]int` - custom counters (mutex-protected)
119+
120+
Workers access metrics via context:
121+
```go
122+
m := metrics.Get(ctx)
123+
m.Inc("custom-counter")
124+
```
125+
126+
Timer types: `TimerProc`, `TimerWait`, `TimerInit`, `TimerWrap`
127+
128+
### Middleware Pattern
129+
130+
Middleware applied in reverse order (first = outermost):
131+
```go
132+
// For stateless: wraps p.worker directly
133+
// For stateful: wraps the maker function
134+
for i := len(middlewares) - 1; i >= 0; i-- {
135+
wrapped = middlewares[i](wrapped)
136+
}
137+
```
138+
139+
Built-in middleware in `middleware/`:
140+
- `Retry(attempts, baseDelay)` - exponential backoff with jitter
141+
- `Timeout(duration)` - per-operation timeout
142+
- `Recovery(handler)` - panic recovery
143+
- `Validator(fn)` - input validation
144+
- `RateLimiter(rate, burst)` - token bucket rate limiting (global, not per-worker)
145+
146+
### Collector
147+
148+
Simple async-to-sync bridge:
149+
```go
150+
type Collector[V any] struct {
151+
ch chan V
152+
ctx context.Context
153+
}
154+
```
155+
156+
- `Submit(v)` - blocks if buffer full
157+
- `Iter()` - returns `iter.Seq2[V, error]` (Go 1.23 range-over-func)
158+
- `All()` - collects all into slice
159+
160+
### Error Handling
161+
162+
- Default: first error stops pool (errgroup behavior)
163+
- `WithContinueOnError()`: accumulates errors, returns last error with total count
164+
- Worker completion callbacks only run if no error or `continueOnError` is set
165+
- Pool completion callback runs when last worker finishes (skipped only on `context.Canceled`, still runs on `context.DeadlineExceeded`)
166+
167+
### Key Implementation Details
168+
169+
1. `activated` flag prevents double `Go()` calls
170+
2. `activeWorkers` atomic counter tracks live workers for pool completion callback
171+
3. `sendMu` mutex makes `Send()` concurrent-safe (wraps `Submit()`)
172+
4. errgroup manages worker goroutines and error propagation
173+
5. Context from errgroup used for cancellation propagation

README.md

Lines changed: 19 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
# pool [![Build Status](https://github.com/go-pkgz/pool/workflows/build/badge.svg)](https://github.com/go-pkgz/pool/actions) [![Coverage Status](https://coveralls.io/repos/github/go-pkgz/pool/badge.svg?branch=master)](https://coveralls.io/github/go-pkgz/pool?branch=master) [![godoc](https://godoc.org/github.com/go-pkgz/pool?status.svg)](https://godoc.org/github.com/go-pkgz/pool)
22

3-
`pool` is a Go package that provides a generic, efficient worker pool implementation for parallel task processing. Built for Go 1.23+, it offers a flexible API with features like batching, work distribution strategies, and comprehensive metrics collection.
3+
`pool` is a Go package that provides a generic, efficient worker pool implementation for parallel task processing. Built for Go 1.24+, it offers a flexible API with features like batching, work distribution strategies, and comprehensive metrics collection.
44

55
## Features
66

@@ -16,7 +16,7 @@
1616
- Optional completion callbacks
1717
- Extensible middleware system for custom functionality
1818
- Built-in middlewares for common tasks
19-
- No external dependencies except for the testing framework
19+
- Minimal dependencies (only `golang.org/x/sync` and `golang.org/x/time` at runtime)
2020

2121
## Quick Start
2222

@@ -52,8 +52,8 @@ func main() {
5252
return nil
5353
})
5454

55-
// create a pool with 5 workers
56-
p := pool.New[string](5, worker).WithContinueOnError(), // don't stop on errors
55+
// create a pool with 5 workers
56+
p := pool.New[string](5, worker).WithContinueOnError() // don't stop on errors
5757

5858
// start the pool
5959
if err := p.Go(context.Background()); err != nil {
@@ -233,13 +233,14 @@ p := pool.New[Task](3, worker).WithChunkFn(func(t Task) string {
233233

234234
How distribution works:
235235
1. Without chunk function:
236-
- Items are distributed randomly among workers
236+
- Direct mode: Items go to a shared channel, workers compete for items
237+
- Batching mode: Items are assigned to random accumulator slots
237238
- Good for independent tasks
238239

239240
2. With chunk function:
240241
- Function returns string key for each item
241242
- Items with the same key always go to the same worker
242-
- Uses consistent hashing to map keys to workers
243+
- Uses FNV-1a hash to map keys to workers deterministically
243244

244245
When to use custom distribution:
245246
- Maintain ordering for related items
@@ -425,7 +426,7 @@ The `GetStats()` method returns a comprehensive `Stats` structure with the follo
425426
**Counters:**
426427
- `Processed` - total number of successfully processed items
427428
- `Errors` - total number of items that returned errors
428-
- `Dropped` - total number of items dropped (e.g., due to context cancellation)
429+
- `Dropped` - total number of items dropped (user-incrementable via `m.IncDropped(workerID)` where `m` is the metrics object)
429430

430431
**Timing Metrics:**
431432
- `ProcessingTime` - cumulative time spent processing items (max across workers)
@@ -436,7 +437,7 @@ The `GetStats()` method returns a comprehensive `Stats` structure with the follo
436437

437438
**Derived Statistics:**
438439
- `RatePerSec` - items processed per second (Processed / TotalTime)
439-
- `AvgLatency` - average processing time per item (ProcessingTime / Processed)
440+
- `AvgLatency` - average wall-clock time per item (max ProcessingTime across workers / Processed)
440441
- `ErrorRate` - percentage of items that failed (Errors / Total)
441442
- `DroppedRate` - percentage of items dropped (Dropped / Total)
442443
- `Utilization` - percentage of time spent processing vs waiting (ProcessingTime / (ProcessingTime + WaitTime))
@@ -523,7 +524,7 @@ p := pool.New[string](5, worker).
523524
The completion callback executes when:
524525
- All workers have completed processing
525526
- Errors occurred but pool continued (`WithContinueOnError()`)
526-
- Does not execute on context cancellation
527+
- Skipped only on `context.Canceled` (still runs on `context.DeadlineExceeded`)
527528

528529
Important notes:
529530
- Use `Submit` when sending items from a single goroutine
@@ -538,21 +539,21 @@ configuration and must be called before `Go()`; calling them after `Go()` is
538539
unsupported and may lead to deadlocks or no-ops.
539540

540541
```go
541-
p := pool.New[string](2, worker). // pool with 2 workers
542-
WithBatchSize(10). // process items in batches
543-
WithWorkerChanSize(5). // set worker channel buffer size
544-
WithChunkFn(chunkFn). // control work distribution
545-
WithContinueOnError(). // don't stop on errors
546-
WithCompleteFn(completeFn) // called when worker finishes
542+
p := pool.New[string](2, worker). // pool with 2 workers
543+
WithBatchSize(10). // process items in batches
544+
WithWorkerChanSize(5). // set worker channel buffer size
545+
WithChunkFn(chunkFn). // control work distribution
546+
WithContinueOnError(). // don't stop on errors
547+
WithWorkerCompleteFn(workerComplete) // called when each worker finishes
547548
```
548549

549550
Available options:
550551
- `WithBatchSize(size int)` - enables batch processing, accumulating items before sending to workers (default: 10)
551552
- `WithWorkerChanSize(size int)` - sets buffer size for worker channels (default: 1)
552-
- `WithChunkFn(fn func(T) string)` - controls work distribution by key (default: none, random distribution)
553+
- `WithChunkFn(fn func(T) string)` - controls work distribution by key (default: none, shared channel)
553554
- `WithContinueOnError()` - continues processing on errors (default: false)
554-
- `WithWorkerCompleteFn(fn func(ctx, id, worker))` - called on worker completion (default: none)
555-
- `WithPoolCompleteFn(fn func(ctx))` - called on pool completion, i.e., when all workers have completed (default: none)
555+
- `WithWorkerCompleteFn(fn func(ctx context.Context, id int, worker Worker[T]) error)` - called on worker completion (default: none)
556+
- `WithPoolCompleteFn(fn func(ctx context.Context) error)` - called on pool completion, i.e., when all workers have completed (default: none)
556557

557558
## Collector
558559

0 commit comments

Comments
 (0)