diff --git a/keeper/__tests__/reconciler.test.js b/keeper/__tests__/reconciler.test.js new file mode 100644 index 0000000..8ff23f3 --- /dev/null +++ b/keeper/__tests__/reconciler.test.js @@ -0,0 +1,566 @@ +'use strict'; + +const { TaskReconciler, MismatchType, deriveStatus } = require('../src/reconciler'); + +// ── Helpers ────────────────────────────────────────────────────────────────── + +const silentLogger = { + info: jest.fn(), + warn: jest.fn(), + error: jest.fn(), + debug: jest.fn(), +}; + +/** + * Build a minimal TaskPoller stub whose getTaskConfig() returns the provided + * map of taskId → on-chain config (or null for absent tasks). + */ +function makePoller(onChainConfigs = {}) { + return { + getTaskConfig: jest.fn(async (taskId) => onChainConfigs[taskId] ?? null), + }; +} + +/** + * Build a minimal TaskRegistry stub backed by a real Map so that updateTask() + * actually persists and can be inspected. + */ +function makeRegistry(localTasks = {}) { + const taskMap = new Map( + Object.entries(localTasks).map(([k, v]) => [Number(k), { ...v }]), + ); + return { + tasks: taskMap, + getTaskIds: () => Array.from(taskMap.keys()).sort((a, b) => a - b), + updateTask: jest.fn((taskId, update) => { + const existing = taskMap.get(taskId) ?? { id: taskId }; + taskMap.set(taskId, { ...existing, ...update }); + }), + }; +} + +/** Canonical on-chain config used across multiple tests. */ +function chainConfig(overrides = {}) { + return { + is_active: true, + gas_balance: 1000, + last_run: 0, + interval: 3600, + ...overrides, + }; +} + +/** Local registry record that mirrors chainConfig (clean state). */ +function localRecord(overrides = {}) { + return { + status: 'active', + is_active: true, + gas_balance: 1000, + last_run: 0, + interval: 3600, + ...overrides, + }; +} + +// ── Unit: deriveStatus ──────────────────────────────────────────────────────── + +describe('deriveStatus', () => { + test('returns paused when not active', () => { + expect(deriveStatus(false, 1000)).toBe('paused'); + expect(deriveStatus(false, 0)).toBe('paused'); + }); + + test('returns low_gas when active but gas is zero', () => { + expect(deriveStatus(true, 0)).toBe('low_gas'); + expect(deriveStatus(true, -1)).toBe('low_gas'); + }); + + test('returns active when active and gas > 0', () => { + expect(deriveStatus(true, 1)).toBe('active'); + expect(deriveStatus(true, 9999)).toBe('active'); + }); +}); + +// ── Unit: reconcileTask ─────────────────────────────────────────────────────── + +describe('TaskReconciler.reconcileTask', () => { + describe('clean state — no drift', () => { + test('returns status=clean with no mismatches', async () => { + const poller = makePoller({ 1: chainConfig() }); + const registry = makeRegistry({ 1: localRecord() }); + const reconciler = new TaskReconciler({ poller, registry }, { logger: silentLogger }); + + const result = await reconciler.reconcileTask(1); + + expect(result.status).toBe('clean'); + expect(result.mismatches).toHaveLength(0); + expect(result.repaired).toBe(false); + expect(registry.updateTask).not.toHaveBeenCalled(); + }); + + test('skips field comparison when local field is undefined (never fetched)', async () => { + // Registry only has basic event-sourced data — no numeric fields yet. + const poller = makePoller({ 5: chainConfig() }); + const registry = makeRegistry({ 5: { status: 'registered' } }); + const reconciler = new TaskReconciler({ poller, registry }, { logger: silentLogger }); + + const result = await reconciler.reconcileTask(5); + + expect(result.status).toBe('clean'); + expect(result.mismatches).toHaveLength(0); + }); + + test('handles numeric type coercion (BigInt vs Number)', async () => { + const poller = makePoller({ 2: chainConfig({ last_run: 3600n, gas_balance: 900n }) }); + const registry = makeRegistry({ 2: localRecord({ last_run: 3600, gas_balance: 900 }) }); + const reconciler = new TaskReconciler({ poller, registry }, { logger: silentLogger }); + + const result = await reconciler.reconcileTask(2); + expect(result.status).toBe('clean'); + }); + }); + + describe('TASK_NOT_ON_CHAIN', () => { + test('detects when get_task returns null', async () => { + const poller = makePoller({}); // task 99 absent on-chain + const registry = makeRegistry({ 99: localRecord() }); + const reconciler = new TaskReconciler({ poller, registry }, { logger: silentLogger }); + + const result = await reconciler.reconcileTask(99); + + expect(result.mismatches).toHaveLength(1); + expect(result.mismatches[0].type).toBe(MismatchType.TASK_NOT_ON_CHAIN); + expect(result.mismatches[0].repair).toBe('MARK_CANCELLED'); + }); + + test('marks task as cancelled in registry (not deleted)', async () => { + const poller = makePoller({}); + const registry = makeRegistry({ 10: localRecord() }); + const reconciler = new TaskReconciler({ poller, registry }, { logger: silentLogger }); + + await reconciler.reconcileTask(10); + + expect(registry.updateTask).toHaveBeenCalledWith( + 10, + expect.objectContaining({ status: 'cancelled' }), + ); + // record still exists — not removed + expect(registry.tasks.has(10)).toBe(true); + }); + + test('stops checking other fields when task absent on-chain', async () => { + const poller = makePoller({}); + const registry = makeRegistry({ 11: localRecord() }); + const reconciler = new TaskReconciler({ poller, registry }, { logger: silentLogger }); + + const result = await reconciler.reconcileTask(11); + + // Only the TASK_NOT_ON_CHAIN mismatch; no spurious field drifts + expect(result.mismatches).toHaveLength(1); + }); + }); + + describe('STALE_RECORD', () => { + test('detects task on-chain with no local record', async () => { + const poller = makePoller({ 7: chainConfig() }); + // registry has no entry for task 7 + const registry = makeRegistry({}); + // manually add task 7 to taskIds so reconcileTask is called for it + registry.tasks.set(7, undefined); // simulate entry without data + registry.getTaskIds = () => [7]; + + const reconciler = new TaskReconciler({ poller, registry }, { logger: silentLogger }); + + const result = await reconciler.reconcileTask(7); + + expect(result.mismatches[0].type).toBe(MismatchType.STALE_RECORD); + expect(result.mismatches[0].repair).toBe('SYNC_ALL_FIELDS'); + }); + + test('syncs all fields when repair applied', async () => { + const poller = makePoller({ 8: chainConfig({ is_active: false, gas_balance: 500, last_run: 7200 }) }); + const registry = makeRegistry({}); + registry.tasks.set(8, null); // no local record + registry.getTaskIds = () => [8]; + + const reconciler = new TaskReconciler({ poller, registry }, { logger: silentLogger }); + + await reconciler.reconcileTask(8); + + expect(registry.updateTask).toHaveBeenCalledWith( + 8, + expect.objectContaining({ + is_active: false, + gas_balance: 500, + last_run: 7200, + interval: 3600, + }), + ); + }); + }); + + describe('STATUS_DRIFT', () => { + test('detects is_active mismatch (chain=false, local=true)', async () => { + const poller = makePoller({ 3: chainConfig({ is_active: false }) }); + const registry = makeRegistry({ 3: localRecord({ is_active: true }) }); + const reconciler = new TaskReconciler({ poller, registry }, { logger: silentLogger }); + + const result = await reconciler.reconcileTask(3); + + const m = result.mismatches.find((x) => x.type === MismatchType.STATUS_DRIFT); + expect(m).toBeDefined(); + expect(m.localValue).toBe(true); + expect(m.chainValue).toBe(false); + }); + + test('repairs is_active and recomputes status', async () => { + const poller = makePoller({ 3: chainConfig({ is_active: false, gas_balance: 500 }) }); + const registry = makeRegistry({ 3: localRecord({ is_active: true, status: 'active' }) }); + const reconciler = new TaskReconciler({ poller, registry }, { logger: silentLogger }); + + await reconciler.reconcileTask(3); + + expect(registry.updateTask).toHaveBeenCalledWith( + 3, + expect.objectContaining({ is_active: false, status: 'paused' }), + ); + }); + }); + + describe('GAS_BALANCE_DRIFT', () => { + test('detects gas_balance mismatch', async () => { + const poller = makePoller({ 4: chainConfig({ gas_balance: 850 }) }); + const registry = makeRegistry({ 4: localRecord({ gas_balance: 1000 }) }); + const reconciler = new TaskReconciler({ poller, registry }, { logger: silentLogger }); + + const result = await reconciler.reconcileTask(4); + + const m = result.mismatches.find((x) => x.type === MismatchType.GAS_BALANCE_DRIFT); + expect(m).toBeDefined(); + expect(m.localValue).toBe(1000); + expect(m.chainValue).toBe(850); + }); + + test('repairs gas_balance from chain truth', async () => { + const poller = makePoller({ 4: chainConfig({ gas_balance: 850 }) }); + const registry = makeRegistry({ 4: localRecord({ gas_balance: 1000 }) }); + const reconciler = new TaskReconciler({ poller, registry }, { logger: silentLogger }); + + await reconciler.reconcileTask(4); + + expect(registry.updateTask).toHaveBeenCalledWith( + 4, + expect.objectContaining({ gas_balance: 850 }), + ); + }); + + test('recomputes status to low_gas when chain gas_balance is 0', async () => { + const poller = makePoller({ 4: chainConfig({ gas_balance: 0 }) }); + const registry = makeRegistry({ 4: localRecord({ gas_balance: 200, status: 'active' }) }); + const reconciler = new TaskReconciler({ poller, registry }, { logger: silentLogger }); + + await reconciler.reconcileTask(4); + + expect(registry.updateTask).toHaveBeenCalledWith( + 4, + expect.objectContaining({ status: 'low_gas' }), + ); + }); + }); + + describe('LAST_RUN_DRIFT', () => { + test('detects last_run mismatch (another keeper executed the task)', async () => { + const poller = makePoller({ 6: chainConfig({ last_run: 7200 }) }); + const registry = makeRegistry({ 6: localRecord({ last_run: 0 }) }); + const reconciler = new TaskReconciler({ poller, registry }, { logger: silentLogger }); + + const result = await reconciler.reconcileTask(6); + + const m = result.mismatches.find((x) => x.type === MismatchType.LAST_RUN_DRIFT); + expect(m).toBeDefined(); + expect(m.localValue).toBe(0); + expect(m.chainValue).toBe(7200); + }); + + test('repairs last_run from chain truth', async () => { + const poller = makePoller({ 6: chainConfig({ last_run: 7200 }) }); + const registry = makeRegistry({ 6: localRecord({ last_run: 0 }) }); + const reconciler = new TaskReconciler({ poller, registry }, { logger: silentLogger }); + + await reconciler.reconcileTask(6); + + expect(registry.updateTask).toHaveBeenCalledWith( + 6, + expect.objectContaining({ last_run: 7200 }), + ); + }); + }); + + describe('FIELD_DRIFT (interval)', () => { + test('detects interval mismatch', async () => { + const poller = makePoller({ 12: chainConfig({ interval: 7200 }) }); + const registry = makeRegistry({ 12: localRecord({ interval: 3600 }) }); + const reconciler = new TaskReconciler({ poller, registry }, { logger: silentLogger }); + + const result = await reconciler.reconcileTask(12); + + const m = result.mismatches.find((x) => x.type === MismatchType.FIELD_DRIFT); + expect(m).toBeDefined(); + expect(m.field).toBe('interval'); + expect(m.chainValue).toBe(7200); + }); + }); + + describe('multiple drifts in one task', () => { + test('detects and repairs all drifted fields in a single pass', async () => { + const poller = makePoller({ + 13: chainConfig({ gas_balance: 500, last_run: 3600, is_active: false }), + }); + const registry = makeRegistry({ + 13: localRecord({ gas_balance: 1000, last_run: 0, is_active: true }), + }); + const reconciler = new TaskReconciler({ poller, registry }, { logger: silentLogger }); + + const result = await reconciler.reconcileTask(13); + + const types = result.mismatches.map((m) => m.type); + expect(types).toContain(MismatchType.STATUS_DRIFT); + expect(types).toContain(MismatchType.GAS_BALANCE_DRIFT); + expect(types).toContain(MismatchType.LAST_RUN_DRIFT); + expect(result.repaired).toBe(true); + // Single updateTask call with all repairs merged + expect(registry.updateTask).toHaveBeenCalledTimes(1); + expect(registry.updateTask).toHaveBeenCalledWith( + 13, + expect.objectContaining({ + is_active: false, + gas_balance: 500, + last_run: 3600, + status: 'paused', + }), + ); + }); + }); + + describe('dry-run mode', () => { + test('detects drift but does not update registry', async () => { + const poller = makePoller({ 14: chainConfig({ gas_balance: 750 }) }); + const registry = makeRegistry({ 14: localRecord({ gas_balance: 1000 }) }); + const reconciler = new TaskReconciler( + { poller, registry }, + { logger: silentLogger, dryRun: true }, + ); + + const result = await reconciler.reconcileTask(14); + + expect(result.status).toBe('drifted'); + expect(result.repaired).toBe(false); + expect(registry.updateTask).not.toHaveBeenCalled(); + }); + + test('clean tasks report clean in dry-run', async () => { + const poller = makePoller({ 15: chainConfig() }); + const registry = makeRegistry({ 15: localRecord() }); + const reconciler = new TaskReconciler( + { poller, registry }, + { logger: silentLogger, dryRun: true }, + ); + + const result = await reconciler.reconcileTask(15); + expect(result.status).toBe('clean'); + }); + }); + + describe('RPC error handling', () => { + test('returns error result when getTaskConfig throws', async () => { + const poller = { + getTaskConfig: jest.fn().mockRejectedValue(new Error('RPC timeout')), + }; + const registry = makeRegistry({ 16: localRecord() }); + const reconciler = new TaskReconciler({ poller, registry }, { logger: silentLogger }); + + const result = await reconciler.reconcileTask(16); + + expect(result.status).toBe('error'); + expect(result.error).toBe('RPC timeout'); + expect(result.mismatches).toHaveLength(0); + expect(result.repaired).toBe(false); + expect(registry.updateTask).not.toHaveBeenCalled(); + }); + + test('does not throw — errors are contained per task', async () => { + const poller = { + getTaskConfig: jest.fn().mockRejectedValue(new Error('connection refused')), + }; + const registry = makeRegistry({ 17: localRecord() }); + const reconciler = new TaskReconciler({ poller, registry }, { logger: silentLogger }); + + await expect(reconciler.reconcileTask(17)).resolves.not.toThrow(); + }); + }); +}); + +// ── Unit: reconcile (full pass) ─────────────────────────────────────────────── + +describe('TaskReconciler.reconcile', () => { + test('reports correct summary counts across mixed task states', async () => { + const poller = makePoller({ + 1: chainConfig(), // matches local — clean + // task 2 absent on-chain — drift + 3: chainConfig({ gas_balance: 500 }), // gas drift + }); + const registry = makeRegistry({ + 1: localRecord(), + 2: localRecord(), + 3: localRecord({ gas_balance: 1000 }), + }); + const reconciler = new TaskReconciler({ poller, registry }, { logger: silentLogger }); + + const report = await reconciler.reconcile(); + + expect(report.checked).toBe(3); + expect(report.clean).toBe(1); + expect(report.drifted).toBe(2); + expect(report.repaired).toBe(2); + expect(report.errors).toBe(0); + expect(report.results).toHaveLength(3); + }); + + test('counts RPC errors separately from drift', async () => { + const poller = { + getTaskConfig: jest.fn() + .mockResolvedValueOnce(chainConfig()) // task 1: clean + .mockRejectedValueOnce(new Error('timeout')), // task 2: error + }; + const registry = makeRegistry({ + 1: localRecord(), + 2: localRecord(), + }); + const reconciler = new TaskReconciler({ poller, registry }, { logger: silentLogger }); + + const report = await reconciler.reconcile(); + + expect(report.clean).toBe(1); + expect(report.errors).toBe(1); + expect(report.drifted).toBe(0); + }); + + test('reconcile with custom taskIds subset only checks requested tasks', async () => { + const poller = makePoller({ + 1: chainConfig(), + 2: chainConfig(), + }); + const registry = makeRegistry({ + 1: localRecord(), + 2: localRecord(), + }); + const reconciler = new TaskReconciler({ poller, registry }, { logger: silentLogger }); + + await reconciler.reconcile({ taskIds: [1] }); + + expect(poller.getTaskConfig).toHaveBeenCalledTimes(1); + expect(poller.getTaskConfig).toHaveBeenCalledWith(1); + }); + + test('returns empty report when registry is empty', async () => { + const poller = makePoller({}); + const registry = makeRegistry({}); + const reconciler = new TaskReconciler({ poller, registry }, { logger: silentLogger }); + + const report = await reconciler.reconcile(); + + expect(report.checked).toBe(0); + expect(report.clean).toBe(0); + expect(report.drifted).toBe(0); + expect(report.results).toHaveLength(0); + }); + + test('stores last report accessible via getLastReport()', async () => { + const poller = makePoller({ 1: chainConfig() }); + const registry = makeRegistry({ 1: localRecord() }); + const reconciler = new TaskReconciler({ poller, registry }, { logger: silentLogger }); + + expect(reconciler.getLastReport()).toBeNull(); + + await reconciler.reconcile(); + + const report = reconciler.getLastReport(); + expect(report).not.toBeNull(); + expect(report.checked).toBe(1); + }); + + test('report includes startedAt, finishedAt, and durationMs', async () => { + const poller = makePoller({ 1: chainConfig() }); + const registry = makeRegistry({ 1: localRecord() }); + const reconciler = new TaskReconciler({ poller, registry }, { logger: silentLogger }); + + const report = await reconciler.reconcile(); + + expect(report.startedAt).toBeTruthy(); + expect(report.finishedAt).toBeTruthy(); + expect(typeof report.durationMs).toBe('number'); + expect(report.durationMs).toBeGreaterThanOrEqual(0); + }); + + test('report includes dryRun flag matching constructor option', async () => { + const poller = makePoller({ 1: chainConfig() }); + const registry = makeRegistry({ 1: localRecord() }); + const reconciler = new TaskReconciler( + { poller, registry }, + { logger: silentLogger, dryRun: true }, + ); + + const report = await reconciler.reconcile(); + expect(report.dryRun).toBe(true); + }); + + test('throws RECONCILIATION_IN_PROGRESS when called concurrently', async () => { + let resolveFirst; + const poller = { + getTaskConfig: jest.fn( + () => + new Promise((resolve) => { + resolveFirst = () => resolve(chainConfig()); + }), + ), + }; + const registry = makeRegistry({ 1: localRecord() }); + const reconciler = new TaskReconciler({ poller, registry }, { logger: silentLogger }); + + // Start first reconciliation but don't await it yet + const first = reconciler.reconcile(); + expect(reconciler.isRunning()).toBe(true); + + // Second call while first is still running + await expect(reconciler.reconcile()).rejects.toMatchObject({ + code: 'RECONCILIATION_IN_PROGRESS', + }); + + // Finish first + resolveFirst(); + await first; + expect(reconciler.isRunning()).toBe(false); + }); + + test('_running resets to false even when reconcileTask throws unexpectedly', async () => { + const poller = { + getTaskConfig: jest.fn().mockRejectedValue(new Error('unexpected')), + }; + const registry = makeRegistry({ 1: localRecord() }); + const reconciler = new TaskReconciler({ poller, registry }, { logger: silentLogger }); + + const report = await reconciler.reconcile(); + expect(reconciler.isRunning()).toBe(false); + expect(report.errors).toBe(1); + }); +}); + +// ── Unit: constructor validation ────────────────────────────────────────────── + +describe('TaskReconciler constructor', () => { + test('throws when deps are missing', () => { + expect(() => new TaskReconciler({})).toThrow(); + expect(() => new TaskReconciler({ poller: makePoller() })).toThrow(); + expect(() => new TaskReconciler({ registry: makeRegistry() })).toThrow(); + }); +}); diff --git a/keeper/index.js b/keeper/index.js index 1b63050..8098ec5 100644 --- a/keeper/index.js +++ b/keeper/index.js @@ -15,8 +15,7 @@ const { MetricsServer } = require("./src/metrics"); const HistoryManager = require("./src/history"); const { normalizeShardConfig, filterTasksForShard } = require("./src/sharding"); const { StartupValidator } = require("./src/validator"); -const { GracefulShutdownManager } = require("./src/gracefulShutdown"); -const { createDefaultFilterChain } = require("./src/taskFilter"); +const { TaskReconciler } = require("./src/reconciler"); // Create root logger for the main module const logger = createLogger("keeper"); @@ -236,91 +235,54 @@ async function main() { }); await registry.init(); - // Initialize graceful shutdown manager - const shutdownManager = new GracefulShutdownManager({ - logger: createLogger("shutdown"), - drainTimeoutMs: parseInt( - process.env.SHUTDOWN_DRAIN_TIMEOUT_MS || 30000, - 10 - ), - forceTimeoutMs: parseInt( - process.env.SHUTDOWN_FORCE_TIMEOUT_MS || 60000, - 10 - ), - }); - - // Register polling interval for cleanup - shutdownManager.registerResource("polling-interval", async () => { - logger.info("Clearing polling interval"); - clearInterval(pollingInterval); - }); + // Initialize reconciler — detects and repairs drift between local registry + // and on-chain truth. Runs on startup and then on a configurable interval. + const reconciler = new TaskReconciler( + { poller, registry }, + { logger: createLogger("reconciler") }, + ); - // Register queue for graceful draining - shutdownManager.registerResource("execution-queue", async () => { - logger.info("Starting queue graceful shutdown"); - const result = await queue.gracefulShutdown({ - drainTimeoutMs: parseInt( - process.env.SHUTDOWN_DRAIN_TIMEOUT_MS || 30000, - 10 - ), - onProgress: (progress) => { - logger.debug("Queue shutdown progress", progress); - }, + // Startup reconciliation: repair any drift accumulated while the keeper was + // offline (missed events, another keeper executing tasks, etc.). + logger.info("Running startup reconciliation"); + try { + const startupReport = await reconciler.reconcile(); + logger.info("Startup reconciliation complete", { + checked: startupReport.checked, + drifted: startupReport.drifted, + repaired: startupReport.repaired, + errors: startupReport.errors, }); + } catch (err) { + logger.warn("Startup reconciliation failed — continuing", { error: err.message }); + } - logger.info("Queue shutdown complete", result); - - // Report final queue status - const status = queue.getInFlightStatus(); - if (status.inFlight > 0) { - logger.warn("Queue shutdown: Still in-flight tasks remaining", { - ...status, - }); - } - }); + // Periodic reconciliation: catch slow drift between polling cycles. + // Default: every 5 minutes. Override via RECONCILE_INTERVAL_MS env var. + const reconcileIntervalMs = parseInt( + process.env.RECONCILE_INTERVAL_MS || String(5 * 60 * 1000), + 10, + ); + logger.info("Scheduling periodic reconciliation", { intervalMs: reconcileIntervalMs }); - // Register registry cleanup - shutdownManager.registerResource("task-registry", async () => { - logger.info("Closing task registry"); - if (registry.close) { - await registry.close(); + const reconcileInterval = setInterval(async () => { + try { + logger.info("Starting periodic reconciliation"); + const report = await reconciler.reconcile(); + if (report.drifted > 0) { + logger.warn("Periodic reconciliation found and repaired drift", { + drifted: report.drifted, + repaired: report.repaired, + }); + } + } catch (err) { + // RECONCILIATION_IN_PROGRESS is expected if the interval fires while a + // previous pass (e.g. from a POST /reconcile request) is still running. + if (err.code !== "RECONCILIATION_IN_PROGRESS") { + logger.error("Periodic reconciliation error", { error: err.message }); + } } - }); - - // Register server cleanup - shutdownManager.registerResource("rpc-server", async () => { - logger.info("Closing RPC server connection"); - // Server doesn't have explicit close, but we log it - }); - - // Register idempotency guard persistence - shutdownManager.registerResource("idempotency-guard", async () => { - logger.info("Finalizing idempotency state"); - const snapshot = idempotencyGuard.getSnapshot(); - logger.info("Idempotency state at shutdown", { - stateFile: snapshot.stateFile, - lockCount: snapshot.lockCount, - completedCount: snapshot.completedCount, - }); - }); - - // Initialize and start listening for signals - shutdownManager.init(); - - // Listen to shutdown events for additional logging - shutdownManager.on("shutdown:initiated", ({ signal, reason }) => { - logger.warn("Shutdown initiated", { signal, reason }); - }); - - shutdownManager.on("shutdown:stop-accepting", () => { - logger.info("Stopped accepting new work"); - // Stop the polling loop explicitly - clearInterval(pollingInterval); - }); - - shutdownManager.on("shutdown:force", () => { - logger.warn("Force shutdown initiated - remaining tasks will be cancelled"); - }); + }, reconcileIntervalMs); // Polling loop const pollingIntervalMs = config.pollIntervalMs; @@ -429,27 +391,7 @@ async function main() { shutdownTimeoutMs, }); clearInterval(pollingInterval); - - const gracefulShutdown = async () => { - await queue.shutdown(); - }; - - try { - await Promise.race([ - gracefulShutdown(), - new Promise((_, reject) => - setTimeout(() => reject(new Error('Shutdown timeout exceeded')), shutdownTimeoutMs), - ), - ]); - logger.info('Graceful shutdown complete, exiting', { signal }); - process.exit(0); - } catch (error) { - logger.error('Graceful shutdown failed or timed out', { - signal, - error: error.message, - }); - process.exit(1); - } + clearInterval(reconcileInterval); await queue.drain(); metricsServer.stop(); logger.info("Graceful shutdown complete, exiting"); diff --git a/keeper/package-lock.json b/keeper/package-lock.json index af1b59f..a640570 100644 --- a/keeper/package-lock.json +++ b/keeper/package-lock.json @@ -2919,6 +2919,7 @@ "version": "25.3.0", "resolved": "https://registry.npmjs.org/@types/node/-/node-25.3.0.tgz", "integrity": "sha512-4K3bqJpXpqfg2XKGK9bpDTc6xO/xoUP/RBWS7AtRMug6zZFaRekiLzjVtAoZMquxoAbzBvy5nxQ7veS5eYzf8A==", + "dev": true, "license": "MIT", "dependencies": { "undici-types": "~7.18.0" @@ -3393,10 +3394,7 @@ "possible-typed-array-names": "^1.0.0" }, "engines": { - "node": ">= 0.4" - }, - "funding": { - "url": "https://github.com/sponsors/ljharb" + "node": ">=12" } }, "node_modules/axios": { @@ -4013,23 +4011,6 @@ "url": "https://opencollective.com/core-js" } }, - "node_modules/cors": { - "version": "2.8.6", - "resolved": "https://registry.npmjs.org/cors/-/cors-2.8.6.tgz", - "integrity": "sha512-tJtZBBHA6vjIAaF6EnIaq6laBBP9aq/Y3ouVJjEfoHbRBcHBAHYcMh/w8LDrk2PvIMMq8gmopa5D4V8RmbrxGw==", - "license": "MIT", - "dependencies": { - "object-assign": "^4", - "vary": "^1" - }, - "engines": { - "node": ">= 0.10" - }, - "funding": { - "type": "opencollective", - "url": "https://opencollective.com/express" - } - }, "node_modules/cross-spawn": { "version": "7.0.6", "resolved": "https://registry.npmjs.org/cross-spawn/-/cross-spawn-7.0.6.tgz", @@ -4220,49 +4201,6 @@ "once": "^1.4.0" } }, - "node_modules/engine.io": { - "version": "6.6.7", - "resolved": "https://registry.npmjs.org/engine.io/-/engine.io-6.6.7.tgz", - "integrity": "sha512-DgOngfDKM2EviOH3Mr9m7ks1q8roetLy/IMmYthAYzbpInMbYc/GS+fWFA3rl1gvwKVsQrVV61fo5emD1y3OJQ==", - "license": "MIT", - "dependencies": { - "@types/cors": "^2.8.12", - "@types/node": ">=10.0.0", - "@types/ws": "^8.5.12", - "accepts": "~1.3.4", - "base64id": "2.0.0", - "cookie": "~0.7.2", - "cors": "~2.8.5", - "debug": "~4.4.1", - "engine.io-parser": "~5.2.1", - "ws": "~8.18.3" - }, - "engines": { - "node": ">=10.2.0" - } - }, - "node_modules/engine.io-client": { - "version": "6.6.4", - "resolved": "https://registry.npmjs.org/engine.io-client/-/engine.io-client-6.6.4.tgz", - "integrity": "sha512-+kjUJnZGwzewFDw951CDWcwj35vMNf2fcj7xQWOctq1F2i1jkDdVvdFG9kM/BEChymCH36KgjnW0NsL58JYRxw==", - "license": "MIT", - "dependencies": { - "@socket.io/component-emitter": "~3.1.0", - "debug": "~4.4.1", - "engine.io-parser": "~5.2.1", - "ws": "~8.18.3", - "xmlhttprequest-ssl": "~2.1.1" - } - }, - "node_modules/engine.io-parser": { - "version": "5.2.3", - "resolved": "https://registry.npmjs.org/engine.io-parser/-/engine.io-parser-5.2.3.tgz", - "integrity": "sha512-HqD3yTBfnBxIrbnM1DoD6Pcq8NECnh8d4As1Qgh0z5Gg3jRRIqijury0CL3ghu/edArpUYiYqQiDUQBIs4np3Q==", - "license": "MIT", - "engines": { - "node": ">=10.0.0" - } - }, "node_modules/error-ex": { "version": "1.3.4", "resolved": "https://registry.npmjs.org/error-ex/-/error-ex-1.3.4.tgz", @@ -5272,64 +5210,6 @@ "integrity": "sha512-k/vGaX4/Yla3WzyMCvTQOXYeIHvqOKtnqBduzTHpzpQZzAskKMhZ2K+EnBiSM9zGSoIFeMpXKxa4dYeZIQqewQ==", "license": "ISC" }, - "node_modules/ioredis": { - "version": "5.10.1", - "resolved": "https://registry.npmjs.org/ioredis/-/ioredis-5.10.1.tgz", - "integrity": "sha512-HuEDBTI70aYdx1v6U97SbNx9F1+svQKBDo30o0b9fw055LMepzpOOd0Ccg9Q6tbqmBSJaMuY0fB7yw9/vjBYCA==", - "license": "MIT", - "dependencies": { - "@ioredis/commands": "1.5.1", - "cluster-key-slot": "^1.1.0", - "debug": "^4.3.4", - "denque": "^2.1.0", - "lodash.defaults": "^4.2.0", - "lodash.isarguments": "^3.1.0", - "redis-errors": "^1.2.0", - "redis-parser": "^3.0.0", - "standard-as-callback": "^2.1.0" - }, - "engines": { - "node": ">=12.22.0" - }, - "funding": { - "type": "opencollective", - "url": "https://opencollective.com/ioredis" - } - }, - "node_modules/ioredis-mock": { - "version": "8.13.1", - "resolved": "https://registry.npmjs.org/ioredis-mock/-/ioredis-mock-8.13.1.tgz", - "integrity": "sha512-Wsi50AU+cMiI32nAgfwpUaJVBtb4iQdVsOHl9M6R3tePCO/8vGsToCVIG82XWAxN4Se55TZoOzVseu+QngFLyw==", - "dev": true, - "license": "MIT", - "dependencies": { - "@ioredis/as-callback": "^3.0.0", - "@ioredis/commands": "^1.4.0", - "fengari": "^0.1.4", - "fengari-interop": "^0.1.3", - "semver": "^7.7.2" - }, - "engines": { - "node": ">=12.22" - }, - "peerDependencies": { - "@types/ioredis-mock": "^8", - "ioredis": "^5" - } - }, - "node_modules/ioredis-mock/node_modules/semver": { - "version": "7.7.4", - "resolved": "https://registry.npmjs.org/semver/-/semver-7.7.4.tgz", - "integrity": "sha512-vFKC2IEtQnVhpT78h1Yp8wzwrf8CM+MzKMHGJZfBtzhZNycRFnXsHk6E5TxIkkMsgNS7mdX3AGB7x2QM2di4lA==", - "dev": true, - "license": "ISC", - "bin": { - "semver": "bin/semver.js" - }, - "engines": { - "node": ">=10" - } - }, "node_modules/is-arrayish": { "version": "0.2.1", "resolved": "https://registry.npmjs.org/is-arrayish/-/is-arrayish-0.2.1.tgz", @@ -5349,20 +5229,17 @@ "url": "https://github.com/sponsors/ljharb" } }, - "node_modules/is-core-module": { - "version": "2.16.1", - "resolved": "https://registry.npmjs.org/is-core-module/-/is-core-module-2.16.1.tgz", - "integrity": "sha512-UfoeMA6fIJ8wTYFEUjelnaGI67v6+N7qXJEvQuIGa99l4xsCruSYOVSQ0uPANn4dAzm8lkYPaKLrrijLq7x23w==", + "node_modules/ioredis-mock/node_modules/semver": { + "version": "7.7.4", + "resolved": "https://registry.npmjs.org/semver/-/semver-7.7.4.tgz", + "integrity": "sha512-vFKC2IEtQnVhpT78h1Yp8wzwrf8CM+MzKMHGJZfBtzhZNycRFnXsHk6E5TxIkkMsgNS7mdX3AGB7x2QM2di4lA==", "dev": true, - "license": "MIT", - "dependencies": { - "hasown": "^2.0.2" + "license": "ISC", + "bin": { + "semver": "bin/semver.js" }, "engines": { - "node": ">= 0.4" - }, - "funding": { - "url": "https://github.com/sponsors/ljharb" + "node": ">=10" } }, "node_modules/is-extglob": { @@ -5439,7 +5316,7 @@ "integrity": "sha512-p3EcsicXjit7SaskXHs1hA91QxgTw46Fv6EFKKGS5DRFLD8yKnohjF3hxoju94b/OcMZoQukzpPpBE9uLVKzgQ==", "license": "MIT", "dependencies": { - "is-extglob": "^2.1.1" + "which-typed-array": "^1.1.16" }, "engines": { "node": ">=0.10.0" @@ -7304,27 +7181,6 @@ "node": ">= 12.13.0" } }, - "node_modules/redis-errors": { - "version": "1.2.0", - "resolved": "https://registry.npmjs.org/redis-errors/-/redis-errors-1.2.0.tgz", - "integrity": "sha512-1qny3OExCf0UvUV/5wpYKf2YwPcOqXzkwKKSmKHiE6ZMQs5heeE/c8eXK+PNllPvmjgAbfnsbpkGZWy8cBpn9w==", - "license": "MIT", - "engines": { - "node": ">=4" - } - }, - "node_modules/redis-parser": { - "version": "3.0.0", - "resolved": "https://registry.npmjs.org/redis-parser/-/redis-parser-3.0.0.tgz", - "integrity": "sha512-DJnGAeenTdpMEH6uAJRK/uiyEIH9WVsUmoLwzudwGJUwZPp80PDBWPHXSAGNPwNvIXAbe7MSUB1zQFugFml66A==", - "license": "MIT", - "dependencies": { - "redis-errors": "^1.0.0" - }, - "engines": { - "node": ">=4" - } - }, "node_modules/regenerate": { "version": "1.4.2", "resolved": "https://registry.npmjs.org/regenerate/-/regenerate-1.4.2.tgz", @@ -7602,62 +7458,6 @@ "url": "https://github.com/chalk/slice-ansi?sponsor=1" } }, - "node_modules/socket.io": { - "version": "4.8.3", - "resolved": "https://registry.npmjs.org/socket.io/-/socket.io-4.8.3.tgz", - "integrity": "sha512-2Dd78bqzzjE6KPkD5fHZmDAKRNe3J15q+YHDrIsy9WEkqttc7GY+kT9OBLSMaPbQaEd0x1BjcmtMtXkfpc+T5A==", - "license": "MIT", - "dependencies": { - "accepts": "~1.3.4", - "base64id": "~2.0.0", - "cors": "~2.8.5", - "debug": "~4.4.1", - "engine.io": "~6.6.0", - "socket.io-adapter": "~2.5.2", - "socket.io-parser": "~4.2.4" - }, - "engines": { - "node": ">=10.2.0" - } - }, - "node_modules/socket.io-adapter": { - "version": "2.5.6", - "resolved": "https://registry.npmjs.org/socket.io-adapter/-/socket.io-adapter-2.5.6.tgz", - "integrity": "sha512-DkkO/dz7MGln0dHn5bmN3pPy+JmywNICWrJqVWiVOyvXjWQFIv9c2h24JrQLLFJ2aQVQf/Cvl1vblnd4r2apLQ==", - "license": "MIT", - "dependencies": { - "debug": "~4.4.1", - "ws": "~8.18.3" - } - }, - "node_modules/socket.io-client": { - "version": "4.8.3", - "resolved": "https://registry.npmjs.org/socket.io-client/-/socket.io-client-4.8.3.tgz", - "integrity": "sha512-uP0bpjWrjQmUt5DTHq9RuoCBdFJF10cdX9X+a368j/Ft0wmaVgxlrjvK3kjvgCODOMMOz9lcaRzxmso0bTWZ/g==", - "license": "MIT", - "dependencies": { - "@socket.io/component-emitter": "~3.1.0", - "debug": "~4.4.1", - "engine.io-client": "~6.6.1", - "socket.io-parser": "~4.2.4" - }, - "engines": { - "node": ">=10.0.0" - } - }, - "node_modules/socket.io-parser": { - "version": "4.2.6", - "resolved": "https://registry.npmjs.org/socket.io-parser/-/socket.io-parser-4.2.6.tgz", - "integrity": "sha512-asJqbVBDsBCJx0pTqw3WfesSY0iRX+2xzWEWzrpcH7L6fLzrhyF8WPI8UaeM4YCuDfpwA/cgsdugMsmtz8EJeg==", - "license": "MIT", - "dependencies": { - "@socket.io/component-emitter": "~3.1.0", - "debug": "~4.4.1" - }, - "engines": { - "node": ">=10.0.0" - } - }, "node_modules/sonic-boom": { "version": "4.2.1", "resolved": "https://registry.npmjs.org/sonic-boom/-/sonic-boom-4.2.1.tgz", @@ -8303,35 +8103,6 @@ "url": "https://github.com/sponsors/isaacs" } }, - "node_modules/ws": { - "version": "8.18.3", - "resolved": "https://registry.npmjs.org/ws/-/ws-8.18.3.tgz", - "integrity": "sha512-PEIGCY5tSlUt50cqyMXfCzX+oOPqN0vuGqWzbcJ2xvnkzkq46oOpz7dQaTDBdfICb4N14+GARUDw2XV2N4tvzg==", - "license": "MIT", - "engines": { - "node": ">=10.0.0" - }, - "peerDependencies": { - "bufferutil": "^4.0.1", - "utf-8-validate": ">=5.0.2" - }, - "peerDependenciesMeta": { - "bufferutil": { - "optional": true - }, - "utf-8-validate": { - "optional": true - } - } - }, - "node_modules/xmlhttprequest-ssl": { - "version": "2.1.2", - "resolved": "https://registry.npmjs.org/xmlhttprequest-ssl/-/xmlhttprequest-ssl-2.1.2.tgz", - "integrity": "sha512-TEU+nJVUUnA4CYJFLvK5X9AOeH4KvDvhIfm0vV1GaQRtchnG0hgK5p8hw/xjv8cunWYCsiPCSDzObPyhEwq3KQ==", - "engines": { - "node": ">=0.4.0" - } - }, "node_modules/y18n": { "version": "5.0.8", "resolved": "https://registry.npmjs.org/y18n/-/y18n-5.0.8.tgz", diff --git a/keeper/src/metrics.js b/keeper/src/metrics.js index f24e983..71054f5 100644 --- a/keeper/src/metrics.js +++ b/keeper/src/metrics.js @@ -186,12 +186,8 @@ class MetricsServer { this.registry = registry; } - setControlStateProvider(provider) { - this.controlStateProvider = provider; - } - - setControlActionHandler(handler) { - this.controlActionHandler = handler; + setReconciler(reconciler) { + this.reconciler = reconciler; } initPrometheusMetrics() { @@ -429,7 +425,7 @@ class MetricsServer { this.server = http.createServer(async (req, res) => { res.setHeader('Access-Control-Allow-Origin', '*'); res.setHeader('Access-Control-Allow-Methods', 'GET, POST, OPTIONS'); - res.setHeader('Access-Control-Allow-Headers', 'Content-Type, Authorization'); + res.setHeader('Access-Control-Allow-Headers', 'Content-Type'); if (req.method === 'OPTIONS') { res.writeHead(204); @@ -449,40 +445,12 @@ class MetricsServer { } else if (req.url === '/metrics/forecast' || req.url === '/metrics/forecast/') { this.handleForecast(res); - - - // 🔐 PROTECTED ROUTES START HERE - - } else if (req.url === '/admin/reset' && req.method === 'POST') { - protect(() => { - this.metrics.reset(); - res.writeHead(200, { 'Content-Type': 'application/json' }); - res.end(JSON.stringify({ success: true })); - })(); - - } else if (req.url === '/admin/dead-letter') { - protect(() => this.handleDeadLetter(res))(); - - } else if (req.url.startsWith('/admin/dead-letter/')) { - protect(() => this.handleDeadLetterTask(req, res))(); - - - // ❌ NOT FOUND - - } else if (url.pathname === '/metrics' || url.pathname === '/metrics/') { - this.handleMetrics(res); - } else if (url.pathname === '/metrics/prometheus' || url.pathname === '/metrics/prometheus/') { - await this.handlePrometheusMetrics(res); - } else if (url.pathname === '/metrics/forecast' || url.pathname === '/metrics/forecast/') { - this.handleForecast(res); - } else if (url.pathname === '/drift' || url.pathname === '/drift/') { - this.handleDrift(res); - } else if (url.pathname === '/admin/keeper' || url.pathname === '/admin/keeper/') { - this.handleAdminState(req, res); - } else if (url.pathname === '/admin/keeper/pause' || url.pathname === '/admin/keeper/pause/') { - await this.handlePauseResume(req, res, true); - } else if (url.pathname === '/admin/keeper/resume' || url.pathname === '/admin/keeper/resume/') { - await this.handlePauseResume(req, res, false); + } else if (req.url === '/reconcile' || req.url === '/reconcile/') { + if (req.method === 'POST') { + this.handleReconcileTrigger(res); + } else { + this.handleReconcileStatus(res); + } } else { res.writeHead(404); res.end('Not Found'); @@ -634,6 +602,50 @@ class MetricsServer { }); } + /** + * GET /reconcile — return the most recent reconciliation report, or a + * 204 No Content when no reconciliation has run yet. + */ + handleReconcileStatus(res) { + if (!this.reconciler) { + res.writeHead(503, { 'Content-Type': 'application/json' }); + res.end(JSON.stringify({ error: 'Reconciler not initialised' })); + return; + } + + const report = this.reconciler.getLastReport(); + if (!report) { + res.writeHead(204); + res.end(); + return; + } + + res.writeHead(200, { 'Content-Type': 'application/json' }); + res.end(JSON.stringify(report, null, 2)); + } + + /** + * POST /reconcile — trigger an immediate reconciliation pass. + * Returns 409 Conflict when one is already running. + * Returns 200 with the report on success. + */ + handleReconcileTrigger(res) { + if (!this.reconciler) { + res.writeHead(503, { 'Content-Type': 'application/json' }); + res.end(JSON.stringify({ error: 'Reconciler not initialised' })); + return; + } + + this.reconciler.reconcile().then((report) => { + res.writeHead(200, { 'Content-Type': 'application/json' }); + res.end(JSON.stringify(report, null, 2)); + }).catch((err) => { + const status = err.code === 'RECONCILIATION_IN_PROGRESS' ? 409 : 500; + res.writeHead(status, { 'Content-Type': 'application/json' }); + res.end(JSON.stringify({ error: err.message, code: err.code ?? null })); + }); + } + updateHealth(state) { this.metrics.updateHealth(state); } diff --git a/keeper/src/reconciler.js b/keeper/src/reconciler.js new file mode 100644 index 0000000..7bbdeec --- /dev/null +++ b/keeper/src/reconciler.js @@ -0,0 +1,404 @@ +'use strict'; + +const { createLogger } = require('./logger'); + +/** + * Human-readable mismatch type constants. + * Each describes a specific class of drift between local registry and on-chain truth. + */ +const MismatchType = { + /** Registry holds a task ID that no longer exists on-chain (likely cancelled). */ + TASK_NOT_ON_CHAIN: 'TASK_NOT_ON_CHAIN', + /** is_active flag differs — missed TaskPaused or TaskResumed event. */ + STATUS_DRIFT: 'STATUS_DRIFT', + /** gas_balance differs — missed GasDeposited, GasWithdrawn, or KeeperPaid event. */ + GAS_BALANCE_DRIFT: 'GAS_BALANCE_DRIFT', + /** last_run differs — task was executed by another keeper without our registry observing it. */ + LAST_RUN_DRIFT: 'LAST_RUN_DRIFT', + /** Any other reconcilable field differs (e.g. interval) — local data may be stale. */ + FIELD_DRIFT: 'FIELD_DRIFT', + /** Task exists on-chain but has no local registry entry — missed TaskRegistered event. */ + STALE_RECORD: 'STALE_RECORD', +}; + +/** + * Fields compared between local registry record and on-chain TaskConfig. + * Ordered from highest to lowest operational impact. + */ +const RECONCILABLE_FIELDS = [ + { + chainField: 'is_active', + localField: 'is_active', + mismatchType: MismatchType.STATUS_DRIFT, + cause: 'Missed TaskPaused or TaskResumed event', + }, + { + chainField: 'gas_balance', + localField: 'gas_balance', + mismatchType: MismatchType.GAS_BALANCE_DRIFT, + cause: 'Missed GasDeposited, GasWithdrawn, or KeeperPaid event', + }, + { + chainField: 'last_run', + localField: 'last_run', + mismatchType: MismatchType.LAST_RUN_DRIFT, + cause: 'Task executed by another keeper — missed KeeperPaid event', + }, + { + chainField: 'interval', + localField: 'interval', + mismatchType: MismatchType.FIELD_DRIFT, + cause: 'Local data may be stale or corrupted', + }, +]; + +/** + * Derive the registry `status` string from on-chain fields. + * This is the canonical status computation shared by the reconciler and the poller. + * + * @param {boolean} isActive + * @param {number} gasBalance + * @returns {'active'|'paused'|'low_gas'} + */ +function deriveStatus(isActive, gasBalance) { + if (!isActive) return 'paused'; + if (Number(gasBalance) <= 0) return 'low_gas'; + return 'active'; +} + +/** + * Compare two values for equality, tolerating numeric type coercions + * (BigInt vs Number, string-encoded numbers). + * + * @param {*} a + * @param {*} b + * @returns {boolean} + */ +function valuesMatch(a, b) { + if (a === b) return true; + const na = Number(a); + const nb = Number(b); + if (!Number.isNaN(na) && !Number.isNaN(nb)) return na === nb; + return false; +} + +/** + * TaskReconciler compares the keeper's local registry state against on-chain + * TaskConfig truth for every known task ID. + * + * Design principles + * ----------------- + * - On-chain state is always the authoritative source of truth. + * - Repairs are additive/update-only — cancelled tasks are marked, never deleted, + * preserving audit history in the local registry. + * - Dry-run mode detects and reports drift without modifying any local state. + * - Sequential task processing avoids overwhelming the RPC endpoint. + * - Concurrent reconcile() calls are rejected to prevent double work. + * + * When to trigger reconciliation + * -------------------------------- + * 1. On keeper startup (after registry.init()) to repair any drift from the + * previous run's missed events. + * 2. Periodically (default every 5 minutes via RECONCILE_INTERVAL_MS env var) + * to detect slow drift from missed events or multi-keeper execution. + * 3. On demand via POST /reconcile when an operator suspects inconsistency. + */ +class TaskReconciler { + /** + * @param {{ poller: TaskPoller, registry: TaskRegistry }} deps + * @param {{ logger?: object, dryRun?: boolean }} [options] + */ + constructor(deps, options = {}) { + if (!deps || !deps.poller || !deps.registry) { + throw new Error('TaskReconciler requires { poller, registry } dependencies'); + } + this.poller = deps.poller; + this.registry = deps.registry; + this.logger = options.logger || createLogger('reconciler'); + this.dryRun = Boolean(options.dryRun); + + this._running = false; + this._lastReport = null; + } + + /** + * Reconcile a single task: fetch on-chain state, detect drift, apply repairs. + * + * @param {number} taskId + * @returns {Promise} + */ + async reconcileTask(taskId) { + const local = this.registry.tasks.get(taskId) ?? null; + let onChain = null; + + try { + onChain = await this.poller.getTaskConfig(taskId); + } catch (err) { + this.logger.warn('Failed to fetch on-chain task during reconciliation', { + taskId, + error: err.message, + }); + return { + taskId, + status: 'error', + error: err.message, + mismatches: [], + repaired: false, + }; + } + + const mismatches = this._detectMismatches(taskId, local, onChain); + + let repaired = false; + if (mismatches.length > 0) { + if (this.dryRun) { + this.logger.info('Drift detected (dry-run — no repair applied)', { + taskId, + count: mismatches.length, + types: mismatches.map((m) => m.type), + }); + } else { + this._applyRepairs(taskId, mismatches, onChain); + repaired = true; + } + } + + return { + taskId, + status: mismatches.length === 0 ? 'clean' : this.dryRun ? 'drifted' : 'repaired', + mismatches, + repaired, + }; + } + + /** + * Run a full reconciliation pass over all known task IDs (or a supplied subset). + * + * @param {{ taskIds?: number[] }} [options] + * @returns {Promise} + * @throws {Error} If a reconciliation is already running (code RECONCILIATION_IN_PROGRESS) + */ + async reconcile(options = {}) { + if (this._running) { + throw Object.assign( + new Error('Reconciliation already in progress — try again shortly'), + { code: 'RECONCILIATION_IN_PROGRESS' }, + ); + } + + this._running = true; + + const taskIds = options.taskIds ?? this.registry.getTaskIds(); + const startTime = Date.now(); + + const report = { + startedAt: new Date().toISOString(), + dryRun: this.dryRun, + checked: 0, + clean: 0, + drifted: 0, + repaired: 0, + errors: 0, + results: [], + durationMs: 0, + finishedAt: null, + }; + + this.logger.info('Starting reconciliation', { + taskCount: taskIds.length, + dryRun: this.dryRun, + }); + + try { + for (const taskId of taskIds) { + const result = await this.reconcileTask(taskId); + report.results.push(result); + report.checked++; + + if (result.status === 'error') { + report.errors++; + } else if (result.mismatches.length === 0) { + report.clean++; + } else { + report.drifted++; + if (result.repaired) report.repaired++; + } + } + } finally { + report.durationMs = Date.now() - startTime; + report.finishedAt = new Date().toISOString(); + this._running = false; + } + + this.logger.info('Reconciliation complete', { + checked: report.checked, + clean: report.clean, + drifted: report.drifted, + repaired: report.repaired, + errors: report.errors, + durationMs: report.durationMs, + }); + + if (report.drifted > 0) { + const byType = {}; + for (const r of report.results) { + for (const m of r.mismatches) { + byType[m.type] = (byType[m.type] ?? 0) + 1; + } + } + this.logger.warn('Drift detected — mismatch summary by type', { byType }); + } + + this._lastReport = report; + return report; + } + + /** + * Return the most recent reconciliation report, or null if none has run yet. + * @returns {ReconcileReport|null} + */ + getLastReport() { + return this._lastReport; + } + + /** + * True when a reconcile() call is currently executing. + * @returns {boolean} + */ + isRunning() { + return this._running; + } + + // ── Internal ──────────────────────────────────────────────────────────────── + + /** + * Compare a local registry record against on-chain config and return all + * detected mismatches. + * + * @param {number} taskId + * @param {object|null} local + * @param {object|null} onChain — null when get_task returned nothing + * @returns {Mismatch[]} + */ + _detectMismatches(taskId, local, onChain) { + const mismatches = []; + + // Task in registry but absent on-chain — likely cancelled, missed event. + if (onChain === null) { + mismatches.push({ + type: MismatchType.TASK_NOT_ON_CHAIN, + field: null, + localValue: local?.status ?? 'unknown', + chainValue: null, + cause: + 'Task not found on-chain — it was probably cancelled. Likely a missed TaskCancelled event.', + repair: 'MARK_CANCELLED', + }); + return mismatches; // no further field checks make sense + } + + // Task on-chain but no local record — missed TaskRegistered event. + if (!local) { + mismatches.push({ + type: MismatchType.STALE_RECORD, + field: null, + localValue: null, + chainValue: 'exists', + cause: + 'Task exists on-chain but has no local registry entry. Likely a missed TaskRegistered event.', + repair: 'SYNC_ALL_FIELDS', + }); + return mismatches; + } + + // Compare each reconcilable field. + // Skip any field whose local value is `undefined` — it was never fetched + // from chain (e.g. task was discovered via event but not yet polled), so + // absence is not drift. + for (const { chainField, localField, mismatchType, cause } of RECONCILABLE_FIELDS) { + const chainValue = onChain[chainField]; + const localValue = local[localField]; + + if (localValue === undefined) continue; + + if (!valuesMatch(chainValue, localValue)) { + mismatches.push({ + type: mismatchType, + field: localField, + localValue, + chainValue, + cause, + repair: 'UPDATE_FROM_CHAIN', + }); + } + } + + return mismatches; + } + + /** + * Apply safe, non-destructive repairs to the registry for a drifted task. + * On-chain state is always the source of truth. Records are updated, never removed. + * + * @param {number} taskId + * @param {Mismatch[]} mismatches + * @param {object|null} onChain + */ + _applyRepairs(taskId, mismatches, onChain) { + const update = { reconciledAt: new Date().toISOString() }; + + for (const mismatch of mismatches) { + switch (mismatch.repair) { + case 'MARK_CANCELLED': + // Mark without deleting — preserves history and execution counts. + update.status = 'cancelled'; + update.cancelledDetectedAt = new Date().toISOString(); + this.logger.info('Marking task as cancelled (absent from on-chain state)', { + taskId, + previousStatus: mismatch.localValue, + }); + break; + + case 'UPDATE_FROM_CHAIN': + update[mismatch.field] = mismatch.chainValue; + this.logger.info('Repairing drifted field from chain truth', { + taskId, + field: mismatch.field, + was: mismatch.localValue, + now: mismatch.chainValue, + mismatchType: mismatch.type, + cause: mismatch.cause, + }); + break; + + case 'SYNC_ALL_FIELDS': + // Full sync for tasks with no prior local record. + if (onChain) { + update.is_active = onChain.is_active; + update.gas_balance = onChain.gas_balance; + update.last_run = onChain.last_run; + update.interval = onChain.interval; + } + this.logger.info('Syncing all fields for task with no local record', { taskId }); + break; + } + } + + // Recompute `status` from the post-repair chain values unless the task was + // just marked as cancelled (that overrides status already). + if (update.status !== 'cancelled' && onChain) { + const existing = this.registry.tasks.get(taskId) ?? {}; + const effectiveIsActive = + update.is_active !== undefined ? update.is_active : existing.is_active ?? onChain.is_active; + const effectiveGasBalance = + update.gas_balance !== undefined + ? update.gas_balance + : existing.gas_balance ?? onChain.gas_balance; + update.status = deriveStatus(effectiveIsActive, effectiveGasBalance); + } + + this.registry.updateTask(taskId, update); + } +} + +module.exports = { TaskReconciler, MismatchType, deriveStatus };