-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathdoc.go
More file actions
61 lines (49 loc) · 1.38 KB
/
doc.go
File metadata and controls
61 lines (49 loc) · 1.38 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
/*
Package sqsprocessor contains an implementation of an sqs processor, similar in design to the provided pubsub client in the gcloud go sdk
The main structure is the Processor, which handles spawning, managing and feeding a pool of workers which execute a provided ProcessFunc over each message they receive.
# Basic Usage
c := sqs.NewFromConfig(cfg)
p := NewProcessor(c, ProcessorConfig{
NumWorkers: 10,
Backoff: time.Second,
Receive: sqs.ReceiveMessageInput{
QueueUrl: sqsQueueURL,
MaxNumberOfMessages: 10,
VisibilityTimeout: 2,
WaitTimeSeconds: 1,
},
})
ctx, cancel := context.WithCancel(context.Background())
var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()
p.Process(ctx, func(ctx context.Context, message types.Message) ProcessResult {
if message.Body == nil {
// Delete bad messages from queue
return ProcessResultAck
}
if *msg.Body == "good" {
// Happy path
return ProcessResultAck
}
// Sad path
return ProcessResultNack
})
}()
wg.Add(1)
go func() {
defer wg.Done()
for err := range p.Errors() {
log.Printf("received error from processor, %v\n", err)
}
}()
sigC := make(chan os.Signal, 1)
signal.Notify(sigC, syscall.SIGTERM, syscall.SIGINT)
<-sigC
log.Print("Receieved signal to quit, stopping processor")
cancel()
wg.Wait()
log.Print("Processor stopped, exiting")
*/
package sqsprocessor