Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -35,3 +35,9 @@ workspace/tmp/
tests/js/node_modules/
tests/js/package-lock.json
tests/js/*.log

# User direction submissions (runtime data, keep out of the repo)
research_agendas/inbox/*.yaml
research_agendas/inbox/*.yml
research_agendas/inbox/processed/
research_agendas/inbox/failed/
242 changes: 242 additions & 0 deletions agents/agenda_budget.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,242 @@
"""Per-agenda LLM token accounting and budget enforcement.

Each research agenda carries a token budget (research_agendas.token_budget,
falling back to config.AGENDA_TOKEN_BUDGET_DEFAULT). A budget of 0, NULL
(with default 0) or a negative value means "no cap": check_budget always
passes and the agenda is never flipped to 'paused_budget', but record_usage
still writes the agenda_token_ledger row and bumps token_spent, so accounting
stays complete either way. Work performed on behalf of an agenda runs inside
`agenda_scope(agenda_id, operation)`; the LLM client then:

1. calls `check_budget(agenda_id)` BEFORE issuing the provider request — if the
budget is exhausted the call fails with AgendaBudgetExceededError before any
tokens are spent, so the caller can stop cleanly without partial writes;
2. calls `record_usage(...)` after a successful response — one ledger row per
call plus an atomic increment of research_agendas.token_spent.

When token_spent crosses the budget the agenda is flipped to status
'paused_budget'. Raising the budget (token_spent < budget again) or calling
`resume_agenda` re-enables it.

Note: the check-then-record pair is not a distributed lock; concurrent calls
may overshoot the budget by roughly one call's worth of tokens. That is an
accepted tolerance for a soft cost cap.
"""

from __future__ import annotations

import contextvars
from contextlib import contextmanager
from typing import Any, Iterator

from config import AGENDA_TOKEN_BUDGET_DEFAULT
from db import database as db

AGENDA_STATUS_PAUSED_BUDGET = "paused_budget"
AGENDA_STATUS_ACTIVE = "active"

# (agenda_id, operation) for the work currently running in this context.
_scope_var: contextvars.ContextVar[tuple[int, str] | None] = contextvars.ContextVar(
"agenda_budget_scope", default=None
)


class AgendaBudgetExceededError(RuntimeError):
"""Raised before an LLM call when the agenda's token budget is spent."""

def __init__(self, agenda_id: int, token_spent: int, token_budget: int):
self.agenda_id = int(agenda_id)
self.token_spent = int(token_spent)
self.token_budget = int(token_budget)
super().__init__(
f"agenda {agenda_id} token budget exhausted "
f"({token_spent}/{token_budget} tokens); raise token_budget or call "
f"POST /api/research_agenda/{agenda_id}/resume to continue"
)


@contextmanager
def agenda_scope(agenda_id: int, operation: str = "llm_call") -> Iterator[None]:
"""Attribute all LLM calls inside the block to the given agenda."""
token = _scope_var.set((int(agenda_id), str(operation)))
try:
yield
finally:
_scope_var.reset(token)


def current_scope() -> tuple[int, str] | None:
"""Return (agenda_id, operation) for the current context, if any."""
return _scope_var.get()


def effective_budget(token_budget: Any) -> int:
"""Resolve a row's token_budget to the enforced value (NULL -> default).

The resolved value may be <= 0, which means no cap is enforced (usage is
still recorded by record_usage).
"""
if token_budget is None:
return int(AGENDA_TOKEN_BUDGET_DEFAULT)
return int(token_budget)


def get_budget_state(agenda_id: int) -> dict[str, Any] | None:
row = db.fetchone(
"SELECT id, token_budget, token_spent, status FROM research_agendas WHERE id=?",
(int(agenda_id),),
)
if not row:
return None
budget = effective_budget(row.get("token_budget"))
spent = int(row.get("token_spent") or 0)
return {
"agenda_id": int(row["id"]),
"token_budget": budget,
"token_budget_raw": row.get("token_budget"),
"token_spent": spent,
"status": str(row.get("status") or AGENDA_STATUS_ACTIVE),
"exhausted": budget > 0 and spent >= budget,
}


def check_budget(agenda_id: int) -> None:
"""Raise AgendaBudgetExceededError if the agenda may not spend more tokens.

A budget <= 0 (including the NULL -> default fallback) disables the cap:
the check always passes and the agenda is never paused for budget reasons,
while record_usage keeps writing the ledger and token_spent.

A 'paused_budget' agenda whose budget was raised in the meantime
(token_spent < budget again) is automatically reactivated, so increasing
the budget alone is enough to continue.
"""
state = get_budget_state(agenda_id)
if state is None:
# Unknown agenda: nothing to enforce. Scoped callers validate
# existence elsewhere; do not block the call on accounting state.
return
budget = state["token_budget"]
spent = state["token_spent"]
if budget <= 0: # explicit 0/negative budget disables the cap
return
if spent >= budget:
if state["status"] != AGENDA_STATUS_PAUSED_BUDGET:
_set_status(agenda_id, AGENDA_STATUS_PAUSED_BUDGET)
raise AgendaBudgetExceededError(agenda_id, spent, budget)
if state["status"] == AGENDA_STATUS_PAUSED_BUDGET:
# Budget was raised above current spend: unpause and continue.
_set_status(agenda_id, AGENDA_STATUS_ACTIVE)


def record_usage(
agenda_id: int,
operation: str,
tokens: int,
cost_usd: float | None = None,
) -> dict[str, Any] | None:
"""Append a ledger row and bump research_agendas.token_spent.

If the new total crosses the budget, the agenda is set to 'paused_budget'
so the next check_budget() stops further spending. Ledger insert, counter
update and the status flip commit together.
"""
tokens = int(tokens or 0)
agenda_id = int(agenda_id)
db.execute(
"""
INSERT INTO agenda_token_ledger (agenda_id, operation, tokens, cost_usd)
VALUES (?, ?, ?, ?)
""",
(agenda_id, str(operation or "llm_call"), tokens, cost_usd),
)
db.execute(
"UPDATE research_agendas SET token_spent = COALESCE(token_spent, 0) + ?, "
"updated_at=CURRENT_TIMESTAMP WHERE id=?",
(tokens, agenda_id),
)
state = get_budget_state(agenda_id)
if state and state["exhausted"] and state["status"] != AGENDA_STATUS_PAUSED_BUDGET:
db.execute(
"UPDATE research_agendas SET status=?, updated_at=CURRENT_TIMESTAMP WHERE id=?",
(AGENDA_STATUS_PAUSED_BUDGET, agenda_id),
)
state["status"] = AGENDA_STATUS_PAUSED_BUDGET
db.commit()
return state


def resume_agenda(agenda_id: int, *, token_budget: int | None = None) -> dict[str, Any] | None:
"""Reactivate a budget-paused agenda, optionally raising its budget."""
agenda_id = int(agenda_id)
if token_budget is not None:
db.execute(
"UPDATE research_agendas SET token_budget=?, updated_at=CURRENT_TIMESTAMP WHERE id=?",
(int(token_budget), agenda_id),
)
db.execute(
"UPDATE research_agendas SET status=?, updated_at=CURRENT_TIMESTAMP WHERE id=? AND status=?",
(AGENDA_STATUS_ACTIVE, agenda_id, AGENDA_STATUS_PAUSED_BUDGET),
)
db.commit()
return get_budget_state(agenda_id)


def _set_status(agenda_id: int, status: str) -> None:
db.execute(
"UPDATE research_agendas SET status=?, updated_at=CURRENT_TIMESTAMP WHERE id=?",
(status, int(agenda_id)),
)
db.commit()


def usage_summary() -> dict[str, Any]:
"""Aggregate the ledger per agenda + overall totals (local accounting view)."""
per_agenda = db.fetchall(
"""
SELECT ra.id AS agenda_id, ra.name, ra.status, ra.submitter,
ra.token_budget, ra.token_spent,
COALESCE(SUM(l.tokens), 0) AS ledger_tokens,
SUM(l.cost_usd) AS ledger_cost_usd,
COUNT(l.id) AS ledger_entries
FROM research_agendas ra
LEFT JOIN agenda_token_ledger l ON l.agenda_id = ra.id
GROUP BY ra.id, ra.name, ra.status, ra.submitter, ra.token_budget, ra.token_spent
ORDER BY ra.id
"""
)
rows = []
total_tokens = 0
total_cost = 0.0
has_cost = False
for r in per_agenda:
spent = int(r.get("token_spent") or 0)
budget = effective_budget(r.get("token_budget"))
ledger_tokens = int(r.get("ledger_tokens") or 0)
cost = r.get("ledger_cost_usd")
if cost is not None:
has_cost = True
total_cost += float(cost)
total_tokens += ledger_tokens
rows.append(
{
"agenda_id": r.get("agenda_id"),
"name": r.get("name"),
"status": r.get("status") or AGENDA_STATUS_ACTIVE,
"submitter": r.get("submitter"),
"token_budget": budget,
"token_spent": spent,
"ledger_tokens": ledger_tokens,
"ledger_cost_usd": cost,
"ledger_entries": int(r.get("ledger_entries") or 0),
"remaining": max(0, budget - spent) if budget > 0 else None,
}
)
return {
"agendas": rows,
"totals": {
"tokens": total_tokens,
"cost_usd": total_cost if has_cost else None,
"agenda_count": len(rows),
},
}
61 changes: 44 additions & 17 deletions agents/agenda_loader.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,13 @@
- save_agenda(agenda) -> int # insert; returns agenda_id
- update_agenda(agenda_id, agenda) -> None
- get_agenda(agenda_id) -> ResearchAgenda | None
- get_active_agenda() -> ResearchAgenda | None
- get_active_agenda() -> ResearchAgenda | None # newest active (several may be active)
- list_agendas(*, only_active=False) -> list[ResearchAgenda]
- set_active_agenda(agenda_id) -> None # exclusive active flag
- set_active_agenda(agenda_id) -> None # mark one agenda active

Multiple agendas may be active at the same time; callers that operate on a
specific agenda should pass agenda_id explicitly. get_active_agenda() is kept
as a convenience for single-agenda deployments and returns the newest active row.
"""

from __future__ import annotations
Expand Down Expand Up @@ -54,6 +58,10 @@ def _decode(field_name: str, default: Any) -> Any:
required_output=ensure_dict(_decode("required_output_json", {})),
raw_config=ensure_dict(raw_config_obj),
is_active=bool(row.get("is_active", 1)),
submitter=str(row.get("submitter") or ""),
token_budget=row.get("token_budget"),
token_spent=int(row.get("token_spent") or 0),
status=str(row.get("status") or "active"),
)
agenda.validate()
return agenda
Expand All @@ -75,6 +83,10 @@ def parse_agenda(payload: Mapping[str, Any], *, agenda_id: int | None = None) ->
required_output=ensure_dict(payload.get("required_output") or {}),
raw_config=dict(payload),
is_active=bool(payload.get("is_active", True)),
submitter=str(payload.get("submitter") or "").strip(),
token_budget=payload.get("token_budget"),
token_spent=int(payload.get("token_spent") or 0),
status=str(payload.get("status") or "active"),
)
agenda.validate()
return agenda
Expand All @@ -99,14 +111,19 @@ def load_agenda_from_file(path: str | Path) -> ResearchAgenda:


def save_agenda(agenda: ResearchAgenda) -> int:
"""Insert a new agenda. Returns the new agenda_id."""
"""Insert a new agenda. Returns the new agenda_id.

Does not deactivate other agendas: several agendas may run concurrently,
each isolated by agenda_id and its own token budget.
"""
agenda.validate()
new_id = db.insert_returning_id(
"""
INSERT INTO research_agendas
(version, name, description, focus_json, prefer_json, reject_json,
required_output_json, raw_config_json, is_active)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)
required_output_json, raw_config_json, is_active,
submitter, token_budget, token_spent, status)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
RETURNING id
""",
(
Expand All @@ -119,10 +136,12 @@ def save_agenda(agenda: ResearchAgenda) -> int:
json.dumps(agenda.required_output, ensure_ascii=False),
json.dumps(agenda.raw_config, ensure_ascii=False),
1 if agenda.is_active else 0,
agenda.submitter or None,
agenda.token_budget,
int(agenda.token_spent or 0),
agenda.status,
),
)
if agenda.is_active:
set_active_agenda(new_id)
db.commit()
agenda.agenda_id = new_id
return new_id
Expand All @@ -136,7 +155,8 @@ def update_agenda(agenda_id: int, agenda: ResearchAgenda) -> None:
UPDATE research_agendas
SET version=?, name=?, description=?, focus_json=?, prefer_json=?,
reject_json=?, required_output_json=?, raw_config_json=?,
is_active=?, updated_at=CURRENT_TIMESTAMP
is_active=?, submitter=?, token_budget=?, status=?,
updated_at=CURRENT_TIMESTAMP
WHERE id=?
""",
(
Expand All @@ -149,11 +169,12 @@ def update_agenda(agenda_id: int, agenda: ResearchAgenda) -> None:
json.dumps(agenda.required_output, ensure_ascii=False),
json.dumps(agenda.raw_config, ensure_ascii=False),
1 if agenda.is_active else 0,
agenda.submitter or None,
agenda.token_budget,
agenda.status,
agenda_id,
),
)
if agenda.is_active:
set_active_agenda(agenda_id)
db.commit()


Expand All @@ -168,8 +189,13 @@ def get_agenda(agenda_id: int) -> ResearchAgenda | None:


def get_active_agenda() -> ResearchAgenda | None:
"""Return the newest active agenda.

Several agendas may be active at once; this helper exists for
single-agenda deployments and callers without an explicit agenda_id.
"""
row = db.fetchone(
"SELECT * FROM research_agendas WHERE is_active=1 ORDER BY created_at DESC LIMIT 1",
"SELECT * FROM research_agendas WHERE is_active=1 ORDER BY created_at DESC, id DESC LIMIT 1",
(),
)
if not row:
Expand All @@ -192,13 +218,14 @@ def list_agendas(*, only_active: bool = False) -> list[ResearchAgenda]:


def set_active_agenda(agenda_id: int) -> None:
"""Mark a single agenda active; clear is_active on all others."""
db.execute(
"UPDATE research_agendas SET is_active=0 WHERE id<>?",
(agenda_id,),
)
"""Mark an agenda active.

Historically this cleared is_active on every other agenda (single-active
model). Agendas are now isolated per agenda_id, so activating one no
longer deactivates the rest.
"""
db.execute(
"UPDATE research_agendas SET is_active=1 WHERE id=?",
"UPDATE research_agendas SET is_active=1, updated_at=CURRENT_TIMESTAMP WHERE id=?",
(agenda_id,),
)
db.commit()
Loading
Loading