From aceac2a659f1a477ff7f655a4a0a586e8f53d5b8 Mon Sep 17 00:00:00 2001 From: dorianzheng Date: Fri, 24 Apr 2026 07:06:49 +0800 Subject: [PATCH 1/2] feat(budget): add per-agent token budget enforcement MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Adds token budget controls to prevent runaway agents from consuming excessive API costs. Checks remaining budget before each LLM call, pauses the agent on limit exceeded, and emits warning/exceeded events. - New SQLite tables: token_budget_configs and token_budget_state - New budget-actions.ts: budget_set, budget_get, budget_resume actions - message-processor.ts: checkAndEnforceBudget() called before runAgent - api/events.ts: BudgetExceededEvent and BudgetWarningEvent types - Budget resets daily at configurable reset_hour (default midnight UTC) - Emits budget.warning at ≥80% usage (non-blocking) - Emits budget.exceeded and pauses agent when limit is hit - Depends on token_usage table from PR #59 (table creation is idempotent) Co-Authored-By: Claude Sonnet 4.6 --- src/agent/agent-impl.ts | 2 + src/agent/budget-actions.ts | 119 ++++++++++++++++++++++++++ src/agent/message-processor.ts | 133 +++++++++++++++++++++++++++++ src/api/events.ts | 34 ++++++++ src/db.ts | 150 +++++++++++++++++++++++++++++++++ 5 files changed, 438 insertions(+) create mode 100644 src/agent/budget-actions.ts diff --git a/src/agent/agent-impl.ts b/src/agent/agent-impl.ts index 00c40ae67f2..80fa8d83170 100644 --- a/src/agent/agent-impl.ts +++ b/src/agent/agent-impl.ts @@ -61,6 +61,7 @@ import { ChannelManager } from './channel-manager.js'; import { GroupManager } from './group-manager.js'; import { TaskManager } from './task-manager.js'; import { MessageProcessor } from './message-processor.js'; +import { registerBudgetActions } from './budget-actions.js'; export { type Agent }; @@ -442,6 +443,7 @@ export class AgentImpl dataDir: this.config.dataDir, assistantName: this.config.assistantName, }); + registerBudgetActions(this, this.db); logger.info({ agent: this.name }, 'Database initialized'); this.groupMgr.loadState(); diff --git a/src/agent/budget-actions.ts b/src/agent/budget-actions.ts new file mode 100644 index 00000000000..80a1aba6fe9 --- /dev/null +++ b/src/agent/budget-actions.ts @@ -0,0 +1,119 @@ +import { z } from 'zod'; + +import type { Agent } from '../api/agent.js'; +import type { AgentDb } from '../db.js'; + +/** Compute the start of the current daily budget period (epoch ms). */ +function getDailyPeriodStart(resetHour: number): number { + const now = new Date(); + const start = new Date( + Date.UTC( + now.getUTCFullYear(), + now.getUTCMonth(), + now.getUTCDate(), + resetHour, + 0, + 0, + 0, + ), + ); + if (start.getTime() > now.getTime()) { + start.setUTCDate(start.getUTCDate() - 1); + } + return start.getTime(); +} + +export function registerBudgetActions(agent: Agent, db: AgentDb): void { + // ── budget_set ────────────────────────────────────────────────── + + agent.action( + 'budget_set', + 'Configure the token spending budget for an agent group. Pass null to remove a limit.', + { + group_jid: z.string().describe('The group/chat JID to configure'), + daily_limit_usd: z + .number() + .positive() + .nullable() + .optional() + .describe('Daily spend limit in USD. null removes the daily limit.'), + total_limit_usd: z + .number() + .positive() + .nullable() + .optional() + .describe('All-time spend limit in USD. null removes the total limit.'), + reset_hour: z + .number() + .int() + .min(0) + .max(23) + .optional() + .describe('UTC hour at which the daily counter resets (0–23, default 0).'), + }, + async (args) => { + db.setBudgetConfig(args.group_jid, { + daily_limit_usd: args.daily_limit_usd, + total_limit_usd: args.total_limit_usd, + reset_hour: args.reset_hour, + }); + return { ok: true }; + }, + ); + + // ── budget_get ────────────────────────────────────────────────── + + agent.action( + 'budget_get', + 'Query the current budget configuration, pause state, and spending for an agent group.', + { + group_jid: z.string().describe('The group/chat JID to query'), + }, + async (args) => { + const config = db.getBudgetConfig(args.group_jid); + const state = db.getBudgetState(args.group_jid); + const resetHour = config?.reset_hour ?? 0; + const periodStart = getDailyPeriodStart(resetHour); + const dailyCostUsd = db.getDailyUsageUsd(args.group_jid, periodStart); + const totalCostUsd = db.getTotalUsageUsd(args.group_jid); + + const dailyPct = + config?.daily_limit_usd != null && config.daily_limit_usd > 0 + ? dailyCostUsd / config.daily_limit_usd + : null; + const totalPct = + config?.total_limit_usd != null && config.total_limit_usd > 0 + ? totalCostUsd / config.total_limit_usd + : null; + + return { + config: { + daily_limit_usd: config?.daily_limit_usd ?? null, + total_limit_usd: config?.total_limit_usd ?? null, + reset_hour: resetHour, + }, + state, + usage: { + daily_cost_usd: dailyCostUsd, + total_cost_usd: totalCostUsd, + daily_pct: dailyPct, + total_pct: totalPct, + }, + }; + }, + ); + + // ── budget_resume ─────────────────────────────────────────────── + + agent.action( + 'budget_resume', + 'Clear the budget-exceeded pause for an agent group, allowing it to run again.', + { + group_jid: z.string().describe('The group/chat JID to resume'), + }, + async (args) => { + db.clearBudgetPaused(args.group_jid); + return { ok: true }; + }, + ); +} diff --git a/src/agent/message-processor.ts b/src/agent/message-processor.ts index bf9ceb8ca04..5f3c39fa81a 100644 --- a/src/agent/message-processor.ts +++ b/src/agent/message-processor.ts @@ -25,6 +25,26 @@ import { buildMcpRuntimeConfig } from './mcp-runtime.js'; export { buildMcpRuntimeConfig }; +/** Compute the start of the current daily budget period (epoch ms). */ +function getDailyPeriodStart(resetHour: number): number { + const now = new Date(); + const start = new Date( + Date.UTC( + now.getUTCFullYear(), + now.getUTCMonth(), + now.getUTCDate(), + resetHour, + 0, + 0, + 0, + ), + ); + if (start.getTime() > now.getTime()) { + start.setUTCDate(start.getUTCDate() - 1); + } + return start.getTime(); +} + function hasWakeTrigger( messages: Array<{ content: string; sender: string; is_from_me?: boolean }>, chatJid: string, @@ -142,6 +162,11 @@ export class MessageProcessor { 'Processing messages', ); + // ── Budget enforcement ───────────────────────���──────────────── + const budgetBlocked = await this.checkAndEnforceBudget(chatJid); + if (budgetBlocked) return false; + // ───────────────────────────────────────────────────────────── + let idleTimer: ReturnType | null = null; const resetIdleTimer = () => { if (idleTimer) clearTimeout(idleTimer); @@ -327,6 +352,114 @@ export class MessageProcessor { return true; } + /** + * Check whether this group's token budget is exceeded or warning-level. + * Returns true if the agent should be blocked (budget exceeded), false otherwise. + * Emits 'budget.exceeded' or 'budget.warning' events as appropriate. + * Sends a user-facing message when pausing. + */ + private async checkAndEnforceBudget(chatJid: string): Promise { + const config = this.ctx.db.getBudgetConfig(chatJid); + if (!config || (config.daily_limit_usd == null && config.total_limit_usd == null)) { + return false; // no budget configured + } + + const state = this.ctx.db.getBudgetState(chatJid); + const now = new Date().toISOString(); + + // If already paused, block immediately. + if (state.paused) { + this.ctx.emit('budget.exceeded', { + agentId: this.ctx.id, + jid: chatJid, + limitType: state.paused_reason === 'total_limit' ? 'total' : 'daily', + limitUsd: + state.paused_reason === 'total_limit' + ? (config.total_limit_usd ?? 0) + : (config.daily_limit_usd ?? 0), + usedUsd: 0, + timestamp: now, + }); + const channel = findChannel(this.channelMgr.channelArray, chatJid); + await channel?.sendMessage?.( + chatJid, + 'Agent paused: token budget exceeded. Resume from Dune settings.', + ); + return true; + } + + const periodStart = getDailyPeriodStart(config.reset_hour); + const dailyCost = this.ctx.db.getDailyUsageUsd(chatJid, periodStart); + const totalCost = this.ctx.db.getTotalUsageUsd(chatJid); + + // Check hard limits. + if ( + (config.daily_limit_usd != null && dailyCost >= config.daily_limit_usd) || + (config.total_limit_usd != null && totalCost >= config.total_limit_usd) + ) { + const isDaily = + config.daily_limit_usd != null && dailyCost >= config.daily_limit_usd; + const limitType: 'daily' | 'total' = isDaily ? 'daily' : 'total'; + const limitUsd = isDaily ? config.daily_limit_usd! : config.total_limit_usd!; + const usedUsd = isDaily ? dailyCost : totalCost; + + this.ctx.db.setBudgetPaused(chatJid, `${limitType}_limit` as 'daily_limit' | 'total_limit'); + this.ctx.emit('budget.exceeded', { + agentId: this.ctx.id, + jid: chatJid, + limitType, + limitUsd, + usedUsd, + timestamp: now, + }); + + const channel = findChannel(this.channelMgr.channelArray, chatJid); + await channel?.sendMessage?.( + chatJid, + `Agent paused: ${limitType} token budget exceeded ($${usedUsd.toFixed(4)} of $${limitUsd.toFixed(2)} used). Resume from Dune settings.`, + ); + + logger.warn( + { chatJid, limitType, limitUsd, usedUsd, agent: this.ctx.name }, + 'Budget exceeded — agent paused', + ); + return true; + } + + // Check warning threshold (80%). + const warnDaily = + config.daily_limit_usd != null && + dailyCost >= 0.8 * config.daily_limit_usd; + const warnTotal = + config.total_limit_usd != null && + totalCost >= 0.8 * config.total_limit_usd; + + if (warnDaily || warnTotal) { + const isDaily = warnDaily; + const limitType: 'daily' | 'total' = isDaily ? 'daily' : 'total'; + const limitUsd = isDaily ? config.daily_limit_usd! : config.total_limit_usd!; + const usedUsd = isDaily ? dailyCost : totalCost; + const pctUsed = usedUsd / limitUsd; + + this.ctx.emit('budget.warning', { + agentId: this.ctx.id, + jid: chatJid, + pctUsed, + limitType, + limitUsd, + usedUsd, + timestamp: now, + }); + + logger.info( + { chatJid, limitType, pctUsed: Math.round(pctUsed * 100), agent: this.ctx.name }, + 'Budget warning — approaching limit', + ); + } + + return false; // proceed normally + } + /** Execute agent in a container for the given group. */ async runAgent( group: InternalRegisteredGroup, diff --git a/src/api/events.ts b/src/api/events.ts index 701b7e8dd87..78d1b2fc617 100644 --- a/src/api/events.ts +++ b/src/api/events.ts @@ -29,6 +29,8 @@ export interface AgentEvents extends Record { 'task.run.succeeded': [payload: TaskRunSucceededEvent]; 'task.run.failed': [payload: TaskRunFailedEvent]; 'task.run.skipped': [payload: TaskRunSkippedEvent]; + 'budget.exceeded': [payload: BudgetExceededEvent]; + 'budget.warning': [payload: BudgetWarningEvent]; started: []; stopped: []; } @@ -302,3 +304,35 @@ export interface TaskRunSkippedEvent { detail?: string; timestamp: string; } + +// ── Budget events ───────────────────────────────────────────────── + +/** Agent was paused because it exceeded its token budget. */ +export interface BudgetExceededEvent { + agentId: string; + /** Group/chat JID that exceeded its budget. */ + jid: string; + /** Which limit was hit. */ + limitType: 'daily' | 'total'; + /** The configured limit in USD. */ + limitUsd: number; + /** Amount spent at time of enforcement. */ + usedUsd: number; + timestamp: string; +} + +/** Agent is approaching its token budget (≥80% used). */ +export interface BudgetWarningEvent { + agentId: string; + /** Group/chat JID. */ + jid: string; + /** Fraction of the limit used (e.g. 0.83 = 83%). */ + pctUsed: number; + /** Which limit triggered the warning. */ + limitType: 'daily' | 'total'; + /** The configured limit in USD. */ + limitUsd: number; + /** Amount spent so far. */ + usedUsd: number; + timestamp: string; +} diff --git a/src/db.ts b/src/db.ts index 5965c9cd244..e4ab3cf342e 100644 --- a/src/db.ts +++ b/src/db.ts @@ -82,6 +82,32 @@ export function createSchema( container_config TEXT, requires_trigger INTEGER DEFAULT 1 ); + CREATE TABLE IF NOT EXISTS token_usage ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + group_jid TEXT NOT NULL, + session_id TEXT, + model TEXT NOT NULL, + prompt_tokens INTEGER NOT NULL DEFAULT 0, + completion_tokens INTEGER NOT NULL DEFAULT 0, + total_tokens INTEGER NOT NULL DEFAULT 0, + cache_read_tokens INTEGER NOT NULL DEFAULT 0, + cache_write_tokens INTEGER NOT NULL DEFAULT 0, + cost_usd REAL, + latency_ms INTEGER, + ts INTEGER NOT NULL + ); + CREATE TABLE IF NOT EXISTS token_budget_configs ( + group_jid TEXT PRIMARY KEY, + daily_limit_usd REAL, + total_limit_usd REAL, + reset_hour INTEGER NOT NULL DEFAULT 0 + ); + CREATE TABLE IF NOT EXISTS token_budget_state ( + group_jid TEXT PRIMARY KEY, + paused INTEGER NOT NULL DEFAULT 0, + paused_at INTEGER, + paused_reason TEXT + ); `); @@ -142,6 +168,11 @@ export function createSchema( } catch { /* columns already exist */ } + + database.exec(` + CREATE INDEX IF NOT EXISTS idx_token_usage_group_jid ON token_usage(group_jid); + CREATE INDEX IF NOT EXISTS idx_token_usage_ts ON token_usage(ts); + `); } export function initDatabase(opts: { @@ -730,4 +761,123 @@ export class AgentDb { } } } + + // ─── Token budget helpers ───────────────────────────────────────── + + getBudgetConfig(groupJid: string): { + daily_limit_usd: number | null; + total_limit_usd: number | null; + reset_hour: number; + } | null { + const row = this.db + .prepare( + `SELECT daily_limit_usd, total_limit_usd, reset_hour FROM token_budget_configs WHERE group_jid = ?`, + ) + .get(groupJid) as { + daily_limit_usd: number | null; + total_limit_usd: number | null; + reset_hour: number; + } | undefined; + return row ?? null; + } + + setBudgetConfig( + groupJid: string, + opts: { + daily_limit_usd?: number | null; + total_limit_usd?: number | null; + reset_hour?: number; + }, + ): void { + const existing = this.getBudgetConfig(groupJid); + const daily = + opts.daily_limit_usd !== undefined + ? opts.daily_limit_usd + : (existing?.daily_limit_usd ?? null); + const total = + opts.total_limit_usd !== undefined + ? opts.total_limit_usd + : (existing?.total_limit_usd ?? null); + const hour = + opts.reset_hour !== undefined ? opts.reset_hour : (existing?.reset_hour ?? 0); + + this.db + .prepare( + `INSERT INTO token_budget_configs (group_jid, daily_limit_usd, total_limit_usd, reset_hour) + VALUES (?, ?, ?, ?) + ON CONFLICT(group_jid) DO UPDATE SET + daily_limit_usd = excluded.daily_limit_usd, + total_limit_usd = excluded.total_limit_usd, + reset_hour = excluded.reset_hour`, + ) + .run(groupJid, daily, total, hour); + } + + getBudgetState(groupJid: string): { + paused: boolean; + paused_at: number | null; + paused_reason: string | null; + } { + const row = this.db + .prepare( + `SELECT paused, paused_at, paused_reason FROM token_budget_state WHERE group_jid = ?`, + ) + .get(groupJid) as { + paused: number; + paused_at: number | null; + paused_reason: string | null; + } | undefined; + if (!row) return { paused: false, paused_at: null, paused_reason: null }; + return { + paused: row.paused === 1, + paused_at: row.paused_at, + paused_reason: row.paused_reason, + }; + } + + setBudgetPaused(groupJid: string, reason: 'daily_limit' | 'total_limit'): void { + this.db + .prepare( + `INSERT INTO token_budget_state (group_jid, paused, paused_at, paused_reason) + VALUES (?, 1, ?, ?) + ON CONFLICT(group_jid) DO UPDATE SET + paused = 1, + paused_at = excluded.paused_at, + paused_reason = excluded.paused_reason`, + ) + .run(groupJid, Date.now(), reason); + } + + clearBudgetPaused(groupJid: string): void { + this.db + .prepare( + `INSERT INTO token_budget_state (group_jid, paused, paused_at, paused_reason) + VALUES (?, 0, NULL, NULL) + ON CONFLICT(group_jid) DO UPDATE SET + paused = 0, + paused_at = NULL, + paused_reason = NULL`, + ) + .run(groupJid); + } + + /** Sum cost_usd for a group since the given epoch-ms timestamp. */ + getDailyUsageUsd(groupJid: string, periodStartMs: number): number { + const row = this.db + .prepare( + `SELECT COALESCE(SUM(cost_usd), 0) AS total FROM token_usage WHERE group_jid = ? AND ts >= ?`, + ) + .get(groupJid, periodStartMs) as { total: number }; + return row.total; + } + + /** Sum cost_usd for a group across all time. */ + getTotalUsageUsd(groupJid: string): number { + const row = this.db + .prepare( + `SELECT COALESCE(SUM(cost_usd), 0) AS total FROM token_usage WHERE group_jid = ?`, + ) + .get(groupJid) as { total: number }; + return row.total; + } } From 157521e0b5999fb5db4bc8cf81150b29fd009754 Mon Sep 17 00:00:00 2001 From: dorianzheng Date: Fri, 24 Apr 2026 07:20:55 +0800 Subject: [PATCH 2/2] style(budget): apply prettier formatting to budget enforcement files Co-Authored-By: Claude Sonnet 4.6 --- src/agent/budget-actions.ts | 4 +++- src/agent/message-processor.ts | 25 ++++++++++++++++++++----- src/db.ts | 33 +++++++++++++++++++++------------ 3 files changed, 44 insertions(+), 18 deletions(-) diff --git a/src/agent/budget-actions.ts b/src/agent/budget-actions.ts index 80a1aba6fe9..38113916580 100644 --- a/src/agent/budget-actions.ts +++ b/src/agent/budget-actions.ts @@ -49,7 +49,9 @@ export function registerBudgetActions(agent: Agent, db: AgentDb): void { .min(0) .max(23) .optional() - .describe('UTC hour at which the daily counter resets (0–23, default 0).'), + .describe( + 'UTC hour at which the daily counter resets (0–23, default 0).', + ), }, async (args) => { db.setBudgetConfig(args.group_jid, { diff --git a/src/agent/message-processor.ts b/src/agent/message-processor.ts index 5f3c39fa81a..0e30abf52fc 100644 --- a/src/agent/message-processor.ts +++ b/src/agent/message-processor.ts @@ -360,7 +360,10 @@ export class MessageProcessor { */ private async checkAndEnforceBudget(chatJid: string): Promise { const config = this.ctx.db.getBudgetConfig(chatJid); - if (!config || (config.daily_limit_usd == null && config.total_limit_usd == null)) { + if ( + !config || + (config.daily_limit_usd == null && config.total_limit_usd == null) + ) { return false; // no budget configured } @@ -400,10 +403,15 @@ export class MessageProcessor { const isDaily = config.daily_limit_usd != null && dailyCost >= config.daily_limit_usd; const limitType: 'daily' | 'total' = isDaily ? 'daily' : 'total'; - const limitUsd = isDaily ? config.daily_limit_usd! : config.total_limit_usd!; + const limitUsd = isDaily + ? config.daily_limit_usd! + : config.total_limit_usd!; const usedUsd = isDaily ? dailyCost : totalCost; - this.ctx.db.setBudgetPaused(chatJid, `${limitType}_limit` as 'daily_limit' | 'total_limit'); + this.ctx.db.setBudgetPaused( + chatJid, + `${limitType}_limit` as 'daily_limit' | 'total_limit', + ); this.ctx.emit('budget.exceeded', { agentId: this.ctx.id, jid: chatJid, @@ -437,7 +445,9 @@ export class MessageProcessor { if (warnDaily || warnTotal) { const isDaily = warnDaily; const limitType: 'daily' | 'total' = isDaily ? 'daily' : 'total'; - const limitUsd = isDaily ? config.daily_limit_usd! : config.total_limit_usd!; + const limitUsd = isDaily + ? config.daily_limit_usd! + : config.total_limit_usd!; const usedUsd = isDaily ? dailyCost : totalCost; const pctUsed = usedUsd / limitUsd; @@ -452,7 +462,12 @@ export class MessageProcessor { }); logger.info( - { chatJid, limitType, pctUsed: Math.round(pctUsed * 100), agent: this.ctx.name }, + { + chatJid, + limitType, + pctUsed: Math.round(pctUsed * 100), + agent: this.ctx.name, + }, 'Budget warning — approaching limit', ); } diff --git a/src/db.ts b/src/db.ts index e4ab3cf342e..9b9c4bd2c40 100644 --- a/src/db.ts +++ b/src/db.ts @@ -773,11 +773,13 @@ export class AgentDb { .prepare( `SELECT daily_limit_usd, total_limit_usd, reset_hour FROM token_budget_configs WHERE group_jid = ?`, ) - .get(groupJid) as { - daily_limit_usd: number | null; - total_limit_usd: number | null; - reset_hour: number; - } | undefined; + .get(groupJid) as + | { + daily_limit_usd: number | null; + total_limit_usd: number | null; + reset_hour: number; + } + | undefined; return row ?? null; } @@ -799,7 +801,9 @@ export class AgentDb { ? opts.total_limit_usd : (existing?.total_limit_usd ?? null); const hour = - opts.reset_hour !== undefined ? opts.reset_hour : (existing?.reset_hour ?? 0); + opts.reset_hour !== undefined + ? opts.reset_hour + : (existing?.reset_hour ?? 0); this.db .prepare( @@ -822,11 +826,13 @@ export class AgentDb { .prepare( `SELECT paused, paused_at, paused_reason FROM token_budget_state WHERE group_jid = ?`, ) - .get(groupJid) as { - paused: number; - paused_at: number | null; - paused_reason: string | null; - } | undefined; + .get(groupJid) as + | { + paused: number; + paused_at: number | null; + paused_reason: string | null; + } + | undefined; if (!row) return { paused: false, paused_at: null, paused_reason: null }; return { paused: row.paused === 1, @@ -835,7 +841,10 @@ export class AgentDb { }; } - setBudgetPaused(groupJid: string, reason: 'daily_limit' | 'total_limit'): void { + setBudgetPaused( + groupJid: string, + reason: 'daily_limit' | 'total_limit', + ): void { this.db .prepare( `INSERT INTO token_budget_state (group_jid, paused, paused_at, paused_reason)