From a29544bb84fcb39fe44216b4595393a002058afe Mon Sep 17 00:00:00 2001 From: Dwayne Charrington Date: Mon, 26 Jan 2026 23:05:36 +1000 Subject: [PATCH] feat(server): improve RPC reliability Adds in health checks, watchdog restarts and optional escalation functionality. Closes issue #8 --- README.md | 8 ++ app.js | 196 ++++++++++++++++++++++++++++++++++++--- config.example.json | 18 ++++ plugins/JsonRPCServer.js | 26 ++++++ 4 files changed, 234 insertions(+), 14 deletions(-) diff --git a/README.md b/README.md index bc586c7f..4168cb7a 100644 --- a/README.md +++ b/README.md @@ -49,6 +49,14 @@ E.g. pm2 start app.js --no-treekill --kill-timeout 10000 --no-autorestart --node-args="--no-node-snapshot" ``` +### RPC health check / auto-restart + +The RPC plugin now exposes a lightweight `GET /health` endpoint and the master process can probe it to +restart the JsonRPCServer if it becomes unresponsive. Configure it via `rpcConfig.healthCheck` in +`config.json` (see `config.example.json` for defaults). You can also enable escalation to restart the +entire node after repeated RPC restarts, and optional RPC monitoring logs for slow requests or large +batches. + ### DB Backup and Restore Backup current state (track current hive blpck in config) diff --git a/app.js b/app.js index 187f7c25..a9b53d37 100644 --- a/app.js +++ b/app.js @@ -1,5 +1,6 @@ require('dotenv').config(); const fs = require('fs-extra'); +const http = require('http'); const program = require('commander'); const { fork } = require('child_process'); const { createLogger, format, transports } = require('winston'); @@ -42,6 +43,25 @@ const plugins = {}; const jobs = new Map(); let currentJobId = 0; +let requestedPlugins = []; + +const defaultRpcHealthCheck = { + enabled: true, + intervalMs: 15000, + timeoutMs: 2000, + failuresBeforeRestart: 3, + restartDelayMs: 5000, + stopTimeoutMs: 5000, + killAfterMs: 10000, + escalateAfter: 0, + escalationWindowMs: 600000, + escalationSignal: 'SIGTERM', +}; + +const getRpcHealthConfig = () => ({ + ...defaultRpcHealthCheck, + ...(conf.rpcConfig?.healthCheck || {}), +}); // send an IPC message to a plugin with a promise in return const send = (plugin, message) => { @@ -65,6 +85,35 @@ const send = (plugin, message) => { }); }; +const sendWithTimeout = (plugin, message, timeoutMs) => { + const newMessage = { + ...message, + to: plugin.name, + from: 'MASTER', + type: 'request', + }; + currentJobId += 1; + if (currentJobId > Number.MAX_SAFE_INTEGER) { + currentJobId = 1; + } + const jobId = currentJobId; + newMessage.jobId = jobId; + plugin.cp.send(newMessage); + return new Promise((resolve) => { + const timeout = setTimeout(() => { + jobs.delete(jobId); + resolve({ timeout: true }); + }, timeoutMs); + jobs.set(jobId, { + message: newMessage, + resolve: (payload) => { + clearTimeout(timeout); + resolve(payload); + }, + }); + }); +}; + // function to route the IPC requests const route = (message) => { // console.log(message); @@ -101,8 +150,137 @@ const getPlugin = (plugin) => { return null; }; +const isPluginRunning = (plugin) => { + const plg = getPlugin(plugin); + return Boolean(plg && plg.cp && plg.cp.exitCode === null); +}; + +const delay = ms => new Promise(resolve => setTimeout(resolve, ms)); + +let rpcHealthTimer = null; +let rpcHealthFailures = 0; +let rpcHealthProbeInFlight = false; +let rpcRestartInProgress = false; +const rpcRestartHistory = []; + +const shouldEscalateRestart = (cfg) => { + if (!cfg.escalateAfter || cfg.escalateAfter <= 0) return false; + const now = Date.now(); + const windowMs = cfg.escalationWindowMs || defaultRpcHealthCheck.escalationWindowMs; + rpcRestartHistory.push(now); + while (rpcRestartHistory.length && now - rpcRestartHistory[0] > windowMs) { + rpcRestartHistory.shift(); + } + return rpcRestartHistory.length >= cfg.escalateAfter; +}; + +const probeRpcHealth = (cfg) => new Promise((resolve) => { + const req = http.get({ + hostname: '127.0.0.1', + port: conf.rpcNodePort, + path: '/health', + timeout: cfg.timeoutMs, + }, (res) => { + res.resume(); + resolve(res.statusCode === 200); + }); + + req.on('timeout', () => { + req.destroy(new Error('health check timeout')); + }); + req.on('error', () => { + resolve(false); + }); +}); + +const unloadPlugin = async (plugin, options = {}) => { + let res = null; + const plg = getPlugin(plugin); + if (plg) { + logger.info(`unloading plugin ${plugin.PLUGIN_NAME}`); + if (options.timeoutMs) { + res = await sendWithTimeout(plg, { action: 'stop' }, options.timeoutMs); + } else { + res = await send(plg, { action: 'stop' }); + } + plg.cp.kill('SIGINT'); + if (options.killAfterMs) { + const killTimer = setTimeout(() => { + if (plg.cp.exitCode === null) { + plg.cp.kill('SIGKILL'); + } + }, options.killAfterMs); + if (killTimer.unref) killTimer.unref(); + } + } + return res; +}; + +const restartJsonRpc = async (reason) => { + if (rpcRestartInProgress || shuttingDown) return; + if (!requestedPlugins.includes(jsonRPCServer.PLUGIN_NAME)) return; + const cfg = getRpcHealthConfig(); + if (shouldEscalateRestart(cfg)) { + logger.error(`[${jsonRPCServer.PLUGIN_NAME}] escalation threshold reached; signaling ${cfg.escalationSignal}`); + stopRpcHealthCheck(); + process.kill(process.pid, cfg.escalationSignal || 'SIGTERM'); + return; + } + rpcRestartInProgress = true; + logger.warn(`[${jsonRPCServer.PLUGIN_NAME}] restarting (${reason})`); + try { + await unloadPlugin(jsonRPCServer, { + timeoutMs: cfg.stopTimeoutMs, + killAfterMs: cfg.killAfterMs, + }); + } catch (error) { + logger.error(`[${jsonRPCServer.PLUGIN_NAME}] restart unload error: ${error}`); + } + rpcHealthFailures = 0; + await delay(cfg.restartDelayMs); + try { + await loadPlugin(jsonRPCServer, requestedPlugins); + } catch (error) { + logger.error(`[${jsonRPCServer.PLUGIN_NAME}] restart load error: ${error}`); + } + rpcRestartInProgress = false; +}; + +const startRpcHealthCheck = () => { + const cfg = getRpcHealthConfig(); + if (!cfg.enabled || !requestedPlugins.includes(jsonRPCServer.PLUGIN_NAME)) return; + if (rpcHealthTimer) clearInterval(rpcHealthTimer); + rpcHealthTimer = setInterval(async () => { + if (rpcHealthProbeInFlight || rpcRestartInProgress || shuttingDown) return; + if (!isPluginRunning(jsonRPCServer)) return; + rpcHealthProbeInFlight = true; + try { + const ok = await probeRpcHealth(cfg); + if (!ok) { + rpcHealthFailures += 1; + logger.warn(`[${jsonRPCServer.PLUGIN_NAME}] health check failed (${rpcHealthFailures}/${cfg.failuresBeforeRestart})`); + if (rpcHealthFailures >= cfg.failuresBeforeRestart) { + await restartJsonRpc('health check failure threshold reached'); + } + } else if (rpcHealthFailures > 0) { + rpcHealthFailures = 0; + } + } finally { + rpcHealthProbeInFlight = false; + } + }, cfg.intervalMs); + if (rpcHealthTimer.unref) rpcHealthTimer.unref(); +}; + +const stopRpcHealthCheck = () => { + if (rpcHealthTimer) { + clearInterval(rpcHealthTimer); + rpcHealthTimer = null; + } +}; + const loadPlugin = (newPlugin, requestedPlugins) => { - if (requestedPlugins.indexOf(newPlugin.PLUGIN_NAME) === -1) { + if (Array.isArray(requestedPlugins) && requestedPlugins.indexOf(newPlugin.PLUGIN_NAME) === -1) { return { payload: null }; } const plugin = {}; @@ -122,17 +300,6 @@ const loadPlugin = (newPlugin, requestedPlugins) => { return send(plugin, { action: 'init', payload: conf }); }; -const unloadPlugin = async (plugin) => { - let res = null; - const plg = getPlugin(plugin); - if (plg) { - logger.info(`unloading plugin ${plugin.PLUGIN_NAME}`); - res = await send(plg, { action: 'stop' }); - plg.cp.kill('SIGINT'); - } - return res; -}; - const stop = async () => { logger.info('Stopping node...'); await unloadPlugin(jsonRPCServer); @@ -160,6 +327,7 @@ const saveConfig = (lastBlockParsed) => { }; const stopApp = async (signal = 0) => { + stopRpcHealthCheck(); const lastBlockParsed = await stop(); saveConfig(lastBlockParsed); // calling process.exit() won't inform parent process of signal @@ -241,11 +409,11 @@ program .option('-p, --plugins ', 'which plugins to run. (Available plugins: Blockchain,Streamer,P2P,JsonRPCServer,LightNode', 'Blockchain,Streamer,P2P,JsonRPCServer,LightNode') .parse(process.argv); -const requestedPlugins = program.plugins.split(','); +requestedPlugins = program.plugins.split(','); if (program.replay !== undefined) { replayBlocksLog(); } else { - start(requestedPlugins); + start(requestedPlugins).then(() => startRpcHealthCheck()); } process.on('SIGTERM', () => { diff --git a/config.example.json b/config.example.json index 1f343cd6..1680899b 100644 --- a/config.example.json +++ b/config.example.json @@ -31,6 +31,24 @@ "disabledMethods" : { "blockchain" : ["getBlockRangeInfo"], "contracts" : [] + }, + "healthCheck" : { + "enabled" : true, + "intervalMs" : 15000, + "timeoutMs" : 2000, + "failuresBeforeRestart" : 3, + "restartDelayMs" : 5000, + "stopTimeoutMs" : 5000, + "killAfterMs" : 10000, + "escalateAfter" : 0, + "escalationWindowMs" : 600000, + "escalationSignal" : "SIGTERM" + }, + "monitoring" : { + "logSlowRequests" : false, + "slowRequestMs" : 2000, + "logBatchSizes" : false, + "logBatchMinSize" : 2 } }, "rpcWebsockets" : { diff --git a/plugins/JsonRPCServer.js b/plugins/JsonRPCServer.js index 4cf68270..fd033492 100644 --- a/plugins/JsonRPCServer.js +++ b/plugins/JsonRPCServer.js @@ -350,6 +350,29 @@ const init = async (conf, callback) => { serverRPC.use(cors({ methods: ['POST'] })); serverRPC.use(bodyParser.urlencoded({ extended: true })); serverRPC.use(bodyParser.json()); + const monitoringConfig = config.rpcConfig?.monitoring || {}; + const logSlowRequests = monitoringConfig.logSlowRequests === true; + const slowRequestMs = Number.isFinite(monitoringConfig.slowRequestMs) ? monitoringConfig.slowRequestMs : 0; + const logBatchSizes = monitoringConfig.logBatchSizes === true; + const logBatchMinSize = Number.isFinite(monitoringConfig.logBatchMinSize) ? monitoringConfig.logBatchMinSize : 2; + if (logSlowRequests || logBatchSizes) { + serverRPC.use((req, res, next) => { + const start = Date.now(); + const batchSize = Array.isArray(req.body) ? req.body.length : 0; + res.on('finish', () => { + if (logSlowRequests && slowRequestMs > 0) { + const duration = Date.now() - start; + if (duration >= slowRequestMs) { + console.warn(`RPC slow request ${req.method} ${req.originalUrl} ${duration}ms status=${res.statusCode}`); + } + } + if (logBatchSizes && batchSize >= logBatchMinSize) { + console.warn(`RPC batch request ${req.method} ${req.originalUrl} size=${batchSize} status=${res.statusCode}`); + } + }); + next(); + }); + } serverRPC.set('trust proxy', true); serverRPC.set('trust proxy', 'loopback'); if (config.rpcConfig.logRequests) { @@ -364,6 +387,9 @@ const init = async (conf, callback) => { console.error(err); res.status(500).json({ error: 'Error processing requests' }); }); + serverRPC.get('/health', (_req, res) => { + res.json({ status: 'ok', timestamp: Date.now() }); + }); serverRPC.get('/', async (_, res) => { try { const status = await generateStatus();