-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathheartbeat.go
More file actions
247 lines (210 loc) · 6.03 KB
/
Copy pathheartbeat.go
File metadata and controls
247 lines (210 loc) · 6.03 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
package netpipe
import (
crand "crypto/rand"
"sync"
"time"
)
// Heartbeat/keepalive wire format:
//
// flagPing (0x04): server → client ping, zero body
// flagPong (0x08): client → server pong, zero body
//
// The heartbeat goroutine runs on the server and sends a ping to every
// TCP client at HeartbeatInterval. Each client's goroutine updates the
// lastPong time when it receives a pong. If a client has not ponged
// within HeartbeatTimeout, the server disconnects it.
//
// The client auto-responds to pings with a pong - zero developer effort.
const (
flagPing byte = 0x04
flagPong byte = 0x08
)
// ---------------------------------------------------------------------------
// Client metadata - tracks heartbeat and idle state per TCP client
// ---------------------------------------------------------------------------
type clientMeta struct {
conn *safeConn
lastPong time.Time
lastData time.Time
}
// clientMetaRegistry manages per-client metadata with its own lock.
type clientMetaRegistry struct {
mu sync.RWMutex
meta map[string]*clientMeta // key = client UUID
}
func newClientMetaRegistry() *clientMetaRegistry {
return &clientMetaRegistry{
meta: make(map[string]*clientMeta),
}
}
func (r *clientMetaRegistry) set(id string, sc *safeConn) {
r.mu.Lock()
now := time.Now()
r.meta[id] = &clientMeta{
conn: sc,
lastPong: now,
lastData: now,
}
r.mu.Unlock()
}
func (r *clientMetaRegistry) remove(id string) {
r.mu.Lock()
delete(r.meta, id)
r.mu.Unlock()
}
func (r *clientMetaRegistry) touchPong(id string) {
r.mu.Lock() // write lock - we are mutating lastPong
if m, ok := r.meta[id]; ok {
m.lastPong = time.Now()
}
r.mu.Unlock()
}
func (r *clientMetaRegistry) touchData(id string) {
r.mu.Lock() // write lock - we are mutating lastData
if m, ok := r.meta[id]; ok {
m.lastData = time.Now()
}
r.mu.Unlock()
}
// snapshot returns a copy of all metadata for safe iteration.
func (r *clientMetaRegistry) snapshot() map[string]*clientMeta {
r.mu.RLock()
defer r.mu.RUnlock()
out := make(map[string]*clientMeta, len(r.meta))
for k, v := range r.meta {
// shallow copy is fine - we only read time fields
cpy := *v
out[k] = &cpy
}
return out
}
// ---------------------------------------------------------------------------
// Server heartbeat loop
// ---------------------------------------------------------------------------
// startHeartbeat launches the heartbeat and idle-timeout sweeper goroutines.
// Called once from Listen() if any relevant config is set.
func (s *Server) startHeartbeat() {
if s.config.HeartbeatInterval > 0 {
go s.heartbeatLoop()
}
if s.config.IdleTimeout > 0 {
go s.idleTimeoutLoop()
}
}
func (s *Server) heartbeatLoop() {
ticker := time.NewTicker(s.config.HeartbeatInterval)
defer ticker.Stop()
timeout := s.config.HeartbeatTimeout
if timeout == 0 {
timeout = s.config.HeartbeatInterval * 2 // sensible default
}
for range ticker.C {
if s.listener == nil && s.udpConn == nil {
return // server shut down
}
now := time.Now()
metas := s.meta.snapshot()
for id, m := range metas {
// check if client missed the deadline
if now.Sub(m.lastPong) > timeout {
// dead connection - kick it
s.Kick(id)
continue
}
// send ping (fire-and-forget - if write fails, next sweep catches it)
writeMessage(m.conn, flagPing, nil)
}
}
}
func (s *Server) idleTimeoutLoop() {
// sweep at half the idle timeout for responsiveness
interval := s.config.IdleTimeout / 2
if interval < time.Second {
interval = time.Second
}
ticker := time.NewTicker(interval)
defer ticker.Stop()
for range ticker.C {
if s.listener == nil && s.udpConn == nil {
return
}
now := time.Now()
metas := s.meta.snapshot()
for id, m := range metas {
if now.Sub(m.lastData) > s.config.IdleTimeout {
s.Kick(id)
}
}
}
}
// ---------------------------------------------------------------------------
// Client auto-reconnect
// ---------------------------------------------------------------------------
// reconnectLoop attempts to reconnect and re-listen iteratively.
// Uses exponential backoff with jitter: base interval doubles each attempt,
// capped at 30 seconds, with random jitter to prevent thundering herd.
func (c *Client) reconnectLoop() {
baseInterval := c.config.ReconnectInterval
if baseInterval == 0 {
baseInterval = 2 * time.Second
}
maxInterval := 30 * time.Second
currentInterval := baseInterval
maxAttempts := c.config.MaxReconnectAttempts
attempt := 0
for {
if maxAttempts > 0 && attempt >= maxAttempts {
if c.onDisconnect != nil {
c.onDisconnect()
}
return
}
// add jitter: ±25% of current interval
jitter := time.Duration(randInt63n(int64(currentInterval) / 2))
sleep := currentInterval - currentInterval/4 + jitter
time.Sleep(sleep)
attempt++
if err := c.Connect(); err != nil {
// exponential backoff: double interval, cap at max
currentInterval *= 2
if currentInterval > maxInterval {
currentInterval = maxInterval
}
continue
}
// reconnected - reset backoff and attempt counter
attempt = 0
currentInterval = baseInterval
// run the read loop inline (not via Listen to avoid recursive reconnect)
c.readLoop()
// readLoop returned - disconnected again, loop back to retry
}
}
// randInt63n returns a random int64 in [0, n). Uses crypto/rand for jitter.
func randInt63n(n int64) int64 {
if n <= 0 {
return 0
}
b := make([]byte, 8)
crand.Read(b)
v := int64(b[0])<<56 | int64(b[1])<<48 | int64(b[2])<<40 | int64(b[3])<<32 |
int64(b[4])<<24 | int64(b[5])<<16 | int64(b[6])<<8 | int64(b[7])
if v < 0 {
v = -v
}
return v % n
}
// ---------------------------------------------------------------------------
// Client pong responder - called from the read loop when a ping arrives
// ---------------------------------------------------------------------------
// sendPong sends a pong response back to the server.
func (c *Client) sendPong() {
if c.conn == nil {
return
}
if c.config.Protocol == "udp" {
c.sendUDP(flagPong, nil)
} else {
writeMessage(c.conn, flagPong, nil)
}
}