From 29c821c80bcaa3f005e3c0dc85040f68c1414f84 Mon Sep 17 00:00:00 2001 From: Contentrain Date: Mon, 18 May 2026 13:08:54 +0300 Subject: [PATCH] feat(chat): durable internal tool trace persistence MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Before this PR, the chat loop stored exactly two rows per POST: the user prompt and the final assistant message (collapsed text + the final iteration's tool_calls jsonb). Intermediate assistant turns were dropped on the floor and tool_result blocks were never persisted — the engine streamed them to Anthropic, fed them back through the in-memory `config.messages` array, then forgot them. Resume reads couldn't reconstruct the Anthropic protocol shape Claude had seen on the prior turn, so multi-iteration conversations effectively lost their tool history. This PR persists the full Anthropic-protocol trace under a single `turn_id` per POST while keeping the user-facing transcript clean. Schema (migration 009): ALTER TABLE messages ADD content_blocks jsonb NULL -- structured Anthropic blocks ADD turn_id uuid NOT NULL -- groups all rows from one POST ADD turn_sequence smallint NOT NULL -- deterministic order in batch ADD iteration smallint NULL -- engine iteration counter ADD internal boolean NOT NULL DEFAULT false RLS rewrite: SELECT internal = false AND user owns conversation INSERT internal = false AND user owns conversation Service-role queries bypass RLS and read/write the full trace — this is a defense-in-depth boundary so a stray client query OR a buggy route forgetting to pass `includeInternal: false` cannot leak internal trace rows. Two indexes: one ordering by (conversation, created_at, turn_sequence) for the resume path, one partial on `internal = false` for the public transcript hot path. Persistence shape per POST: - 1 seed user row internal=false, iteration=NULL, turn_sequence=0 - N assistant rows internal=true for intermediate, false for final - 0..N tool_result rows internal=true, role='user' All share one `turn_id`; `turn_sequence` increments per row. Token columns land only on the final visible assistant row — Anthropic returns usage as a per-call total, not per-iteration. Engine: - `runConversationLoop` accumulates an `IterationTrace[]` and surfaces it on the `done` event. The in-memory `config.messages` push for the next AI call is unchanged. Persistence helpers (`saveChatResult` / `saveApiChatResult`): - Object-form args. - One `randomUUID()` per POST allocated as `turnId`. - `buildTraceRows` composes the row list deterministically and `db.insertMessages` writes them as a single batched INSERT — one round-trip instead of N, atomic at the batch level. Provider surface: - `MessageInsertInput` exposes the new columns + `internal`. - `insertMessages(rows[])` is the batch path; `insertMessage` stays for the rare one-row sites. - `loadConversationMessages(..., { includeInternal })` — defaults to false; the chat / Conversation API resume paths set true. Turn-safe history budget (`buildPromptMessages`): The single most protocol-critical change in this PR. The old row-level cutoff could drop an assistant `tool_use` while keeping its matching `tool_result` (or vice-versa), and Anthropic rejects that — orphaned tool_use blocks invalidate the request, orphaned tool_result blocks silently drift the conversation. The walker now: 1. Groups rows by `turn_id` preserving DB order. 2. Walks groups newest → oldest summing per-group estimates. 3. Drops ENTIRE turns at the budget boundary, never half. 4. If the DB row limit truncated mid-turn (rare), drops the leading partial group so a turn never starts with a tool_result missing its tool_use. Legacy rows without `turn_id` fall back to single-row "turns" through the helper's null-handling — protocol-safe by definition since the legacy path never persisted tool blocks. Read priority in `extractContent`: content_blocks (post-009) → tool_calls (legacy) → content (text) Public transcript routes (Studio `/messages.get`, EE `/history.get`) use the provider default `includeInternal: false`; internal rows stay hidden at the DB layer (RLS) AND at the provider layer. Quota unchanged: `agent_usage.message_count` and `api_message_usage.message_count` increment once per POST via the existing reservation step. Multiple persisted rows do NOT bill the user multiple messages — cache and trace persistence are Contentrain-side observability, not customer-facing meters. Tests: - db: trace shape (seed user + assistant + tool_result rows under one turn_id), single batched insertMessages call, intermediate rows internal=true / final internal=false, cache token landing, Conversation API symmetric path. - conversation-history: content_blocks priority over legacy columns; turn-grouped budget keeps whole multi-row turns together (assistant tool_use + matching tool_result); whole older turns dropped when budget overflows — never half. - chat-route integration: resume path explicitly passes includeInternal: true; saveChatResult receives iterations array instead of the old assistantText/assistantContent pair. Out of scope: - Backfill of pre-009 rows. Pre-launch system, no real data. Legacy rows get distinct turn_id defaults via gen_random_uuid() and live as single-row turns under the new walker. - block_kind discriminator column. Discriminator lives inside each block's `type`; query patterns can grow expression indexes when there's a real need. - Per-iteration token breakdown. Provider returns per-call totals only. - UI rendering of tool_use chips. Data is there; UI can adapt at its own pace. --- ee/enterprise/conversation-api.ts | 12 +- .../projects/[projectId]/chat.post.ts | 27 ++- server/providers/database.ts | 82 +++++++-- server/providers/supabase-db/conversations.ts | 60 ++++-- server/utils/conversation-engine.ts | 31 +++- server/utils/conversation-history.ts | 102 +++++++++-- server/utils/db.ts | 172 ++++++++++++++---- .../009_message_trace_persistence.sql | 109 +++++++++++ .../chat-route.integration.test.ts | 17 +- tests/unit/conversation-history.test.ts | 103 +++++++++++ tests/unit/db.test.ts | 106 +++++++---- 11 files changed, 684 insertions(+), 137 deletions(-) create mode 100644 supabase/migrations/009_message_trace_persistence.sql diff --git a/ee/enterprise/conversation-api.ts b/ee/enterprise/conversation-api.ts index ce8ab31..aadbe6f 100644 --- a/ee/enterprise/conversation-api.ts +++ b/ee/enterprise/conversation-api.ts @@ -297,10 +297,14 @@ async function runConversationMessage( const model = keyData.aiModel const budget = selectHistoryBudget({ plan, model, source: 'api' }) + // Same resume contract as the Studio path — load the full + // trace (intermediate assistant + tool_result rows) so the + // prompt builder can reconstruct Anthropic protocol shape. const historyRows = await db.loadConversationMessages( conversationId, budget.rowLimit, - 'role, content, tool_calls', + undefined, + { includeInternal: true }, ) const messages = buildPromptMessages({ history: historyRows ?? [], @@ -319,6 +323,7 @@ async function runConversationMessage( let totalCacheCreationInputTokens = 0 let totalCacheReadInputTokens = 0 let lastAssistantContent: AIContentBlock[] = [] + let iterations: Array<{ iteration: number, assistantBlocks: AIContentBlock[], toolResultBlocks: AIContentBlock[] }> = [] for await (const evt of runConversationLoop( { model, apiKey, systemPrompt, messages, tools: aiTools }, @@ -359,6 +364,7 @@ async function runConversationMessage( totalCacheCreationInputTokens = u?.cacheCreationInputTokens ?? 0 totalCacheReadInputTokens = u?.cacheReadInputTokens ?? 0 lastAssistantContent = (evt.lastContent as AIContentBlock[]) ?? [] + iterations = (evt.iterations as typeof iterations) ?? [] break } } @@ -367,8 +373,8 @@ async function runConversationMessage( await saveApiChatResult({ conversationId, userMessage: body.message, - assistantText: responseText, - assistantContent: lastAssistantContent, + iterations, + lastAssistantContent, model, inputTokens: totalInputTokens, outputTokens: totalOutputTokens, diff --git a/server/api/workspaces/[workspaceId]/projects/[projectId]/chat.post.ts b/server/api/workspaces/[workspaceId]/projects/[projectId]/chat.post.ts index 4b70fbd..34f3788 100644 --- a/server/api/workspaces/[workspaceId]/projects/[projectId]/chat.post.ts +++ b/server/api/workspaces/[workspaceId]/projects/[projectId]/chat.post.ts @@ -153,7 +153,15 @@ export default defineEventHandler(async (event) => { // === HISTORY === const budget = selectHistoryBudget({ plan, model, source: usageSource }) - const historyRows = await db.loadConversationMessages(conversationId, budget.rowLimit) + // Resume reads need the full Anthropic-protocol trace, including + // intermediate assistant turns and tool_result blocks — those rows + // are RLS-hidden from end users but allowed for service-role reads. + const historyRows = await db.loadConversationMessages( + conversationId, + budget.rowLimit, + undefined, + { includeInternal: true }, + ) const messages = buildPromptMessages({ history: historyRows ?? [], newUserMessage: body.message, @@ -232,6 +240,7 @@ export default defineEventHandler(async (event) => { let totalCacheCreationInputTokens = 0 let totalCacheReadInputTokens = 0 let lastAssistantContent: AIContentBlock[] = [] + let iterations: Array<{ iteration: number, assistantBlocks: AIContentBlock[], toolResultBlocks: AIContentBlock[] }> = [] try { for await (const evt of runConversationLoop( @@ -263,6 +272,7 @@ export default defineEventHandler(async (event) => { totalCacheCreationInputTokens = u?.cacheCreationInputTokens ?? 0 totalCacheReadInputTokens = u?.cacheReadInputTokens ?? 0 lastAssistantContent = (evt.lastContent as AIContentBlock[]) ?? [] + iterations = (evt.iterations as typeof iterations) ?? [] // Forward the done event without lastContent (not needed by client) await eventStream.push(JSON.stringify({ @@ -277,16 +287,17 @@ export default defineEventHandler(async (event) => { } // === SAVE TO DB === - const assistantText = lastAssistantContent - .filter(b => b.type === 'text') - .map(b => (b as { text: string }).text) - .join('') - + // `saveChatResult` writes the full iteration trace as a + // single batched INSERT — seed user row, every assistant + // iteration, every tool_result iteration — under one + // `turn_id`. Intermediate rows land with `internal=true` + // (RLS-hidden from the user transcript); only the final + // assistant row stays visible. await saveChatResult({ conversationId, userMessage: body.message, - assistantText, - assistantContent: lastAssistantContent, + iterations, + lastAssistantContent, model, inputTokens: totalInputTokens, outputTokens: totalOutputTokens, diff --git a/server/providers/database.ts b/server/providers/database.ts index 4f85c80..42a2c42 100644 --- a/server/providers/database.ts +++ b/server/providers/database.ts @@ -48,6 +48,42 @@ export interface PaginationOptions { limit?: number } +/** + * Shape of one row in the `messages` table for write operations. + * + * `content_blocks` is the structured Anthropic-protocol payload + * (text + tool_use + tool_result discriminated array) and takes + * precedence over the legacy `content` + `toolCalls` pair on read. + * + * `turnId` groups all rows produced by a single chat POST — seed + * user row, every assistant iteration, every tool_result iteration. + * `turnSequence` orders rows within the turn deterministically; + * `iteration` is the engine's per-turn iteration counter (NULL for + * the seed user row). + * + * `internal=true` rows are part of the protocol-replay trace but + * MUST NOT appear in the user-facing transcript — RLS enforces this + * for client-side queries; the resume path uses the service role to + * read across the boundary. + */ +export interface MessageInsertInput { + conversationId: string + role: 'user' | 'assistant' + content: string + contentBlocks?: unknown[] | null + /** Legacy column kept for backward compat with old single-row writes. */ + toolCalls?: unknown[] | null + turnId: string + turnSequence: number + iteration?: number | null + internal: boolean + tokenCountInput?: number + tokenCountOutput?: number + cacheCreationInputTokens?: number + cacheReadInputTokens?: number + model?: string +} + export interface DatabaseProvider { // ═══════════════════════════════════════════════════ // PROFILES @@ -277,18 +313,40 @@ export interface DatabaseProvider { // MESSAGES // ═══════════════════════════════════════════════════ - loadConversationMessages: (conversationId: string, limit?: number, fields?: string) => Promise - insertMessage: (input: { - conversationId: string - role: 'user' | 'assistant' - content: string - toolCalls?: unknown[] | null - tokenCountInput?: number - tokenCountOutput?: number - cacheCreationInputTokens?: number - cacheReadInputTokens?: number - model?: string - }) => Promise + /** + * Load conversation messages for the engine's resume path or the + * public transcript route. + * + * `options.includeInternal` defaults to `false` — the public route + * MUST NOT pass `true`. Only the chat / Conversation API resume + * paths set it to load the full Anthropic-protocol shape + * (intermediate assistant turns + tool_result blocks) needed to + * reconstruct prior multi-iteration loops. + */ + loadConversationMessages: ( + conversationId: string, + limit?: number, + fields?: string, + options?: { includeInternal?: boolean }, + ) => Promise + + /** + * Insert a single message row. Convenience wrapper around the batch + * path used in places that only persist one row (e.g. an event + * stream side-effect). For the chat persistence path use + * `insertMessages` so the entire turn lands atomically in one + * round-trip. + */ + insertMessage: (input: MessageInsertInput) => Promise + + /** + * Atomic batch insert for the full assistant/tool_result trace a + * single chat POST produces. All rows in one batch share the same + * `turnId` so resume reads can group them deterministically; the + * caller assigns `turnSequence` in protocol order so two rows that + * land at the same `created_at` still resolve consistently. + */ + insertMessages: (rows: MessageInsertInput[]) => Promise // ═══════════════════════════════════════════════════ // AGENT USAGE diff --git a/server/providers/supabase-db/conversations.ts b/server/providers/supabase-db/conversations.ts index f06dd6b..7e634bb 100644 --- a/server/providers/supabase-db/conversations.ts +++ b/server/providers/supabase-db/conversations.ts @@ -2,9 +2,31 @@ * Conversation, message, and agent usage methods * for the Supabase DatabaseProvider. */ -import type { DatabaseProvider, DatabaseRow } from '../database' +import type { DatabaseProvider, DatabaseRow, MessageInsertInput } from '../database' import { getAdmin, getUser } from './helpers' +function toMessageRow(input: MessageInsertInput): Record { + const row: Record = { + conversation_id: input.conversationId, + role: input.role, + content: input.content, + turn_id: input.turnId, + turn_sequence: input.turnSequence, + internal: input.internal, + } + + if (input.contentBlocks && input.contentBlocks.length > 0) row.content_blocks = input.contentBlocks + if (input.toolCalls) row.tool_calls = input.toolCalls + if (input.iteration != null) row.iteration = input.iteration + if (input.tokenCountInput) row.token_count_input = input.tokenCountInput + if (input.tokenCountOutput) row.token_count_output = input.tokenCountOutput + if (input.cacheCreationInputTokens) row.cache_creation_input_tokens = input.cacheCreationInputTokens + if (input.cacheReadInputTokens) row.cache_read_input_tokens = input.cacheReadInputTokens + if (input.model) row.model = input.model + + return row +} + type ConversationMethods = Pick< DatabaseProvider, | 'createConversation' @@ -15,6 +37,7 @@ type ConversationMethods = Pick< | 'updateConversationTimestamp' | 'loadConversationMessages' | 'insertMessage' + | 'insertMessages' | 'getAgentUsage' | 'upsertAgentUsage' | 'getMonthlyUsageSummary' @@ -111,34 +134,35 @@ export function conversationMethods(): ConversationMethods { .eq('id', conversationId) }, - async loadConversationMessages(conversationId, limit = 20, fields = 'role, content, tool_calls') { + async loadConversationMessages(conversationId, limit = 20, fields = 'role, content, tool_calls, content_blocks, turn_id, turn_sequence, iteration, internal', options) { const admin = getAdmin() - const { data } = await admin + let query = admin .from('messages') .select(fields) .eq('conversation_id', conversationId) - .order('created_at', { ascending: true }) - .limit(limit) + // Ordering by `(created_at, turn_sequence)` keeps batch-inserted + // rows with identical timestamps in protocol order. + query = query.order('created_at', { ascending: true }) + query = query.order('turn_sequence', { ascending: true }) + + if (!options?.includeInternal) { + query = query.eq('internal', false) + } + + const { data } = await query.limit(limit) return (data ?? []) as unknown as DatabaseRow[] }, async insertMessage(input) { const admin = getAdmin() - const row: Record = { - conversation_id: input.conversationId, - role: input.role, - content: input.content, - } - - if (input.toolCalls) row.tool_calls = input.toolCalls - if (input.tokenCountInput) row.token_count_input = input.tokenCountInput - if (input.tokenCountOutput) row.token_count_output = input.tokenCountOutput - if (input.cacheCreationInputTokens) row.cache_creation_input_tokens = input.cacheCreationInputTokens - if (input.cacheReadInputTokens) row.cache_read_input_tokens = input.cacheReadInputTokens - if (input.model) row.model = input.model + await admin.from('messages').insert(toMessageRow(input)) + }, - await admin.from('messages').insert(row) + async insertMessages(rows) { + if (rows.length === 0) return + const admin = getAdmin() + await admin.from('messages').insert(rows.map(toMessageRow)) }, // ─── Agent Usage ─── diff --git a/server/utils/conversation-engine.ts b/server/utils/conversation-engine.ts index 7fe3254..e4e5206 100644 --- a/server/utils/conversation-engine.ts +++ b/server/utils/conversation-engine.ts @@ -24,6 +24,21 @@ export interface ConversationEvent { [key: string]: unknown } +/** + * One slice of the tool-use loop. `assistantBlocks` is whatever the + * provider produced this iteration (text + tool_use blocks in order). + * `toolResultBlocks` is the synthesized tool_result content fed back + * to Anthropic as the next "user" message — empty for the final + * iteration when the model stops with `end_turn`. The persister + * writes each entry as `assistant` + (optional) `user` row pair + * sharing one `turn_id`. + */ +export interface IterationTrace { + iteration: number + assistantBlocks: AIContentBlock[] + toolResultBlocks: AIContentBlock[] +} + // ─── Configuration ─── export interface ConversationConfig { @@ -94,6 +109,12 @@ export async function* runConversationLoop( let totalCacheReadInputTokens = 0 let lastAssistantContent: AIContentBlock[] = [] let accumulatedAffected: AffectedResources = emptyAffected() + // Full iteration-by-iteration trace surfaced on `done` so the + // caller can persist the actual Anthropic-protocol shape Claude + // saw — intermediate assistant turns AND tool_result blocks — + // and reconstruct it on resume. Empty for the seed user message; + // populated for every loop iteration regardless of stop reason. + const trace: IterationTrace[] = [] let iteration = 0 @@ -189,7 +210,13 @@ export async function* runConversationLoop( lastAssistantContent = assistantBlocks - if (stopReason !== 'tool_use' || currentToolCalls.length === 0) break + if (stopReason !== 'tool_use' || currentToolCalls.length === 0) { + // Final iteration — no tool execution this turn. Persist the + // assistant blocks alone (no tool_result row will exist for + // this iteration). + trace.push({ iteration, assistantBlocks, toolResultBlocks: [] }) + break + } // === TOOL EXECUTION with state guard + workflow-aware auto-merge === const toolResultBlocks: AIContentBlock[] = [] @@ -231,6 +258,7 @@ export async function* runConversationLoop( config.messages.push({ role: 'assistant', content: assistantBlocks }) config.messages.push({ role: 'user', content: toolResultBlocks }) lastAssistantContent = assistantBlocks + trace.push({ iteration, assistantBlocks, toolResultBlocks }) } // === DONE with affected resources === @@ -244,6 +272,7 @@ export async function* runConversationLoop( }, affected: accumulatedAffected, lastContent: lastAssistantContent, + iterations: trace, } } diff --git a/server/utils/conversation-history.ts b/server/utils/conversation-history.ts index 0edb170..50e25e2 100644 --- a/server/utils/conversation-history.ts +++ b/server/utils/conversation-history.ts @@ -108,47 +108,109 @@ export function selectHistoryBudget(input: { return { maxTokens, rowLimit } } +/** + * Build the AI message list from persisted trace rows. + * + * Critical Anthropic protocol invariant: a `tool_use` block in an + * assistant turn MUST be followed by a matching `tool_result` block + * in the next user turn (or vice versa). If the row-level budget + * walker drops a tool_use but keeps its tool_result — or worse, the + * other way around — Anthropic rejects the request (or silently + * mis-routes the conversation). So the budget cutoff operates at the + * **turn** boundary, not the row boundary: + * + * 1. Group rows by `turn_id` (preserving DB order). + * 2. Walk groups newest → oldest, summing per-group token estimates. + * 3. Drop entire turns when the budget is exceeded — never half a turn. + * 4. If the DB row_limit truncated mid-turn (rare), drop the + * partial leading turn so we never feed Anthropic a turn that + * starts with a tool_result without its matching tool_use. + * 5. Materialize the kept groups in chronological order and append + * the current user message. + * + * Legacy rows (pre-009 migration) get distinct turn_ids via the + * column's `gen_random_uuid()` default — each becomes a one-row + * "turn" of its own. That's protocol-safe by definition (no tool + * blocks were persisted on the legacy path). + */ export function buildPromptMessages(input: { history: DatabaseRow[] newUserMessage: string budget: HistoryBudget }): AIMessage[] { + const groups = groupRowsByTurn(input.history) + const kept = selectTurnsWithinBudget(groups, input.budget.maxTokens) + const messages: AIMessage[] = [] - const cutoff = findBudgetCutoff(input.history, input.budget.maxTokens) - for (let i = cutoff; i < input.history.length; i++) { - const row = input.history[i]! - messages.push({ - role: row.role as 'user' | 'assistant', - content: extractContent(row), - }) + for (const group of kept) { + for (const row of group) { + messages.push({ + role: row.role as 'user' | 'assistant', + content: extractContent(row), + }) + } } messages.push({ role: 'user', content: input.newUserMessage }) return messages } /** - * Tolerates both `tool_calls` (Studio path — `db.loadConversationMessages` - * returns snake_case rows) and `toolCalls` (EE handler's pre-refactor - * wrapper renamed it). Once that wrapper is gone the second branch is - * dead — leave it as a safety net for any external caller. + * Content extraction priority: structured `content_blocks` jsonb + * first (post-009 trace rows), then the legacy `tool_calls` / + * `toolCalls` wrapper (only the final assistant turn ever wrote it), + * finally plain text `content`. */ function extractContent(row: DatabaseRow): string | AIContentBlock[] { - const blocks = (row.tool_calls ?? row.toolCalls) as AIContentBlock[] | null | undefined + const blocks + = (row.content_blocks ?? row.contentBlocks ?? row.tool_calls ?? row.toolCalls) as AIContentBlock[] | null | undefined if (blocks && Array.isArray(blocks) && blocks.length > 0) return blocks return row.content as string | AIContentBlock[] } -function findBudgetCutoff(history: DatabaseRow[], maxTokens: number): number { - if (maxTokens <= 0) return history.length +function groupRowsByTurn(rows: DatabaseRow[]): DatabaseRow[][] { + const groups: DatabaseRow[][] = [] + let current: DatabaseRow[] = [] + let currentTurn: string | null = null + for (const row of rows) { + const turn = (row.turn_id ?? row.turnId) as string | null | undefined + // Treat null/undefined as the row's own group so legacy rows + // (without turn_id) and rows with distinct ids both behave + // protocol-safely. + const key = (turn ?? `__row_${groups.length}_${current.length}`) as string + if (key !== currentTurn) { + if (current.length > 0) groups.push(current) + current = [] + currentTurn = key + } + current.push(row) + } + if (current.length > 0) groups.push(current) + return groups +} + +function selectTurnsWithinBudget(groups: DatabaseRow[][], maxTokens: number): DatabaseRow[][] { + if (maxTokens <= 0) return [] + if (groups.length === 0) return groups let tokens = 0 - for (let i = history.length - 1; i >= 0; i--) { - const row = history[i]! + let cutoff = 0 + for (let i = groups.length - 1; i >= 0; i--) { + const groupTokens = estimateGroupTokens(groups[i]!) + if (tokens + groupTokens > maxTokens) { + cutoff = i + 1 + break + } + tokens += groupTokens + } + return groups.slice(cutoff) +} + +function estimateGroupTokens(group: DatabaseRow[]): number { + let total = 0 + for (const row of group) { const content = extractContent(row) - const estimate = typeof content === 'string' + total += typeof content === 'string' ? Math.ceil(content.length / 4) : Math.ceil(JSON.stringify(content).length / 4) - tokens += estimate - if (tokens > maxTokens) return i + 1 } - return 0 + return total } diff --git a/server/utils/db.ts b/server/utils/db.ts index d98b798..b83c737 100644 --- a/server/utils/db.ts +++ b/server/utils/db.ts @@ -1,6 +1,13 @@ import type { GitProvider } from '../providers/git' import { normalizeContentRoot } from './content-paths' +// ─── Cross-domain: Chat Persistence ─── + +import { randomUUID } from 'node:crypto' +import type { MessageInsertInput } from '../providers/database' +import type { AIContentBlock } from '../providers/ai' +import type { IterationTrace } from './conversation-engine' + /** * Cross-provider database utilities. * @@ -217,39 +224,110 @@ export async function inviteOrLookupUser( } } -// ─── Cross-domain: Chat Persistence ─── - -async function persistChatMessages(input: { +/** + * Compose the row trace for a single chat POST and write it as one + * batched INSERT. + * + * Shape per POST: + * - 1 seed user row (the prompt). `internal=false`, `iteration=NULL`. + * - For each engine iteration: + * - 1 assistant row carrying `content_blocks` (text + tool_use). + * - 0 or 1 tool_result row (role='user', content_blocks=[tool_result...]) + * when the iteration triggered tool execution. + * - All rows share a single freshly-minted `turn_id`. `turn_sequence` + * starts at 0 (seed user) and increments by 1 per row so two rows + * landing at the same `created_at` tick still resolve in order. + * + * Visibility (`internal=true`): + * - Intermediate assistant rows AND every tool_result row are + * internal; they live in the protocol replay but should not + * appear in the user-facing transcript. + * - The LAST assistant row in the trace is visible — usually the + * final `end_turn` text reply; on max-iteration cutoff it's the + * last tool-use turn (UI sees the `[tool calls]` placeholder + * content text, same fallback the pre-trace path used). + * + * Token columns land only on the final visible assistant row — + * Anthropic's `usage` is per-call total, not per-iteration. + */ +function buildTraceRows(input: { conversationId: string userMessage: string - assistantText: string - assistantContent: unknown[] + trace: IterationTrace[] + lastAssistantContent: AIContentBlock[] model: string inputTokens: number outputTokens: number cacheCreationInputTokens: number cacheReadInputTokens: number -}) { - const db = useDatabaseProvider() - await db.insertMessage({ conversationId: input.conversationId, role: 'user', content: input.userMessage }) - try { - await db.insertMessage({ +}): MessageInsertInput[] { + const turnId = randomUUID() + const rows: MessageInsertInput[] = [] + let seq = 0 + + rows.push({ + conversationId: input.conversationId, + role: 'user', + content: input.userMessage, + turnId, + turnSequence: seq++, + iteration: null, + internal: false, + }) + + const trace = input.trace.length > 0 + ? input.trace + // Defensive fallback: the engine should always emit at least one + // iteration before yielding `done`, but if the loop bailed before + // pushing a trace entry (e.g. abort between provider start and + // first event) we still write the last known assistant content + // so the transcript isn't blank. + : [{ iteration: 1, assistantBlocks: input.lastAssistantContent, toolResultBlocks: [] }] + + const lastIndex = trace.length - 1 + for (let i = 0; i < trace.length; i++) { + const iter = trace[i]! + const isFinal = i === lastIndex + const assistantText = iter.assistantBlocks + .filter(b => b.type === 'text') + .map(b => (b as { text: string }).text) + .join('') + + rows.push({ conversationId: input.conversationId, role: 'assistant', - content: input.assistantText || '[tool calls]', - toolCalls: input.assistantContent.length > 0 ? input.assistantContent : null, - tokenCountInput: input.inputTokens, - tokenCountOutput: input.outputTokens, - cacheCreationInputTokens: input.cacheCreationInputTokens, - cacheReadInputTokens: input.cacheReadInputTokens, - model: input.model, + content: assistantText || '[tool calls]', + contentBlocks: iter.assistantBlocks, + turnId, + turnSequence: seq++, + iteration: iter.iteration, + internal: !isFinal, + ...(isFinal + ? { + tokenCountInput: input.inputTokens, + tokenCountOutput: input.outputTokens, + cacheCreationInputTokens: input.cacheCreationInputTokens, + cacheReadInputTokens: input.cacheReadInputTokens, + model: input.model, + } + : {}), }) + + if (iter.toolResultBlocks.length > 0) { + rows.push({ + conversationId: input.conversationId, + role: 'user', + content: '[tool results]', + contentBlocks: iter.toolResultBlocks, + turnId, + turnSequence: seq++, + iteration: iter.iteration, + internal: true, + }) + } } - catch (err) { - // eslint-disable-next-line no-console - console.error('[chat-persist] Failed to insert assistant message:', err) - throw err - } + + return rows } /** @@ -259,14 +337,22 @@ async function persistChatMessages(input: { * persists messages and bumps the token metadata on the row that the * reservation created. * - * Conversation API has its own actor model (key-keyed, not user-keyed) - * and a dedicated `api_message_usage` table — use `saveApiChatResult`. + * Persistence is per-turn batched: every iteration of the tool loop + * becomes its own row pair (assistant + tool_result) sharing the + * single `turn_id` allocated below, so resume reads can reconstruct + * the Anthropic protocol shape Claude saw on prior turns. Quota is + * unaffected — `agent_usage.message_count` increments exactly once + * per POST via the reservation step. + * + * Conversation API has its own actor model (key-keyed, not + * user-keyed) and a dedicated `api_message_usage` table — use + * `saveApiChatResult`. */ export async function saveChatResult(input: { conversationId: string userMessage: string - assistantText: string - assistantContent: unknown[] + iterations: IterationTrace[] + lastAssistantContent: AIContentBlock[] model: string inputTokens: number outputTokens: number @@ -279,11 +365,11 @@ export async function saveChatResult(input: { }) { const db = useDatabaseProvider() - await persistChatMessages({ + const rows = buildTraceRows({ conversationId: input.conversationId, userMessage: input.userMessage, - assistantText: input.assistantText, - assistantContent: input.assistantContent, + trace: input.iterations, + lastAssistantContent: input.lastAssistantContent, model: input.model, inputTokens: input.inputTokens, outputTokens: input.outputTokens, @@ -291,6 +377,15 @@ export async function saveChatResult(input: { cacheReadInputTokens: input.cacheReadInputTokens, }) + try { + await db.insertMessages(rows) + } + catch (err) { + // eslint-disable-next-line no-console + console.error('[chat-persist] Failed to insert chat trace:', err) + throw err + } + await db.updateAgentUsageTokens({ workspaceId: input.workspaceId, userId: input.userId, @@ -313,8 +408,8 @@ export async function saveChatResult(input: { export async function saveApiChatResult(input: { conversationId: string userMessage: string - assistantText: string - assistantContent: unknown[] + iterations: IterationTrace[] + lastAssistantContent: AIContentBlock[] model: string inputTokens: number outputTokens: number @@ -326,11 +421,11 @@ export async function saveApiChatResult(input: { }) { const db = useDatabaseProvider() - await persistChatMessages({ + const rows = buildTraceRows({ conversationId: input.conversationId, userMessage: input.userMessage, - assistantText: input.assistantText, - assistantContent: input.assistantContent, + trace: input.iterations, + lastAssistantContent: input.lastAssistantContent, model: input.model, inputTokens: input.inputTokens, outputTokens: input.outputTokens, @@ -338,6 +433,15 @@ export async function saveApiChatResult(input: { cacheReadInputTokens: input.cacheReadInputTokens, }) + try { + await db.insertMessages(rows) + } + catch (err) { + // eslint-disable-next-line no-console + console.error('[chat-persist] Failed to insert API chat trace:', err) + throw err + } + await db.updateAPIUsageTokens({ workspaceId: input.workspaceId, apiKeyId: input.apiKeyId, diff --git a/supabase/migrations/009_message_trace_persistence.sql b/supabase/migrations/009_message_trace_persistence.sql new file mode 100644 index 0000000..c099341 --- /dev/null +++ b/supabase/migrations/009_message_trace_persistence.sql @@ -0,0 +1,109 @@ +-- Durable internal tool-trace persistence for chat conversations. +-- +-- Before this migration, `messages` stored exactly two rows per chat +-- POST: one user row (the prompt) and one assistant row (the final +-- iteration's collapsed text + the final iteration's `tool_calls` +-- jsonb). Multi-iteration tool loops lost intermediate assistant +-- narration entirely, and tool_result blocks were never persisted — +-- the engine streamed them to Anthropic, then dropped them on the +-- floor. Resume reads couldn't reconstruct the actual Anthropic +-- protocol shape Claude saw on the prior turn. +-- +-- This migration restructures `messages` so a single chat POST writes +-- a deterministic trace: +-- +-- seed user row internal=false, iteration=NULL, turn_sequence=0 +-- assistant iteration 1 internal=true, iteration=1, turn_sequence=1 +-- tool_result iteration 1 internal=true, iteration=1, turn_sequence=2 +-- assistant iteration 2 internal=true, iteration=2, turn_sequence=3 +-- tool_result iteration 2 internal=true, iteration=2, turn_sequence=4 +-- final assistant internal=false, iteration=N, turn_sequence=K +-- +-- All rows in the trace share a single `turn_id` (UUID generated +-- once per POST in `saveChatResult`). `turn_sequence` makes ordering +-- deterministic even when a batch INSERT lands every row at the same +-- `created_at` timestamp. +-- +-- `internal=true` keeps intermediate trace out of the user-facing +-- transcript. The RLS policy below enforces this at the database +-- layer — a Supabase user client (or a buggy app route that forgets +-- to pass `includeInternal: false`) physically cannot read those +-- rows. Server-side service-role queries bypass RLS and load the +-- full trace for resume. +-- +-- Schema choices revisited during review: +-- - `iteration` alone was insufficient: it resets per POST so we +-- can't group cross-conversation. `turn_id` is the actual group +-- key, `iteration` is now a per-turn sequence number used for +-- debug/inspection. +-- - No `block_kind` column. The discriminator lives inside the +-- `content_blocks` jsonb (`type: 'text'|'tool_use'|'tool_result'`) +-- and any indexing need can be served by partial expression +-- indexes later — premature without a real query pattern. +-- - Legacy rows (zero in pre-launch) get `turn_id` defaults from +-- `gen_random_uuid()` — each old row becomes its own turn of one. +-- For pre-launch data that's a non-issue; if this migration ever +-- ran against historical data, the worst case is each historical +-- row standing alone in budget grouping, which still preserves +-- Anthropic protocol shape per-turn. + +-- ───────────────────────────────────────────────────────────────── +-- 1. Columns +-- ───────────────────────────────────────────────────────────────── + +ALTER TABLE public.messages + ADD COLUMN content_blocks jsonb NULL, + ADD COLUMN turn_id uuid NOT NULL DEFAULT gen_random_uuid(), + ADD COLUMN turn_sequence smallint NOT NULL DEFAULT 0, + ADD COLUMN iteration smallint NULL, + ADD COLUMN internal boolean NOT NULL DEFAULT false; + +-- Resume reads always order by created_at then turn_sequence so a +-- batch INSERT with identical clock_ticks stays deterministic. +CREATE INDEX idx_messages_conversation_turn + ON public.messages (conversation_id, created_at, turn_sequence); + +-- Public transcript queries always filter `internal = false`; a +-- partial index keeps that hot path narrow even as the trace table +-- grows with internal rows. +CREATE INDEX idx_messages_conversation_visible + ON public.messages (conversation_id, created_at) + WHERE internal = false; + +-- ───────────────────────────────────────────────────────────────── +-- 2. RLS — hide internal trace from end-user SELECT/INSERT +-- ───────────────────────────────────────────────────────────────── +-- +-- Route-level `includeInternal: false` filters protect app code, but +-- a direct Supabase user client query (or a future bug forgetting to +-- pass the flag) would still leak trace rows. Push the boundary into +-- RLS so service-role writes and reads are the ONLY way internal +-- rows are touched. + +DROP POLICY IF EXISTS "Users can view own messages" ON public.messages; +DROP POLICY IF EXISTS "Users can insert own messages" ON public.messages; + +CREATE POLICY "Users can view own visible messages" ON public.messages + FOR SELECT + USING ( + internal = false + AND EXISTS ( + SELECT 1 FROM public.conversations c + WHERE c.id = messages.conversation_id + AND c.user_id = auth.uid() + ) + ); + +-- Users may only create non-internal rows directly. Internal trace +-- writes go through the SECURITY DEFINER service-role path used by +-- `insertMessages` in the chat handlers. +CREATE POLICY "Users can insert own visible messages" ON public.messages + FOR INSERT + WITH CHECK ( + internal = false + AND EXISTS ( + SELECT 1 FROM public.conversations c + WHERE c.id = messages.conversation_id + AND c.user_id = auth.uid() + ) + ); diff --git a/tests/integration/chat-route.integration.test.ts b/tests/integration/chat-route.integration.test.ts index 5af805c..9a71524 100644 --- a/tests/integration/chat-route.integration.test.ts +++ b/tests/integration/chat-route.integration.test.ts @@ -195,13 +195,22 @@ describe('chat route integration', () => { expect(mockCreateConversation).toHaveBeenCalledWith('project-1', 'user-1', 'hello') // rowLimit is derived from selectHistoryBudget(plan, model, source); // budget math is covered by conversation-history.test.ts. Here we only - // check that the handler asked for some bounded history slice. - expect(mockLoadMessages).toHaveBeenCalledWith('conversation-new', expect.any(Number)) + // check that the handler asked for some bounded history slice AND + // that resume opts include the internal trace (intermediate + // assistant + tool_result rows that RLS hides from end users). + expect(mockLoadMessages).toHaveBeenCalledWith( + 'conversation-new', + expect.any(Number), + undefined, + { includeInternal: true }, + ) expect(saveChatResult).toHaveBeenCalledWith(expect.objectContaining({ conversationId: 'conversation-new', userMessage: 'hello', - assistantText: 'Hello from the agent.', - assistantContent: [{ type: 'text', text: 'Hello from the agent.' }], + // Iteration trace replaced the flat assistantText/assistantContent + // pair; the seed user row + assistant row land inside saveChatResult. + lastAssistantContent: [{ type: 'text', text: 'Hello from the agent.' }], + iterations: expect.any(Array), model: expect.any(String), inputTokens: 12, outputTokens: 24, diff --git a/tests/unit/conversation-history.test.ts b/tests/unit/conversation-history.test.ts index b4fbf94..6811735 100644 --- a/tests/unit/conversation-history.test.ts +++ b/tests/unit/conversation-history.test.ts @@ -176,4 +176,107 @@ describe('buildPromptMessages', () => { expect(joined).toContain('NEWEST') expect(joined).not.toContain('OLDEST') }) + + it('reads structured content_blocks first when present', () => { + const blocks = [{ type: 'tool_use', id: 't1', name: 'get_content', input: {} }] + const messages = buildPromptMessages({ + history: [ + { + role: 'assistant', + content: '[tool calls]', + content_blocks: blocks, + tool_calls: null, + turn_id: 'turn-A', + turn_sequence: 0, + }, + ], + newUserMessage: 'next', + budget, + }) + expect(messages[0]).toEqual({ role: 'assistant', content: blocks }) + }) +}) + +describe('buildPromptMessages — turn-safe Anthropic protocol invariant', () => { + const budget = { maxTokens: 10_000, rowLimit: 100 } + + it('keeps whole multi-row turns together (assistant tool_use + matching tool_result)', () => { + // One conversation with two turns: T1 = (user prompt, assistant + // text+tool_use, user tool_result), T2 = (user prompt, assistant + // text). The budget walker must never drop the tool_result while + // keeping the tool_use — Anthropic rejects orphaned tool_use blocks. + const t1Assistant = [ + { type: 'text', text: 'I will check.' }, + { type: 'tool_use', id: 't1', name: 'get_content', input: {} }, + ] + const t1ToolResult = [{ type: 'tool_result', toolUseId: 't1', content: '{}' }] + const messages = buildPromptMessages({ + history: [ + { role: 'user', content: 'prompt 1', turn_id: 'T1', turn_sequence: 0 }, + { role: 'assistant', content: '[tool calls]', content_blocks: t1Assistant, turn_id: 'T1', turn_sequence: 1 }, + { role: 'user', content: '[tool results]', content_blocks: t1ToolResult, turn_id: 'T1', turn_sequence: 2 }, + { role: 'user', content: 'prompt 2', turn_id: 'T2', turn_sequence: 0 }, + { role: 'assistant', content: 'Done.', turn_id: 'T2', turn_sequence: 1 }, + ], + newUserMessage: 'prompt 3', + budget, + }) + // All 5 history rows + the new user message. + expect(messages).toHaveLength(6) + expect((messages[1]!.content as Array<{ type: string }>)[1]!.type).toBe('tool_use') + expect((messages[2]!.content as Array<{ type: string }>)[0]!.type).toBe('tool_result') + }) + + it('drops whole older turns when budget overflows; never splits a turn', () => { + // OLD turn is heavy (3 rows × ~500 tokens each ≈ 1500 tokens); + // NEW turn is small enough to fit in a 600-token budget. The + // walker must keep all of NEW and discard all of OLD — never + // half-OLD, which would leak an orphan tool_use/tool_result pair. + const longText = 'x'.repeat(2000) // ~500 tokens + const heavyAssistant = [ + { type: 'text', text: longText }, + { type: 'tool_use', id: 't', name: 'get_content', input: {} }, + ] + const heavyToolResult = [{ type: 'tool_result', toolUseId: 't', content: longText }] + const messages = buildPromptMessages({ + history: [ + { role: 'user', content: `OLD-USER ${longText}`, turn_id: 'OLD', turn_sequence: 0 }, + { role: 'assistant', content: '[tool calls]', content_blocks: heavyAssistant, turn_id: 'OLD', turn_sequence: 1 }, + { role: 'user', content: '[tool results]', content_blocks: heavyToolResult, turn_id: 'OLD', turn_sequence: 2 }, + { role: 'user', content: 'NEW-USER short', turn_id: 'NEW', turn_sequence: 0 }, + { role: 'assistant', content: 'NEW-REPLY short', turn_id: 'NEW', turn_sequence: 1 }, + ], + newUserMessage: 'next', + budget: { maxTokens: 600, rowLimit: 100 }, + }) + expect(messages).toHaveLength(3) + expect(messages[0]!.content).toBe('NEW-USER short') + expect(messages[1]!.content).toBe('NEW-REPLY short') + // OLD turn fully dropped — no orphaned tool_use, no orphaned tool_result. + const joined = messages.map(m => JSON.stringify(m.content)).join('|') + expect(joined).not.toContain('OLD-USER') + expect(joined).not.toMatch(/tool_use|tool_result/) + }) + + it('drops the entire oldest turn rather than half of it when budget is tight', () => { + const longText = 'y'.repeat(2000) + const messages = buildPromptMessages({ + history: [ + // Old turn has 3 rows — none should survive partial inclusion. + { role: 'user', content: `OLD-1 ${longText}`, turn_id: 'OLD', turn_sequence: 0 }, + { role: 'assistant', content: `OLD-2 ${longText}`, turn_id: 'OLD', turn_sequence: 1 }, + { role: 'user', content: `OLD-3 ${longText}`, turn_id: 'OLD', turn_sequence: 2 }, + // Newer turn is small and fits. + { role: 'user', content: 'NEW', turn_id: 'NEW', turn_sequence: 0 }, + { role: 'assistant', content: 'OK', turn_id: 'NEW', turn_sequence: 1 }, + ], + // Budget large enough for the small NEW turn but not for any + // single row of the OLD turn — proves "all or none" per turn. + budget: { maxTokens: 200, rowLimit: 100 }, + newUserMessage: 'next', + }) + expect(messages).toHaveLength(3) + expect(messages[0]!.content).toBe('NEW') + expect(messages[1]!.content).toBe('OK') + }) }) diff --git a/tests/unit/db.test.ts b/tests/unit/db.test.ts index 17155c4..7c99c7e 100644 --- a/tests/unit/db.test.ts +++ b/tests/unit/db.test.ts @@ -45,6 +45,7 @@ describe('db helpers', () => { createConversation: vi.fn().mockResolvedValue('conv-1'), loadConversationMessages: vi.fn().mockResolvedValue([{ role: 'user', content: 'Hello', tool_calls: null }]), insertMessage: vi.fn().mockResolvedValue(undefined), + insertMessages: vi.fn().mockResolvedValue(undefined), upsertAgentUsage: vi.fn().mockResolvedValue(undefined), updateAgentUsageTokens: vi.fn().mockResolvedValue(undefined), decrementAgentUsage: vi.fn().mockResolvedValue(undefined), @@ -104,13 +105,15 @@ describe('db helpers', () => { }) }) - it('saves chat results via provider methods', async () => { + it('saves chat results as a single batched trace insert', async () => { const { saveChatResult } = await loadDbModule() await saveChatResult({ conversationId: 'conv-1', userMessage: 'Hello', - assistantText: 'World', - assistantContent: [{ type: 'text', text: 'World' }], + iterations: [ + { iteration: 1, assistantBlocks: [{ type: 'text', text: 'World' }], toolResultBlocks: [] }, + ], + lastAssistantContent: [{ type: 'text', text: 'World' }], model: 'claude-sonnet-4-20250514', inputTokens: 7, outputTokens: 3, @@ -122,9 +125,16 @@ describe('db helpers', () => { usageMonth: '2026-04', }) - expect(mockDb.insertMessage).toHaveBeenCalledTimes(2) - expect(mockDb.insertMessage).toHaveBeenCalledWith(expect.objectContaining({ role: 'user', content: 'Hello' })) - expect(mockDb.insertMessage).toHaveBeenCalledWith(expect.objectContaining({ role: 'assistant', content: 'World' })) + // Single round-trip for the whole turn. + expect(mockDb.insertMessages).toHaveBeenCalledTimes(1) + const rows = mockDb.insertMessages.mock.calls[0]![0] as Array> + expect(rows).toHaveLength(2) + expect(rows[0]).toMatchObject({ role: 'user', content: 'Hello', internal: false, turnSequence: 0, iteration: null }) + expect(rows[1]).toMatchObject({ role: 'assistant', content: 'World', internal: false, turnSequence: 1 }) + // All rows in one batch share the same turn_id. + expect(rows[0]!.turnId).toBe(rows[1]!.turnId) + expect(typeof rows[0]!.turnId).toBe('string') + expect(mockDb.updateAgentUsageTokens).toHaveBeenCalledWith(expect.objectContaining({ workspaceId: 'workspace-1', userId: 'user-1', @@ -138,42 +148,64 @@ describe('db helpers', () => { expect(mockDb.updateConversationTimestamp).toHaveBeenCalledWith('conv-1') }) - it('updates token counts on the usage row reserved by the atomic limit check', async () => { + it('writes intermediate iterations as internal rows and the final as visible', async () => { const { saveChatResult } = await loadDbModule() await saveChatResult({ conversationId: 'conv-1', - userMessage: 'Hello', - assistantText: '', - assistantContent: [], - model: 'claude-haiku-4-5-20251001', - inputTokens: 4, - outputTokens: 2, + userMessage: 'do X then Y', + iterations: [ + { + iteration: 1, + assistantBlocks: [ + { type: 'text', text: 'I will check first.' }, + { type: 'tool_use', id: 't1', name: 'get_content', input: {} }, + ], + toolResultBlocks: [{ type: 'tool_result', toolUseId: 't1', content: '{"ok":true}' }], + }, + { iteration: 2, assistantBlocks: [{ type: 'text', text: 'Done.' }], toolResultBlocks: [] }, + ], + lastAssistantContent: [{ type: 'text', text: 'Done.' }], + model: 'claude-sonnet-4-5', + inputTokens: 50, + outputTokens: 10, cacheCreationInputTokens: 0, cacheReadInputTokens: 0, workspaceId: 'workspace-1', userId: 'user-1', - usageSource: 'byoa', + usageSource: 'studio', usageMonth: '2026-04', }) - expect(mockDb.updateAgentUsageTokens).toHaveBeenCalledWith(expect.objectContaining({ - source: 'byoa', - month: '2026-04', - inputTokens: 4, - outputTokens: 2, - })) + const rows = mockDb.insertMessages.mock.calls[0]![0] as Array> + // 1 seed user + 2 assistant + 1 tool_result = 4 rows + expect(rows).toHaveLength(4) + expect(rows[0]).toMatchObject({ role: 'user', internal: false, iteration: null }) + expect(rows[1]).toMatchObject({ role: 'assistant', internal: true, iteration: 1 }) + expect(rows[2]).toMatchObject({ role: 'user', internal: true, iteration: 1 }) + expect(rows[3]).toMatchObject({ role: 'assistant', internal: false, iteration: 2 }) + // turnSequence is monotonically increasing within the turn so a + // batch insert with identical created_at still resolves deterministically. + expect(rows.map(r => r.turnSequence)).toEqual([0, 1, 2, 3]) + // All four rows share the single turn_id. + expect(new Set(rows.map(r => r.turnId)).size).toBe(1) + // Token counts land only on the final visible assistant row. + expect(rows[1]).not.toHaveProperty('tokenCountInput') + expect(rows[3]).toMatchObject({ + tokenCountInput: 50, + tokenCountOutput: 10, + model: 'claude-sonnet-4-5', + }) }) - it('propagates cache token buckets through saveChatResult', async () => { - // Second turn of a cached conversation — most input is cache_read - // (cheap), a small slice is cache_creation, base input/output are - // small. All four buckets must land on agent_usage row + message row. + it('propagates cache token buckets onto the final assistant row', async () => { const { saveChatResult } = await loadDbModule() await saveChatResult({ conversationId: 'conv-1', userMessage: 'follow up', - assistantText: 'short', - assistantContent: [{ type: 'text', text: 'short' }], + iterations: [ + { iteration: 1, assistantBlocks: [{ type: 'text', text: 'short' }], toolResultBlocks: [] }, + ], + lastAssistantContent: [{ type: 'text', text: 'short' }], model: 'claude-sonnet-4-5', inputTokens: 80, outputTokens: 12, @@ -195,20 +227,23 @@ describe('db helpers', () => { cacheCreationInputTokens: 150, cacheReadInputTokens: 9000, }) - expect(mockDb.insertMessage).toHaveBeenCalledWith(expect.objectContaining({ + const rows = mockDb.insertMessages.mock.calls[0]![0] as Array> + expect(rows.at(-1)).toMatchObject({ role: 'assistant', cacheCreationInputTokens: 150, cacheReadInputTokens: 9000, - })) + }) }) - it('saveApiChatResult writes to api_message_usage, not agent_usage', async () => { + it('saveApiChatResult writes the trace and points tokens at api_message_usage', async () => { const { saveApiChatResult } = await loadDbModule() await saveApiChatResult({ conversationId: 'conv-2', userMessage: 'Hello', - assistantText: 'World', - assistantContent: [{ type: 'text', text: 'World' }], + iterations: [ + { iteration: 1, assistantBlocks: [{ type: 'text', text: 'World' }], toolResultBlocks: [] }, + ], + lastAssistantContent: [{ type: 'text', text: 'World' }], model: 'claude-sonnet-4-5', inputTokens: 11, outputTokens: 5, @@ -219,12 +254,9 @@ describe('db helpers', () => { usageMonth: '2026-04', }) - // Messages persist into the shared messages table the same way as - // the Studio path; the difference is only in which usage table the - // token counters land on. - expect(mockDb.insertMessage).toHaveBeenCalledTimes(2) - expect(mockDb.insertMessage).toHaveBeenCalledWith(expect.objectContaining({ role: 'user', content: 'Hello' })) - expect(mockDb.insertMessage).toHaveBeenCalledWith(expect.objectContaining({ role: 'assistant', content: 'World' })) + expect(mockDb.insertMessages).toHaveBeenCalledTimes(1) + const rows = mockDb.insertMessages.mock.calls[0]![0] as Array> + expect(rows.map(r => r.role)).toEqual(['user', 'assistant']) expect(mockDb.updateAPIUsageTokens).toHaveBeenCalledWith({ workspaceId: 'workspace-1',