Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions src/agent/agent-impl.ts
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ import { ChannelManager } from './channel-manager.js';
import { GroupManager } from './group-manager.js';
import { TaskManager } from './task-manager.js';
import { MessageProcessor } from './message-processor.js';
import { registerBudgetActions } from './budget-actions.js';

export { type Agent };

Expand Down Expand Up @@ -442,6 +443,7 @@ export class AgentImpl
dataDir: this.config.dataDir,
assistantName: this.config.assistantName,
});
registerBudgetActions(this, this.db);
logger.info({ agent: this.name }, 'Database initialized');
this.groupMgr.loadState();

Expand Down
121 changes: 121 additions & 0 deletions src/agent/budget-actions.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
import { z } from 'zod';

import type { Agent } from '../api/agent.js';
import type { AgentDb } from '../db.js';

/** Compute the start of the current daily budget period (epoch ms). */
function getDailyPeriodStart(resetHour: number): number {
const now = new Date();
const start = new Date(
Date.UTC(
now.getUTCFullYear(),
now.getUTCMonth(),
now.getUTCDate(),
resetHour,
0,
0,
0,
),
);
if (start.getTime() > now.getTime()) {
start.setUTCDate(start.getUTCDate() - 1);
}
return start.getTime();
}

export function registerBudgetActions(agent: Agent, db: AgentDb): void {
// ── budget_set ──────────────────────────────────────────────────

agent.action(
'budget_set',
'Configure the token spending budget for an agent group. Pass null to remove a limit.',
{
group_jid: z.string().describe('The group/chat JID to configure'),
daily_limit_usd: z
.number()
.positive()
.nullable()
.optional()
.describe('Daily spend limit in USD. null removes the daily limit.'),
total_limit_usd: z
.number()
.positive()
.nullable()
.optional()
.describe('All-time spend limit in USD. null removes the total limit.'),
reset_hour: z
.number()
.int()
.min(0)
.max(23)
.optional()
.describe(
'UTC hour at which the daily counter resets (0–23, default 0).',
),
},
async (args) => {
db.setBudgetConfig(args.group_jid, {
daily_limit_usd: args.daily_limit_usd,
total_limit_usd: args.total_limit_usd,
reset_hour: args.reset_hour,
});
return { ok: true };
},
);

// ── budget_get ──────────────────────────────────────────────────

agent.action(
'budget_get',
'Query the current budget configuration, pause state, and spending for an agent group.',
{
group_jid: z.string().describe('The group/chat JID to query'),
},
async (args) => {
const config = db.getBudgetConfig(args.group_jid);
const state = db.getBudgetState(args.group_jid);
const resetHour = config?.reset_hour ?? 0;
const periodStart = getDailyPeriodStart(resetHour);
const dailyCostUsd = db.getDailyUsageUsd(args.group_jid, periodStart);
const totalCostUsd = db.getTotalUsageUsd(args.group_jid);

const dailyPct =
config?.daily_limit_usd != null && config.daily_limit_usd > 0
? dailyCostUsd / config.daily_limit_usd
: null;
const totalPct =
config?.total_limit_usd != null && config.total_limit_usd > 0
? totalCostUsd / config.total_limit_usd
: null;

return {
config: {
daily_limit_usd: config?.daily_limit_usd ?? null,
total_limit_usd: config?.total_limit_usd ?? null,
reset_hour: resetHour,
},
state,
usage: {
daily_cost_usd: dailyCostUsd,
total_cost_usd: totalCostUsd,
daily_pct: dailyPct,
total_pct: totalPct,
},
};
},
);

// ── budget_resume ───────────────────────────────────────────────

agent.action(
'budget_resume',
'Clear the budget-exceeded pause for an agent group, allowing it to run again.',
{
group_jid: z.string().describe('The group/chat JID to resume'),
},
async (args) => {
db.clearBudgetPaused(args.group_jid);
return { ok: true };
},
);
}
148 changes: 148 additions & 0 deletions src/agent/message-processor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,26 @@ import { buildMcpRuntimeConfig } from './mcp-runtime.js';

export { buildMcpRuntimeConfig };

/** Compute the start of the current daily budget period (epoch ms). */
function getDailyPeriodStart(resetHour: number): number {
const now = new Date();
const start = new Date(
Date.UTC(
now.getUTCFullYear(),
now.getUTCMonth(),
now.getUTCDate(),
resetHour,
0,
0,
0,
),
);
if (start.getTime() > now.getTime()) {
start.setUTCDate(start.getUTCDate() - 1);
}
return start.getTime();
}

