From dc4d45422c4c1f7406b44ba67393e883ea6331eb Mon Sep 17 00:00:00 2001 From: Pradeesh S Date: Wed, 22 Apr 2026 20:08:00 +0530 Subject: [PATCH 1/4] feat: added token level compaction in agent loop --- package-lock.json | 1 + src/core/compaction.ts | 177 +++++++++++++++++++++++++++++++++++++++++ src/core/engine.ts | 26 +++++- src/core/types.ts | 2 +- 4 files changed, 203 insertions(+), 3 deletions(-) create mode 100644 src/core/compaction.ts diff --git a/package-lock.json b/package-lock.json index 218d23e..9de13d1 100644 --- a/package-lock.json +++ b/package-lock.json @@ -14,6 +14,7 @@ "@types/yauzl": "^2.10.3", "@xynehq/jaf": "^0.1.12", "ai": "^5.0.35", + "cfb": "^1.2.2", "eventsource": "^2.0.2", "express": "^4.18.2", "fastify": "^4.29.1", diff --git a/src/core/compaction.ts b/src/core/compaction.ts new file mode 100644 index 0000000..7172808 --- /dev/null +++ b/src/core/compaction.ts @@ -0,0 +1,177 @@ +import { Message, RunState, TraceEvent, getTextContent } from './types.js'; + +/** + * Configuration for token-based message compaction + */ +export interface CompactionConfig { + readonly enabled: boolean; + readonly maxTokenLimit: number; // Trigger compaction when token count exceeds this + readonly targetTokenLimit: number; // Compact until token count is below this +} + +/** + * Default compaction configuration + */ +export const defaultCompactionConfig: CompactionConfig = { + enabled: true, + maxTokenLimit: 180_000, + targetTokenLimit: 100_000 +}; + +/** + * Result of a compaction operation + */ +export interface CompactionResult { + readonly originalCount: number; + readonly compactedCount: number; + readonly removedCount: number; + readonly originalTokens: number; + readonly compactedTokens: number; + readonly removedMessages: readonly Message[]; + readonly preservedMessages: readonly Message[]; +} + +/** + * Estimate token count for a message + * Uses a simple approximation: ~4 characters per token on average + */ +export function estimateMessageTokens(message: Message): number { + let textLength = 0; + + if (typeof message.content === 'string') { + textLength = message.content.length; + } else if (Array.isArray(message.content)) { + textLength = message.content + .filter(part => part.type === 'text') + .map(part => part.text.length) + .reduce((a, b) => a + b, 0); + } + + // Add overhead for tool_calls + if (message.tool_calls) { + for (const tc of message.tool_calls) { + textLength += tc.function.name.length; + textLength += tc.function.arguments.length; + } + } + + // Rough estimate: 4 chars per token + return Math.ceil(textLength / 4); +} + +/** + * Estimate total tokens for all messages + */ +export function estimateTotalTokens(messages: readonly Message[]): number { + return messages.reduce((sum, msg) => sum + estimateMessageTokens(msg), 0); +} + +/** + * Check if compaction should be triggered + */ +export function shouldCompact( + messages: readonly Message[], + config: Pick +): boolean { + if (!config.enabled) return false; + const tokens = estimateTotalTokens(messages); + return tokens > config.maxTokenLimit; +} + +/** + * Trim messages from the start, removing only tool messages (tool calls and results) + * Stops when token count falls below target limit or no more tool messages to trim + */ +export function trimToolMessages( + messages: readonly Message[], + targetTokenLimit: number +): CompactionResult { + const originalTokens = estimateTotalTokens(messages); + let currentTokens = originalTokens; + const removedMessages: Message[] = []; + const preservedMessages: Message[] = []; + + // Scan from start, skip user messages, remove tool messages until under limit + for (const msg of messages) { + if (currentTokens <= targetTokenLimit) { + preservedMessages.push(msg); + continue; + } + + // Only remove tool messages (role: 'tool' or assistant messages with tool_calls) + const isToolMessage = + msg.role === 'tool' || + (msg.role === 'assistant' && msg.tool_calls && msg.tool_calls.length > 0); + + if (isToolMessage) { + const msgTokens = estimateMessageTokens(msg); + removedMessages.push(msg); + currentTokens -= msgTokens; + } else { + // Keep user messages and non-tool assistant messages + preservedMessages.push(msg); + } + } + + return { + originalCount: messages.length, + compactedCount: preservedMessages.length, + removedCount: removedMessages.length, + originalTokens, + compactedTokens: currentTokens, + removedMessages, + preservedMessages + }; +} + +/** + * Compacts state by trimming tool messages from the start + * Returns updated state and compaction result + */ +export function compactState( + state: RunState, + config: Pick +): { readonly state: RunState; readonly result: CompactionResult } { + const result = trimToolMessages(state.messages, config.targetTokenLimit); + + if (result.removedCount === 0) { + return { state, result }; + } + + return { + state: { + ...state, + messages: result.removedMessages.length > 0 ? state.messages.filter( + msg => !result.removedMessages.includes(msg) + ) : state.messages + }, + result + }; +} + +/** + * Creates a compaction event for tracing + */ +export function createCompactionEvent( + result: CompactionResult, + runId: string, + traceId: string +): TraceEvent { + return { + type: 'memory_operation', + data: { + operation: 'compact', + conversationId: runId, + status: 'end', + messageCount: result.compactedCount, + metadata: { + originalCount: result.originalCount, + removedCount: result.removedCount, + originalTokens: result.originalTokens, + compactedTokens: result.compactedTokens, + runId, + traceId + } + } + }; +} \ No newline at end of file diff --git a/src/core/engine.ts b/src/core/engine.ts index e24ca96..d549dbd 100644 --- a/src/core/engine.ts +++ b/src/core/engine.ts @@ -18,6 +18,13 @@ import { setToolRuntime } from './tool-runtime.js'; import { buildEffectiveGuardrails, executeInputGuardrailsParallel, executeInputGuardrailsSequential, executeOutputGuardrails } from './guardrails.js'; import { safeConsole, isVerboseLogging } from '../utils/logger.js'; import { DEFAULT_CLARIFICATION_DESCRIPTION } from '../utils/constants.js'; +import { + shouldCompact, + compactState, + createCompactionEvent, + estimateTotalTokens, + defaultCompactionConfig +} from './compaction.js'; type ClarificationTriggerMarker = { readonly _clarification_trigger: true; @@ -429,7 +436,22 @@ async function runInternal( }; } - const currentAgent = config.agentRegistry.get(state.currentAgentName); + // Token-based compaction: check and trim if messages exceed limit + let compactionState = state; + if (shouldCompact(state.messages, defaultCompactionConfig)) { + const currentTokens = estimateTotalTokens(state.messages); + safeConsole.log(`[JAF:ENGINE] Compaction triggered: ${currentTokens} tokens exceed ${defaultCompactionConfig.maxTokenLimit} limit`); + + const { state: compactedState, result } = compactState(state, defaultCompactionConfig); + compactionState = compactedState; + + safeConsole.log(`[JAF:ENGINE] Compaction complete: removed ${result.removedCount} messages, reduced from ${result.originalTokens} to ${result.compactedTokens} tokens`); + + // Emit compaction event + config.onEvent?.(createCompactionEvent(result, state.runId as string, state.traceId as string)); + } + + const currentAgent = config.agentRegistry.get(compactionState.currentAgentName); if (!currentAgent) { return { finalState: state, @@ -1508,7 +1530,7 @@ async function executeToolCalls( toolResult = modifiedResult; } } catch (callbackError) { - console.error(`[JAF:ENGINE] Error in onAfterToolExecution callback for ${toolCall.function.name}:`, callbackError); + safeConsole.error(`[JAF:ENGINE] Error in onAfterToolExecution callback for ${toolCall.function.name}:`, callbackError); // Continue with original result if callback fails } } diff --git a/src/core/types.ts b/src/core/types.ts index b512ec0..57f1dea 100644 --- a/src/core/types.ts +++ b/src/core/types.ts @@ -242,7 +242,7 @@ export type TraceEvent = | { type: 'handoff_denied'; data: { from: string; to: string; reason: string } } | { type: 'guardrail_violation'; data: { stage: 'input' | 'output'; reason: string } } | { type: 'guardrail_check'; data: { guardrailName: string; content: any; isValid?: boolean; errorMessage?: string; } } - | { type: 'memory_operation'; data: { operation: 'load' | 'store'; conversationId: string; status: 'start' | 'end' | 'fail'; error?: string; messageCount?: number; } } + | { type: 'memory_operation'; data: { operation: 'load' | 'store' | 'compact'; conversationId: string; status: 'start' | 'end' | 'fail'; error?: string; messageCount?: number; metadata?: Record; } } | { type: 'output_parse'; data: { content: string; status: 'start' | 'end' | 'fail'; parsedOutput?: any; error?: string; } } | { type: 'decode_error'; data: { errors: z.ZodIssue[] } } | { type: 'turn_end'; data: { turn: number; agentName: string } } From 2675a80d5561823eeec790e587781a88461969f5 Mon Sep 17 00:00:00 2001 From: Pradeesh S Date: Wed, 22 Apr 2026 20:18:15 +0530 Subject: [PATCH 2/4] fix: pair wise removal --- src/core/compaction.ts | 100 ++++++++++++++++++++++++++++++++++------- 1 file changed, 83 insertions(+), 17 deletions(-) diff --git a/src/core/compaction.ts b/src/core/compaction.ts index 7172808..b27f2ee 100644 --- a/src/core/compaction.ts +++ b/src/core/compaction.ts @@ -79,8 +79,43 @@ export function shouldCompact( } /** - * Trim messages from the start, removing only tool messages (tool calls and results) - * Stops when token count falls below target limit or no more tool messages to trim + * Identifies tool interaction groups (assistant with tool_calls + their tool results) + * Returns a map of message indices to their group IDs + */ +function identifyToolGroups(messages: readonly Message[]): Map { + const messageToGroup = new Map(); + const pendingToolCalls = new Map(); // tool_call_id -> assistant message index + + for (let i = 0; i < messages.length; i++) { + const msg = messages[i]; + + if (msg.role === 'assistant' && msg.tool_calls && msg.tool_calls.length > 0) { + // This is an assistant message initiating tool calls + const groupId = `group_${i}`; + messageToGroup.set(i, groupId); + + // Track all tool_call_ids from this assistant message + for (const tc of msg.tool_calls) { + pendingToolCalls.set(tc.id, i); + } + } else if (msg.role === 'tool' && msg.tool_call_id) { + // This is a tool result - associate it with its originating assistant + const assistantIndex = pendingToolCalls.get(msg.tool_call_id); + if (assistantIndex !== undefined) { + const groupId = messageToGroup.get(assistantIndex); + if (groupId) { + messageToGroup.set(i, groupId); + } + } + } + } + + return messageToGroup; +} + +/** + * Trim messages from the start, removing complete tool interaction groups + * Stops when token count falls below target limit or no more complete groups to trim */ export function trimToolMessages( messages: readonly Message[], @@ -91,24 +126,57 @@ export function trimToolMessages( const removedMessages: Message[] = []; const preservedMessages: Message[] = []; - // Scan from start, skip user messages, remove tool messages until under limit - for (const msg of messages) { + // Identify tool interaction groups + const toolGroups = identifyToolGroups(messages); + + // Track which groups we've decided to remove + const removedGroups = new Set(); + + // Scan from start + for (let i = 0; i < messages.length; i++) { + const msg = messages[i]; + + // If we're under the limit, preserve everything remaining if (currentTokens <= targetTokenLimit) { preservedMessages.push(msg); continue; } - // Only remove tool messages (role: 'tool' or assistant messages with tool_calls) - const isToolMessage = - msg.role === 'tool' || - (msg.role === 'assistant' && msg.tool_calls && msg.tool_calls.length > 0); + const groupId = toolGroups.get(i); + + if (groupId) { + // This message is part of a tool interaction group + if (removedGroups.has(groupId)) { + // Already decided to remove this group, skip counting tokens again + removedMessages.push(msg); + continue; + } + + // Calculate tokens for the entire group + let groupTokens = 0; + const groupIndices: number[] = []; + + for (let j = i; j < messages.length; j++) { + if (toolGroups.get(j) === groupId) { + groupTokens += estimateMessageTokens(messages[j]); + groupIndices.push(j); + } + } + + // Decide whether to remove this group + // Always remove complete groups from the start until under target + removedGroups.add(groupId); + for (const idx of groupIndices) { + removedMessages.push(messages[idx]); + } + currentTokens -= groupTokens; - if (isToolMessage) { - const msgTokens = estimateMessageTokens(msg); - removedMessages.push(msg); - currentTokens -= msgTokens; + // Skip ahead past this group (will be handled in subsequent iterations) + // Actually, we need to skip in the outer loop + const lastGroupIndex = Math.max(...groupIndices); + i = lastGroupIndex; // Will be incremented by the for loop } else { - // Keep user messages and non-tool assistant messages + // Not a tool message - preserve it (user messages, plain assistant messages) preservedMessages.push(msg); } } @@ -125,7 +193,7 @@ export function trimToolMessages( } /** - * Compacts state by trimming tool messages from the start + * Compacts state by trimming tool interaction groups from the start * Returns updated state and compaction result */ export function compactState( @@ -141,9 +209,7 @@ export function compactState( return { state: { ...state, - messages: result.removedMessages.length > 0 ? state.messages.filter( - msg => !result.removedMessages.includes(msg) - ) : state.messages + messages: result.preservedMessages }, result }; From 5374553c612928ed5c8360f9f92927988b721aea Mon Sep 17 00:00:00 2001 From: Pradeesh S Date: Wed, 22 Apr 2026 20:50:54 +0530 Subject: [PATCH 3/4] fix: comments resolved --- src/core/compaction.ts | 100 +++++++++++++++++++++++------------------ src/core/engine.ts | 10 +++-- 2 files changed, 64 insertions(+), 46 deletions(-) diff --git a/src/core/compaction.ts b/src/core/compaction.ts index b27f2ee..dc7e139 100644 --- a/src/core/compaction.ts +++ b/src/core/compaction.ts @@ -1,4 +1,4 @@ -import { Message, RunState, TraceEvent, getTextContent } from './types.js'; +import { Message, RunState, TraceEvent } from './types.js'; /** * Configuration for token-based message compaction @@ -79,20 +79,34 @@ export function shouldCompact( } /** - * Identifies tool interaction groups (assistant with tool_calls + their tool results) - * Returns a map of message indices to their group IDs + * Precomputed tool group info for efficient compaction */ -function identifyToolGroups(messages: readonly Message[]): Map { +interface ToolGroupInfo { + readonly groupId: string; + readonly indices: number[]; + readonly tokens: number; +} + +/** + * Precomputes tool interaction groups in a single pass (O(n)) + * Returns ordered array of groups from earliest to latest + */ +function precomputeToolGroups(messages: readonly Message[]): ToolGroupInfo[] { const messageToGroup = new Map(); const pendingToolCalls = new Map(); // tool_call_id -> assistant message index + const groupTokens = new Map(); + const groupIndices = new Map(); for (let i = 0; i < messages.length; i++) { const msg = messages[i]; + const msgTokens = estimateMessageTokens(msg); if (msg.role === 'assistant' && msg.tool_calls && msg.tool_calls.length > 0) { // This is an assistant message initiating tool calls const groupId = `group_${i}`; messageToGroup.set(i, groupId); + groupIndices.set(groupId, [i]); + groupTokens.set(groupId, msgTokens); // Track all tool_call_ids from this assistant message for (const tc of msg.tool_calls) { @@ -105,12 +119,29 @@ function identifyToolGroups(messages: readonly Message[]): Map { const groupId = messageToGroup.get(assistantIndex); if (groupId) { messageToGroup.set(i, groupId); + groupIndices.get(groupId)!.push(i); + groupTokens.set(groupId, groupTokens.get(groupId)! + msgTokens); } } } } - return messageToGroup; + // Convert to ordered array (groups naturally ordered by assistant index) + const groups: ToolGroupInfo[] = []; + const seenGroups = new Set(); + for (let i = 0; i < messages.length; i++) { + const groupId = messageToGroup.get(i); + if (groupId && !seenGroups.has(groupId)) { + seenGroups.add(groupId); + groups.push({ + groupId, + indices: groupIndices.get(groupId)!, + tokens: groupTokens.get(groupId)! + }); + } + } + + return groups; } /** @@ -126,11 +157,10 @@ export function trimToolMessages( const removedMessages: Message[] = []; const preservedMessages: Message[] = []; - // Identify tool interaction groups - const toolGroups = identifyToolGroups(messages); - - // Track which groups we've decided to remove + // Precompute tool groups in single pass (O(n)) + const toolGroups = precomputeToolGroups(messages); const removedGroups = new Set(); + let nextGroupIndex = 0; // Scan from start for (let i = 0; i < messages.length; i++) { @@ -142,43 +172,27 @@ export function trimToolMessages( continue; } - const groupId = toolGroups.get(i); - - if (groupId) { - // This message is part of a tool interaction group - if (removedGroups.has(groupId)) { - // Already decided to remove this group, skip counting tokens again - removedMessages.push(msg); - continue; - } - - // Calculate tokens for the entire group - let groupTokens = 0; - const groupIndices: number[] = []; - - for (let j = i; j < messages.length; j++) { - if (toolGroups.get(j) === groupId) { - groupTokens += estimateMessageTokens(messages[j]); - groupIndices.push(j); - } - } - - // Decide whether to remove this group - // Always remove complete groups from the start until under target - removedGroups.add(groupId); - for (const idx of groupIndices) { + // Check if this message is the start of the next removable group + const nextGroup = toolGroups[nextGroupIndex]; + if (nextGroup && nextGroup.indices[0] === i && !removedGroups.has(nextGroup.groupId)) { + // Remove this complete group + removedGroups.add(nextGroup.groupId); + for (const idx of nextGroup.indices) { removedMessages.push(messages[idx]); } - currentTokens -= groupTokens; - - // Skip ahead past this group (will be handled in subsequent iterations) - // Actually, we need to skip in the outer loop - const lastGroupIndex = Math.max(...groupIndices); - i = lastGroupIndex; // Will be incremented by the for loop - } else { - // Not a tool message - preserve it (user messages, plain assistant messages) - preservedMessages.push(msg); + currentTokens -= nextGroup.tokens; + i = nextGroup.indices[nextGroup.indices.length - 1]; // Skip to end of group + nextGroupIndex++; + continue; } + + // If current message is part of an already-removed group, skip it + if (nextGroup && nextGroup.indices.includes(i) && removedGroups.has(nextGroup.groupId)) { + continue; + } + + // Not a tool message or part of removed group - preserve it + preservedMessages.push(msg); } return { diff --git a/src/core/engine.ts b/src/core/engine.ts index d549dbd..7958bfa 100644 --- a/src/core/engine.ts +++ b/src/core/engine.ts @@ -447,11 +447,15 @@ async function runInternal( safeConsole.log(`[JAF:ENGINE] Compaction complete: removed ${result.removedCount} messages, reduced from ${result.originalTokens} to ${result.compactedTokens} tokens`); - // Emit compaction event - config.onEvent?.(createCompactionEvent(result, state.runId as string, state.traceId as string)); + // Emit compaction event with proper conversationId + const conversationId = config.conversationId || (state.runId as string); + config.onEvent?.(createCompactionEvent(result, conversationId, state.traceId as string)); } - const currentAgent = config.agentRegistry.get(compactionState.currentAgentName); + // Use compacted state for all subsequent operations + state = compactionState; + + const currentAgent = config.agentRegistry.get(state.currentAgentName); if (!currentAgent) { return { finalState: state, From 4577291f44b5aeebafe7b317f76df4f474afdd09 Mon Sep 17 00:00:00 2001 From: Pradeesh S Date: Wed, 22 Apr 2026 21:22:37 +0530 Subject: [PATCH 4/4] feat: added compaction config --- src/core/compaction.ts | 32 ++++++++++++++++++-------------- src/core/engine.ts | 10 ++++++---- src/core/types.ts | 7 +++++++ 3 files changed, 31 insertions(+), 18 deletions(-) diff --git a/src/core/compaction.ts b/src/core/compaction.ts index dc7e139..c07b023 100644 --- a/src/core/compaction.ts +++ b/src/core/compaction.ts @@ -1,23 +1,27 @@ -import { Message, RunState, TraceEvent } from './types.js'; +import { Message, RunState, TraceEvent, CompactionConfig } from './types.js'; /** - * Configuration for token-based message compaction + * Default compaction configuration values */ -export interface CompactionConfig { - readonly enabled: boolean; - readonly maxTokenLimit: number; // Trigger compaction when token count exceeds this - readonly targetTokenLimit: number; // Compact until token count is below this -} - -/** - * Default compaction configuration - */ -export const defaultCompactionConfig: CompactionConfig = { +const DEFAULT_COMPACTION_CONFIG: Required = { enabled: true, maxTokenLimit: 180_000, targetTokenLimit: 100_000 }; +/** + * Merges user-provided compaction config with defaults + */ +export function resolveCompactionConfig( + userConfig?: CompactionConfig +): Required { + return { + enabled: userConfig?.enabled ?? DEFAULT_COMPACTION_CONFIG.enabled, + maxTokenLimit: userConfig?.maxTokenLimit ?? DEFAULT_COMPACTION_CONFIG.maxTokenLimit, + targetTokenLimit: userConfig?.targetTokenLimit ?? DEFAULT_COMPACTION_CONFIG.targetTokenLimit + }; +} + /** * Result of a compaction operation */ @@ -71,7 +75,7 @@ export function estimateTotalTokens(messages: readonly Message[]): number { */ export function shouldCompact( messages: readonly Message[], - config: Pick + config: Required> ): boolean { if (!config.enabled) return false; const tokens = estimateTotalTokens(messages); @@ -212,7 +216,7 @@ export function trimToolMessages( */ export function compactState( state: RunState, - config: Pick + config: Required> ): { readonly state: RunState; readonly result: CompactionResult } { const result = trimToolMessages(state.messages, config.targetTokenLimit); diff --git a/src/core/engine.ts b/src/core/engine.ts index 7958bfa..8b6db9c 100644 --- a/src/core/engine.ts +++ b/src/core/engine.ts @@ -23,7 +23,7 @@ import { compactState, createCompactionEvent, estimateTotalTokens, - defaultCompactionConfig + resolveCompactionConfig } from './compaction.js'; type ClarificationTriggerMarker = { @@ -437,12 +437,14 @@ async function runInternal( } // Token-based compaction: check and trim if messages exceed limit + const compactionConfig = resolveCompactionConfig(config.compaction); let compactionState = state; - if (shouldCompact(state.messages, defaultCompactionConfig)) { + + if (shouldCompact(state.messages, compactionConfig)) { const currentTokens = estimateTotalTokens(state.messages); - safeConsole.log(`[JAF:ENGINE] Compaction triggered: ${currentTokens} tokens exceed ${defaultCompactionConfig.maxTokenLimit} limit`); + safeConsole.log(`[JAF:ENGINE] Compaction triggered: ${currentTokens} tokens exceed ${compactionConfig.maxTokenLimit} limit`); - const { state: compactedState, result } = compactState(state, defaultCompactionConfig); + const { state: compactedState, result } = compactState(state, compactionConfig); compactionState = compactedState; safeConsole.log(`[JAF:ENGINE] Compaction complete: removed ${result.removedCount} messages, reduced from ${result.originalTokens} to ${result.compactedTokens} tokens`); diff --git a/src/core/types.ts b/src/core/types.ts index 57f1dea..2049dd3 100644 --- a/src/core/types.ts +++ b/src/core/types.ts @@ -438,6 +438,12 @@ export interface ModelProvider { ) => AsyncGenerator; } +export interface CompactionConfig { + readonly enabled?: boolean; + readonly maxTokenLimit?: number; + readonly targetTokenLimit?: number; +} + export type RunConfig = { readonly agentRegistry: ReadonlyMap>; readonly modelProvider: ModelProvider; @@ -470,6 +476,7 @@ export type RunConfig = { readonly defaultFastModel?: string; readonly allowClarificationRequests?: boolean; readonly clarificationDescription?: string; + readonly compaction?: CompactionConfig; }; export const jsonParseLLMOutput = (text: string): any => {