diff --git a/examples/checkpoint-demo/server.ts b/examples/checkpoint-demo/server.ts new file mode 100644 index 0000000..ad58dcd --- /dev/null +++ b/examples/checkpoint-demo/server.ts @@ -0,0 +1,109 @@ +import 'dotenv/config'; +import IORedis from 'ioredis'; + +import { runServer } from '../../src/server/index'; +import { makeLiteLLMProvider } from '../../src/providers/model'; +import { createMemoryProviderFromEnv } from '../../src/memory/factory'; +import { Agent } from '../../src/core/types'; + +async function startCheckpointDemo() { + // Configure LiteLLM provider + const litellmUrl = process.env.LITELLM_URL || 'http://localhost:4000'; + const litellmKey = process.env.LITELLM_API_KEY || 'anything'; + + const modelProvider = makeLiteLLMProvider(litellmUrl, litellmKey); + + // Configure Redis client for memory provider + const redisHost = process.env.JAF_REDIS_HOST || '127.0.0.1'; + const redisPort = parseInt(process.env.JAF_REDIS_PORT || '6379', 10); + const redisPassword = process.env.JAF_REDIS_PASSWORD || undefined; + const redisDb = parseInt(process.env.JAF_REDIS_DB || '0', 10); + + const redis = new IORedis({ host: redisHost, port: redisPort, password: redisPassword, db: redisDb }); + const memory = await createMemoryProviderFromEnv({ redis }); + + // Simple demo agent + const agent: Agent = { + name: 'DemoAgent', + instructions: () => 'You are a helpful assistant. Keep responses short.', + tools: [], + modelConfig: { name: process.env.DEMO_MODEL || 'gemini-2.5-pro', temperature: 0.2, maxTokens: 300 } + }; + + // Start server + const server = await runServer([agent], { modelProvider }, { port: parseInt(process.env.DEMO_PORT || '3000', 10), defaultMemoryProvider: memory }); + const port = parseInt(process.env.DEMO_PORT || '3000', 10); + const base = `http://localhost:${port}`; + + const conversationId = process.env.DEMO_CONV_ID || 'conv-checkpoint-demo-1'; + + // Helpful curl snippets + console.log('\nāœ… Checkpoint Demo Server Ready'); + console.log(` Base URL: ${base}`); + console.log(` LiteLLM: ${litellmUrl}`); + console.log(` Redis: ${redisHost}:${redisPort}/${redisDb}`); + + console.log('\nTry these commands:'); + console.log('\n1) Create conversation and store messages'); + console.log( + `curl -X POST ${base}/chat \ + -H 'Content-Type: application/json' \ + -d '${JSON.stringify({ + messages: [ + { id: 'msg_u1', role: 'user', content: 'Plan a 2-day trip to Kyoto.' } + ], + agentName: agent.name, + conversationId, + memory: { autoStore: true, storeOnCompletion: true } + })}'` + ); + + console.log('\n2) Inspect stored conversation'); + console.log(`curl ${base}/conversations/${conversationId}`); + + console.log('\n3) Continue conversation'); + console.log( + `curl -X POST ${base}/chat \ + -H 'Content-Type: application/json' \ + -d '${JSON.stringify({ + messages: [ + { id: 'msg_u2', role: 'user', content: 'Add a tea ceremony on day 1.' } + ], + agentName: agent.name, + conversationId, + memory: { autoStore: true, storeOnCompletion: true } + })}'` + ); + + console.log('\n4) Checkpoint to the first user message by ID (remove that and everything after)'); + console.log( + `curl -X POST ${base}/conversations/${conversationId}/checkpoint \ + -H 'Content-Type: application/json' \ + -d '${JSON.stringify({ byMessageId: 'msg_u1' })}'` + ); + + console.log('\n5) Verify conversation after checkpoint'); + console.log(`curl ${base}/conversations/${conversationId}`); + + console.log('\n6) Continue again after checkpoint'); + console.log( + `curl -X POST ${base}/chat \ + -H 'Content-Type: application/json' \ + -d '${JSON.stringify({ + messages: [ + { id: 'msg_u3', role: 'user', content: 'Actually, plan a 1-day Kyoto itinerary instead.' } + ], + agentName: agent.name, + conversationId, + memory: { autoStore: true, storeOnCompletion: true } + })}'` + ); + + console.log('\nTip: Use jq to pretty-print responses: add | jq'); +} + +startCheckpointDemo().catch((err) => { + console.error('āŒ Failed to start demo:', err); + process.exit(1); +}); + diff --git a/src/adk/runners/index.ts b/src/adk/runners/index.ts index 08ad1e9..1e362e7 100644 --- a/src/adk/runners/index.ts +++ b/src/adk/runners/index.ts @@ -37,7 +37,7 @@ import { GraphOptions, GraphResult } from '../../visualization/graphviz'; - +import { randomUUID } from 'crypto'; import { getOrCreateSession, addMessageToSession, addArtifactToSession } from '../sessions'; import { executeTool } from '../tools'; import { createModelMessage, getFunctionCalls, createUserMessage } from '../content'; @@ -1010,8 +1010,11 @@ const getMessageText = (content: Content): string => { }; const generateRequestId = (): string => { - // Use crypto-based ID generation for pure functional approach - return `req_${Date.now()}_${Math.random().toString(36).substring(2, 11)}`; + try { + return `req_${randomUUID()}`; + } catch { + return `req_${Date.now()}_${Math.random().toString(36).substring(2, 11)}`; + } }; const createAgentEvent = ( @@ -1173,4 +1176,4 @@ export const generateRunnerGraphPng = async ( }; return await generateRunnerVisualization(config, options); -}; \ No newline at end of file +}; diff --git a/src/core/engine.ts b/src/core/engine.ts index ecefcaf..f9cc091 100644 --- a/src/core/engine.ts +++ b/src/core/engine.ts @@ -10,6 +10,7 @@ import { Interruption, getTextContent, Guardrail, + generateMessageId, } from './types.js'; import { setToolRuntime } from './tool-runtime.js'; import { buildEffectiveGuardrails, executeInputGuardrailsParallel, executeInputGuardrailsSequential, executeOutputGuardrails } from './guardrails.js'; @@ -20,19 +21,24 @@ export async function run( config: RunConfig ): Promise> { try { + // Ensure all existing messages have IDs + const initialStateWithIds: RunState = { + ...initialState, + messages: initialState.messages.map(m => m.id ? m : { ...m, id: generateMessageId() }) + }; config.onEvent?.({ type: 'run_start', data: { - runId: initialState.runId, - traceId: initialState.traceId, - context: initialState.context, - userId: (initialState.context as any)?.userId, - sessionId: (initialState.context as any)?.sessionId || (initialState.context as any)?.conversationId, - messages: initialState.messages + runId: initialStateWithIds.runId, + traceId: initialStateWithIds.traceId, + context: initialStateWithIds.context, + userId: (initialStateWithIds.context as any)?.userId, + sessionId: (initialStateWithIds.context as any)?.sessionId || (initialStateWithIds.context as any)?.conversationId, + messages: initialStateWithIds.messages } }); - let stateWithMemory = initialState; + let stateWithMemory = initialStateWithIds; if (config.memory?.autoStore && config.conversationId) { console.log(`[JAF:ENGINE] Loading conversation history for ${config.conversationId}`); stateWithMemory = await loadConversationHistory(initialState, config); @@ -59,7 +65,7 @@ export async function run( config.onEvent?.({ type: 'run_end', - data: { outcome: result.outcome, traceId: initialState.traceId, runId: initialState.runId } + data: { outcome: result.outcome, traceId: initialStateWithIds.traceId, runId: initialStateWithIds.runId } }); return result; @@ -474,6 +480,7 @@ async function runInternal( try { streamingUsed = true; const stream = config.modelProvider.getCompletionStream(state, currentAgent, config); + const assistantMessageId = generateMessageId(); let aggregatedText = ''; const toolCalls: Array<{ id?: string; type: 'function'; function: { name?: string; arguments: string } }> = []; @@ -497,6 +504,7 @@ async function runInternal( if (chunk?.delta || chunk?.toolCallDelta) { assistantEventStreamed = true; const partialMessage: Message = { + id: assistantMessageId, role: 'assistant', content: aggregatedText, ...(toolCalls.length > 0 @@ -518,6 +526,7 @@ async function runInternal( llmResponse = { message: { + id: assistantMessageId, content: aggregatedText || undefined, ...(toolCalls.length > 0 ? { @@ -547,6 +556,7 @@ async function runInternal( try { streamingUsed = true; const stream = config.modelProvider.getCompletionStream(state, currentAgent, config); + const assistantMessageId = generateMessageId(); let aggregatedText = ''; const toolCalls: Array<{ id?: string; type: 'function'; function: { name?: string; arguments: string } }> = []; @@ -570,6 +580,7 @@ async function runInternal( if (chunk?.delta || chunk?.toolCallDelta) { assistantEventStreamed = true; const partialMessage: Message = { + id: assistantMessageId, role: 'assistant', content: aggregatedText, ...(toolCalls.length > 0 @@ -591,6 +602,7 @@ async function runInternal( llmResponse = { message: { + id: assistantMessageId, content: aggregatedText || undefined, ...(toolCalls.length > 0 ? { @@ -667,6 +679,7 @@ async function runInternal( } const assistantMessage: Message = { + id: llmResponse.message.id ?? generateMessageId(), role: 'assistant', content: llmResponse.message.content || '', tool_calls: llmResponse.message.tool_calls @@ -695,12 +708,12 @@ async function runInternal( config.onEvent?.({ type: 'tool_requests', data: { toolCalls: requests } }); } catch { /* ignore */ } - const toolResults = await executeToolCalls( - llmResponse.message.tool_calls, - currentAgent, - state, - config, - ); + const toolResults = await executeToolCalls( + llmResponse.message.tool_calls, + currentAgent, + state, + config, + ); const interruptions = toolResults .map(r => r.interruption) @@ -1017,6 +1030,7 @@ async function executeToolCalls( return { message: { + id: generateMessageId(), role: 'tool', content: errorResult, tool_call_id: toolCall.id @@ -1086,6 +1100,7 @@ async function executeToolCalls( sessionId: state.runId, }, message: { + id: generateMessageId(), role: 'tool', content: JSON.stringify({ status: 'halted', @@ -1102,6 +1117,7 @@ async function executeToolCalls( const rejectionReason = additionalContext?.rejectionReason || 'User declined the action'; return { message: { + id: generateMessageId(), role: 'tool', content: JSON.stringify({ status: 'approval_denied', @@ -1162,6 +1178,7 @@ async function executeToolCalls( if (handoffCheck && typeof handoffCheck === 'object' && 'handoff_to' in handoffCheck) { return { message: { + id: generateMessageId(), role: 'tool', content: resultString, tool_call_id: toolCall.id @@ -1198,6 +1215,7 @@ async function executeToolCalls( return { message: { + id: generateMessageId(), role: 'tool', content: finalContent, tool_call_id: toolCall.id @@ -1231,6 +1249,7 @@ async function executeToolCalls( return { message: { + id: generateMessageId(), role: 'tool', content: errorResult, tool_call_id: toolCall.id diff --git a/src/core/types.ts b/src/core/types.ts index ab74cb2..6e91471 100644 --- a/src/core/types.ts +++ b/src/core/types.ts @@ -1,12 +1,19 @@ import { z } from 'zod'; +import { randomUUID } from 'crypto'; import { MemoryConfig } from '../memory/types'; import type { ApprovalStorage } from '../memory/approval-storage'; export type TraceId = string & { readonly _brand: 'TraceId' }; export type RunId = string & { readonly _brand: 'RunId' }; +export type MessageId = string & { readonly _brand: 'MessageId' }; export const createTraceId = (id: string): TraceId => id as TraceId; export const createRunId = (id: string): RunId => id as RunId; +export const createMessageId = (id: string): MessageId => id as MessageId; + +export const generateMessageId = (): MessageId => { + return (`msg_${randomUUID()}`) as MessageId; +}; export type ValidationResult = | { readonly isValid: true } @@ -37,6 +44,7 @@ export type MessageContentPart = | { readonly type: 'file'; readonly file: { readonly file_id: string; readonly format?: string } }; export type Message = { + readonly id?: MessageId; readonly role: 'user' | 'assistant' | 'tool'; readonly content: string | readonly MessageContentPart[]; readonly attachments?: readonly Attachment[]; // Optional structured attachments @@ -266,4 +274,3 @@ export const jsonParseLLMOutput = (text: string): any => { return null; } }; - diff --git a/src/memory/providers/in-memory.ts b/src/memory/providers/in-memory.ts index f163960..e6520a7 100644 --- a/src/memory/providers/in-memory.ts +++ b/src/memory/providers/in-memory.ts @@ -1,4 +1,4 @@ -import { Message, TraceId } from '../../core/types'; +import { Message, TraceId, getTextContent } from '../../core/types'; import { MemoryProvider, ConversationMemory, @@ -338,6 +338,75 @@ export function createInMemoryProvider(config: InMemoryConfig = { type: 'memory' } }; + const restoreToCheckpoint: MemoryProvider['restoreToCheckpoint'] = async (conversationId, criteria) => { + try { + const existingResult = await getConversation(conversationId); + if (!existingResult.success) { + return existingResult as unknown as Result<{ restored: boolean; removedMessagesCount: number; checkpointIndex: number; checkpointUserQuery?: string }>; + } + const conversation = existingResult.data; + if (!conversation) { + return createFailure(createMemoryNotFoundError(conversationId, 'InMemory')); + } + + const msgs = conversation.messages as Message[]; + let targetIdx: number | null = null; + + if (typeof criteria.byMessageId === 'string') { + targetIdx = msgs.findIndex(m => m.id === (criteria.byMessageId as any)); + } else if (typeof criteria.byIndex === 'number') { + targetIdx = criteria.byIndex; + } else if (typeof criteria.byUserMessageNumber === 'number') { + let count = 0; + for (let i = 0; i < msgs.length; i++) { + if (msgs[i].role === 'user') { + if (count === criteria.byUserMessageNumber) { targetIdx = i; break; } + count++; + } + } + } else if (typeof criteria.byText === 'string') { + const q = criteria.byText; + const mode = criteria.match || 'exact'; + for (let i = msgs.length - 1; i >= 0; i--) { + const m = msgs[i]; + if (m.role !== 'user') continue; + const text = getTextContent(m.content); + const match = mode === 'exact' ? (text === q) + : mode === 'startsWith' ? text.startsWith(q) + : text.includes(q); + if (match) { targetIdx = i; break; } + } + } + + if (targetIdx == null || targetIdx < 0 || targetIdx >= msgs.length || msgs[targetIdx].role !== 'user') { + return createFailure(createMemoryStorageError('restore checkpoint', 'InMemory', new Error('Checkpoint user message not found'))); + } + + const checkpointUserQuery = getTextContent(msgs[targetIdx].content); + const newMessages = msgs.slice(0, targetIdx); + const removedCount = msgs.length - newMessages.length; + + const now = new Date(); + const updatedConversation: ConversationMemory = { + ...conversation, + messages: newMessages, + metadata: { + ...(conversation.metadata || { createdAt: now, updatedAt: now, lastActivity: now, totalMessages: newMessages.length }), + updatedAt: now, + lastActivity: now, + totalMessages: newMessages.length + } + }; + + conversations.set(conversationId, updatedConversation); + + console.log(`[MEMORY:InMemory] Restored conversation ${conversationId} to checkpoint at index ${targetIdx} (removed ${removedCount} messages)`); + return createSuccess({ restored: true, removedMessagesCount: removedCount, checkpointIndex: targetIdx, checkpointUserQuery }); + } catch (error) { + return createFailure(createMemoryStorageError('restore checkpoint', 'InMemory', error as Error)); + } + }; + return { storeMessages, getConversation, @@ -348,6 +417,7 @@ export function createInMemoryProvider(config: InMemoryConfig = { type: 'memory' clearUserConversations, getStats, healthCheck, - close + close, + restoreToCheckpoint }; -} \ No newline at end of file +} diff --git a/src/memory/providers/postgres.ts b/src/memory/providers/postgres.ts index d9a96bf..2c0bd16 100644 --- a/src/memory/providers/postgres.ts +++ b/src/memory/providers/postgres.ts @@ -1,4 +1,4 @@ -import { Message, TraceId } from '../../core/types'; +import { Message, TraceId, getTextContent } from '../../core/types'; import { MemoryProvider, ConversationMemory, @@ -492,6 +492,82 @@ export async function createPostgresProvider(config: PostgresConfig, postgresCli } }; + const restoreToCheckpoint: MemoryProvider['restoreToCheckpoint'] = async (conversationId, criteria) => { + const client = ensureConnected(); + try { + const existingResult = await getConversation(conversationId); + if (!existingResult.success) { + return existingResult as unknown as Result<{ restored: boolean; removedMessagesCount: number; checkpointIndex: number; checkpointUserQuery?: string }>; + } + const conversation = existingResult.data; + if (!conversation) { + return createFailure(createMemoryNotFoundError(conversationId, 'PostgreSQL')); + } + + const msgs = conversation.messages as Message[]; + let targetIdx: number | null = null; + + if (typeof criteria.byMessageId === 'string') { + targetIdx = msgs.findIndex(m => m.id === (criteria.byMessageId as any)); + } else if (typeof criteria.byIndex === 'number') { + targetIdx = criteria.byIndex; + } else if (typeof criteria.byUserMessageNumber === 'number') { + let count = 0; + for (let i = 0; i < msgs.length; i++) { + if (msgs[i].role === 'user') { + if (count === criteria.byUserMessageNumber) { targetIdx = i; break; } + count++; + } + } + } else if (typeof criteria.byText === 'string') { + const q = criteria.byText; + const mode = criteria.match || 'exact'; + for (let i = msgs.length - 1; i >= 0; i--) { + const m = msgs[i]; + if (m.role !== 'user') continue; + const text = getTextContent(m.content); + const match = mode === 'exact' ? (text === q) + : mode === 'startsWith' ? text.startsWith(q) + : text.includes(q); + if (match) { targetIdx = i; break; } + } + } + + if (targetIdx == null || targetIdx < 0 || targetIdx >= msgs.length || msgs[targetIdx].role !== 'user') { + return createFailure(createMemoryStorageError('restore checkpoint', 'PostgreSQL', new Error('Checkpoint user message not found'))); + } + + const checkpointUserQuery = getTextContent(msgs[targetIdx].content); + const newMessages = msgs.slice(0, targetIdx); + const removedCount = msgs.length - newMessages.length; + + const now = new Date(); + const updatedMetadata = { + ...(conversation.metadata || {}), + totalMessages: newMessages.length, + updatedAt: now, + lastActivity: now + }; + + const sql = ` + UPDATE ${fullConfig.tableName} + SET messages = $1, metadata = $2, updated_at = $3, last_activity = $3 + WHERE conversation_id = $4 + `; + await client.query(sql, [ + JSON.stringify(newMessages), + JSON.stringify(updatedMetadata), + now, + conversationId + ]); + + console.log(`[MEMORY:Postgres] Restored conversation ${conversationId} to checkpoint at index ${targetIdx} (removed ${removedCount} messages)`); + return createSuccess({ restored: true, removedMessagesCount: removedCount, checkpointIndex: targetIdx, checkpointUserQuery }); + } catch (error) { + return createFailure(createMemoryStorageError('restore checkpoint', 'PostgreSQL', error as Error)); + } + }; + const cleanupOldConversations = async (olderThanDays: number): Promise> => { const client = ensureConnected(); @@ -568,7 +644,8 @@ export async function createPostgresProvider(config: PostgresConfig, postgresCli clearUserConversations, getStats, healthCheck, - close + close, + restoreToCheckpoint }; } @@ -608,4 +685,4 @@ async function initializeSchema(client: PostgresClient, config: PostgresConfig & } console.log(`[MEMORY:Postgres] Schema initialized for table ${config.tableName}`); -} \ No newline at end of file +} diff --git a/src/memory/providers/redis.ts b/src/memory/providers/redis.ts index 03be8a5..8729677 100644 --- a/src/memory/providers/redis.ts +++ b/src/memory/providers/redis.ts @@ -1,4 +1,4 @@ -import { Message, TraceId } from '../../core/types'; +import { Message, TraceId, getTextContent } from '../../core/types'; import { MemoryProvider, ConversationMemory, @@ -447,6 +447,80 @@ export async function createRedisProvider(config: RedisConfig, redisClient: Redi clearUserConversations, getStats, healthCheck, - close + close, + restoreToCheckpoint: async (conversationId, criteria) => { + try { + const existingResult = await getConversation(conversationId); + if (!existingResult.success) { + return existingResult as unknown as Result<{ restored: boolean; removedMessagesCount: number; checkpointIndex: number; checkpointUserQuery?: string }>; + } + const conversation = existingResult.data; + if (!conversation) { + return createFailure(createMemoryNotFoundError(conversationId, 'Redis')); + } + + const msgs = conversation.messages as Message[]; + let targetIdx: number | null = null; + + if (typeof criteria.byMessageId === 'string') { + targetIdx = msgs.findIndex(m => m.id === (criteria.byMessageId as any)); + } else if (typeof criteria.byIndex === 'number') { + targetIdx = criteria.byIndex; + } else if (typeof criteria.byUserMessageNumber === 'number') { + let count = 0; + for (let i = 0; i < msgs.length; i++) { + if (msgs[i].role === 'user') { + if (count === criteria.byUserMessageNumber) { targetIdx = i; break; } + count++; + } + } + } else if (typeof criteria.byText === 'string') { + const q = criteria.byText; + const mode = criteria.match || 'exact'; + for (let i = msgs.length - 1; i >= 0; i--) { + const m = msgs[i]; + if (m.role !== 'user') continue; + const text = getTextContent(m.content); + const match = mode === 'exact' ? (text === q) + : mode === 'startsWith' ? text.startsWith(q) + : text.includes(q); + if (match) { targetIdx = i; break; } + } + } + + if (targetIdx == null || targetIdx < 0 || targetIdx >= msgs.length || msgs[targetIdx].role !== 'user') { + return createFailure(createMemoryStorageError('restore checkpoint', 'Redis', new Error('Checkpoint user message not found'))); + } + + const checkpointUserQuery = getTextContent(msgs[targetIdx].content); + const newMessages = msgs.slice(0, targetIdx); + const removedCount = msgs.length - newMessages.length; + + const now = new Date(); + const updatedConversation: ConversationMemory = { + ...conversation, + messages: newMessages, + metadata: { + ...(conversation.metadata ? { ...conversation.metadata } : {}), + createdAt: conversation.metadata?.createdAt ?? now, + updatedAt: now, + lastActivity: now, + totalMessages: newMessages.length + } + }; + + const key = getKey(conversationId); + await ensureConnected().set(key, JSON.stringify(updatedConversation, null, 0)); + + if (fullConfig.ttl) { + await ensureConnected().expire(key, fullConfig.ttl); + } + + console.log(`[MEMORY:Redis] Restored conversation ${conversationId} to checkpoint at index ${targetIdx} (removed ${removedCount} messages)`); + return createSuccess({ restored: true, removedMessagesCount: removedCount, checkpointIndex: targetIdx, checkpointUserQuery }); + } catch (error) { + return createFailure(createMemoryStorageError('restore checkpoint', 'Redis', error as Error)); + } + } }; -} \ No newline at end of file +} diff --git a/src/memory/types.ts b/src/memory/types.ts index 145e5c9..27e4419 100644 --- a/src/memory/types.ts +++ b/src/memory/types.ts @@ -104,6 +104,38 @@ export type MemoryProvider = { * Close/cleanup the provider */ readonly close: () => Promise>; + + /** + * Restore a conversation to a checkpoint above a specific user message. + * Removes the targeted user message and all messages after it. + * Returns the targeted user query text. + */ + readonly restoreToCheckpoint: ( + conversationId: string, + criteria: CheckpointCriteria + ) => Promise>; +}; + +/** + * Criteria to identify the user message to checkpoint against. + * Provide exactly one of the selectors. If multiple are provided, + * precedence is: byMessageId > byIndex > byUserMessageNumber > byText. + * + * - byIndex: 0-based index into the messages array + * - byUserMessageNumber: 0-based index among only 'user' role messages + * - byText: match the message text using the specified match mode + */ +export type CheckpointCriteria = { + readonly byMessageId?: string; + readonly byIndex?: number; + readonly byUserMessageNumber?: number; + readonly byText?: string; + readonly match?: 'exact' | 'startsWith' | 'contains'; }; export interface MemoryConfig { @@ -276,4 +308,4 @@ export const isSuccess = (result: Result): result is { success: true export const isFailure = (result: Result): result is { success: false; error: E } => { return !result.success; -}; \ No newline at end of file +}; diff --git a/src/server/server.ts b/src/server/server.ts index 61a9eb8..d2f307c 100644 --- a/src/server/server.ts +++ b/src/server/server.ts @@ -11,7 +11,7 @@ import { ApprovalMessage } from './types.js'; import { run, runStream } from '../core/engine.js'; -import { RunState, Message, createRunId, createTraceId } from '../core/types.js'; +import { RunState, Message, createRunId, createTraceId, generateMessageId } from '../core/types.js'; import { v4 as uuidv4 } from 'uuid'; // Helper: stable stringify to create deterministic signatures @@ -75,6 +75,7 @@ const attachmentSchema = { const httpMessageSchema = { type: 'object', properties: { + id: { type: 'string' }, role: { type: 'string', enum: ['user', 'assistant', 'system'] }, content: { oneOf: [ @@ -274,6 +275,7 @@ export function createJAFServer(config: ServerConfig): { // Convert HTTP messages to JAF messages const jafMessages: Message[] = validatedRequest.messages.map(msg => ({ + id: (msg as any).id || generateMessageId(), role: msg.role === 'system' ? 'user' : msg.role as 'user' | 'assistant', content: msg.content, attachments: (msg as any).attachments @@ -557,6 +559,7 @@ export function createJAFServer(config: ServerConfig): { if (msg.role === 'tool') { // Include tool messages with special formatting return { + id: msg.id, role: 'tool', content: msg.content, tool_call_id: msg.tool_call_id @@ -564,6 +567,7 @@ export function createJAFServer(config: ServerConfig): { } else if (msg.role === 'assistant' && msg.tool_calls) { // Include assistant messages with tool calls return { + id: msg.id, role: msg.role, content: msg.content || '', tool_calls: msg.tool_calls.map(tc => ({ @@ -578,6 +582,7 @@ export function createJAFServer(config: ServerConfig): { } else { // Regular user/assistant messages return { + id: msg.id, role: msg.role as 'user' | 'assistant', content: msg.content, ...(msg.attachments ? { attachments: msg.attachments } : {}) @@ -739,6 +744,66 @@ export function createJAFServer(config: ServerConfig): { }); }); + // Restore conversation to a checkpoint above a user message + app.post('/conversations/:conversationId/checkpoint', { + schema: { + body: { + oneOf: [ + { + type: 'object', + properties: { byMessageId: { type: 'string' } }, + required: ['byMessageId'] + }, + { + type: 'object', + properties: { byIndex: { type: 'number' } }, + required: ['byIndex'] + }, + { + type: 'object', + properties: { byUserMessageNumber: { type: 'number' } }, + required: ['byUserMessageNumber'] + }, + { + type: 'object', + properties: { + byText: { type: 'string' }, + match: { type: 'string', enum: ['exact', 'startsWith', 'contains'] } + }, + required: ['byText'] + } + ] + } + } + }, async ( + request: FastifyRequest<{ Params: { conversationId: string }; Body: { byMessageId?: string; byIndex?: number; byUserMessageNumber?: number; byText?: string; match?: 'exact' | 'startsWith' | 'contains' } }>, + reply: FastifyReply + ) => { + if (!config.defaultMemoryProvider) { + return reply.code(503).send({ success: false, error: 'Memory provider not configured' }); + } + + const criteria = request.body || {}; + const selectors = [ + criteria.byMessageId ? 'byMessageId' : null, + typeof criteria.byIndex === 'number' ? 'byIndex' : null, + typeof criteria.byUserMessageNumber === 'number' ? 'byUserMessageNumber' : null, + typeof criteria.byText === 'string' ? 'byText' : null + ].filter(Boolean) as string[]; + + if (selectors.length !== 1) { + return reply.code(400).send({ success: false, error: `Provide exactly one selector. Received: ${selectors.join(', ') || 'none'}` }); + } + + request.log.info({ selectorApplied: selectors[0], conversationId: request.params.conversationId }, 'Applying checkpoint selector'); + const result = await config.defaultMemoryProvider.restoreToCheckpoint(request.params.conversationId, criteria); + if (!result.success) { + return reply.code(400).send({ success: false, error: result.error.message }); + } + + return reply.code(200).send({ success: true, data: result.data }); + }); + app.get('/memory/health', async (request: FastifyRequest, reply: FastifyReply) => { if (!config.defaultMemoryProvider) { return reply.code(503).send({ diff --git a/src/server/types.ts b/src/server/types.ts index 6113450..067c4d2 100644 --- a/src/server/types.ts +++ b/src/server/types.ts @@ -24,6 +24,7 @@ export const attachmentSchema = z.object({ }); export const httpMessageSchema = z.object({ + id: z.string().optional(), role: z.enum(['user', 'assistant', 'system']), content: z.string(), attachments: z.array(attachmentSchema).optional() @@ -62,6 +63,7 @@ export type ApprovalMessage = z.infer; export const fullMessageSchema = z.union([ httpMessageSchema, z.object({ + id: z.string().optional(), role: z.literal('assistant'), content: z.string(), tool_calls: z.array(z.object({ @@ -74,6 +76,7 @@ export const fullMessageSchema = z.union([ })).optional() }), z.object({ + id: z.string().optional(), role: z.literal('tool'), content: z.string(), tool_call_id: z.string().optional()