diff --git a/src/agent/agent-impl.ts b/src/agent/agent-impl.ts index 00c40ae67f2..80fa8d83170 100644 --- a/src/agent/agent-impl.ts +++ b/src/agent/agent-impl.ts @@ -61,6 +61,7 @@ import { ChannelManager } from './channel-manager.js'; import { GroupManager } from './group-manager.js'; import { TaskManager } from './task-manager.js'; import { MessageProcessor } from './message-processor.js'; +import { registerBudgetActions } from './budget-actions.js'; export { type Agent }; @@ -442,6 +443,7 @@ export class AgentImpl dataDir: this.config.dataDir, assistantName: this.config.assistantName, }); + registerBudgetActions(this, this.db); logger.info({ agent: this.name }, 'Database initialized'); this.groupMgr.loadState(); diff --git a/src/agent/budget-actions.ts b/src/agent/budget-actions.ts new file mode 100644 index 00000000000..38113916580 --- /dev/null +++ b/src/agent/budget-actions.ts @@ -0,0 +1,121 @@ +import { z } from 'zod'; + +import type { Agent } from '../api/agent.js'; +import type { AgentDb } from '../db.js'; + +/** Compute the start of the current daily budget period (epoch ms). */ +function getDailyPeriodStart(resetHour: number): number { + const now = new Date(); + const start = new Date( + Date.UTC( + now.getUTCFullYear(), + now.getUTCMonth(), + now.getUTCDate(), + resetHour, + 0, + 0, + 0, + ), + ); + if (start.getTime() > now.getTime()) { + start.setUTCDate(start.getUTCDate() - 1); + } + return start.getTime(); +} + +export function registerBudgetActions(agent: Agent, db: AgentDb): void { + // ── budget_set ────────────────────────────────────────────────── + + agent.action( + 'budget_set', + 'Configure the token spending budget for an agent group. Pass null to remove a limit.', + { + group_jid: z.string().describe('The group/chat JID to configure'), + daily_limit_usd: z + .number() + .positive() + .nullable() + .optional() + .describe('Daily spend limit in USD. null removes the daily limit.'), + total_limit_usd: z + .number() + .positive() + .nullable() + .optional() + .describe('All-time spend limit in USD. null removes the total limit.'), + reset_hour: z + .number() + .int() + .min(0) + .max(23) + .optional() + .describe( + 'UTC hour at which the daily counter resets (0–23, default 0).', + ), + }, + async (args) => { + db.setBudgetConfig(args.group_jid, { + daily_limit_usd: args.daily_limit_usd, + total_limit_usd: args.total_limit_usd, + reset_hour: args.reset_hour, + }); + return { ok: true }; + }, + ); + + // ── budget_get ────────────────────────────────────────────────── + + agent.action( + 'budget_get', + 'Query the current budget configuration, pause state, and spending for an agent group.', + { + group_jid: z.string().describe('The group/chat JID to query'), + }, + async (args) => { + const config = db.getBudgetConfig(args.group_jid); + const state = db.getBudgetState(args.group_jid); + const resetHour = config?.reset_hour ?? 0; + const periodStart = getDailyPeriodStart(resetHour); + const dailyCostUsd = db.getDailyUsageUsd(args.group_jid, periodStart); + const totalCostUsd = db.getTotalUsageUsd(args.group_jid); + + const dailyPct = + config?.daily_limit_usd != null && config.daily_limit_usd > 0 + ? dailyCostUsd / config.daily_limit_usd + : null; + const totalPct = + config?.total_limit_usd != null && config.total_limit_usd > 0 + ? totalCostUsd / config.total_limit_usd + : null; + + return { + config: { + daily_limit_usd: config?.daily_limit_usd ?? null, + total_limit_usd: config?.total_limit_usd ?? null, + reset_hour: resetHour, + }, + state, + usage: { + daily_cost_usd: dailyCostUsd, + total_cost_usd: totalCostUsd, + daily_pct: dailyPct, + total_pct: totalPct, + }, + }; + }, + ); + + // ── budget_resume ─────────────────────────────────────────────── + + agent.action( + 'budget_resume', + 'Clear the budget-exceeded pause for an agent group, allowing it to run again.', + { + group_jid: z.string().describe('The group/chat JID to resume'), + }, + async (args) => { + db.clearBudgetPaused(args.group_jid); + return { ok: true }; + }, + ); +} diff --git a/src/agent/message-processor.ts b/src/agent/message-processor.ts index bf9ceb8ca04..0e30abf52fc 100644 --- a/src/agent/message-processor.ts +++ b/src/agent/message-processor.ts @@ -25,6 +25,26 @@ import { buildMcpRuntimeConfig } from './mcp-runtime.js'; export { buildMcpRuntimeConfig }; +/** Compute the start of the current daily budget period (epoch ms). */ +function getDailyPeriodStart(resetHour: number): number { + const now = new Date(); + const start = new Date( + Date.UTC( + now.getUTCFullYear(), + now.getUTCMonth(), + now.getUTCDate(), + resetHour, + 0, + 0, + 0, + ), + ); + if (start.getTime() > now.getTime()) { + start.setUTCDate(start.getUTCDate() - 1); + } + return start.getTime(); +} + function hasWakeTrigger( messages: Array<{ content: string; sender: string; is_from_me?: boolean }>, chatJid: string, @@ -142,6 +162,11 @@ export class MessageProcessor { 'Processing messages', ); + // ── Budget enforcement ───────────────────────���──────────────── + const budgetBlocked = await this.checkAndEnforceBudget(chatJid); + if (budgetBlocked) return false; + // ───────────────────────────────────────────────────────────── + let idleTimer: ReturnType | null = null; const resetIdleTimer = () => { if (idleTimer) clearTimeout(idleTimer); @@ -327,6 +352,129 @@ export class MessageProcessor { return true; } + /** + * Check whether this group's token budget is exceeded or warning-level. + * Returns true if the agent should be blocked (budget exceeded), false otherwise. + * Emits 'budget.exceeded' or 'budget.warning' events as appropriate. + * Sends a user-facing message when pausing. + */ + private async checkAndEnforceBudget(chatJid: string): Promise { + const config = this.ctx.db.getBudgetConfig(chatJid); + if ( + !config || + (config.daily_limit_usd == null && config.total_limit_usd == null) + ) { + return false; // no budget configured + } + + const state = this.ctx.db.getBudgetState(chatJid); + const now = new Date().toISOString(); + + // If already paused, block immediately. + if (state.paused) { + this.ctx.emit('budget.exceeded', { + agentId: this.ctx.id, + jid: chatJid, + limitType: state.paused_reason === 'total_limit' ? 'total' : 'daily', + limitUsd: + state.paused_reason === 'total_limit' + ? (config.total_limit_usd ?? 0) + : (config.daily_limit_usd ?? 0), + usedUsd: 0, + timestamp: now, + }); + const channel = findChannel(this.channelMgr.channelArray, chatJid); + await channel?.sendMessage?.( + chatJid, + 'Agent paused: token budget exceeded. Resume from Dune settings.', + ); + return true; + } + + const periodStart = getDailyPeriodStart(config.reset_hour); + const dailyCost = this.ctx.db.getDailyUsageUsd(chatJid, periodStart); + const totalCost = this.ctx.db.getTotalUsageUsd(chatJid); + + // Check hard limits. + if ( + (config.daily_limit_usd != null && dailyCost >= config.daily_limit_usd) || + (config.total_limit_usd != null && totalCost >= config.total_limit_usd) + ) { + const isDaily = + config.daily_limit_usd != null && dailyCost >= config.daily_limit_usd; + const limitType: 'daily' | 'total' = isDaily ? 'daily' : 'total'; + const limitUsd = isDaily + ? config.daily_limit_usd! + : config.total_limit_usd!; + const usedUsd = isDaily ? dailyCost : totalCost; + + this.ctx.db.setBudgetPaused( + chatJid, + `${limitType}_limit` as 'daily_limit' | 'total_limit', + ); + this.ctx.emit('budget.exceeded', { + agentId: this.ctx.id, + jid: chatJid, + limitType, + limitUsd, + usedUsd, + timestamp: now, + }); + + const channel = findChannel(this.channelMgr.channelArray, chatJid); + await channel?.sendMessage?.( + chatJid, + `Agent paused: ${limitType} token budget exceeded ($${usedUsd.toFixed(4)} of $${limitUsd.toFixed(2)} used). Resume from Dune settings.`, + ); + + logger.warn( + { chatJid, limitType, limitUsd, usedUsd, agent: this.ctx.name }, + 'Budget exceeded — agent paused', + ); + return true; + } + + // Check warning threshold (80%). + const warnDaily = + config.daily_limit_usd != null && + dailyCost >= 0.8 * config.daily_limit_usd; + const warnTotal = + config.total_limit_usd != null && + totalCost >= 0.8 * config.total_limit_usd; + + if (warnDaily || warnTotal) { + const isDaily = warnDaily; + const limitType: 'daily' | 'total' = isDaily ? 'daily' : 'total'; + const limitUsd = isDaily + ? config.daily_limit_usd! + : config.total_limit_usd!; + const usedUsd = isDaily ? dailyCost : totalCost; + const pctUsed = usedUsd / limitUsd; + + this.ctx.emit('budget.warning', { + agentId: this.ctx.id, + jid: chatJid, + pctUsed, + limitType, + limitUsd, + usedUsd, + timestamp: now, + }); + + logger.info( + { + chatJid, + limitType, + pctUsed: Math.round(pctUsed * 100), + agent: this.ctx.name, + }, + 'Budget warning — approaching limit', + ); + } + + return false; // proceed normally + } + /** Execute agent in a container for the given group. */ async runAgent( group: InternalRegisteredGroup, diff --git a/src/api/events.ts b/src/api/events.ts index 701b7e8dd87..78d1b2fc617 100644 --- a/src/api/events.ts +++ b/src/api/events.ts @@ -29,6 +29,8 @@ export interface AgentEvents extends Record { 'task.run.succeeded': [payload: TaskRunSucceededEvent]; 'task.run.failed': [payload: TaskRunFailedEvent]; 'task.run.skipped': [payload: TaskRunSkippedEvent]; + 'budget.exceeded': [payload: BudgetExceededEvent]; + 'budget.warning': [payload: BudgetWarningEvent]; started: []; stopped: []; } @@ -302,3 +304,35 @@ export interface TaskRunSkippedEvent { detail?: string; timestamp: string; } + +// ── Budget events ───────────────────────────────────────────────── + +/** Agent was paused because it exceeded its token budget. */ +export interface BudgetExceededEvent { + agentId: string; + /** Group/chat JID that exceeded its budget. */ + jid: string; + /** Which limit was hit. */ + limitType: 'daily' | 'total'; + /** The configured limit in USD. */ + limitUsd: number; + /** Amount spent at time of enforcement. */ + usedUsd: number; + timestamp: string; +} + +/** Agent is approaching its token budget (≥80% used). */ +export interface BudgetWarningEvent { + agentId: string; + /** Group/chat JID. */ + jid: string; + /** Fraction of the limit used (e.g. 0.83 = 83%). */ + pctUsed: number; + /** Which limit triggered the warning. */ + limitType: 'daily' | 'total'; + /** The configured limit in USD. */ + limitUsd: number; + /** Amount spent so far. */ + usedUsd: number; + timestamp: string; +} diff --git a/src/db.ts b/src/db.ts index 5965c9cd244..9b9c4bd2c40 100644 --- a/src/db.ts +++ b/src/db.ts @@ -82,6 +82,32 @@ export function createSchema( container_config TEXT, requires_trigger INTEGER DEFAULT 1 ); + CREATE TABLE IF NOT EXISTS token_usage ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + group_jid TEXT NOT NULL, + session_id TEXT, + model TEXT NOT NULL, + prompt_tokens INTEGER NOT NULL DEFAULT 0, + completion_tokens INTEGER NOT NULL DEFAULT 0, + total_tokens INTEGER NOT NULL DEFAULT 0, + cache_read_tokens INTEGER NOT NULL DEFAULT 0, + cache_write_tokens INTEGER NOT NULL DEFAULT 0, + cost_usd REAL, + latency_ms INTEGER, + ts INTEGER NOT NULL + ); + CREATE TABLE IF NOT EXISTS token_budget_configs ( + group_jid TEXT PRIMARY KEY, + daily_limit_usd REAL, + total_limit_usd REAL, + reset_hour INTEGER NOT NULL DEFAULT 0 + ); + CREATE TABLE IF NOT EXISTS token_budget_state ( + group_jid TEXT PRIMARY KEY, + paused INTEGER NOT NULL DEFAULT 0, + paused_at INTEGER, + paused_reason TEXT + ); `); @@ -142,6 +168,11 @@ export function createSchema( } catch { /* columns already exist */ } + + database.exec(` + CREATE INDEX IF NOT EXISTS idx_token_usage_group_jid ON token_usage(group_jid); + CREATE INDEX IF NOT EXISTS idx_token_usage_ts ON token_usage(ts); + `); } export function initDatabase(opts: { @@ -730,4 +761,132 @@ export class AgentDb { } } } + + // ─── Token budget helpers ───────────────────────────────────────── + + getBudgetConfig(groupJid: string): { + daily_limit_usd: number | null; + total_limit_usd: number | null; + reset_hour: number; + } | null { + const row = this.db + .prepare( + `SELECT daily_limit_usd, total_limit_usd, reset_hour FROM token_budget_configs WHERE group_jid = ?`, + ) + .get(groupJid) as + | { + daily_limit_usd: number | null; + total_limit_usd: number | null; + reset_hour: number; + } + | undefined; + return row ?? null; + } + + setBudgetConfig( + groupJid: string, + opts: { + daily_limit_usd?: number | null; + total_limit_usd?: number | null; + reset_hour?: number; + }, + ): void { + const existing = this.getBudgetConfig(groupJid); + const daily = + opts.daily_limit_usd !== undefined + ? opts.daily_limit_usd + : (existing?.daily_limit_usd ?? null); + const total = + opts.total_limit_usd !== undefined + ? opts.total_limit_usd + : (existing?.total_limit_usd ?? null); + const hour = + opts.reset_hour !== undefined + ? opts.reset_hour + : (existing?.reset_hour ?? 0); + + this.db + .prepare( + `INSERT INTO token_budget_configs (group_jid, daily_limit_usd, total_limit_usd, reset_hour) + VALUES (?, ?, ?, ?) + ON CONFLICT(group_jid) DO UPDATE SET + daily_limit_usd = excluded.daily_limit_usd, + total_limit_usd = excluded.total_limit_usd, + reset_hour = excluded.reset_hour`, + ) + .run(groupJid, daily, total, hour); + } + + getBudgetState(groupJid: string): { + paused: boolean; + paused_at: number | null; + paused_reason: string | null; + } { + const row = this.db + .prepare( + `SELECT paused, paused_at, paused_reason FROM token_budget_state WHERE group_jid = ?`, + ) + .get(groupJid) as + | { + paused: number; + paused_at: number | null; + paused_reason: string | null; + } + | undefined; + if (!row) return { paused: false, paused_at: null, paused_reason: null }; + return { + paused: row.paused === 1, + paused_at: row.paused_at, + paused_reason: row.paused_reason, + }; + } + + setBudgetPaused( + groupJid: string, + reason: 'daily_limit' | 'total_limit', + ): void { + this.db + .prepare( + `INSERT INTO token_budget_state (group_jid, paused, paused_at, paused_reason) + VALUES (?, 1, ?, ?) + ON CONFLICT(group_jid) DO UPDATE SET + paused = 1, + paused_at = excluded.paused_at, + paused_reason = excluded.paused_reason`, + ) + .run(groupJid, Date.now(), reason); + } + + clearBudgetPaused(groupJid: string): void { + this.db + .prepare( + `INSERT INTO token_budget_state (group_jid, paused, paused_at, paused_reason) + VALUES (?, 0, NULL, NULL) + ON CONFLICT(group_jid) DO UPDATE SET + paused = 0, + paused_at = NULL, + paused_reason = NULL`, + ) + .run(groupJid); + } + + /** Sum cost_usd for a group since the given epoch-ms timestamp. */ + getDailyUsageUsd(groupJid: string, periodStartMs: number): number { + const row = this.db + .prepare( + `SELECT COALESCE(SUM(cost_usd), 0) AS total FROM token_usage WHERE group_jid = ? AND ts >= ?`, + ) + .get(groupJid, periodStartMs) as { total: number }; + return row.total; + } + + /** Sum cost_usd for a group across all time. */ + getTotalUsageUsd(groupJid: string): number { + const row = this.db + .prepare( + `SELECT COALESCE(SUM(cost_usd), 0) AS total FROM token_usage WHERE group_jid = ?`, + ) + .get(groupJid) as { total: number }; + return row.total; + } }