Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 19 additions & 0 deletions keeper/__tests__/fixtures/simple-resolver.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
class SimpleResolver {
async init(options) {
this.options = options || {};
}

async resolve(taskId, taskConfig) {
if (this.options.reject) {
return { isReady: false, reason: 'explicit_reject' };
}

if (this.options.timeout) {
await new Promise((resolve) => setTimeout(resolve, this.options.timeout));
}

return { isReady: true };
}
}

module.exports = SimpleResolver;
18 changes: 15 additions & 3 deletions keeper/__tests__/integration.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,18 @@ describe('Keeper Integration Tests', () => {
}));
}

const targetValue = taskConfig.target || 'CAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAD2KM';
mapEntries.push(new xdr.ScMapEntry({
key: xdr.ScVal.scvSymbol('target'),
val: xdr.ScVal.scvString(targetValue),
}));

const functionValue = taskConfig.function || 'run_task';
mapEntries.push(new xdr.ScMapEntry({
key: xdr.ScVal.scvSymbol('function'),
val: xdr.ScVal.scvString(functionValue),
}));

return xdr.ScVal.scvVec([xdr.ScVal.scvMap(mapEntries)]);
}

Expand Down Expand Up @@ -121,7 +133,7 @@ describe('Keeper Integration Tests', () => {
// Poll for due tasks
const dueTaskIds = await poller.pollDueTasks(registry.getTaskIds());

expect(dueTaskIds).toEqual([1]);
expect(dueTaskIds.map((item) => item.taskId)).toEqual([1]);
expect(poller.stats.tasksDue).toBe(1);
expect(poller.stats.tasksChecked).toBe(2);

Expand Down Expand Up @@ -200,7 +212,7 @@ describe('Keeper Integration Tests', () => {
});

dueTaskIds = await poller.pollDueTasks(registry.getTaskIds());
expect(dueTaskIds).toEqual([1]);
expect(dueTaskIds.map((item) => item.taskId)).toEqual([1]);
});
});

Expand Down Expand Up @@ -234,7 +246,7 @@ describe('Keeper Integration Tests', () => {

const dueTaskIds = await poller.pollDueTasks(registry.getTaskIds());

expect(dueTaskIds).toEqual([2]);
expect(dueTaskIds.map((item) => item.taskId)).toEqual([2]);
expect(poller.stats.errors).toBe(1);
expect(poller.stats.tasksDue).toBe(1);
});
Expand Down
54 changes: 49 additions & 5 deletions keeper/__tests__/poller.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ describe('TaskPoller', () => {

const result = await poller.pollDueTasks(taskIds);

expect(result).toEqual([1, 3]);
expect(result.map((item) => item.taskId)).toEqual([1, 3]);
expect(poller.stats.tasksDue).toBe(2);
});

Expand All @@ -126,7 +126,7 @@ describe('TaskPoller', () => {

const result = await poller.pollDueTasks(taskIds);

expect(result).toEqual([2]);
expect(result.map((item) => item.taskId)).toEqual([2]);
expect(poller.stats.errors).toBe(1);
expect(poller.stats.tasksDue).toBe(1);
});
Expand Down Expand Up @@ -168,7 +168,7 @@ describe('TaskPoller', () => {

const result = await poller.pollDueTasks([1, 2, 3]);

expect(result).toEqual([2, 3]);
expect(result.map((item) => item.taskId)).toEqual([2, 3]);
expect(poller.stats.tasksSmoothed).toBe(1);
expect(poller.stats.unacceptablyLate).toBe(1);
expect(poller.stats.tasksDue).toBe(2);
Expand All @@ -194,6 +194,9 @@ describe('TaskPoller', () => {
last_run: 500,
interval: 100,
gas_balance: 0,
target: 'CAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAD2KM',
function: 'run_task',
args: [],
});

const result = await poller.checkTask(1, 1000);
Expand All @@ -210,6 +213,9 @@ describe('TaskPoller', () => {
last_run: 500,
interval: 100,
gas_balance: -10,
target: 'CAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAD2KM',
function: 'run_task',
args: [],
});

const result = await poller.checkTask(1, 1000);
Expand All @@ -226,6 +232,9 @@ describe('TaskPoller', () => {
last_run: 500,
interval: 400,
gas_balance: 1000,
target: 'CAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAD2KM',
function: 'run_task',
args: [],
});

const result = await poller.checkTask(1, 1000);
Expand All @@ -237,11 +246,37 @@ describe('TaskPoller', () => {
});
});

it('should skip a task when the configured resolver declines execution', async () => {
poller.resolverManager = {
resolve: jest.fn().mockResolvedValue({ isReady: false, reason: 'resolver_false' }),
};
jest.spyOn(poller, 'getTaskConfig').mockResolvedValue({
last_run: 500,
interval: 400,
gas_balance: 1000,
resolver: 'simple-resolver',
target: 'CAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAD2KM',
function: 'run_task',
args: [],
});

const result = await poller.checkTask(1, 1000);

expect(result).toMatchObject({
isDue: false,
taskId: 1,
reason: 'resolver_false',
});
});

