Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 32 additions & 3 deletions contract/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,13 @@ pub enum Error {
CircularDependency = 10,
DependencyBlocked = 11,
AlreadyInitialized = 12,
UnauthorizedSlasher = 13,
KeeperStakeTooLow = 14,
OperatorAlreadySet = 15,
// Payload validation errors
ArgsTooMany = 13,
ArgsTooLarge = 14,
InvalidPayload = 15,
ArgsTooMany = 16,
ArgsTooLarge = 17,
InvalidPayload = 18,
}

/// Maximum number of arguments allowed in a task payload
Expand Down Expand Up @@ -61,6 +64,8 @@ pub enum DataKey {
Counter,
ActiveTasks,
Token,
Operator,
KeeperStake(Address),
TaskDependencies(u64),
}

Expand Down Expand Up @@ -117,6 +122,30 @@ fn remove_active_task_id(env: &Env, task_id: u64) {
set_active_task_ids(env, &filtered);
}

fn get_operator(env: &Env) -> Option<Address> {
env.storage().persistent().get(&DataKey::Operator)
}

fn require_operator(env: &Env, signer: Address) {
let operator = get_operator(env).expect("Operator not configured");
if operator != signer {
panic_with_error!(&env, Error::UnauthorizedSlasher);
}
}

fn get_keeper_stake(env: &Env, keeper: &Address) -> i128 {
env.storage()
.persistent()
.get(&DataKey::KeeperStake(keeper.clone()))
.unwrap_or(0)
}

fn set_keeper_stake(env: &Env, keeper: &Address, amount: i128) {
env.storage()
.persistent()
.set(&DataKey::KeeperStake(keeper.clone()), &amount);
}