function hasWakeTrigger(
messages: Array<{ content: string; sender: string; is_from_me?: boolean }>,
chatJid: string,
Expand Down Expand Up @@ -142,6 +162,11 @@ export class MessageProcessor {
'Processing messages',
);

// ── Budget enforcement ───────────────────────���────────────────
const budgetBlocked = await this.checkAndEnforceBudget(chatJid);
if (budgetBlocked) return false;
// ─────────────────────────────────────────────────────────────

let idleTimer: ReturnType<typeof setTimeout> | null = null;
const resetIdleTimer = () => {
if (idleTimer) clearTimeout(idleTimer);
Expand Down Expand Up @@ -327,6 +352,129 @@ export class MessageProcessor {
return true;
}

/**
* Check whether this group's token budget is exceeded or warning-level.
* Returns true if the agent should be blocked (budget exceeded), false otherwise.
* Emits 'budget.exceeded' or 'budget.warning' events as appropriate.
* Sends a user-facing message when pausing.
*/
private async checkAndEnforceBudget(chatJid: string): Promise<boolean> {
const config = this.ctx.db.getBudgetConfig(chatJid);
if (
!config ||
(config.daily_limit_usd == null && config.total_limit_usd == null)
) {
return false; // no budget configured
}

const state = this.ctx.db.getBudgetState(chatJid);
const now = new Date().toISOString();

// If already paused, block immediately.
if (state.paused) {
this.ctx.emit('budget.exceeded', {
agentId: this.ctx.id,
jid: chatJid,
limitType: state.paused_reason === 'total_limit' ? 'total' : 'daily',
limitUsd:
state.paused_reason === 'total_limit'
? (config.total_limit_usd ?? 0)
: (config.daily_limit_usd ?? 0),
usedUsd: 0,
timestamp: now,
});
const channel = findChannel(this.channelMgr.channelArray, chatJid);
await channel?.sendMessage?.(
chatJid,
'Agent paused: token budget exceeded. Resume from Dune settings.',
);
return true;
}

const periodStart = getDailyPeriodStart(config.reset_hour);
const dailyCost = this.ctx.db.getDailyUsageUsd(chatJid, periodStart);
const totalCost = this.ctx.db.getTotalUsageUsd(chatJid);

// Check hard limits.
if (
(config.daily_limit_usd != null && dailyCost >= config.daily_limit_usd) ||
(config.total_limit_usd != null && totalCost >= config.total_limit_usd)
) {
const isDaily =
config.daily_limit_usd != null && dailyCost >= config.daily_limit_usd;
const limitType: 'daily' | 'total' = isDaily ? 'daily' : 'total';
const limitUsd = isDaily
? config.daily_limit_usd!
: config.total_limit_usd!;
const usedUsd = isDaily ? dailyCost : totalCost;

this.ctx.db.setBudgetPaused(
chatJid,
`${limitType}_limit` as 'daily_limit' | 'total_limit',
);
this.ctx.emit('budget.exceeded', {
agentId: this.ctx.id,
jid: chatJid,
limitType,
limitUsd,
usedUsd,
timestamp: now,
});

const channel = findChannel(this.channelMgr.channelArray, chatJid);
await channel?.sendMessage?.(
chatJid,
`Agent paused: ${limitType} token budget exceeded ($${usedUsd.toFixed(4)} of $${limitUsd.toFixed(2)} used). Resume from Dune settings.`,
);

logger.warn(
{ chatJid, limitType, limitUsd, usedUsd, agent: this.ctx.name },
'Budget exceeded — agent paused',
);
return true;
}

// Check warning threshold (80%).
const warnDaily =
config.daily_limit_usd != null &&
dailyCost >= 0.8 * config.daily_limit_usd;
const warnTotal =
config.total_limit_usd != null &&
totalCost >= 0.8 * config.total_limit_usd;

if (warnDaily || warnTotal) {
const isDaily = warnDaily;
const limitType: 'daily' | 'total' = isDaily ? 'daily' : 'total';
const limitUsd = isDaily
? config.daily_limit_usd!
: config.total_limit_usd!;
const usedUsd = isDaily ? dailyCost : totalCost;
const pctUsed = usedUsd / limitUsd;

this.ctx.emit('budget.warning', {
agentId: this.ctx.id,
jid: chatJid,
pctUsed,
limitType,
limitUsd,
usedUsd,
timestamp: now,
});

logger.info(
{
chatJid,
limitType,
pctUsed: Math.round(pctUsed * 100),
agent: this.ctx.name,
},
'Budget warning — approaching limit',
);
}

return false; // proceed normally
}

/** Execute agent in a container for the given group. */
async runAgent(
group: InternalRegisteredGroup,
Expand Down
34 changes: 34 additions & 0 deletions src/api/events.ts
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ export interface AgentEvents extends Record<string, any[]> {
'task.run.succeeded': [payload: TaskRunSucceededEvent];
'task.run.failed': [payload: TaskRunFailedEvent];
'task.run.skipped': [payload: TaskRunSkippedEvent];
'budget.exceeded': [payload: BudgetExceededEvent];
'budget.warning': [payload: BudgetWarningEvent];
started: [];
stopped: [];
}
Expand Down Expand Up @@ -302,3 +304,35 @@ export interface TaskRunSkippedEvent {
detail?: string;
timestamp: string;
}

// ── Budget events ─────────────────────────────────────────────────

/** Agent was paused because it exceeded its token budget. */
export interface BudgetExceededEvent {
agentId: string;
/** Group/chat JID that exceeded its budget. */
jid: string;
/** Which limit was hit. */
limitType: 'daily' | 'total';
/** The configured limit in USD. */
limitUsd: number;
/** Amount spent at time of enforcement. */
usedUsd: number;
timestamp: string;
}

/** Agent is approaching its token budget (≥80% used). */
export interface BudgetWarningEvent {
agentId: string;
/** Group/chat JID. */
jid: string;
/** Fraction of the limit used (e.g. 0.83 = 83%). */
pctUsed: number;
/** Which limit triggered the warning. */
limitType: 'daily' | 'total';
/** The configured limit in USD. */
limitUsd: number;
/** Amount spent so far. */
usedUsd: number;
timestamp: string;
}
Loading
Loading