Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,14 @@ E.g.
pm2 start app.js --no-treekill --kill-timeout 10000 --no-autorestart --node-args="--no-node-snapshot"
```

### RPC health check / auto-restart

The RPC plugin now exposes a lightweight `GET /health` endpoint and the master process can probe it to
restart the JsonRPCServer if it becomes unresponsive. Configure it via `rpcConfig.healthCheck` in
`config.json` (see `config.example.json` for defaults). You can also enable escalation to restart the
entire node after repeated RPC restarts, and optional RPC monitoring logs for slow requests or large
batches.

### DB Backup and Restore

Backup current state (track current hive blpck in config)
Expand Down
196 changes: 182 additions & 14 deletions app.js
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
require('dotenv').config();
const fs = require('fs-extra');
const http = require('http');
const program = require('commander');
const { fork } = require('child_process');
const { createLogger, format, transports } = require('winston');
Expand Down Expand Up @@ -42,6 +43,25 @@ const plugins = {};

const jobs = new Map();
let currentJobId = 0;
let requestedPlugins = [];

const defaultRpcHealthCheck = {
enabled: true,
intervalMs: 15000,
timeoutMs: 2000,
failuresBeforeRestart: 3,
restartDelayMs: 5000,
stopTimeoutMs: 5000,
killAfterMs: 10000,
escalateAfter: 0,
escalationWindowMs: 600000,
escalationSignal: 'SIGTERM',
};

const getRpcHealthConfig = () => ({
...defaultRpcHealthCheck,
...(conf.rpcConfig?.healthCheck || {}),
});

// send an IPC message to a plugin with a promise in return
const send = (plugin, message) => {
Expand All @@ -65,6 +85,35 @@ const send = (plugin, message) => {
});
};

const sendWithTimeout = (plugin, message, timeoutMs) => {
const newMessage = {
...message,
to: plugin.name,
from: 'MASTER',
type: 'request',
};
currentJobId += 1;
if (currentJobId > Number.MAX_SAFE_INTEGER) {
currentJobId = 1;
}
const jobId = currentJobId;
newMessage.jobId = jobId;
plugin.cp.send(newMessage);
return new Promise((resolve) => {
const timeout = setTimeout(() => {
jobs.delete(jobId);
resolve({ timeout: true });
}, timeoutMs);
jobs.set(jobId, {
message: newMessage,
resolve: (payload) => {
clearTimeout(timeout);
resolve(payload);
},
});
});
};

// function to route the IPC requests
const route = (message) => {
// console.log(message);
Expand Down Expand Up @@ -101,8 +150,137 @@ const getPlugin = (plugin) => {
return null;
};

const isPluginRunning = (plugin) => {
const plg = getPlugin(plugin);
return Boolean(plg && plg.cp && plg.cp.exitCode === null);
};

const delay = ms => new Promise(resolve => setTimeout(resolve, ms));

let rpcHealthTimer = null;
let rpcHealthFailures = 0;
let rpcHealthProbeInFlight = false;
let rpcRestartInProgress = false;
const rpcRestartHistory = [];

const shouldEscalateRestart = (cfg) => {
if (!cfg.escalateAfter || cfg.escalateAfter <= 0) return false;
const now = Date.now();
const windowMs = cfg.escalationWindowMs || defaultRpcHealthCheck.escalationWindowMs;
rpcRestartHistory.push(now);
while (rpcRestartHistory.length && now - rpcRestartHistory[0] > windowMs) {
rpcRestartHistory.shift();
}
return rpcRestartHistory.length >= cfg.escalateAfter;
};

const probeRpcHealth = (cfg) => new Promise((resolve) => {
const req = http.get({
hostname: '127.0.0.1',
port: conf.rpcNodePort,
path: '/health',
timeout: cfg.timeoutMs,
}, (res) => {
res.resume();
resolve(res.statusCode === 200);
});

req.on('timeout', () => {
req.destroy(new Error('health check timeout'));
});
req.on('error', () => {
resolve(false);
});
});

const unloadPlugin = async (plugin, options = {}) => {
let res = null;
const plg = getPlugin(plugin);
if (plg) {
logger.info(`unloading plugin ${plugin.PLUGIN_NAME}`);
if (options.timeoutMs) {
res = await sendWithTimeout(plg, { action: 'stop' }, options.timeoutMs);
} else {
res = await send(plg, { action: 'stop' });
}
plg.cp.kill('SIGINT');
if (options.killAfterMs) {
const killTimer = setTimeout(() => {
if (plg.cp.exitCode === null) {
plg.cp.kill('SIGKILL');
}
}, options.killAfterMs);
if (killTimer.unref) killTimer.unref();
}
}
return res;
};

const restartJsonRpc = async (reason) => {
if (rpcRestartInProgress || shuttingDown) return;
if (!requestedPlugins.includes(jsonRPCServer.PLUGIN_NAME)) return;
const cfg = getRpcHealthConfig();
if (shouldEscalateRestart(cfg)) {
logger.error(`[${jsonRPCServer.PLUGIN_NAME}] escalation threshold reached; signaling ${cfg.escalationSignal}`);
stopRpcHealthCheck();
process.kill(process.pid, cfg.escalationSignal || 'SIGTERM');
return;
}
rpcRestartInProgress = true;
logger.warn(`[${jsonRPCServer.PLUGIN_NAME}] restarting (${reason})`);
try {
await unloadPlugin(jsonRPCServer, {
timeoutMs: cfg.stopTimeoutMs,
killAfterMs: cfg.killAfterMs,
});
} catch (error) {
logger.error(`[${jsonRPCServer.PLUGIN_NAME}] restart unload error: ${error}`);
}
rpcHealthFailures = 0;
await delay(cfg.restartDelayMs);
try {
await loadPlugin(jsonRPCServer, requestedPlugins);
} catch (error) {
logger.error(`[${jsonRPCServer.PLUGIN_NAME}] restart load error: ${error}`);
}
rpcRestartInProgress = false;
};

const startRpcHealthCheck = () => {
const cfg = getRpcHealthConfig();
if (!cfg.enabled || !requestedPlugins.includes(jsonRPCServer.PLUGIN_NAME)) return;
if (rpcHealthTimer) clearInterval(rpcHealthTimer);
rpcHealthTimer = setInterval(async () => {
if (rpcHealthProbeInFlight || rpcRestartInProgress || shuttingDown) return;
if (!isPluginRunning(jsonRPCServer)) return;
rpcHealthProbeInFlight = true;
try {
const ok = await probeRpcHealth(cfg);
if (!ok) {
rpcHealthFailures += 1;
logger.warn(`[${jsonRPCServer.PLUGIN_NAME}] health check failed (${rpcHealthFailures}/${cfg.failuresBeforeRestart})`);
if (rpcHealthFailures >= cfg.failuresBeforeRestart) {
await restartJsonRpc('health check failure threshold reached');
}
} else if (rpcHealthFailures > 0) {
rpcHealthFailures = 0;
}
} finally {
rpcHealthProbeInFlight = false;
}
}, cfg.intervalMs);
if (rpcHealthTimer.unref) rpcHealthTimer.unref();
};

const stopRpcHealthCheck = () => {
if (rpcHealthTimer) {
clearInterval(rpcHealthTimer);
rpcHealthTimer = null;
}
};

const loadPlugin = (newPlugin, requestedPlugins) => {
if (requestedPlugins.indexOf(newPlugin.PLUGIN_NAME) === -1) {
if (Array.isArray(requestedPlugins) && requestedPlugins.indexOf(newPlugin.PLUGIN_NAME) === -1) {
return { payload: null };
}
const plugin = {};
Expand All @@ -122,17 +300,6 @@ const loadPlugin = (newPlugin, requestedPlugins) => {
return send(plugin, { action: 'init', payload: conf });
};

const unloadPlugin = async (plugin) => {
let res = null;
const plg = getPlugin(plugin);
if (plg) {
logger.info(`unloading plugin ${plugin.PLUGIN_NAME}`);
res = await send(plg, { action: 'stop' });
plg.cp.kill('SIGINT');
}
return res;
};

const stop = async () => {
logger.info('Stopping node...');
await unloadPlugin(jsonRPCServer);
Expand Down Expand Up @@ -160,6 +327,7 @@ const saveConfig = (lastBlockParsed) => {
};

const stopApp = async (signal = 0) => {
stopRpcHealthCheck();
const lastBlockParsed = await stop();
saveConfig(lastBlockParsed);
// calling process.exit() won't inform parent process of signal
Expand Down Expand Up @@ -241,11 +409,11 @@ program
.option('-p, --plugins <plugins>', 'which plugins to run. (Available plugins: Blockchain,Streamer,P2P,JsonRPCServer,LightNode', 'Blockchain,Streamer,P2P,JsonRPCServer,LightNode')
.parse(process.argv);

const requestedPlugins = program.plugins.split(',');
requestedPlugins = program.plugins.split(',');
if (program.replay !== undefined) {
replayBlocksLog();
} else {
start(requestedPlugins);
start(requestedPlugins).then(() => startRpcHealthCheck());
}

process.on('SIGTERM', () => {
Expand Down
18 changes: 18 additions & 0 deletions config.example.json
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,24 @@
"disabledMethods" : {
"blockchain" : ["getBlockRangeInfo"],
"contracts" : []
},
"healthCheck" : {
"enabled" : true,
"intervalMs" : 15000,
"timeoutMs" : 2000,
"failuresBeforeRestart" : 3,
"restartDelayMs" : 5000,
"stopTimeoutMs" : 5000,
"killAfterMs" : 10000,
"escalateAfter" : 0,
"escalationWindowMs" : 600000,
"escalationSignal" : "SIGTERM"
},
"monitoring" : {
"logSlowRequests" : false,
"slowRequestMs" : 2000,
"logBatchSizes" : false,
"logBatchMinSize" : 2
}
},
"rpcWebsockets" : {
Expand Down
26 changes: 26 additions & 0 deletions plugins/JsonRPCServer.js
Original file line number Diff line number Diff line change
Expand Up @@ -350,6 +350,29 @@ const init = async (conf, callback) => {
serverRPC.use(cors({ methods: ['POST'] }));
serverRPC.use(bodyParser.urlencoded({ extended: true }));
serverRPC.use(bodyParser.json());
const monitoringConfig = config.rpcConfig?.monitoring || {};
const logSlowRequests = monitoringConfig.logSlowRequests === true;
const slowRequestMs = Number.isFinite(monitoringConfig.slowRequestMs) ? monitoringConfig.slowRequestMs : 0;
const logBatchSizes = monitoringConfig.logBatchSizes === true;
const logBatchMinSize = Number.isFinite(monitoringConfig.logBatchMinSize) ? monitoringConfig.logBatchMinSize : 2;
if (logSlowRequests || logBatchSizes) {
serverRPC.use((req, res, next) => {
const start = Date.now();
const batchSize = Array.isArray(req.body) ? req.body.length : 0;
res.on('finish', () => {
if (logSlowRequests && slowRequestMs > 0) {
const duration = Date.now() - start;
if (duration >= slowRequestMs) {
console.warn(`RPC slow request ${req.method} ${req.originalUrl} ${duration}ms status=${res.statusCode}`);
}
}
if (logBatchSizes && batchSize >= logBatchMinSize) {
console.warn(`RPC batch request ${req.method} ${req.originalUrl} size=${batchSize} status=${res.statusCode}`);
}
});
next();
});
}
serverRPC.set('trust proxy', true);
serverRPC.set('trust proxy', 'loopback');
if (config.rpcConfig.logRequests) {
Expand All @@ -364,6 +387,9 @@ const init = async (conf, callback) => {
console.error(err);
res.status(500).json({ error: 'Error processing requests' });
});
serverRPC.get('/health', (_req, res) => {
res.json({ status: 'ok', timestamp: Date.now() });
});
serverRPC.get('/', async (_, res) => {
try {
const status = await generateStatus();
Expand Down