diff --git a/package-lock.json b/package-lock.json index 218d23e..9de13d1 100644 --- a/package-lock.json +++ b/package-lock.json @@ -14,6 +14,7 @@ "@types/yauzl": "^2.10.3", "@xynehq/jaf": "^0.1.12", "ai": "^5.0.35", + "cfb": "^1.2.2", "eventsource": "^2.0.2", "express": "^4.18.2", "fastify": "^4.29.1", diff --git a/src/core/compaction.ts b/src/core/compaction.ts new file mode 100644 index 0000000..c07b023 --- /dev/null +++ b/src/core/compaction.ts @@ -0,0 +1,261 @@ +import { Message, RunState, TraceEvent, CompactionConfig } from './types.js'; + +/** + * Default compaction configuration values + */ +const DEFAULT_COMPACTION_CONFIG: Required = { + enabled: true, + maxTokenLimit: 180_000, + targetTokenLimit: 100_000 +}; + +/** + * Merges user-provided compaction config with defaults + */ +export function resolveCompactionConfig( + userConfig?: CompactionConfig +): Required { + return { + enabled: userConfig?.enabled ?? DEFAULT_COMPACTION_CONFIG.enabled, + maxTokenLimit: userConfig?.maxTokenLimit ?? DEFAULT_COMPACTION_CONFIG.maxTokenLimit, + targetTokenLimit: userConfig?.targetTokenLimit ?? DEFAULT_COMPACTION_CONFIG.targetTokenLimit + }; +} + +/** + * Result of a compaction operation + */ +export interface CompactionResult { + readonly originalCount: number; + readonly compactedCount: number; + readonly removedCount: number; + readonly originalTokens: number; + readonly compactedTokens: number; + readonly removedMessages: readonly Message[]; + readonly preservedMessages: readonly Message[]; +} + +/** + * Estimate token count for a message + * Uses a simple approximation: ~4 characters per token on average + */ +export function estimateMessageTokens(message: Message): number { + let textLength = 0; + + if (typeof message.content === 'string') { + textLength = message.content.length; + } else if (Array.isArray(message.content)) { + textLength = message.content + .filter(part => part.type === 'text') + .map(part => part.text.length) + .reduce((a, b) => a + b, 0); + } + + // Add overhead for tool_calls + if (message.tool_calls) { + for (const tc of message.tool_calls) { + textLength += tc.function.name.length; + textLength += tc.function.arguments.length; + } + } + + // Rough estimate: 4 chars per token + return Math.ceil(textLength / 4); +} + +/** + * Estimate total tokens for all messages + */ +export function estimateTotalTokens(messages: readonly Message[]): number { + return messages.reduce((sum, msg) => sum + estimateMessageTokens(msg), 0); +} + +/** + * Check if compaction should be triggered + */ +export function shouldCompact( + messages: readonly Message[], + config: Required> +): boolean { + if (!config.enabled) return false; + const tokens = estimateTotalTokens(messages); + return tokens > config.maxTokenLimit; +} + +/** + * Precomputed tool group info for efficient compaction + */ +interface ToolGroupInfo { + readonly groupId: string; + readonly indices: number[]; + readonly tokens: number; +} + +/** + * Precomputes tool interaction groups in a single pass (O(n)) + * Returns ordered array of groups from earliest to latest + */ +function precomputeToolGroups(messages: readonly Message[]): ToolGroupInfo[] { + const messageToGroup = new Map(); + const pendingToolCalls = new Map(); // tool_call_id -> assistant message index + const groupTokens = new Map(); + const groupIndices = new Map(); + + for (let i = 0; i < messages.length; i++) { + const msg = messages[i]; + const msgTokens = estimateMessageTokens(msg); + + if (msg.role === 'assistant' && msg.tool_calls && msg.tool_calls.length > 0) { + // This is an assistant message initiating tool calls + const groupId = `group_${i}`; + messageToGroup.set(i, groupId); + groupIndices.set(groupId, [i]); + groupTokens.set(groupId, msgTokens); + + // Track all tool_call_ids from this assistant message + for (const tc of msg.tool_calls) { + pendingToolCalls.set(tc.id, i); + } + } else if (msg.role === 'tool' && msg.tool_call_id) { + // This is a tool result - associate it with its originating assistant + const assistantIndex = pendingToolCalls.get(msg.tool_call_id); + if (assistantIndex !== undefined) { + const groupId = messageToGroup.get(assistantIndex); + if (groupId) { + messageToGroup.set(i, groupId); + groupIndices.get(groupId)!.push(i); + groupTokens.set(groupId, groupTokens.get(groupId)! + msgTokens); + } + } + } + } + + // Convert to ordered array (groups naturally ordered by assistant index) + const groups: ToolGroupInfo[] = []; + const seenGroups = new Set(); + for (let i = 0; i < messages.length; i++) { + const groupId = messageToGroup.get(i); + if (groupId && !seenGroups.has(groupId)) { + seenGroups.add(groupId); + groups.push({ + groupId, + indices: groupIndices.get(groupId)!, + tokens: groupTokens.get(groupId)! + }); + } + } + + return groups; +} + +/** + * Trim messages from the start, removing complete tool interaction groups + * Stops when token count falls below target limit or no more complete groups to trim + */ +export function trimToolMessages( + messages: readonly Message[], + targetTokenLimit: number +): CompactionResult { + const originalTokens = estimateTotalTokens(messages); + let currentTokens = originalTokens; + const removedMessages: Message[] = []; + const preservedMessages: Message[] = []; + + // Precompute tool groups in single pass (O(n)) + const toolGroups = precomputeToolGroups(messages); + const removedGroups = new Set(); + let nextGroupIndex = 0; + + // Scan from start + for (let i = 0; i < messages.length; i++) { + const msg = messages[i]; + + // If we're under the limit, preserve everything remaining + if (currentTokens <= targetTokenLimit) { + preservedMessages.push(msg); + continue; + } + + // Check if this message is the start of the next removable group + const nextGroup = toolGroups[nextGroupIndex]; + if (nextGroup && nextGroup.indices[0] === i && !removedGroups.has(nextGroup.groupId)) { + // Remove this complete group + removedGroups.add(nextGroup.groupId); + for (const idx of nextGroup.indices) { + removedMessages.push(messages[idx]); + } + currentTokens -= nextGroup.tokens; + i = nextGroup.indices[nextGroup.indices.length - 1]; // Skip to end of group + nextGroupIndex++; + continue; + } + + // If current message is part of an already-removed group, skip it + if (nextGroup && nextGroup.indices.includes(i) && removedGroups.has(nextGroup.groupId)) { + continue; + } + + // Not a tool message or part of removed group - preserve it + preservedMessages.push(msg); + } + + return { + originalCount: messages.length, + compactedCount: preservedMessages.length, + removedCount: removedMessages.length, + originalTokens, + compactedTokens: currentTokens, + removedMessages, + preservedMessages + }; +} + +/** + * Compacts state by trimming tool interaction groups from the start + * Returns updated state and compaction result + */ +export function compactState( + state: RunState, + config: Required> +): { readonly state: RunState; readonly result: CompactionResult } { + const result = trimToolMessages(state.messages, config.targetTokenLimit); + + if (result.removedCount === 0) { + return { state, result }; + } + + return { + state: { + ...state, + messages: result.preservedMessages + }, + result + }; +} + +/** + * Creates a compaction event for tracing + */ +export function createCompactionEvent( + result: CompactionResult, + runId: string, + traceId: string +): TraceEvent { + return { + type: 'memory_operation', + data: { + operation: 'compact', + conversationId: runId, + status: 'end', + messageCount: result.compactedCount, + metadata: { + originalCount: result.originalCount, + removedCount: result.removedCount, + originalTokens: result.originalTokens, + compactedTokens: result.compactedTokens, + runId, + traceId + } + } + }; +} \ No newline at end of file diff --git a/src/core/engine.ts b/src/core/engine.ts index e24ca96..8b6db9c 100644 --- a/src/core/engine.ts +++ b/src/core/engine.ts @@ -18,6 +18,13 @@ import { setToolRuntime } from './tool-runtime.js'; import { buildEffectiveGuardrails, executeInputGuardrailsParallel, executeInputGuardrailsSequential, executeOutputGuardrails } from './guardrails.js'; import { safeConsole, isVerboseLogging } from '../utils/logger.js'; import { DEFAULT_CLARIFICATION_DESCRIPTION } from '../utils/constants.js'; +import { + shouldCompact, + compactState, + createCompactionEvent, + estimateTotalTokens, + resolveCompactionConfig +} from './compaction.js'; type ClarificationTriggerMarker = { readonly _clarification_trigger: true; @@ -429,6 +436,27 @@ async function runInternal( }; } + // Token-based compaction: check and trim if messages exceed limit + const compactionConfig = resolveCompactionConfig(config.compaction); + let compactionState = state; + + if (shouldCompact(state.messages, compactionConfig)) { + const currentTokens = estimateTotalTokens(state.messages); + safeConsole.log(`[JAF:ENGINE] Compaction triggered: ${currentTokens} tokens exceed ${compactionConfig.maxTokenLimit} limit`); + + const { state: compactedState, result } = compactState(state, compactionConfig); + compactionState = compactedState; + + safeConsole.log(`[JAF:ENGINE] Compaction complete: removed ${result.removedCount} messages, reduced from ${result.originalTokens} to ${result.compactedTokens} tokens`); + + // Emit compaction event with proper conversationId + const conversationId = config.conversationId || (state.runId as string); + config.onEvent?.(createCompactionEvent(result, conversationId, state.traceId as string)); + } + + // Use compacted state for all subsequent operations + state = compactionState; + const currentAgent = config.agentRegistry.get(state.currentAgentName); if (!currentAgent) { return { @@ -1508,7 +1536,7 @@ async function executeToolCalls( toolResult = modifiedResult; } } catch (callbackError) { - console.error(`[JAF:ENGINE] Error in onAfterToolExecution callback for ${toolCall.function.name}:`, callbackError); + safeConsole.error(`[JAF:ENGINE] Error in onAfterToolExecution callback for ${toolCall.function.name}:`, callbackError); // Continue with original result if callback fails } } diff --git a/src/core/types.ts b/src/core/types.ts index b512ec0..2049dd3 100644 --- a/src/core/types.ts +++ b/src/core/types.ts @@ -242,7 +242,7 @@ export type TraceEvent = | { type: 'handoff_denied'; data: { from: string; to: string; reason: string } } | { type: 'guardrail_violation'; data: { stage: 'input' | 'output'; reason: string } } | { type: 'guardrail_check'; data: { guardrailName: string; content: any; isValid?: boolean; errorMessage?: string; } } - | { type: 'memory_operation'; data: { operation: 'load' | 'store'; conversationId: string; status: 'start' | 'end' | 'fail'; error?: string; messageCount?: number; } } + | { type: 'memory_operation'; data: { operation: 'load' | 'store' | 'compact'; conversationId: string; status: 'start' | 'end' | 'fail'; error?: string; messageCount?: number; metadata?: Record; } } | { type: 'output_parse'; data: { content: string; status: 'start' | 'end' | 'fail'; parsedOutput?: any; error?: string; } } | { type: 'decode_error'; data: { errors: z.ZodIssue[] } } | { type: 'turn_end'; data: { turn: number; agentName: string } } @@ -438,6 +438,12 @@ export interface ModelProvider { ) => AsyncGenerator; } +export interface CompactionConfig { + readonly enabled?: boolean; + readonly maxTokenLimit?: number; + readonly targetTokenLimit?: number; +} + export type RunConfig = { readonly agentRegistry: ReadonlyMap>; readonly modelProvider: ModelProvider; @@ -470,6 +476,7 @@ export type RunConfig = { readonly defaultFastModel?: string; readonly allowClarificationRequests?: boolean; readonly clarificationDescription?: string; + readonly compaction?: CompactionConfig; }; export const jsonParseLLMOutput = (text: string): any => {