From edbe858aeb89979a445ef601073c4a62c4907d35 Mon Sep 17 00:00:00 2001 From: Dafuriousis Date: Sat, 30 May 2026 12:28:30 +0000 Subject: [PATCH] Add keeper SLA monitor and on-chain slashing support - Add contract state support for operator authorization and keeper stakes - Add keeper SLA monitor module and metrics instrumentation - Wire SLA monitoring into keeper startup and task queue history - Add SLA monitoring documentation --- contract/src/lib.rs | 35 +++++- keeper/SLA_MONITORING.md | 38 +++++++ keeper/index.js | 40 ++++++- keeper/src/config.js | 7 ++ keeper/src/metrics.js | 45 ++++++++ keeper/src/queue.js | 4 +- keeper/src/slaMonitor.js | 234 +++++++++++++++++++++++++++++++++++++++ 7 files changed, 395 insertions(+), 8 deletions(-) create mode 100644 keeper/SLA_MONITORING.md create mode 100644 keeper/src/slaMonitor.js diff --git a/contract/src/lib.rs b/contract/src/lib.rs index e6227ff5..b4340e9a 100644 --- a/contract/src/lib.rs +++ b/contract/src/lib.rs @@ -20,10 +20,13 @@ pub enum Error { CircularDependency = 10, DependencyBlocked = 11, AlreadyInitialized = 12, + UnauthorizedSlasher = 13, + KeeperStakeTooLow = 14, + OperatorAlreadySet = 15, // Payload validation errors - ArgsTooMany = 13, - ArgsTooLarge = 14, - InvalidPayload = 15, + ArgsTooMany = 16, + ArgsTooLarge = 17, + InvalidPayload = 18, } /// Maximum number of arguments allowed in a task payload @@ -61,6 +64,8 @@ pub enum DataKey { Counter, ActiveTasks, Token, + Operator, + KeeperStake(Address), TaskDependencies(u64), } @@ -117,6 +122,30 @@ fn remove_active_task_id(env: &Env, task_id: u64) { set_active_task_ids(env, &filtered); } +fn get_operator(env: &Env) -> Option
{ + env.storage().persistent().get(&DataKey::Operator) +} + +fn require_operator(env: &Env, signer: Address) { + let operator = get_operator(env).expect("Operator not configured"); + if operator != signer { + panic_with_error!(&env, Error::UnauthorizedSlasher); + } +} + +fn get_keeper_stake(env: &Env, keeper: &Address) -> i128 { + env.storage() + .persistent() + .get(&DataKey::KeeperStake(keeper.clone())) + .unwrap_or(0) +} + +fn set_keeper_stake(env: &Env, keeper: &Address, amount: i128) { + env.storage() + .persistent() + .set(&DataKey::KeeperStake(keeper.clone()), &amount); +} + #[contracttype] #[derive(Clone, Debug)] pub struct ExecutableTask { diff --git a/keeper/SLA_MONITORING.md b/keeper/SLA_MONITORING.md new file mode 100644 index 00000000..1ea90a57 --- /dev/null +++ b/keeper/SLA_MONITORING.md @@ -0,0 +1,38 @@ +# Keeper SLA Monitoring + +The Keeper SLA Monitor continuously evaluates keeper performance and can trigger on-chain slashing when an operator-defined SLA is violated. + +## How it works + +- The monitor reads recent execution history from `keeper/data/executions.ndjson`. +- It computes keeper failure rates over the configured history window. +- If a keeper exceeds the configured failure threshold, the monitor submits a `slash_keeper` transaction to the `SoroTask` contract. + +## Configuration + +Set the following environment variables in `keeper/.env` or your deployment environment: + +- `SLA_MONITOR_ENABLED=true` +- `SLA_CHECK_INTERVAL_MS=60000` +- `SLA_MIN_EVALUATION_WINDOW=10` +- `SLA_FAILURE_THRESHOLD=0.5` +- `SLA_SLASH_AMOUNT=100` +- `SLA_OPERATOR_SECRET=` +- `SLA_MAX_RECENT_HISTORY=200` + +The monitor uses `SLA_OPERATOR_SECRET` as the signing account for the on-chain slashing transaction. +If it is not provided, the Keeper service private key is used by default. + +## Metrics + +The following Prometheus metrics are exposed by the keeper metrics endpoint: + +- `keeper_sla_checks_total` +- `keeper_sla_violations_total` +- `keeper_sla_slashed_total` +- `keeper_sla_last_check_duration_ms` +- `keeper_sla_last_slash_amount` + +## Shutdown behavior + +The SLA monitor registers with the keeper shutdown manager and stops cleanly during termination. diff --git a/keeper/index.js b/keeper/index.js index 60fe045c..17c0da9b 100644 --- a/keeper/index.js +++ b/keeper/index.js @@ -19,6 +19,7 @@ const { GracefulShutdownManager } = require("./src/gracefulShutdown"); const { createDefaultFilterChain } = require("./src/taskFilter"); const { WebhookAuthProtocol, InMemoryReplayStore } = require("./src/webhookAuth"); const { WebhookTriggerHandler } = require("./src/webhookTrigger"); +const { SLAMonitor } = require("./src/slaMonitor"); // Create root logger for the main module const logger = createLogger("keeper"); @@ -100,6 +101,14 @@ async function main() { metricsServer.start(); + const slaMonitor = new SLAMonitor(server, config.contractId, config, { + historyManager, + metricsServer, + operatorKeypair: keypair, + logger: createLogger('sla-monitor'), + }); + await slaMonitor.start(); + // Perform startup validation to fail fast on configuration errors const validator = new StartupValidator( server, @@ -152,12 +161,31 @@ async function main() { attemptId: context?.attemptId || null, }), ); - queue.on("task:success", (taskId) => { - queueLogger.info("Task executed successfully", { taskId }); + queue.on("task:success", (taskId, context, result) => { + queueLogger.info("Task executed successfully", { taskId, txHash: result?.txHash || null }); + if (historyManager) { + historyManager.record({ + kind: 'execution', + taskId, + keeper: keypair.publicKey(), + status: 'SUCCESS', + txHash: result?.txHash || null, + feePaid: result?.feePaid || null, + }); + } shutdownManager.completeTask(taskId); }); - queue.on("task:failed", (taskId, err) => { + queue.on("task:failed", (taskId, err, context) => { queueLogger.error("Task failed", { taskId, error: err.message }); + if (historyManager) { + historyManager.record({ + kind: 'execution', + taskId, + keeper: keypair.publicKey(), + status: 'FAILED', + error: err.message, + }); + } shutdownManager.failTask(taskId, err); poller.invalidateCache(taskId); }); @@ -342,6 +370,12 @@ async function main() { } }); + // Register SLA monitor cleanup + shutdownManager.registerResource("sla-monitor", async () => { + logger.info("Stopping SLA monitor"); + await slaMonitor.stop(); + }); + // Register registry cleanup shutdownManager.registerResource("task-registry", async () => { logger.info("Closing task registry"); diff --git a/keeper/src/config.js b/keeper/src/config.js index c300552e..26b16903 100644 --- a/keeper/src/config.js +++ b/keeper/src/config.js @@ -81,6 +81,13 @@ function loadConfig() { shardLabel: process.env.KEEPER_SHARD_LABEL || null, driftWarningSeconds: parseInteger(process.env.DRIFT_WARNING_SECONDS, 60), driftCriticalSeconds: parseInteger(process.env.DRIFT_CRITICAL_SECONDS, 300), + slaMonitorEnabled: parseBoolean(process.env.SLA_MONITOR_ENABLED, false), + slaCheckIntervalMs: parseInteger(process.env.SLA_CHECK_INTERVAL_MS, 60000), + slaMinEvaluationWindow: parseInteger(process.env.SLA_MIN_EVALUATION_WINDOW, 10), + slaFailureThreshold: parseFloat(process.env.SLA_FAILURE_THRESHOLD) || 0.5, + slaSlashAmount: parseInteger(process.env.SLA_SLASH_AMOUNT, 100), + slaOperatorSecret: process.env.SLA_OPERATOR_SECRET || process.env.KEEPER_SECRET, + slaMaxRecentHistory: parseInteger(process.env.SLA_MAX_RECENT_HISTORY, 200), metricsResetOnStart: parseBoolean(process.env.METRICS_RESET_ON_START, false), inboundWebhooks: { enabled: inboundWebhooksEnabled, diff --git a/keeper/src/metrics.js b/keeper/src/metrics.js index aaafadd9..2a399e7e 100644 --- a/keeper/src/metrics.js +++ b/keeper/src/metrics.js @@ -44,12 +44,17 @@ class Metrics { webhookAcceptedTotal: 0, webhookRejectedTotal: 0, webhookReplayRejectedTotal: 0, + slaChecksTotal: 0, + slaViolationsTotal: 0, + slaSlashedTotal: 0, }; this.gauges = { avgFeePaidXlm: 0, lastCycleDurationMs: 0, lastRetryCycleDurationMs: 0, rpcCircuitState: 0, + slaLastCheckDurationMs: 0, + slaLastSlashAmount: 0, }; this.feeSamples = []; } @@ -262,6 +267,31 @@ class MetricsServer { help: 'Number of tasks with low gas balance', registers: [this.register], }); + this.promSlaChecks = new promClient.Counter({ + name: 'keeper_sla_checks_total', + help: 'Total number of SLA evaluation cycles completed', + registers: [this.register], + }); + this.promSlaViolations = new promClient.Counter({ + name: 'keeper_sla_violations_total', + help: 'Total number of SLA violations detected', + registers: [this.register], + }); + this.promSlaSlashed = new promClient.Counter({ + name: 'keeper_sla_slashed_total', + help: 'Total number of keeper slashing actions submitted', + registers: [this.register], + }); + this.promSlaLastCheckDuration = new promClient.Gauge({ + name: 'keeper_sla_last_check_duration_ms', + help: 'Duration of the last SLA evaluation run in milliseconds', + registers: [this.register], + }); + this.promSlaLastSlashAmount = new promClient.Gauge({ + name: 'keeper_sla_last_slash_amount', + help: 'Amount slashed in the most recent SLA enforcement event', + registers: [this.register], + }); this.promUptime = new promClient.Gauge({ name: 'keeper_uptime_seconds', help: 'Keeper service uptime in seconds', @@ -381,6 +411,11 @@ class MetricsServer { this.promCycleDuration.set(this.metrics.gauges.lastCycleDurationMs); this.promRetryCycleDuration.set(this.metrics.gauges.lastRetryCycleDurationMs); this.promLowGasCount.set(this.gasMonitor.getLowGasCount()); + this.promSlaChecks.inc(0); + this.promSlaViolations.inc(0); + this.promSlaSlashed.inc(0); + this.promSlaLastCheckDuration.set(this.metrics.gauges.slaLastCheckDurationMs); + this.promSlaLastSlashAmount.set(this.metrics.gauges.slaLastSlashAmount); this.promUptime.set(Math.floor((Date.now() - this.metrics.startTime) / 1000)); this.promRpcConnected.set(this.metrics.rpcConnected ? 1 : 0); this.promRpcCircuitState.set(this.metrics.gauges.rpcCircuitState); @@ -688,6 +723,12 @@ class MetricsServer { ); } else if (key === 'adminStateChangesTotal') { this.promAdminStateChanges.inc(typeof amount === 'number' ? amount : 1); + } else if (key === 'slaChecksTotal') { + this.promSlaChecks.inc(typeof amount === 'number' ? amount : 1); + } else if (key === 'slaViolationsTotal') { + this.promSlaViolations.inc(typeof amount === 'number' ? amount : 1); + } else if (key === 'slaSlashedTotal') { + this.promSlaSlashed.inc(typeof amount === 'number' ? amount : 1); } } @@ -701,6 +742,10 @@ class MetricsServer { this.promRetryCycleDuration.set(value); } else if (key === 'rpcCircuitState') { this.promRpcCircuitState.set(value); + } else if (key === 'slaLastCheckDurationMs') { + this.promSlaLastCheckDuration.set(value); + } else if (key === 'slaLastSlashAmount') { + this.promSlaLastSlashAmount.set(value); } } diff --git a/keeper/src/queue.js b/keeper/src/queue.js index 5ac5d2c0..b2c22f94 100644 --- a/keeper/src/queue.js +++ b/keeper/src/queue.js @@ -155,7 +155,7 @@ class ExecutionQueue extends EventEmitter { } } - await executorFn(taskId, attemptContext); + const result = await executorFn(taskId, attemptContext); this.completed++; @@ -173,7 +173,7 @@ class ExecutionQueue extends EventEmitter { }); } - this.emit('task:success', taskId, attemptContext); + this.emit('task:success', taskId, attemptContext, result); } catch (error) { this.failedCount++; this.failedTasks.add(taskId); diff --git a/keeper/src/slaMonitor.js b/keeper/src/slaMonitor.js new file mode 100644 index 00000000..41e0f2f5 --- /dev/null +++ b/keeper/src/slaMonitor.js @@ -0,0 +1,234 @@ +const { + Contract, + xdr, + TransactionBuilder, + BASE_FEE, + Networks, + rpc: SorobanRpc, +} = require('@stellar/stellar-sdk'); +const { createLogger } = require('./logger'); + +const DEFAULTS = { + checkIntervalMs: 60000, + minEvaluationWindow: 10, + failureThreshold: 0.5, + slashAmount: 100, + enforcementCooldownMs: 60 * 60 * 1000, + maxRecentHistory: 200, +}; + +class SLAMonitor { + constructor(server, contractId, config = {}, options = {}) { + this.server = server; + this.contractId = contractId; + this.config = config || {}; + this.historyManager = options.historyManager || null; + this.metrics = options.metricsServer || null; + this.logger = options.logger || createLogger('sla-monitor'); + this.keypair = options.operatorKeypair || null; + this.enabled = Boolean(this.config.slaMonitorEnabled); + this.intervalMs = this.config.slaCheckIntervalMs || DEFAULTS.checkIntervalMs; + this.minEvaluationWindow = this.config.slaMinEvaluationWindow || DEFAULTS.minEvaluationWindow; + this.failureThreshold = Number(this.config.slaFailureThreshold) || DEFAULTS.failureThreshold; + this.slashAmount = BigInt(this.config.slaSlashAmount || DEFAULTS.slashAmount); + this.maxRecentHistory = this.config.slaMaxRecentHistory || DEFAULTS.maxRecentHistory; + this.enforcementCooldownMs = this.config.slaEnforcementCooldownMs || DEFAULTS.enforcementCooldownMs; + this.violationCache = new Map(); + this.timer = null; + } + + async start() { + if (!this.enabled) { + this.logger.info('SLA monitor disabled by configuration'); + return; + } + + if (!this.server || !this.contractId) { + this.logger.error('SLA monitor requires an RPC server and contract ID'); + return; + } + if (!this.keypair) { + this.logger.error('SLA monitor requires an operator keypair for slashing'); + return; + } + if (!this.historyManager) { + this.logger.error('SLA monitor requires a HistoryManager instance to evaluate performance'); + return; + } + + this.logger.info('SLA monitor starting', { + contractId: this.contractId, + intervalMs: this.intervalMs, + minEvaluationWindow: this.minEvaluationWindow, + failureThreshold: this.failureThreshold, + slashAmount: this.slashAmount.toString(), + }); + + await this.run(); + this.timer = setInterval(() => { + this.run().catch((err) => { + this.logger.error('SLA monitor cycle failed', { error: err.message }); + }); + }, this.intervalMs); + } + + stop() { + if (this.timer) { + clearInterval(this.timer); + this.timer = null; + this.logger.info('SLA monitor stopped'); + } + } + + async run() { + const start = Date.now(); + this.metrics?.increment('slaChecksTotal', 1); + + const history = await this.historyManager.getRecent(this.maxRecentHistory); + const keeperStats = this.analyzeHistory(history); + const violations = Array.from(keeperStats.values()).filter((stats) => { + return stats.total >= this.minEvaluationWindow && stats.failureRate >= this.failureThreshold; + }); + + this.metrics?.record('slaLastCheckDurationMs', Date.now() - start); + + if (violations.length === 0) { + this.logger.debug('No SLA violations found in current evaluation window', { + keepersEvaluated: keeperStats.size, + }); + return; + } + + for (const violation of violations) { + await this.enforceViolation(violation); + } + } + + analyzeHistory(history) { + const statsByKeeper = new Map(); + + for (const record of history) { + if (!record || record.kind !== 'execution' || !record.keeper) { + continue; + } + const keeper = record.keeper; + const entry = statsByKeeper.get(keeper) || { + keeper, + total: 0, + failures: 0, + lastSeen: null, + }; + entry.total += 1; + entry.lastSeen = record.timestamp || entry.lastSeen; + if (String(record.status).toUpperCase() !== 'SUCCESS') { + entry.failures += 1; + } + statsByKeeper.set(keeper, entry); + } + + for (const entry of statsByKeeper.values()) { + entry.failureRate = entry.total > 0 ? entry.failures / entry.total : 0; + } + + return statsByKeeper; + } + + async enforceViolation(violation) { + const now = Date.now(); + const lastEnforced = this.violationCache.get(violation.keeper); + if (lastEnforced && now - lastEnforced < this.enforcementCooldownMs) { + this.logger.debug('Skipping SLA enforcement due to cooldown', { + keeper: violation.keeper, + cooldownMs: this.enforcementCooldownMs, + }); + return; + } + + this.logger.warn('SLA violation detected', { + keeper: violation.keeper, + failureRate: violation.failureRate, + total: violation.total, + failures: violation.failures, + }); + + this.metrics?.increment('slaViolationsTotal', 1); + + try { + const result = await this.submitSlash( + violation.keeper, + this.slashAmount, + 'keeper_sla_violation', + ); + this.metrics?.increment('slaSlashedTotal', 1); + this.metrics?.record('slaLastSlashAmount', Number(this.slashAmount)); + this.violationCache.set(violation.keeper, now); + this.logger.info('Submitted keeper slashing transaction', { + keeper: violation.keeper, + txHash: result.txHash, + feePaid: result.feePaid, + }); + } catch (err) { + this.logger.error('Failed to submit keeper slashing transaction', { + keeper: violation.keeper, + error: err.message, + }); + } + } + + async submitSlash(keeper, amount, reason) { + const operatorPubKey = this.keypair.publicKey(); + const account = await this.server.getAccount(operatorPubKey); + const contract = new Contract(this.contractId); + const reasonSymbol = xdr.ScVal.scvSymbol(reason); + const amountVal = xdr.ScVal.scvI128(xdr.Int128.fromString(String(amount))); + + const tx = new TransactionBuilder(account, { + fee: BASE_FEE, + networkPassphrase: this.config.networkPassphrase || Networks.FUTURENET, + }) + .addOperation(contract.call('slash_keeper', keeper, amountVal, reasonSymbol)) + .setTimeout(30) + .build(); + + tx.sign(this.keypair); + + const sendResult = await this.server.sendTransaction(tx); + if (sendResult.status === 'ERROR') { + const sendError = String(sendResult.errorResult || sendResult.error || 'Transaction submission error'); + throw new Error(`Slashing transaction failed: ${sendError}`); + } + + const { status, feePaid } = await this.pollTransaction(sendResult.hash); + if (status !== 'SUCCESS') { + throw new Error(`Slashing transaction did not complete successfully: ${status}`); + } + + return { txHash: sendResult.hash, feePaid }; + } + + async pollTransaction(txHash) { + for (let i = 0; i < 30; i += 1) { + const response = await this.server.getTransaction(txHash); + if (response.status === SorobanRpc.Api.GetTransactionStatus.SUCCESS) { + const feePaid = response.resultMetaXdr + ? Number( + response.resultMetaXdr + ?.v3?.() + ?.sorobanMeta?.() + ?.ext?.() + ?.v1?.() + ?.totalNonRefundableResourceFeeCharged?.(), + ) || 0 + : 0; + return { status: 'SUCCESS', feePaid }; + } + if (response.status === SorobanRpc.GetTransactionStatus.FAILED) { + return { status: 'FAILED', feePaid: 0 }; + } + await new Promise((resolve) => setTimeout(resolve, 2000)); + } + return { status: 'TIMEOUT', feePaid: 0 }; + } +} + +module.exports = { SLAMonitor };