Skip to content

Commit 7799cbe

Browse files
committed
improve examples: add basic/chunking/pool_completion, fix linter issues
- add basic example: minimal hello world for getting started - add chunking example: demonstrates WithChunkFn for key-based routing - add pool_completion example: shows pool completion callback usage - add README.md for all new examples - fix structural linter issues across all examples: - exitAfterDefer: use return instead of log.Fatal after defer - shadow: rename shadowed variables - intrange: use range over int (Go 1.22+) - modernize: use strings.FieldsSeq, interface{} -> any - prealloc: preallocate slices where size is known - update examples/README.md with new examples
1 parent 566dcd6 commit 7799cbe

27 files changed

Lines changed: 594 additions & 77 deletions

File tree

examples/README.md

Lines changed: 30 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,18 @@ This directory contains examples demonstrating various aspects of the [go-pkgz/p
66

77
## Available Examples
88

9+
### [basic](./basic)
10+
Minimal "hello world" example to get started quickly. Demonstrates:
11+
- Simplest pool creation and usage
12+
- Submitting work items
13+
- Basic metrics
14+
15+
### [chunking](./chunking)
16+
Shows how to use WithChunkFn for consistent work distribution by key. Demonstrates:
17+
- Key-based routing (same key always goes to same worker)
18+
- Per-key aggregation without synchronization
19+
- Worker ID tracking via context
20+
921
### [tokenizer_stateful](./tokenizer_stateful)
1022
Shows how to use stateful workers where each worker maintains its own independent state (word frequency counters). Demonstrates:
1123
- Worker state isolation
@@ -53,11 +65,23 @@ Shows how to handle and categorize errors in parallel processing. Demonstrates:
5365
- Timing information tracking
5466
- Statistical reporting on errors
5567

68+
### [pool_completion](./pool_completion)
69+
Shows how to use the pool completion callback for final aggregation. Demonstrates:
70+
- Pool completion callback (WithPoolCompleteFn)
71+
- Final cleanup when all workers finish
72+
- Difference between worker and pool completion callbacks
73+
5674
## Running Examples
5775

5876
Each example can be run from its directory:
5977
```bash
60-
cd tokenizer_stateful
78+
cd basic
79+
go run main.go
80+
81+
cd ../chunking
82+
go run main.go
83+
84+
cd ../tokenizer_stateful
6185
go run main.go -file input.txt
6286

6387
cd ../tokenizer_stateless
@@ -77,15 +101,19 @@ go run main.go
77101

78102
cd ../collector_errors
79103
go run main.go -workers 8 -jobs 100 -error-rate 0.3
104+
105+
cd ../pool_completion
106+
go run main.go
80107
```
81108

82109
## Common Patterns
83110

84111
While the examples are simplified, they showcase important pool package features:
112+
- Basic pool usage (basic)
113+
- Consistent work distribution by key (chunking)
85114
- Worker state management (stateful vs stateless)
86115
- Result collection strategies
87116
- Error handling approaches
88117
- Metrics and monitoring
89-
- Work distribution patterns
90118
- Middleware integration
91119
- Multi-stage processing pipelines

examples/basic/README.md

Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,66 @@
1+
# Basic Example
2+
3+
Minimal "hello world" example demonstrating the simplest pool usage.
4+
5+
## What it demonstrates
6+
7+
- Creating a pool with `pool.New[T]`
8+
- Using `pool.WorkerFunc` adapter for simple functions
9+
- Submitting work items with `Submit()`
10+
- Closing pool and waiting with `Close()`
11+
- Basic metrics via `Metrics().GetStats()`
12+
13+
## Running
14+
15+
```bash
16+
go run main.go
17+
```
18+
19+
## Output
20+
21+
```
22+
processing: date
23+
processing: apple
24+
processing: banana
25+
processing: cherry
26+
processing: elderberry
27+
28+
done: processed 5 items in 102ms
29+
```
30+
31+
## Code walkthrough
32+
33+
### Create the pool
34+
35+
```go
36+
p := pool.New[string](3, pool.WorkerFunc[string](func(_ context.Context, item string) error {
37+
fmt.Printf("processing: %s\n", item)
38+
time.Sleep(50 * time.Millisecond)
39+
return nil
40+
}))
41+
```
42+
43+
- `pool.New[string](3, ...)` creates a pool with 3 workers processing strings
44+
- `pool.WorkerFunc` adapts a function to the `Worker` interface
45+
46+
### Start, submit, close
47+
48+
```go
49+
p.Go(ctx) // start workers
50+
p.Submit("apple") // submit work (not thread-safe)
51+
p.Close(ctx) // close input and wait for completion
52+
```
53+
54+
### Get metrics
55+
56+
```go
57+
stats := p.Metrics().GetStats()
58+
fmt.Printf("processed %d items in %v\n", stats.Processed, stats.TotalTime)
59+
```
60+
61+
## Next steps
62+
63+
After understanding this basic example, explore:
64+
- [chunking](../chunking) - consistent work distribution by key
65+
- [tokenizer_stateful](../tokenizer_stateful) - workers with state
66+
- [middleware](../middleware) - adding retry, timeout, validation

examples/basic/go.mod

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
module examples/basic
2+
3+
go 1.24.0
4+
5+
require github.com/go-pkgz/pool v0.7.0
6+
7+
require golang.org/x/sync v0.19.0 // indirect
8+
9+
replace github.com/go-pkgz/pool => ../..

examples/basic/go.sum

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
2+
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
3+
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
4+
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
5+
github.com/stretchr/testify v1.11.1 h1:7s2iGBzp5EwR7/aIZr8ao5+dra3wiQyKjjFuvgVKu7U=
6+
github.com/stretchr/testify v1.11.1/go.mod h1:wZwfW3scLgRK+23gO65QZefKpKQRnfz6sD981Nm4B6U=
7+
golang.org/x/sync v0.19.0 h1:vV+1eWNmZ5geRlYjzm2adRgW2/mcpevXNg50YZtPCE4=
8+
golang.org/x/sync v0.19.0/go.mod h1:9KTHXmSnoGruLpwFjVSX0lNNA75CykiMECbovNTZqGI=
9+
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
10+
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=

examples/basic/main.go

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
// Example basic demonstrates the simplest possible pool usage.
2+
// This is a minimal "hello world" example to get started quickly.
3+
package main
4+
5+
import (
6+
"context"
7+
"fmt"
8+
"time"
9+
10+
"github.com/go-pkgz/pool"
11+
)
12+
13+
func main() {
14+
// create a pool with 3 workers using a simple function
15+
p := pool.New[string](3, pool.WorkerFunc[string](func(_ context.Context, item string) error {
16+
fmt.Printf("processing: %s\n", item)
17+
time.Sleep(50 * time.Millisecond) // simulate work
18+
return nil
19+
}))
20+
21+
// start the pool
22+
ctx := context.Background()
23+
if err := p.Go(ctx); err != nil {
24+
fmt.Printf("failed to start: %v\n", err)
25+
return
26+
}
27+
28+
// submit work items
29+
items := []string{"apple", "banana", "cherry", "date", "elderberry"}
30+
for _, item := range items {
31+
p.Submit(item)
32+
}
33+
34+
// close and wait for completion
35+
if err := p.Close(ctx); err != nil {
36+
fmt.Printf("error: %v\n", err)
37+
}
38+
39+
// print stats
40+
stats := p.Metrics().GetStats()
41+
fmt.Printf("\ndone: processed %d items in %v\n", stats.Processed, stats.TotalTime.Round(time.Millisecond))
42+
}

examples/chunking/README.md

Lines changed: 88 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,88 @@
1+
# Chunking Example
2+
3+
Demonstrates `WithChunkFn` for consistent work distribution by key.
4+
5+
## What it demonstrates
6+
7+
- Key-based routing with `WithChunkFn`
8+
- Same key always goes to the same worker (consistent hashing)
9+
- Per-key aggregation without synchronization
10+
- Getting worker ID from context with `metrics.WorkerID(ctx)`
11+
12+
## Use cases
13+
14+
- Aggregating events by user/session ID
15+
- Processing messages by partition key
16+
- Any scenario where items with the same key must be handled by the same worker
17+
18+
## Running
19+
20+
```bash
21+
go run main.go
22+
```
23+
24+
## Output
25+
26+
```
27+
worker 1: user=charlie action=login
28+
worker 2: user=alice action=login
29+
worker 1: user=charlie action=view_page
30+
worker 2: user=bob action=login
31+
worker 2: user=alice action=view_page
32+
...
33+
34+
worker assignment (each user always goes to same worker):
35+
worker 1: charlie(4)
36+
worker 2: bob(3) alice(4)
37+
38+
processed 11 events in 76ms
39+
```
40+
41+
Notice: charlie always goes to worker 1, alice and bob always go to worker 2.
42+
43+
## Code walkthrough
44+
45+
### Define chunk function
46+
47+
```go
48+
p := pool.New[Event](3, worker).WithChunkFn(func(e Event) string {
49+
return e.UserID // route by user ID
50+
})
51+
```
52+
53+
The chunk function extracts a key from each item. Items with the same key are routed to the same worker using consistent hashing (FNV-1a).
54+
55+
### Get worker ID in worker
56+
57+
```go
58+
import "github.com/go-pkgz/pool/metrics"
59+
60+
func worker(ctx context.Context, e Event) error {
61+
workerID := metrics.WorkerID(ctx)
62+
// workerID is consistent for this user
63+
}
64+
```
65+
66+
### Benefits
67+
68+
Without chunking:
69+
- Workers compete for items randomly
70+
- Aggregating by key requires synchronization (mutex/channel)
71+
72+
With chunking:
73+
- Same key → same worker, always
74+
- Per-key state can be local to worker (no locks needed)
75+
- Useful for building per-user counters, session state, etc.
76+
77+
## How it works
78+
79+
1. `WithChunkFn` creates per-worker channels (instead of shared channel)
80+
2. On `Submit()`, the chunk function extracts the key
81+
3. Key is hashed with FNV-1a: `hash(key) % numWorkers`
82+
4. Item is sent to that worker's dedicated channel
83+
84+
## Related examples
85+
86+
- [basic](../basic) - simplest pool usage
87+
- [tokenizer_stateful](../tokenizer_stateful) - stateful workers
88+
- [direct_chain](../direct_chain) - multi-stage pipelines

examples/chunking/go.mod

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
module examples/chunking
2+
3+
go 1.24.0
4+
5+
require github.com/go-pkgz/pool v0.7.0
6+
7+
require golang.org/x/sync v0.19.0 // indirect
8+
9+
replace github.com/go-pkgz/pool => ../..

examples/chunking/go.sum

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
2+
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
3+
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
4+
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
5+
github.com/stretchr/testify v1.11.1 h1:7s2iGBzp5EwR7/aIZr8ao5+dra3wiQyKjjFuvgVKu7U=
6+
github.com/stretchr/testify v1.11.1/go.mod h1:wZwfW3scLgRK+23gO65QZefKpKQRnfz6sD981Nm4B6U=
7+
golang.org/x/sync v0.19.0 h1:vV+1eWNmZ5geRlYjzm2adRgW2/mcpevXNg50YZtPCE4=
8+
golang.org/x/sync v0.19.0/go.mod h1:9KTHXmSnoGruLpwFjVSX0lNNA75CykiMECbovNTZqGI=
9+
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
10+
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=

examples/chunking/main.go

Lines changed: 90 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,90 @@
1+
// Example chunking demonstrates WithChunkFn for consistent work distribution.
2+
// Items with the same key always go to the same worker, enabling per-key
3+
// aggregation without synchronization. Useful for grouping events by user,
4+
// session, or any other key.
5+
package main
6+
7+
import (
8+
"context"
9+
"fmt"
10+
"sync"
11+
"time"
12+
13+
"github.com/go-pkgz/pool"
14+
"github.com/go-pkgz/pool/metrics"
15+
)
16+
17+
// Event represents something happening for a user
18+
type Event struct {
19+
UserID string
20+
Action string
21+
}
22+
23+
func main() {
24+
// track which worker processes which users (for demonstration)
25+
var mu sync.Mutex
26+
workerUsers := make(map[int]map[string]int) // workerID -> userID -> count
27+
28+
// create pool with chunking by UserID
29+
// this ensures all events for a user go to the same worker
30+
p := pool.New[Event](3, pool.WorkerFunc[Event](func(ctx context.Context, e Event) error {
31+
// get worker ID from context
32+
workerID := metrics.WorkerID(ctx)
33+
34+
mu.Lock()
35+
if workerUsers[workerID] == nil {
36+
workerUsers[workerID] = make(map[string]int)
37+
}
38+
workerUsers[workerID][e.UserID]++
39+
mu.Unlock()
40+
41+
fmt.Printf("worker %d: user=%s action=%s\n", workerID, e.UserID, e.Action)
42+
time.Sleep(10 * time.Millisecond)
43+
return nil
44+
})).WithChunkFn(func(e Event) string {
45+
return e.UserID // route by user ID
46+
})
47+
48+
ctx := context.Background()
49+
if err := p.Go(ctx); err != nil {
50+
fmt.Printf("failed to start: %v\n", err)
51+
return
52+
}
53+
54+
// submit events for different users
55+
// notice: same user's events will always go to the same worker
56+
events := []Event{
57+
{"alice", "login"},
58+
{"bob", "login"},
59+
{"charlie", "login"},
60+
{"alice", "view_page"},
61+
{"bob", "view_page"},
62+
{"alice", "click_button"},
63+
{"charlie", "view_page"},
64+
{"bob", "logout"},
65+
{"alice", "logout"},
66+
{"charlie", "click_button"},
67+
{"charlie", "logout"},
68+
}
69+
70+
for _, e := range events {
71+
p.Submit(e)
72+
}
73+
74+
if err := p.Close(ctx); err != nil {
75+
fmt.Printf("error: %v\n", err)
76+
}
77+
78+
// show which worker handled which users
79+
fmt.Printf("\nworker assignment (each user always goes to same worker):\n")
80+
for workerID, users := range workerUsers {
81+
fmt.Printf(" worker %d: ", workerID)
82+
for user, count := range users {
83+
fmt.Printf("%s(%d) ", user, count)
84+
}
85+
fmt.Println()
86+
}
87+
88+
stats := p.Metrics().GetStats()
89+
fmt.Printf("\nprocessed %d events in %v\n", stats.Processed, stats.TotalTime.Round(time.Millisecond))
90+
}

0 commit comments

Comments
 (0)