#[contracttype]
#[derive(Clone, Debug)]
pub struct ExecutableTask {
Expand Down
38 changes: 38 additions & 0 deletions keeper/SLA_MONITORING.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
# Keeper SLA Monitoring

The Keeper SLA Monitor continuously evaluates keeper performance and can trigger on-chain slashing when an operator-defined SLA is violated.

## How it works

- The monitor reads recent execution history from `keeper/data/executions.ndjson`.
- It computes keeper failure rates over the configured history window.
- If a keeper exceeds the configured failure threshold, the monitor submits a `slash_keeper` transaction to the `SoroTask` contract.

## Configuration

Set the following environment variables in `keeper/.env` or your deployment environment:

- `SLA_MONITOR_ENABLED=true`
- `SLA_CHECK_INTERVAL_MS=60000`
- `SLA_MIN_EVALUATION_WINDOW=10`
- `SLA_FAILURE_THRESHOLD=0.5`
- `SLA_SLASH_AMOUNT=100`
- `SLA_OPERATOR_SECRET=<operator_secret_key>`
- `SLA_MAX_RECENT_HISTORY=200`

The monitor uses `SLA_OPERATOR_SECRET` as the signing account for the on-chain slashing transaction.
If it is not provided, the Keeper service private key is used by default.

## Metrics

The following Prometheus metrics are exposed by the keeper metrics endpoint:

- `keeper_sla_checks_total`
- `keeper_sla_violations_total`
- `keeper_sla_slashed_total`
- `keeper_sla_last_check_duration_ms`
- `keeper_sla_last_slash_amount`

## Shutdown behavior

The SLA monitor registers with the keeper shutdown manager and stops cleanly during termination.
40 changes: 37 additions & 3 deletions keeper/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ const { GracefulShutdownManager } = require("./src/gracefulShutdown");
const { createDefaultFilterChain } = require("./src/taskFilter");
const { WebhookAuthProtocol, InMemoryReplayStore } = require("./src/webhookAuth");
const { WebhookTriggerHandler } = require("./src/webhookTrigger");
const { SLAMonitor } = require("./src/slaMonitor");

// Create root logger for the main module
const logger = createLogger("keeper");
Expand Down Expand Up @@ -100,6 +101,14 @@ async function main() {

metricsServer.start();

const slaMonitor = new SLAMonitor(server, config.contractId, config, {
historyManager,
metricsServer,
operatorKeypair: keypair,
logger: createLogger('sla-monitor'),
});
await slaMonitor.start();

// Perform startup validation to fail fast on configuration errors
const validator = new StartupValidator(
server,
Expand Down Expand Up @@ -152,12 +161,31 @@ async function main() {
attemptId: context?.attemptId || null,
}),
);
queue.on("task:success", (taskId) => {
queueLogger.info("Task executed successfully", { taskId });
queue.on("task:success", (taskId, context, result) => {
queueLogger.info("Task executed successfully", { taskId, txHash: result?.txHash || null });
if (historyManager) {
historyManager.record({
kind: 'execution',
taskId,
keeper: keypair.publicKey(),
status: 'SUCCESS',
txHash: result?.txHash || null,
feePaid: result?.feePaid || null,
});
}
shutdownManager.completeTask(taskId);
});
queue.on("task:failed", (taskId, err) => {
queue.on("task:failed", (taskId, err, context) => {
queueLogger.error("Task failed", { taskId, error: err.message });
if (historyManager) {
historyManager.record({
kind: 'execution',
taskId,
keeper: keypair.publicKey(),
status: 'FAILED',
error: err.message,
});
}
shutdownManager.failTask(taskId, err);
poller.invalidateCache(taskId);
});
Expand Down Expand Up @@ -342,6 +370,12 @@ async function main() {
}
});

// Register SLA monitor cleanup
shutdownManager.registerResource("sla-monitor", async () => {
logger.info("Stopping SLA monitor");
await slaMonitor.stop();
});

// Register registry cleanup
shutdownManager.registerResource("task-registry", async () => {
logger.info("Closing task registry");
Expand Down
7 changes: 7 additions & 0 deletions keeper/src/config.js
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,13 @@ function loadConfig() {
shardLabel: process.env.KEEPER_SHARD_LABEL || null,
driftWarningSeconds: parseInteger(process.env.DRIFT_WARNING_SECONDS, 60),
driftCriticalSeconds: parseInteger(process.env.DRIFT_CRITICAL_SECONDS, 300),
slaMonitorEnabled: parseBoolean(process.env.SLA_MONITOR_ENABLED, false),
slaCheckIntervalMs: parseInteger(process.env.SLA_CHECK_INTERVAL_MS, 60000),
slaMinEvaluationWindow: parseInteger(process.env.SLA_MIN_EVALUATION_WINDOW, 10),
slaFailureThreshold: parseFloat(process.env.SLA_FAILURE_THRESHOLD) || 0.5,
slaSlashAmount: parseInteger(process.env.SLA_SLASH_AMOUNT, 100),
slaOperatorSecret: process.env.SLA_OPERATOR_SECRET || process.env.KEEPER_SECRET,
slaMaxRecentHistory: parseInteger(process.env.SLA_MAX_RECENT_HISTORY, 200),
metricsResetOnStart: parseBoolean(process.env.METRICS_RESET_ON_START, false),
inboundWebhooks: {
enabled: inboundWebhooksEnabled,
Expand Down
45 changes: 45 additions & 0 deletions keeper/src/metrics.js
Original file line number Diff line number Diff line change
Expand Up @@ -44,12 +44,17 @@ class Metrics {
webhookAcceptedTotal: 0,
webhookRejectedTotal: 0,
webhookReplayRejectedTotal: 0,
slaChecksTotal: 0,
slaViolationsTotal: 0,
slaSlashedTotal: 0,
};
this.gauges = {
avgFeePaidXlm: 0,
lastCycleDurationMs: 0,
lastRetryCycleDurationMs: 0,
rpcCircuitState: 0,
slaLastCheckDurationMs: 0,
slaLastSlashAmount: 0,
};
this.feeSamples = [];
}
Expand Down Expand Up @@ -262,6 +267,31 @@ class MetricsServer {
help: 'Number of tasks with low gas balance',
registers: [this.register],
});
this.promSlaChecks = new promClient.Counter({
name: 'keeper_sla_checks_total',
help: 'Total number of SLA evaluation cycles completed',
registers: [this.register],
});
this.promSlaViolations = new promClient.Counter({
name: 'keeper_sla_violations_total',
help: 'Total number of SLA violations detected',
registers: [this.register],
});
this.promSlaSlashed = new promClient.Counter({
name: 'keeper_sla_slashed_total',
help: 'Total number of keeper slashing actions submitted',
registers: [this.register],
});
this.promSlaLastCheckDuration = new promClient.Gauge({
name: 'keeper_sla_last_check_duration_ms',
help: 'Duration of the last SLA evaluation run in milliseconds',
registers: [this.register],
});
this.promSlaLastSlashAmount = new promClient.Gauge({
name: 'keeper_sla_last_slash_amount',
help: 'Amount slashed in the most recent SLA enforcement event',
registers: [this.register],
});
this.promUptime = new promClient.Gauge({
name: 'keeper_uptime_seconds',
help: 'Keeper service uptime in seconds',
Expand Down Expand Up @@ -381,6 +411,11 @@ class MetricsServer {
this.promCycleDuration.set(this.metrics.gauges.lastCycleDurationMs);
this.promRetryCycleDuration.set(this.metrics.gauges.lastRetryCycleDurationMs);
this.promLowGasCount.set(this.gasMonitor.getLowGasCount());
this.promSlaChecks.inc(0);
this.promSlaViolations.inc(0);
this.promSlaSlashed.inc(0);
this.promSlaLastCheckDuration.set(this.metrics.gauges.slaLastCheckDurationMs);
this.promSlaLastSlashAmount.set(this.metrics.gauges.slaLastSlashAmount);
this.promUptime.set(Math.floor((Date.now() - this.metrics.startTime) / 1000));
this.promRpcConnected.set(this.metrics.rpcConnected ? 1 : 0);
this.promRpcCircuitState.set(this.metrics.gauges.rpcCircuitState);
Expand Down Expand Up @@ -688,6 +723,12 @@ class MetricsServer {
);
} else if (key === 'adminStateChangesTotal') {
this.promAdminStateChanges.inc(typeof amount === 'number' ? amount : 1);
} else if (key === 'slaChecksTotal') {
this.promSlaChecks.inc(typeof amount === 'number' ? amount : 1);
} else if (key === 'slaViolationsTotal') {
this.promSlaViolations.inc(typeof amount === 'number' ? amount : 1);
} else if (key === 'slaSlashedTotal') {
this.promSlaSlashed.inc(typeof amount === 'number' ? amount : 1);
}
}

Expand All @@ -701,6 +742,10 @@ class MetricsServer {
this.promRetryCycleDuration.set(value);
} else if (key === 'rpcCircuitState') {
this.promRpcCircuitState.set(value);
} else if (key === 'slaLastCheckDurationMs') {
this.promSlaLastCheckDuration.set(value);
} else if (key === 'slaLastSlashAmount') {
this.promSlaLastSlashAmount.set(value);
}
}

Expand Down
4 changes: 2 additions & 2 deletions keeper/src/queue.js
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,7 @@ class ExecutionQueue extends EventEmitter {
}
}

await executorFn(taskId, attemptContext);
const result = await executorFn(taskId, attemptContext);

this.completed++;

Expand All @@ -173,7 +173,7 @@ class ExecutionQueue extends EventEmitter {
});
}

this.emit('task:success', taskId, attemptContext);
this.emit('task:success', taskId, attemptContext, result);
} catch (error) {
this.failedCount++;
this.failedTasks.add(taskId);
Expand Down
Loading