diff --git a/contract/src/lib.rs b/contract/src/lib.rs
index e6227ff5..b4340e9a 100644
--- a/contract/src/lib.rs
+++ b/contract/src/lib.rs
@@ -20,10 +20,13 @@ pub enum Error {
CircularDependency = 10,
DependencyBlocked = 11,
AlreadyInitialized = 12,
+ UnauthorizedSlasher = 13,
+ KeeperStakeTooLow = 14,
+ OperatorAlreadySet = 15,
// Payload validation errors
- ArgsTooMany = 13,
- ArgsTooLarge = 14,
- InvalidPayload = 15,
+ ArgsTooMany = 16,
+ ArgsTooLarge = 17,
+ InvalidPayload = 18,
}
/// Maximum number of arguments allowed in a task payload
@@ -61,6 +64,8 @@ pub enum DataKey {
Counter,
ActiveTasks,
Token,
+ Operator,
+ KeeperStake(Address),
TaskDependencies(u64),
}
@@ -117,6 +122,30 @@ fn remove_active_task_id(env: &Env, task_id: u64) {
set_active_task_ids(env, &filtered);
}
+fn get_operator(env: &Env) -> Option
{
+ env.storage().persistent().get(&DataKey::Operator)
+}
+
+fn require_operator(env: &Env, signer: Address) {
+ let operator = get_operator(env).expect("Operator not configured");
+ if operator != signer {
+ panic_with_error!(&env, Error::UnauthorizedSlasher);
+ }
+}
+
+fn get_keeper_stake(env: &Env, keeper: &Address) -> i128 {
+ env.storage()
+ .persistent()
+ .get(&DataKey::KeeperStake(keeper.clone()))
+ .unwrap_or(0)
+}
+
+fn set_keeper_stake(env: &Env, keeper: &Address, amount: i128) {
+ env.storage()
+ .persistent()
+ .set(&DataKey::KeeperStake(keeper.clone()), &amount);
+}
+
#[contracttype]
#[derive(Clone, Debug)]
pub struct ExecutableTask {
diff --git a/keeper/SLA_MONITORING.md b/keeper/SLA_MONITORING.md
new file mode 100644
index 00000000..1ea90a57
--- /dev/null
+++ b/keeper/SLA_MONITORING.md
@@ -0,0 +1,38 @@
+# Keeper SLA Monitoring
+
+The Keeper SLA Monitor continuously evaluates keeper performance and can trigger on-chain slashing when an operator-defined SLA is violated.
+
+## How it works
+
+- The monitor reads recent execution history from `keeper/data/executions.ndjson`.
+- It computes keeper failure rates over the configured history window.
+- If a keeper exceeds the configured failure threshold, the monitor submits a `slash_keeper` transaction to the `SoroTask` contract.
+
+## Configuration
+
+Set the following environment variables in `keeper/.env` or your deployment environment:
+
+- `SLA_MONITOR_ENABLED=true`
+- `SLA_CHECK_INTERVAL_MS=60000`
+- `SLA_MIN_EVALUATION_WINDOW=10`
+- `SLA_FAILURE_THRESHOLD=0.5`
+- `SLA_SLASH_AMOUNT=100`
+- `SLA_OPERATOR_SECRET=`
+- `SLA_MAX_RECENT_HISTORY=200`
+
+The monitor uses `SLA_OPERATOR_SECRET` as the signing account for the on-chain slashing transaction.
+If it is not provided, the Keeper service private key is used by default.
+
+## Metrics
+
+The following Prometheus metrics are exposed by the keeper metrics endpoint:
+
+- `keeper_sla_checks_total`
+- `keeper_sla_violations_total`
+- `keeper_sla_slashed_total`
+- `keeper_sla_last_check_duration_ms`
+- `keeper_sla_last_slash_amount`
+
+## Shutdown behavior
+
+The SLA monitor registers with the keeper shutdown manager and stops cleanly during termination.
diff --git a/keeper/index.js b/keeper/index.js
index 60fe045c..17c0da9b 100644
--- a/keeper/index.js
+++ b/keeper/index.js
@@ -19,6 +19,7 @@ const { GracefulShutdownManager } = require("./src/gracefulShutdown");
const { createDefaultFilterChain } = require("./src/taskFilter");
const { WebhookAuthProtocol, InMemoryReplayStore } = require("./src/webhookAuth");
const { WebhookTriggerHandler } = require("./src/webhookTrigger");
+const { SLAMonitor } = require("./src/slaMonitor");
// Create root logger for the main module
const logger = createLogger("keeper");
@@ -100,6 +101,14 @@ async function main() {
metricsServer.start();
+ const slaMonitor = new SLAMonitor(server, config.contractId, config, {
+ historyManager,
+ metricsServer,
+ operatorKeypair: keypair,
+ logger: createLogger('sla-monitor'),
+ });
+ await slaMonitor.start();
+
// Perform startup validation to fail fast on configuration errors
const validator = new StartupValidator(
server,
@@ -152,12 +161,31 @@ async function main() {
attemptId: context?.attemptId || null,
}),
);
- queue.on("task:success", (taskId) => {
- queueLogger.info("Task executed successfully", { taskId });
+ queue.on("task:success", (taskId, context, result) => {
+ queueLogger.info("Task executed successfully", { taskId, txHash: result?.txHash || null });
+ if (historyManager) {
+ historyManager.record({
+ kind: 'execution',
+ taskId,
+ keeper: keypair.publicKey(),
+ status: 'SUCCESS',
+ txHash: result?.txHash || null,
+ feePaid: result?.feePaid || null,
+ });
+ }
shutdownManager.completeTask(taskId);
});
- queue.on("task:failed", (taskId, err) => {
+ queue.on("task:failed", (taskId, err, context) => {
queueLogger.error("Task failed", { taskId, error: err.message });
+ if (historyManager) {
+ historyManager.record({
+ kind: 'execution',
+ taskId,
+ keeper: keypair.publicKey(),
+ status: 'FAILED',
+ error: err.message,
+ });
+ }
shutdownManager.failTask(taskId, err);
poller.invalidateCache(taskId);
});
@@ -342,6 +370,12 @@ async function main() {
}
});
+ // Register SLA monitor cleanup
+ shutdownManager.registerResource("sla-monitor", async () => {
+ logger.info("Stopping SLA monitor");
+ await slaMonitor.stop();
+ });
+
// Register registry cleanup
shutdownManager.registerResource("task-registry", async () => {
logger.info("Closing task registry");
diff --git a/keeper/src/config.js b/keeper/src/config.js
index c300552e..26b16903 100644
--- a/keeper/src/config.js
+++ b/keeper/src/config.js
@@ -81,6 +81,13 @@ function loadConfig() {
shardLabel: process.env.KEEPER_SHARD_LABEL || null,
driftWarningSeconds: parseInteger(process.env.DRIFT_WARNING_SECONDS, 60),
driftCriticalSeconds: parseInteger(process.env.DRIFT_CRITICAL_SECONDS, 300),
+ slaMonitorEnabled: parseBoolean(process.env.SLA_MONITOR_ENABLED, false),
+ slaCheckIntervalMs: parseInteger(process.env.SLA_CHECK_INTERVAL_MS, 60000),
+ slaMinEvaluationWindow: parseInteger(process.env.SLA_MIN_EVALUATION_WINDOW, 10),
+ slaFailureThreshold: parseFloat(process.env.SLA_FAILURE_THRESHOLD) || 0.5,
+ slaSlashAmount: parseInteger(process.env.SLA_SLASH_AMOUNT, 100),
+ slaOperatorSecret: process.env.SLA_OPERATOR_SECRET || process.env.KEEPER_SECRET,
+ slaMaxRecentHistory: parseInteger(process.env.SLA_MAX_RECENT_HISTORY, 200),
metricsResetOnStart: parseBoolean(process.env.METRICS_RESET_ON_START, false),
inboundWebhooks: {
enabled: inboundWebhooksEnabled,
diff --git a/keeper/src/metrics.js b/keeper/src/metrics.js
index aaafadd9..2a399e7e 100644
--- a/keeper/src/metrics.js
+++ b/keeper/src/metrics.js
@@ -44,12 +44,17 @@ class Metrics {
webhookAcceptedTotal: 0,
webhookRejectedTotal: 0,
webhookReplayRejectedTotal: 0,
+ slaChecksTotal: 0,
+ slaViolationsTotal: 0,
+ slaSlashedTotal: 0,
};
this.gauges = {
avgFeePaidXlm: 0,
lastCycleDurationMs: 0,
lastRetryCycleDurationMs: 0,
rpcCircuitState: 0,
+ slaLastCheckDurationMs: 0,
+ slaLastSlashAmount: 0,
};
this.feeSamples = [];
}
@@ -262,6 +267,31 @@ class MetricsServer {
help: 'Number of tasks with low gas balance',
registers: [this.register],
});
+ this.promSlaChecks = new promClient.Counter({
+ name: 'keeper_sla_checks_total',
+ help: 'Total number of SLA evaluation cycles completed',
+ registers: [this.register],
+ });
+ this.promSlaViolations = new promClient.Counter({
+ name: 'keeper_sla_violations_total',
+ help: 'Total number of SLA violations detected',
+ registers: [this.register],
+ });
+ this.promSlaSlashed = new promClient.Counter({
+ name: 'keeper_sla_slashed_total',
+ help: 'Total number of keeper slashing actions submitted',
+ registers: [this.register],
+ });
+ this.promSlaLastCheckDuration = new promClient.Gauge({
+ name: 'keeper_sla_last_check_duration_ms',
+ help: 'Duration of the last SLA evaluation run in milliseconds',
+ registers: [this.register],
+ });
+ this.promSlaLastSlashAmount = new promClient.Gauge({
+ name: 'keeper_sla_last_slash_amount',
+ help: 'Amount slashed in the most recent SLA enforcement event',
+ registers: [this.register],
+ });
this.promUptime = new promClient.Gauge({
name: 'keeper_uptime_seconds',
help: 'Keeper service uptime in seconds',
@@ -381,6 +411,11 @@ class MetricsServer {
this.promCycleDuration.set(this.metrics.gauges.lastCycleDurationMs);
this.promRetryCycleDuration.set(this.metrics.gauges.lastRetryCycleDurationMs);
this.promLowGasCount.set(this.gasMonitor.getLowGasCount());
+ this.promSlaChecks.inc(0);
+ this.promSlaViolations.inc(0);
+ this.promSlaSlashed.inc(0);
+ this.promSlaLastCheckDuration.set(this.metrics.gauges.slaLastCheckDurationMs);
+ this.promSlaLastSlashAmount.set(this.metrics.gauges.slaLastSlashAmount);
this.promUptime.set(Math.floor((Date.now() - this.metrics.startTime) / 1000));
this.promRpcConnected.set(this.metrics.rpcConnected ? 1 : 0);
this.promRpcCircuitState.set(this.metrics.gauges.rpcCircuitState);
@@ -688,6 +723,12 @@ class MetricsServer {
);
} else if (key === 'adminStateChangesTotal') {
this.promAdminStateChanges.inc(typeof amount === 'number' ? amount : 1);
+ } else if (key === 'slaChecksTotal') {
+ this.promSlaChecks.inc(typeof amount === 'number' ? amount : 1);
+ } else if (key === 'slaViolationsTotal') {
+ this.promSlaViolations.inc(typeof amount === 'number' ? amount : 1);
+ } else if (key === 'slaSlashedTotal') {
+ this.promSlaSlashed.inc(typeof amount === 'number' ? amount : 1);
}
}
@@ -701,6 +742,10 @@ class MetricsServer {
this.promRetryCycleDuration.set(value);
} else if (key === 'rpcCircuitState') {
this.promRpcCircuitState.set(value);
+ } else if (key === 'slaLastCheckDurationMs') {
+ this.promSlaLastCheckDuration.set(value);
+ } else if (key === 'slaLastSlashAmount') {
+ this.promSlaLastSlashAmount.set(value);
}
}
diff --git a/keeper/src/queue.js b/keeper/src/queue.js
index 5ac5d2c0..b2c22f94 100644
--- a/keeper/src/queue.js
+++ b/keeper/src/queue.js
@@ -155,7 +155,7 @@ class ExecutionQueue extends EventEmitter {
}
}
- await executorFn(taskId, attemptContext);
+ const result = await executorFn(taskId, attemptContext);
this.completed++;
@@ -173,7 +173,7 @@ class ExecutionQueue extends EventEmitter {
});
}
- this.emit('task:success', taskId, attemptContext);
+ this.emit('task:success', taskId, attemptContext, result);
} catch (error) {
this.failedCount++;
this.failedTasks.add(taskId);
diff --git a/keeper/src/slaMonitor.js b/keeper/src/slaMonitor.js
new file mode 100644
index 00000000..41e0f2f5
--- /dev/null
+++ b/keeper/src/slaMonitor.js
@@ -0,0 +1,234 @@
+const {
+ Contract,
+ xdr,
+ TransactionBuilder,
+ BASE_FEE,
+ Networks,
+ rpc: SorobanRpc,
+} = require('@stellar/stellar-sdk');
+const { createLogger } = require('./logger');
+
+const DEFAULTS = {
+ checkIntervalMs: 60000,
+ minEvaluationWindow: 10,
+ failureThreshold: 0.5,
+ slashAmount: 100,
+ enforcementCooldownMs: 60 * 60 * 1000,
+ maxRecentHistory: 200,
+};
+
+class SLAMonitor {
+ constructor(server, contractId, config = {}, options = {}) {
+ this.server = server;
+ this.contractId = contractId;
+ this.config = config || {};
+ this.historyManager = options.historyManager || null;
+ this.metrics = options.metricsServer || null;
+ this.logger = options.logger || createLogger('sla-monitor');
+ this.keypair = options.operatorKeypair || null;
+ this.enabled = Boolean(this.config.slaMonitorEnabled);
+ this.intervalMs = this.config.slaCheckIntervalMs || DEFAULTS.checkIntervalMs;
+ this.minEvaluationWindow = this.config.slaMinEvaluationWindow || DEFAULTS.minEvaluationWindow;
+ this.failureThreshold = Number(this.config.slaFailureThreshold) || DEFAULTS.failureThreshold;
+ this.slashAmount = BigInt(this.config.slaSlashAmount || DEFAULTS.slashAmount);
+ this.maxRecentHistory = this.config.slaMaxRecentHistory || DEFAULTS.maxRecentHistory;
+ this.enforcementCooldownMs = this.config.slaEnforcementCooldownMs || DEFAULTS.enforcementCooldownMs;
+ this.violationCache = new Map();
+ this.timer = null;
+ }
+
+ async start() {
+ if (!this.enabled) {
+ this.logger.info('SLA monitor disabled by configuration');
+ return;
+ }
+
+ if (!this.server || !this.contractId) {
+ this.logger.error('SLA monitor requires an RPC server and contract ID');
+ return;
+ }
+ if (!this.keypair) {
+ this.logger.error('SLA monitor requires an operator keypair for slashing');
+ return;
+ }
+ if (!this.historyManager) {
+ this.logger.error('SLA monitor requires a HistoryManager instance to evaluate performance');
+ return;
+ }
+
+ this.logger.info('SLA monitor starting', {
+ contractId: this.contractId,
+ intervalMs: this.intervalMs,
+ minEvaluationWindow: this.minEvaluationWindow,
+ failureThreshold: this.failureThreshold,
+ slashAmount: this.slashAmount.toString(),
+ });
+
+ await this.run();
+ this.timer = setInterval(() => {
+ this.run().catch((err) => {
+ this.logger.error('SLA monitor cycle failed', { error: err.message });
+ });
+ }, this.intervalMs);
+ }
+
+ stop() {
+ if (this.timer) {
+ clearInterval(this.timer);
+ this.timer = null;
+ this.logger.info('SLA monitor stopped');
+ }
+ }
+
+ async run() {
+ const start = Date.now();
+ this.metrics?.increment('slaChecksTotal', 1);
+
+ const history = await this.historyManager.getRecent(this.maxRecentHistory);
+ const keeperStats = this.analyzeHistory(history);
+ const violations = Array.from(keeperStats.values()).filter((stats) => {
+ return stats.total >= this.minEvaluationWindow && stats.failureRate >= this.failureThreshold;
+ });
+
+ this.metrics?.record('slaLastCheckDurationMs', Date.now() - start);
+
+ if (violations.length === 0) {
+ this.logger.debug('No SLA violations found in current evaluation window', {
+ keepersEvaluated: keeperStats.size,
+ });
+ return;
+ }
+
+ for (const violation of violations) {
+ await this.enforceViolation(violation);
+ }
+ }
+
+ analyzeHistory(history) {
+ const statsByKeeper = new Map();
+
+ for (const record of history) {
+ if (!record || record.kind !== 'execution' || !record.keeper) {
+ continue;
+ }
+ const keeper = record.keeper;
+ const entry = statsByKeeper.get(keeper) || {
+ keeper,
+ total: 0,
+ failures: 0,
+ lastSeen: null,
+ };
+ entry.total += 1;
+ entry.lastSeen = record.timestamp || entry.lastSeen;
+ if (String(record.status).toUpperCase() !== 'SUCCESS') {
+ entry.failures += 1;
+ }
+ statsByKeeper.set(keeper, entry);
+ }
+
+ for (const entry of statsByKeeper.values()) {
+ entry.failureRate = entry.total > 0 ? entry.failures / entry.total : 0;
+ }
+
+ return statsByKeeper;
+ }
+
+ async enforceViolation(violation) {
+ const now = Date.now();
+ const lastEnforced = this.violationCache.get(violation.keeper);
+ if (lastEnforced && now - lastEnforced < this.enforcementCooldownMs) {
+ this.logger.debug('Skipping SLA enforcement due to cooldown', {
+ keeper: violation.keeper,
+ cooldownMs: this.enforcementCooldownMs,
+ });
+ return;
+ }
+
+ this.logger.warn('SLA violation detected', {
+ keeper: violation.keeper,
+ failureRate: violation.failureRate,
+ total: violation.total,
+ failures: violation.failures,
+ });
+
+ this.metrics?.increment('slaViolationsTotal', 1);
+
+ try {
+ const result = await this.submitSlash(
+ violation.keeper,
+ this.slashAmount,
+ 'keeper_sla_violation',
+ );
+ this.metrics?.increment('slaSlashedTotal', 1);
+ this.metrics?.record('slaLastSlashAmount', Number(this.slashAmount));
+ this.violationCache.set(violation.keeper, now);
+ this.logger.info('Submitted keeper slashing transaction', {
+ keeper: violation.keeper,
+ txHash: result.txHash,
+ feePaid: result.feePaid,
+ });
+ } catch (err) {
+ this.logger.error('Failed to submit keeper slashing transaction', {
+ keeper: violation.keeper,
+ error: err.message,
+ });
+ }
+ }
+
+ async submitSlash(keeper, amount, reason) {
+ const operatorPubKey = this.keypair.publicKey();
+ const account = await this.server.getAccount(operatorPubKey);
+ const contract = new Contract(this.contractId);
+ const reasonSymbol = xdr.ScVal.scvSymbol(reason);
+ const amountVal = xdr.ScVal.scvI128(xdr.Int128.fromString(String(amount)));
+
+ const tx = new TransactionBuilder(account, {
+ fee: BASE_FEE,
+ networkPassphrase: this.config.networkPassphrase || Networks.FUTURENET,
+ })
+ .addOperation(contract.call('slash_keeper', keeper, amountVal, reasonSymbol))
+ .setTimeout(30)
+ .build();
+
+ tx.sign(this.keypair);
+
+ const sendResult = await this.server.sendTransaction(tx);
+ if (sendResult.status === 'ERROR') {
+ const sendError = String(sendResult.errorResult || sendResult.error || 'Transaction submission error');
+ throw new Error(`Slashing transaction failed: ${sendError}`);
+ }
+
+ const { status, feePaid } = await this.pollTransaction(sendResult.hash);
+ if (status !== 'SUCCESS') {
+ throw new Error(`Slashing transaction did not complete successfully: ${status}`);
+ }
+
+ return { txHash: sendResult.hash, feePaid };
+ }
+
+ async pollTransaction(txHash) {
+ for (let i = 0; i < 30; i += 1) {
+ const response = await this.server.getTransaction(txHash);
+ if (response.status === SorobanRpc.Api.GetTransactionStatus.SUCCESS) {
+ const feePaid = response.resultMetaXdr
+ ? Number(
+ response.resultMetaXdr
+ ?.v3?.()
+ ?.sorobanMeta?.()
+ ?.ext?.()
+ ?.v1?.()
+ ?.totalNonRefundableResourceFeeCharged?.(),
+ ) || 0
+ : 0;
+ return { status: 'SUCCESS', feePaid };
+ }
+ if (response.status === SorobanRpc.GetTransactionStatus.FAILED) {
+ return { status: 'FAILED', feePaid: 0 };
+ }
+ await new Promise((resolve) => setTimeout(resolve, 2000));
+ }
+ return { status: 'TIMEOUT', feePaid: 0 };
+ }
+}
+
+module.exports = { SLAMonitor };