From a63f33abe526f19108003ae743953da82bcb9fcb Mon Sep 17 00:00:00 2001 From: Gozirimdev Date: Sat, 30 May 2026 00:03:50 +0100 Subject: [PATCH] Fix keeper resolver integration and update keeper tests to pass --- keeper/__tests__/fixtures/simple-resolver.js | 19 ++ keeper/__tests__/integration.test.js | 18 +- keeper/__tests__/poller.test.js | 54 +++++- keeper/__tests__/resolverManager.test.js | 59 +++++++ keeper/index.js | 35 +++- keeper/package-lock.json | 130 +++++++++++++- keeper/package.json | 3 +- keeper/src/config.js | 3 + keeper/src/metrics.js | 28 +++ keeper/src/poller.js | 39 ++++- keeper/src/queue.js | 4 + keeper/src/resolverManager.js | 173 +++++++++++++++++++ 12 files changed, 540 insertions(+), 25 deletions(-) create mode 100644 keeper/__tests__/fixtures/simple-resolver.js create mode 100644 keeper/__tests__/resolverManager.test.js create mode 100644 keeper/src/resolverManager.js diff --git a/keeper/__tests__/fixtures/simple-resolver.js b/keeper/__tests__/fixtures/simple-resolver.js new file mode 100644 index 00000000..cda16122 --- /dev/null +++ b/keeper/__tests__/fixtures/simple-resolver.js @@ -0,0 +1,19 @@ +class SimpleResolver { + async init(options) { + this.options = options || {}; + } + + async resolve(taskId, taskConfig) { + if (this.options.reject) { + return { isReady: false, reason: 'explicit_reject' }; + } + + if (this.options.timeout) { + await new Promise((resolve) => setTimeout(resolve, this.options.timeout)); + } + + return { isReady: true }; + } +} + +module.exports = SimpleResolver; diff --git a/keeper/__tests__/integration.test.js b/keeper/__tests__/integration.test.js index 33e5ea44..b25197b1 100644 --- a/keeper/__tests__/integration.test.js +++ b/keeper/__tests__/integration.test.js @@ -53,6 +53,18 @@ describe('Keeper Integration Tests', () => { })); } + const targetValue = taskConfig.target || 'CAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAD2KM'; + mapEntries.push(new xdr.ScMapEntry({ + key: xdr.ScVal.scvSymbol('target'), + val: xdr.ScVal.scvString(targetValue), + })); + + const functionValue = taskConfig.function || 'run_task'; + mapEntries.push(new xdr.ScMapEntry({ + key: xdr.ScVal.scvSymbol('function'), + val: xdr.ScVal.scvString(functionValue), + })); + return xdr.ScVal.scvVec([xdr.ScVal.scvMap(mapEntries)]); } @@ -121,7 +133,7 @@ describe('Keeper Integration Tests', () => { // Poll for due tasks const dueTaskIds = await poller.pollDueTasks(registry.getTaskIds()); - expect(dueTaskIds).toEqual([1]); + expect(dueTaskIds.map((item) => item.taskId)).toEqual([1]); expect(poller.stats.tasksDue).toBe(1); expect(poller.stats.tasksChecked).toBe(2); @@ -200,7 +212,7 @@ describe('Keeper Integration Tests', () => { }); dueTaskIds = await poller.pollDueTasks(registry.getTaskIds()); - expect(dueTaskIds).toEqual([1]); + expect(dueTaskIds.map((item) => item.taskId)).toEqual([1]); }); }); @@ -234,7 +246,7 @@ describe('Keeper Integration Tests', () => { const dueTaskIds = await poller.pollDueTasks(registry.getTaskIds()); - expect(dueTaskIds).toEqual([2]); + expect(dueTaskIds.map((item) => item.taskId)).toEqual([2]); expect(poller.stats.errors).toBe(1); expect(poller.stats.tasksDue).toBe(1); }); diff --git a/keeper/__tests__/poller.test.js b/keeper/__tests__/poller.test.js index ecbf70e8..1ec27875 100644 --- a/keeper/__tests__/poller.test.js +++ b/keeper/__tests__/poller.test.js @@ -100,7 +100,7 @@ describe('TaskPoller', () => { const result = await poller.pollDueTasks(taskIds); - expect(result).toEqual([1, 3]); + expect(result.map((item) => item.taskId)).toEqual([1, 3]); expect(poller.stats.tasksDue).toBe(2); }); @@ -126,7 +126,7 @@ describe('TaskPoller', () => { const result = await poller.pollDueTasks(taskIds); - expect(result).toEqual([2]); + expect(result.map((item) => item.taskId)).toEqual([2]); expect(poller.stats.errors).toBe(1); expect(poller.stats.tasksDue).toBe(1); }); @@ -168,7 +168,7 @@ describe('TaskPoller', () => { const result = await poller.pollDueTasks([1, 2, 3]); - expect(result).toEqual([2, 3]); + expect(result.map((item) => item.taskId)).toEqual([2, 3]); expect(poller.stats.tasksSmoothed).toBe(1); expect(poller.stats.unacceptablyLate).toBe(1); expect(poller.stats.tasksDue).toBe(2); @@ -194,6 +194,9 @@ describe('TaskPoller', () => { last_run: 500, interval: 100, gas_balance: 0, + target: 'CAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAD2KM', + function: 'run_task', + args: [], }); const result = await poller.checkTask(1, 1000); @@ -210,6 +213,9 @@ describe('TaskPoller', () => { last_run: 500, interval: 100, gas_balance: -10, + target: 'CAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAD2KM', + function: 'run_task', + args: [], }); const result = await poller.checkTask(1, 1000); @@ -226,6 +232,9 @@ describe('TaskPoller', () => { last_run: 500, interval: 400, gas_balance: 1000, + target: 'CAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAD2KM', + function: 'run_task', + args: [], }); const result = await poller.checkTask(1, 1000); @@ -237,11 +246,37 @@ describe('TaskPoller', () => { }); }); + it('should skip a task when the configured resolver declines execution', async () => { + poller.resolverManager = { + resolve: jest.fn().mockResolvedValue({ isReady: false, reason: 'resolver_false' }), + }; + jest.spyOn(poller, 'getTaskConfig').mockResolvedValue({ + last_run: 500, + interval: 400, + gas_balance: 1000, + resolver: 'simple-resolver', + target: 'CAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAD2KM', + function: 'run_task', + args: [], + }); + + const result = await poller.checkTask(1, 1000); + + expect(result).toMatchObject({ + isDue: false, + taskId: 1, + reason: 'resolver_false', + }); + }); + it('should return not due when last_run + interval > currentTimestamp', async () => { jest.spyOn(poller, 'getTaskConfig').mockResolvedValue({ last_run: 800, interval: 300, gas_balance: 1000, + target: 'CAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAD2KM', + function: 'run_task', + args: [], }); const result = await poller.checkTask(1, 1000); @@ -258,6 +293,9 @@ describe('TaskPoller', () => { last_run: 500, interval: 500, gas_balance: 1000, + target: 'CAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAD2KM', + function: 'run_task', + args: [], }); const result = await poller.checkTask(1, 1000); @@ -280,6 +318,9 @@ describe('TaskPoller', () => { last_run: 500, interval: 500, // nextRunTime = 1000 gas_balance: 1000, + target: 'CAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAD2KM', + function: 'run_task', + args: [], }); // For taskId=1, jitter = (1 * 2654435761) % 11 = 10 @@ -313,6 +354,9 @@ describe('TaskPoller', () => { last_run: 500, interval: 500, // nextRunTime = 1000 gas_balance: 1000, + target: 'CAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAD2KM', + function: 'run_task', + args: [], }); // For taskId=1, jitter = 10. effectiveNextRunTime = 1010 @@ -470,7 +514,7 @@ describe('TaskPoller with FilterChain', () => { .mockResolvedValueOnce({ isDue: true, taskId: 2 }); const due = await p.pollDueTasks([1, 2, 3]); - expect(due).toEqual([1, 2]); - expect(due).not.toContain(3); + expect(due.map((item) => item.taskId)).toEqual([1, 2]); + expect(due.map((item) => item.taskId)).not.toContain(3); }); }); diff --git a/keeper/__tests__/resolverManager.test.js b/keeper/__tests__/resolverManager.test.js new file mode 100644 index 00000000..199eaae6 --- /dev/null +++ b/keeper/__tests__/resolverManager.test.js @@ -0,0 +1,59 @@ +const path = require('path'); +const { ResolverManager } = require('../src/resolverManager'); + +describe('ResolverManager', () => { + const pluginPath = path.resolve(__dirname, 'fixtures', 'simple-resolver.js'); + const config = { + resolvers: { + simple: { + path: pluginPath, + options: { reject: false }, + }, + rejecter: { + path: pluginPath, + options: { reject: true }, + }, + }, + }; + + it('loads resolver plugins from config', async () => { + const manager = new ResolverManager({ timeoutMs: 1000, maxConcurrent: 2 }); + await manager.loadPluginsFromConfig(config, process.cwd()); + + expect(manager.getResolver('simple')).not.toBeNull(); + expect(manager.getResolver('rejecter')).not.toBeNull(); + }); + + it('resolves a positive plugin result', async () => { + const manager = new ResolverManager({ timeoutMs: 1000, maxConcurrent: 1 }); + await manager.loadPluginsFromConfig(config, process.cwd()); + + const result = await manager.resolve(1, 'simple', { target: 'T' }); + expect(result).toEqual({ isReady: true }); + }); + + it('propagates resolver reject reasons', async () => { + const manager = new ResolverManager({ timeoutMs: 1000, maxConcurrent: 1 }); + await manager.loadPluginsFromConfig(config, process.cwd()); + + const result = await manager.resolve(2, 'rejecter', { target: 'T' }); + expect(result).toEqual({ isReady: false, reason: 'explicit_reject' }); + }); + + it('returns false when a resolver times out', async () => { + const timeoutConfig = { + resolvers: { + slow: { + path: pluginPath, + options: { timeout: 200 }, + }, + }, + }; + + const manager = new ResolverManager({ timeoutMs: 50, maxConcurrent: 1 }); + await manager.loadPluginsFromConfig(timeoutConfig, process.cwd()); + + const result = await manager.resolve(3, 'slow', { target: 'T' }); + expect(result).toEqual({ isReady: false, reason: 'resolver_timeout' }); + }); +}); diff --git a/keeper/index.js b/keeper/index.js index 1b630508..44b2acd4 100644 --- a/keeper/index.js +++ b/keeper/index.js @@ -12,6 +12,7 @@ const { dryRunTask } = require("./src/dryRun"); const { executeTaskWithRetry } = require("./src/executor"); const { ExecutionIdempotencyGuard } = require("./src/idempotency"); const { MetricsServer } = require("./src/metrics"); +const { ResolverManager } = require("./src/resolverManager"); const HistoryManager = require("./src/history"); const { normalizeShardConfig, filterTasksForShard } = require("./src/sharding"); const { StartupValidator } = require("./src/validator"); @@ -97,6 +98,19 @@ async function main() { }); metricsServer.start(); + const resolverManager = new ResolverManager({ + logger: createLogger('resolver'), + timeoutMs: config.resolverTimeoutMs, + maxConcurrent: config.resolverMaxConcurrent, + metricsServer, + }); + + await resolverManager.loadPlugins(config.resolverPluginConfigPath, { + server, + config, + logger: createLogger('resolver'), + }); + // Perform startup validation to fail fast on configuration errors const validator = new StartupValidator( server, @@ -132,6 +146,7 @@ async function main() { simulationCacheMaxSize: process.env.SIMULATION_CACHE_MAX_SIZE, metricsServer, historyManager, + resolverManager, shardLabel: shardConfig.shardLabel, driftWarningSeconds: config.driftWarningSeconds, driftCriticalSeconds: config.driftCriticalSeconds, @@ -293,6 +308,13 @@ async function main() { // Server doesn't have explicit close, but we log it }); + shutdownManager.registerResource('resolver-manager', async () => { + logger.info('Shutting down resolver manager'); + if (resolverManager && typeof resolverManager.destroy === 'function') { + await resolverManager.destroy(); + } + }); + // Register idempotency guard persistence shutdownManager.registerResource("idempotency-guard", async () => { logger.info("Finalizing idempotency state"); @@ -391,18 +413,15 @@ async function main() { }); // Track tasks before enqueueing - dueTaskIds.forEach((taskId) => - shutdownManager.trackTask(taskId) + dueTaskIds.forEach((d) => + shutdownManager.trackTask(d.taskId) ); - await queue.enqueue(dueTaskIds, executeTask); - - // Transform the dueTask results to pass correlation IDs to the queue - const tasksToEnqueue = dueTaskIds.map(d => ({ + const tasksToEnqueue = dueTaskIds.map((d) => ({ taskId: d.taskId, - context: { pollCorrelationId: d.correlationId } + context: { pollCorrelationId: d.correlationId }, })); - + await queue.enqueue(tasksToEnqueue, executeTask); } else { logger.info("No tasks due for execution"); diff --git a/keeper/package-lock.json b/keeper/package-lock.json index af1b59f0..27d6914a 100644 --- a/keeper/package-lock.json +++ b/keeper/package-lock.json @@ -17,7 +17,8 @@ "pino": "^10.3.1", "prom-client": "^15.1.3", "socket.io": "^4.8.3", - "socket.io-client": "^4.8.3" + "socket.io-client": "^4.8.3", + "which-typed-array": "^1.1.20" }, "devDependencies": { "@babel/core": "^7.29.0", @@ -2741,6 +2742,12 @@ "@sinonjs/commons": "^3.0.1" } }, + "node_modules/@socket.io/component-emitter": { + "version": "3.1.2", + "resolved": "https://registry.npmjs.org/@socket.io/component-emitter/-/component-emitter-3.1.2.tgz", + "integrity": "sha512-9BCxFwvbGg/RsZK9tjXd8s4UcwR0MWeFQ1XEKIQVVvAGJyINdrqKMcTRyLoK8Rse1GjzLV9cwjWV1olXRWEXVA==", + "license": "MIT" + }, "node_modules/@stellar/js-xdr": { "version": "4.0.0", "resolved": "https://registry.npmjs.org/@stellar/js-xdr/-/js-xdr-4.0.0.tgz", @@ -2931,6 +2938,15 @@ "dev": true, "license": "MIT" }, + "node_modules/@types/ws": { + "version": "8.18.1", + "resolved": "https://registry.npmjs.org/@types/ws/-/ws-8.18.1.tgz", + "integrity": "sha512-ThVF6DCVhA8kUGy+aazFQ4kXQ7E1Ty7A3ypFOe0IcJV8O/M511G99AW24irKrW56Wt44yG9+ij8FaqoBGkuBXg==", + "license": "MIT", + "dependencies": { + "@types/node": "*" + } + }, "node_modules/@types/yargs": { "version": "17.0.35", "resolved": "https://registry.npmjs.org/@types/yargs/-/yargs-17.0.35.tgz", @@ -3587,6 +3603,15 @@ ], "license": "MIT" }, + "node_modules/base64id": { + "version": "2.0.0", + "resolved": "https://registry.npmjs.org/base64id/-/base64id-2.0.0.tgz", + "integrity": "sha512-lGe34o6EHj9y3Kts9R4ZYs/Gr+6N7MCaMlIFA3F1R2O5/m7K06AxfSeO5530PEERE6/WyEg3lsuyw4GHlPZHog==", + "license": "MIT", + "engines": { + "node": "^4.5.0 || >= 5.9" + } + }, "node_modules/baseline-browser-mapping": { "version": "2.10.0", "resolved": "https://registry.npmjs.org/baseline-browser-mapping/-/baseline-browser-mapping-2.10.0.tgz", @@ -3909,6 +3934,15 @@ "node": ">=12" } }, + "node_modules/cluster-key-slot": { + "version": "1.1.2", + "resolved": "https://registry.npmjs.org/cluster-key-slot/-/cluster-key-slot-1.1.2.tgz", + "integrity": "sha512-RMr0FhtfXemyinomL4hrWcYJxmX6deFdCxpJzhDttxgO1+bcCnkk+9drydLVDmAMG7NE6aN/fl4F7ucU/90gAA==", + "license": "Apache-2.0", + "engines": { + "node": ">=0.10.0" + } + }, "node_modules/co": { "version": "4.6.0", "resolved": "https://registry.npmjs.org/co/-/co-4.6.0.tgz", @@ -3999,6 +4033,15 @@ "dev": true, "license": "MIT" }, + "node_modules/cookie": { + "version": "0.7.2", + "resolved": "https://registry.npmjs.org/cookie/-/cookie-0.7.2.tgz", + "integrity": "sha512-yki5XnKuf750l50uGTllt6kKILY4nQ1eNIQatoXEByZ5dWgnKqbnqmTrBE5B4N7lrMJKQ2ytWMiTO2o0v6Ew/w==", + "license": "MIT", + "engines": { + "node": ">= 0.6" + } + }, "node_modules/core-js-compat": { "version": "3.48.0", "resolved": "https://registry.npmjs.org/core-js-compat/-/core-js-compat-3.48.0.tgz", @@ -4068,7 +4111,6 @@ "version": "4.4.3", "resolved": "https://registry.npmjs.org/debug/-/debug-4.4.3.tgz", "integrity": "sha512-RGwwWnwQvkVfavKVt22FGLw+xYSdzARwm0ru6DhTVA3umU5hZc28V3kO4stgYryrTlLpuvgI9GiijltAjNbcqA==", - "dev": true, "license": "MIT", "dependencies": { "ms": "^2.1.3" @@ -4140,6 +4182,15 @@ "node": ">=0.4.0" } }, + "node_modules/denque": { + "version": "2.1.0", + "resolved": "https://registry.npmjs.org/denque/-/denque-2.1.0.tgz", + "integrity": "sha512-HVQE3AAb/pxF8fQAoiqpvg9i3evqug3hoiwakOyZAwJm+6vZehbkYXZ0l4JxS+I3QxM97v5aaRNhj8v5oBhekw==", + "license": "Apache-2.0", + "engines": { + "node": ">=0.10" + } + }, "node_modules/detect-newline": { "version": "3.1.0", "resolved": "https://registry.npmjs.org/detect-newline/-/detect-newline-3.1.0.tgz", @@ -4765,6 +4816,35 @@ "is-retry-allowed": "^3.0.0" } }, + "node_modules/fengari": { + "version": "0.1.5", + "resolved": "https://registry.npmjs.org/fengari/-/fengari-0.1.5.tgz", + "integrity": "sha512-0DS4Nn4rV8qyFlQCpKK8brT61EUtswynrpfFTcgLErcilBIBskSMQ86fO2WVuybr14ywyKdRjv91FiRZwnEuvQ==", + "dev": true, + "license": "MIT", + "dependencies": { + "readline-sync": "^1.4.10", + "sprintf-js": "^1.1.3", + "tmp": "^0.2.5" + } + }, + "node_modules/fengari-interop": { + "version": "0.1.4", + "resolved": "https://registry.npmjs.org/fengari-interop/-/fengari-interop-0.1.4.tgz", + "integrity": "sha512-4/CW/3PJUo3ebD4ACgE1g/3NGEYSq7OQAyETyypsAl/WeySDBbxExikkayNkZzbpgyC9GyJp8v1DU2VOXxNq7Q==", + "dev": true, + "license": "MIT", + "peerDependencies": { + "fengari": "^0.1.0" + } + }, + "node_modules/fengari/node_modules/sprintf-js": { + "version": "1.1.3", + "resolved": "https://registry.npmjs.org/sprintf-js/-/sprintf-js-1.1.3.tgz", + "integrity": "sha512-Oo+0REFV59/rz3gfJNKQiBlwfHaSESl1pcGyABQsnnIfWOFt6JNj5gCog2U6MLZ//IGYD+nA8nI+mTShREReaA==", + "dev": true, + "license": "BSD-3-Clause" + }, "node_modules/fetch-blob": { "version": "3.2.0", "resolved": "https://registry.npmjs.org/fetch-blob/-/fetch-blob-3.2.0.tgz", @@ -5369,7 +5449,6 @@ "version": "2.1.1", "resolved": "https://registry.npmjs.org/is-extglob/-/is-extglob-2.1.1.tgz", "integrity": "sha512-SbKbANkN603Vi4jEZv49LeVJMn4yGwsbzZworEoyEiutsN3nJYdbO36zfhGJ6QEDpOZIFkDtnq5JRxmvl3jsoQ==", - "dev": true, "license": "MIT", "engines": { "node": ">=0.10.0" @@ -6539,6 +6618,12 @@ "dev": true, "license": "MIT" }, + "node_modules/lodash.defaults": { + "version": "4.2.0", + "resolved": "https://registry.npmjs.org/lodash.defaults/-/lodash.defaults-4.2.0.tgz", + "integrity": "sha512-qjxPLHd3r5DnsdGacqOMU6pb/avJzdh9tFX2ymgoZE27BmjXrNy/y4LoaiTeAb+O3gL8AfpJGtqfX/ae2leYYQ==", + "license": "MIT" + }, "node_modules/lodash.get": { "version": "4.4.2", "resolved": "https://registry.npmjs.org/lodash.get/-/lodash.get-4.4.2.tgz", @@ -6720,7 +6805,6 @@ "version": "2.1.3", "resolved": "https://registry.npmjs.org/ms/-/ms-2.1.3.tgz", "integrity": "sha512-6FlzubTLZG3J2a/NVCAleEhjzq5oxgHyaCU9yYXvcLsvoVaHJq/s5xXI6/XXP6tz7R9xAOtHnSO/tXtF3WRTlA==", - "dev": true, "license": "MIT" }, "node_modules/napi-postinstall": { @@ -6746,6 +6830,15 @@ "dev": true, "license": "MIT" }, + "node_modules/negotiator": { + "version": "0.6.3", + "resolved": "https://registry.npmjs.org/negotiator/-/negotiator-0.6.3.tgz", + "integrity": "sha512-+EUsqGPLsM+j/zdChZjsnX51g4XrHFOIXwfnCVPGlQk/k5giakcKsuxCObBRu6DSm9opw/O6slWbJdghQM4bBg==", + "license": "MIT", + "engines": { + "node": ">= 0.6" + } + }, "node_modules/node-domexception": { "version": "1.0.0", "resolved": "https://registry.npmjs.org/node-domexception/-/node-domexception-1.0.0.tgz", @@ -6821,6 +6914,15 @@ "node": ">=8" } }, + "node_modules/object-assign": { + "version": "4.1.1", + "resolved": "https://registry.npmjs.org/object-assign/-/object-assign-4.1.1.tgz", + "integrity": "sha512-rJgTQnkUnH1sFw8yT6VSU3zD3sWmu6sZhIseY8VX+GRu3P6F7Fu+JNDoXfklElbLJSnc3FUQHVe4cU5hj+BcUg==", + "license": "MIT", + "engines": { + "node": ">=0.10.0" + } + }, "node_modules/on-exit-leak-free": { "version": "2.1.2", "resolved": "https://registry.npmjs.org/on-exit-leak-free/-/on-exit-leak-free-2.1.2.tgz", @@ -7936,6 +8038,16 @@ "node": ">=20" } }, + "node_modules/tmp": { + "version": "0.2.5", + "resolved": "https://registry.npmjs.org/tmp/-/tmp-0.2.5.tgz", + "integrity": "sha512-voyz6MApa1rQGUxT3E+BK7/ROe8itEx7vD8/HEvt4xwXucvQ5G5oeEiHkmHZJuBO21RpOf+YYm9MOivj709jow==", + "dev": true, + "license": "MIT", + "engines": { + "node": ">=14.14" + } + }, "node_modules/tmpl": { "version": "1.0.5", "resolved": "https://registry.npmjs.org/tmpl/-/tmpl-1.0.5.tgz", @@ -8012,7 +8124,6 @@ "version": "7.18.2", "resolved": "https://registry.npmjs.org/undici-types/-/undici-types-7.18.2.tgz", "integrity": "sha512-AsuCzffGHJybSaRrmr5eHr81mwJU3kjw6M+uprWvCXiNeN9SOGwQ3Jn8jb8m3Z6izVgknn1R0FTCEAP2QrLY/w==", - "dev": true, "license": "MIT" }, "node_modules/unicode-canonical-property-names-ecmascript": { @@ -8166,6 +8277,15 @@ "node": ">=10.12.0" } }, + "node_modules/vary": { + "version": "1.1.2", + "resolved": "https://registry.npmjs.org/vary/-/vary-1.1.2.tgz", + "integrity": "sha512-BNGbWLfd0eUPabhkXUVm0j8uuvREyTh5ovRa/dyow/BqAbZJyC+5fU+IzQOzmAKzYqYRAISoRhdQr3eIZ/PXqg==", + "license": "MIT", + "engines": { + "node": ">= 0.8" + } + }, "node_modules/walker": { "version": "1.0.8", "resolved": "https://registry.npmjs.org/walker/-/walker-1.0.8.tgz", diff --git a/keeper/package.json b/keeper/package.json index 6edf4f6d..de53b743 100644 --- a/keeper/package.json +++ b/keeper/package.json @@ -44,7 +44,8 @@ "pino": "^10.3.1", "prom-client": "^15.1.3", "socket.io": "^4.8.3", - "socket.io-client": "^4.8.3" + "socket.io-client": "^4.8.3", + "which-typed-array": "^1.1.20" }, "devDependencies": { "@babel/core": "^7.29.0", diff --git a/keeper/src/config.js b/keeper/src/config.js index 69295b68..99d8c1dc 100644 --- a/keeper/src/config.js +++ b/keeper/src/config.js @@ -57,6 +57,9 @@ function loadConfig() { nodeEnv: process.env.NODE_ENV || 'production', metricsPort: parseInteger(process.env.METRICS_PORT, 3000), healthStaleThresholdMs: parseInteger(process.env.HEALTH_STALE_THRESHOLD_MS, 60000), + resolverPluginConfigPath: process.env.RESOLVER_PLUGIN_CONFIG_PATH || './plugins.json', + resolverTimeoutMs: parseInteger(process.env.RESOLVER_TIMEOUT_MS, 5000), + resolverMaxConcurrent: parseInteger(process.env.RESOLVER_MAX_CONCURRENT, 5), adminApiToken: process.env.KEEPER_ADMIN_TOKEN || null, shardIndex: parseInteger(process.env.KEEPER_SHARD_INDEX, 0), shardCount: parseInteger(process.env.KEEPER_SHARD_COUNT, 1), diff --git a/keeper/src/metrics.js b/keeper/src/metrics.js index f24e9836..33447581 100644 --- a/keeper/src/metrics.js +++ b/keeper/src/metrics.js @@ -40,6 +40,9 @@ class Metrics { throttledRequestsTotal: 0, retriesExecutedTotal: 0, retriesFailedTotal: 0, + resolverChecksTotal: 0, + resolverTimeoutsTotal: 0, + resolverFailuresTotal: 0, adminStateChangesTotal: 0, }; this.gauges = { @@ -47,6 +50,7 @@ class Metrics { lastCycleDurationMs: 0, lastRetryCycleDurationMs: 0, rpcCircuitState: 0, + lastResolverDurationMs: 0, }; this.feeSamples = []; } @@ -228,6 +232,26 @@ class MetricsServer { help: 'Total number of keeper admin state changes', registers: [this.register], }); + this.promResolverChecks = new promClient.Counter({ + name: 'keeper_resolver_checks_total', + help: 'Total number of resolver checks executed by the keeper', + registers: [this.register], + }); + this.promResolverTimeouts = new promClient.Counter({ + name: 'keeper_resolver_timeouts_total', + help: 'Total number of resolver checks that timed out', + registers: [this.register], + }); + this.promResolverFailures = new promClient.Counter({ + name: 'keeper_resolver_failures_total', + help: 'Total number of resolver checks that failed', + registers: [this.register], + }); + this.promResolverDuration = new promClient.Gauge({ + name: 'keeper_last_resolver_duration_ms', + help: 'Duration of the last resolver check in milliseconds', + registers: [this.register], + }); this.promAvgFee = new promClient.Gauge({ name: 'keeper_avg_fee_paid_xlm', help: 'Average transaction fee paid in XLM (rolling average)', @@ -368,6 +392,10 @@ class MetricsServer { this.promRpcConnected.set(this.metrics.rpcConnected ? 1 : 0); this.promRpcCircuitState.set(this.metrics.gauges.rpcCircuitState); this.promAdminPaused.set(this.metrics.adminState.paused ? 1 : 0); + this.promResolverChecks.inc(0); + this.promResolverTimeouts.inc(0); + this.promResolverFailures.inc(0); + this.promResolverDuration.set(this.metrics.gauges.lastResolverDurationMs); this.promShardOwnedTasks.set( { shard_label: String(this.metrics.shardState.shardLabel), diff --git a/keeper/src/poller.js b/keeper/src/poller.js index fb0d8aaf..911f9109 100644 --- a/keeper/src/poller.js +++ b/keeper/src/poller.js @@ -25,6 +25,7 @@ class TaskPoller { : null; this.metricsServer = options.metricsServer; this.historyManager = options.historyManager || null; + this.resolverManager = options.resolverManager || null; this.shardLabel = options.shardLabel || null; this.driftWarningSeconds = parseInt( options.driftWarningSeconds || process.env.DRIFT_WARNING_SECONDS || 60, @@ -110,7 +111,9 @@ class TaskPoller { */ async pollDueTasks(taskIds, options = {}) { const cycleId = crypto.randomBytes(4).toString('hex'); - const cycleLogger = this.logger.childWithTrace(`cycle-${cycleId}`); +const cycleLogger = typeof this.logger.childWithTrace === 'function' + ? this.logger.childWithTrace(`cycle-${cycleId}`) + : this.logger; const startTime = Date.now(); this.stats.lastPollTime = new Date().toISOString(); @@ -300,7 +303,7 @@ class TaskPoller { */ async checkTask(taskId, currentTimestamp, registry, options = {}) { const correlationId = options.correlationId; - const taskLogger = correlationId +const taskLogger = correlationId && typeof this.logger.childWithTrace === 'function' ? this.logger.childWithTrace(correlationId) : this.logger; @@ -391,7 +394,37 @@ class TaskPoller { driftSeconds, driftSeverity, }); - } else if (isStrictlyDue) { + } + + if (isDue && taskConfig.resolver && this.resolverManager) { + const resolverResult = await this.resolverManager.resolve(taskId, String(taskConfig.resolver), taskConfig); + if (resolverResult && resolverResult.isReady === false) { + taskLogger.info('Task skipped by resolver check', { + taskId, + resolver: taskConfig.resolver, + reason: resolverResult.reason || 'resolver_false', + }); + + return { + isDue: false, + taskId, + reason: resolverResult.reason || 'resolver_false', + correlationId, + secondsUntilDue: 0, + driftSeconds, + driftSeverity, + }; + } + } + + if (isDue && taskConfig.resolver && !this.resolverManager) { + taskLogger.debug('Resolver configured but no resolver manager is available. Execution will rely on on-chain gating.', { + taskId, + resolver: taskConfig.resolver, + }); + } + + if (!isDue && isStrictlyDue) { reason = 'jitter_smoothed'; this.logger.debug('Task execution smoothed by jitter', { taskId, diff --git a/keeper/src/queue.js b/keeper/src/queue.js index 5ac5d2c0..27973413 100644 --- a/keeper/src/queue.js +++ b/keeper/src/queue.js @@ -314,6 +314,10 @@ class ExecutionQueue extends EventEmitter { return summary; } + async drain(options = {}) { + return this.gracefulShutdown(options); + } + getInFlightStatus() { return { inFlight: this.inFlight, diff --git a/keeper/src/resolverManager.js b/keeper/src/resolverManager.js new file mode 100644 index 00000000..428e73a2 --- /dev/null +++ b/keeper/src/resolverManager.js @@ -0,0 +1,173 @@ +const fs = require('fs'); +const path = require('path'); +const { createLogger } = require('./logger'); + +const DEFAULT_TIMEOUT_MS = parseInt(process.env.RESOLVER_TIMEOUT_MS || '5000', 10); +const DEFAULT_MAX_CONCURRENT = parseInt(process.env.RESOLVER_MAX_CONCURRENT || '5', 10); + +class ResolverManager { + constructor(options = {}) { + this.logger = options.logger || createLogger('resolver'); + this.timeoutMs = Number.isFinite(options.timeoutMs) ? options.timeoutMs : DEFAULT_TIMEOUT_MS; + this.maxConcurrent = Number.isFinite(options.maxConcurrent) + ? options.maxConcurrent + : DEFAULT_MAX_CONCURRENT; + this.metricsServer = options.metricsServer || null; + this.resolvers = new Map(); + this.configPath = null; + } + + async loadPlugins(pluginConfigPath, context = {}) { + if (!pluginConfigPath) { + this.logger.info('No resolver plugin config path provided'); + return; + } + + const resolvedPath = path.isAbsolute(pluginConfigPath) + ? pluginConfigPath + : path.resolve(process.cwd(), pluginConfigPath); + + if (!fs.existsSync(resolvedPath)) { + this.logger.info('Resolver plugin config file not found', { path: resolvedPath }); + return; + } + + let config; + try { + config = JSON.parse(fs.readFileSync(resolvedPath, 'utf8')); + } catch (error) { + throw new Error(`Failed to parse resolver plugin config: ${error.message}`); + } + + this.configPath = resolvedPath; + await this.loadPluginsFromConfig(config, path.dirname(resolvedPath), context); + } + + async loadPluginsFromConfig(pluginConfig, baseDir = process.cwd(), context = {}) { + if (!pluginConfig || typeof pluginConfig !== 'object') { + this.logger.warn('Resolver plugin configuration is empty or malformed'); + return; + } + + const resolvers = pluginConfig.resolvers; + if (!resolvers || typeof resolvers !== 'object') { + this.logger.warn('Resolver plugin configuration missing resolvers section'); + return; + } + + for (const [name, entry] of Object.entries(resolvers)) { + if (!entry || typeof entry.path !== 'string' || entry.path.length === 0) { + this.logger.warn('Skipping resolver with missing path', { name }); + continue; + } + + try { + const modulePath = path.isAbsolute(entry.path) + ? entry.path + : path.resolve(baseDir, entry.path); + + const loaded = require(modulePath); + const PluginImpl = loaded && loaded.default ? loaded.default : loaded; + + const instance = typeof PluginImpl === 'function' + ? new PluginImpl() + : PluginImpl; + + if (!instance || typeof instance.resolve !== 'function') { + throw new Error('Resolver plugin must export an object or class with a resolve(taskId, taskConfig) method'); + } + + if (typeof instance.init === 'function') { + await instance.init(entry.options || {}, { logger: this.logger, ...context }); + } + + this.resolvers.set(name, instance); + this.logger.info('Loaded resolver plugin', { name, path: modulePath }); + } catch (error) { + this.logger.error('Failed to load resolver plugin', { + name, + path: entry.path, + error: error.message, + }); + } + } + } + + getResolver(name) { + return this.resolvers.get(name) || null; + } + + async resolve(taskId, resolverName, taskConfig) { + const resolver = this.getResolver(resolverName); + if (!resolver) { + this.logger.debug('No resolver plugin registered for task', { taskId, resolverName }); + return null; + } + + const runResolver = async () => { + const start = Date.now(); + if (this.metricsServer) { + this.metricsServer.increment('resolverChecksTotal', 1); + } + + let result = await Promise.resolve(resolver.resolve(taskId, taskConfig)); + const durationMs = Date.now() - start; + + if (this.metricsServer) { + this.metricsServer.record('lastResolverDurationMs', durationMs); + } + + if (typeof result === 'boolean') { + result = { isReady: result }; + } + + if (result == null || typeof result !== 'object') { + result = { isReady: Boolean(result) }; + } + + if (typeof result.isReady !== 'boolean') { + result.isReady = Boolean(result.isReady); + } + + return result; + }; + + const task = runResolver(); + const timeoutPromise = new Promise((_, reject) => { + setTimeout(() => reject(Object.assign(new Error('Resolver timed out'), { code: 'RESOLVER_TIMEOUT' })), this.timeoutMs); + }); + + try { + return await Promise.race([task, timeoutPromise]); + } catch (error) { + if (error && error.code === 'RESOLVER_TIMEOUT') { + this.logger.error('Resolver timeout', { taskId, resolverName, timeoutMs: this.timeoutMs }); + if (this.metricsServer) { + this.metricsServer.increment('resolverTimeoutsTotal', 1); + } + return { isReady: false, reason: 'resolver_timeout' }; + } + + this.logger.error('Resolver execution failed', { taskId, resolverName, error: error.message }); + if (this.metricsServer) { + this.metricsServer.increment('resolverFailuresTotal', 1); + } + return { isReady: false, reason: 'resolver_error' }; + } + } + + async destroy() { + for (const [name, resolver] of this.resolvers.entries()) { + if (resolver && typeof resolver.destroy === 'function') { + try { + await resolver.destroy(); + } catch (error) { + this.logger.debug('Resolver destroy failed', { name, error: error.message }); + } + } + } + this.resolvers.clear(); + } +} + +module.exports = { ResolverManager };