it('should return not due when last_run + interval > currentTimestamp', async () => {
jest.spyOn(poller, 'getTaskConfig').mockResolvedValue({
last_run: 800,
interval: 300,
gas_balance: 1000,
target: 'CAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAD2KM',
function: 'run_task',
args: [],
});

const result = await poller.checkTask(1, 1000);
Expand All @@ -258,6 +293,9 @@ describe('TaskPoller', () => {
last_run: 500,
interval: 500,
gas_balance: 1000,
target: 'CAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAD2KM',
function: 'run_task',
args: [],
});

const result = await poller.checkTask(1, 1000);
Expand All @@ -280,6 +318,9 @@ describe('TaskPoller', () => {
last_run: 500,
interval: 500, // nextRunTime = 1000
gas_balance: 1000,
target: 'CAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAD2KM',
function: 'run_task',
args: [],
});

// For taskId=1, jitter = (1 * 2654435761) % 11 = 10
Expand Down Expand Up @@ -313,6 +354,9 @@ describe('TaskPoller', () => {
last_run: 500,
interval: 500, // nextRunTime = 1000
gas_balance: 1000,
target: 'CAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAD2KM',
function: 'run_task',
args: [],
});

// For taskId=1, jitter = 10. effectiveNextRunTime = 1010
Expand Down Expand Up @@ -470,7 +514,7 @@ describe('TaskPoller with FilterChain', () => {
.mockResolvedValueOnce({ isDue: true, taskId: 2 });

const due = await p.pollDueTasks([1, 2, 3]);
expect(due).toEqual([1, 2]);
expect(due).not.toContain(3);
expect(due.map((item) => item.taskId)).toEqual([1, 2]);
expect(due.map((item) => item.taskId)).not.toContain(3);
});
});
59 changes: 59 additions & 0 deletions keeper/__tests__/resolverManager.test.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
const path = require('path');
const { ResolverManager } = require('../src/resolverManager');

describe('ResolverManager', () => {
const pluginPath = path.resolve(__dirname, 'fixtures', 'simple-resolver.js');
const config = {
resolvers: {
simple: {
path: pluginPath,
options: { reject: false },
},
rejecter: {
path: pluginPath,
options: { reject: true },
},
},
};

it('loads resolver plugins from config', async () => {
const manager = new ResolverManager({ timeoutMs: 1000, maxConcurrent: 2 });
await manager.loadPluginsFromConfig(config, process.cwd());

expect(manager.getResolver('simple')).not.toBeNull();
expect(manager.getResolver('rejecter')).not.toBeNull();
});

it('resolves a positive plugin result', async () => {
const manager = new ResolverManager({ timeoutMs: 1000, maxConcurrent: 1 });
await manager.loadPluginsFromConfig(config, process.cwd());

const result = await manager.resolve(1, 'simple', { target: 'T' });
expect(result).toEqual({ isReady: true });
});

it('propagates resolver reject reasons', async () => {
const manager = new ResolverManager({ timeoutMs: 1000, maxConcurrent: 1 });
await manager.loadPluginsFromConfig(config, process.cwd());

const result = await manager.resolve(2, 'rejecter', { target: 'T' });
expect(result).toEqual({ isReady: false, reason: 'explicit_reject' });
});

it('returns false when a resolver times out', async () => {
const timeoutConfig = {
resolvers: {
slow: {
path: pluginPath,
options: { timeout: 200 },
},
},
};

const manager = new ResolverManager({ timeoutMs: 50, maxConcurrent: 1 });
await manager.loadPluginsFromConfig(timeoutConfig, process.cwd());

const result = await manager.resolve(3, 'slow', { target: 'T' });
expect(result).toEqual({ isReady: false, reason: 'resolver_timeout' });
});
});
35 changes: 27 additions & 8 deletions keeper/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ const { dryRunTask } = require("./src/dryRun");
const { executeTaskWithRetry } = require("./src/executor");
const { ExecutionIdempotencyGuard } = require("./src/idempotency");
const { MetricsServer } = require("./src/metrics");
const { ResolverManager } = require("./src/resolverManager");
const HistoryManager = require("./src/history");
const { normalizeShardConfig, filterTasksForShard } = require("./src/sharding");
const { StartupValidator } = require("./src/validator");
Expand Down Expand Up @@ -100,6 +101,19 @@ async function main() {

metricsServer.start();

const resolverManager = new ResolverManager({
logger: createLogger('resolver'),
timeoutMs: config.resolverTimeoutMs,
maxConcurrent: config.resolverMaxConcurrent,
metricsServer,
});

await resolverManager.loadPlugins(config.resolverPluginConfigPath, {
server,
config,
logger: createLogger('resolver'),
});

// Perform startup validation to fail fast on configuration errors
const validator = new StartupValidator(
server,
Expand Down Expand Up @@ -135,6 +149,7 @@ async function main() {
simulationCacheMaxSize: process.env.SIMULATION_CACHE_MAX_SIZE,
metricsServer,
historyManager,
resolverManager,
shardLabel: shardConfig.shardLabel,
driftWarningSeconds: config.driftWarningSeconds,
driftCriticalSeconds: config.driftCriticalSeconds,
Expand Down Expand Up @@ -361,6 +376,13 @@ async function main() {
// Server doesn't have explicit close, but we log it
});

shutdownManager.registerResource('resolver-manager', async () => {
logger.info('Shutting down resolver manager');
if (resolverManager && typeof resolverManager.destroy === 'function') {
await resolverManager.destroy();
}
});

// Register idempotency guard persistence
shutdownManager.registerResource("idempotency-guard", async () => {
logger.info("Finalizing idempotency state");
Expand Down Expand Up @@ -472,18 +494,15 @@ async function main() {
});

// Track tasks before enqueueing
dueTaskIds.forEach((taskId) =>
shutdownManager.trackTask(taskId)
dueTaskIds.forEach((d) =>
shutdownManager.trackTask(d.taskId)
);

await queue.enqueue(dueTaskIds, executeTask);

// Transform the dueTask results to pass correlation IDs to the queue
const tasksToEnqueue = dueTaskIds.map(d => ({
const tasksToEnqueue = dueTaskIds.map((d) => ({
taskId: d.taskId,
context: { pollCorrelationId: d.correlationId }
context: { pollCorrelationId: d.correlationId },
}));

await queue.enqueue(tasksToEnqueue, executeTask);
} else {
logger.info("No tasks due for execution");
Expand Down
Loading
Loading