-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathindex.js
More file actions
108 lines (95 loc) · 2.56 KB
/
index.js
File metadata and controls
108 lines (95 loc) · 2.56 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
const assert = require('assert');
const redis = require('redis');
/**
* Valdates options related to the heartbeat and sets defaults for missing ones
* @param {Array} args
*/
function processOptions(options) {
const defaultOptions = {
heartbeat_timeout: 1000,
heartbeat_interval: 5000
};
Object.assign(options, defaultOptions);
for (const key in defaultOptions) {
assert(Number.isInteger(options[key]), `"${key}" must have an integer value in milliseconds`);
assert(options[key] > 0, `"${key}" must be a non-zero positive number`);
}
assert(options.heartbeat_interval > options.heartbeat_timeout, '"heartbeat_interval" must be larger than "heartbeat_timeout"');
}
/**
* Starts the heartbeat process
* @param {RedisClient}
* @return {RedisClient}
*/
function init(client) {
if (client.heartbeatStarted) {
return;
}
client.heartbeatStarted = true;
setInterval(() =>{
let running = true;
// Consider the connection dead after 'heartbeat_timeout' milliseconds without a response
const timeout = setTimeout(() => {
running = false;
// Forces a Redis reconnection by destroying its underlying socket
client.stream.destroy({
message: 'Server did not respond to heartbeat in time.',
code: 'HEARTBEAT_TIMEOUT'
});
}, client.options.heartbeat_timeout);
client.ping(error => {
if (!error && running) {
clearTimeout(timeout);
}
});
}, client.options.heartbeat_interval);
return client;
}
/**
* @inheritdoc
*/
class RedisClient extends redis.RedisClient {
/**
* @inheritdoc
*/
constructor(options = {}, stream) {
processOptions(options);
super(options, stream);
init(this);
}
/**
* @inheritdoc
*/
duplicate(options = {}, callback) {
processOptions(options);
const client = super.duplicate(options, callback);
Object.setPrototypeOf(client, RedisClient.prototype);
return init(client);
}
/**
* @inheritdoc
*/
on_error(err) {
if (err.code === 'HEARTBEAT_TIMEOUT') {
this.emit('heartbeat-timeout', err);
return this.connection_gone('error', err);
}
return super.on_error(err);
}
}
/**
* @inheritdoc
*/
module.exports = Object.assign({}, redis, {
createClient: (...args) => {
if (!args.length || typeof args[args.length - 1] !== 'object') {
args.push({});
}
const options = args[args.length - 1];
processOptions(options);
const client = redis.createClient(...args);
Object.setPrototypeOf(client, RedisClient.prototype);
return init(client);
},
RedisClient
});