-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathmicroservice_batch_consumer.go
More file actions
135 lines (112 loc) · 2.47 KB
/
microservice_batch_consumer.go
File metadata and controls
135 lines (112 loc) · 2.47 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
// Create and maintain by Chaiyapong Lapliengtrakul (chaiyapong@3dsinteractive.com), All right reserved (2021 - Present)
package main
import (
"os"
"os/signal"
"syscall"
"time"
"github.com/confluentinc/confluent-kafka-go/kafka"
)
func (ms *Microservice) consumeBatch(
servers string,
topic string,
groupID string,
readTimeout time.Duration,
batchSize int,
batchTimeout time.Duration,
h ServiceHandleFunc) error {
// Batch Filler
fill := func(b *Batch, payload interface{}) error {
p := payload.(string)
b.Add(p)
return nil
}
// Batch Executer
exec := func(b *Batch) error {
messages := make([]string, 0)
for {
item := b.Read()
if item == nil {
break
}
message := item.(string)
messages = append(messages, message)
}
if len(messages) == 0 {
return nil
}
// Execute Handler
h(NewBatchConsumerContext(ms, messages))
return nil
}
// Payloads loader
payload := make(chan interface{})
quit := make(chan bool, 1)
go func() {
c, err := ms.newKafkaConsumer(servers, groupID)
if err != nil {
quit <- true
return
}
// This will close kafka consumer before exit function
defer c.Close()
c.Subscribe(topic, nil)
for {
if readTimeout <= 0 {
// readtimeout -1 indicates no timeout
readTimeout = -1
}
msg, err := c.ReadMessage(readTimeout)
if err != nil {
kafkaErr, ok := err.(kafka.Error)
if ok {
if kafkaErr.Code() == kafka.ErrTimedOut {
if readTimeout == -1 {
// No timeout just continue to read message again
continue
}
}
}
return
}
message := string(msg.Value)
payload <- message
}
}()
go func() {
// Gracefull shutdown routine
osQuit := make(chan os.Signal, 1)
signal.Notify(osQuit, syscall.SIGTERM, syscall.SIGINT)
select {
case <-quit:
close(payload)
case <-osQuit:
close(payload)
}
}()
// Error listener
errc := make(chan error)
defer close(errc)
go func() {
for err := range errc {
if err != nil {
ms.Log("BatchConsumer", err.Error())
}
}
}()
be := NewBatchEvent(batchSize, batchTimeout, fill, exec, payload, errc)
be.Start()
return nil
}
// ConsumeBatch register service endpoint for Batch Consumer service
func (ms *Microservice) ConsumeBatch(
servers string,
topic string,
groupID string,
readTimeout time.Duration,
batchSize int,
batchTimeout time.Duration,
h ServiceHandleFunc) error {
go ms.consumeBatch(servers, topic, groupID, readTimeout, batchSize, batchTimeout, h)
return nil
}