Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
109 changes: 109 additions & 0 deletions examples/checkpoint-demo/server.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
import 'dotenv/config';
import IORedis from 'ioredis';

import { runServer } from '../../src/server/index';
import { makeLiteLLMProvider } from '../../src/providers/model';
import { createMemoryProviderFromEnv } from '../../src/memory/factory';
import { Agent } from '../../src/core/types';

async function startCheckpointDemo() {
// Configure LiteLLM provider
const litellmUrl = process.env.LITELLM_URL || 'http://localhost:4000';
const litellmKey = process.env.LITELLM_API_KEY || 'anything';

const modelProvider = makeLiteLLMProvider(litellmUrl, litellmKey);

// Configure Redis client for memory provider
const redisHost = process.env.JAF_REDIS_HOST || '127.0.0.1';
const redisPort = parseInt(process.env.JAF_REDIS_PORT || '6379', 10);
const redisPassword = process.env.JAF_REDIS_PASSWORD || undefined;
const redisDb = parseInt(process.env.JAF_REDIS_DB || '0', 10);

const redis = new IORedis({ host: redisHost, port: redisPort, password: redisPassword, db: redisDb });
const memory = await createMemoryProviderFromEnv({ redis });

// Simple demo agent
const agent: Agent<any, any> = {
name: 'DemoAgent',
instructions: () => 'You are a helpful assistant. Keep responses short.',
tools: [],
modelConfig: { name: process.env.DEMO_MODEL || 'gemini-2.5-pro', temperature: 0.2, maxTokens: 300 }
};

// Start server
const server = await runServer([agent], { modelProvider }, { port: parseInt(process.env.DEMO_PORT || '3000', 10), defaultMemoryProvider: memory });
const port = parseInt(process.env.DEMO_PORT || '3000', 10);
const base = `http://localhost:${port}`;

const conversationId = process.env.DEMO_CONV_ID || 'conv-checkpoint-demo-1';

// Helpful curl snippets
console.log('\n✅ Checkpoint Demo Server Ready');
console.log(` Base URL: ${base}`);
console.log(` LiteLLM: ${litellmUrl}`);
console.log(` Redis: ${redisHost}:${redisPort}/${redisDb}`);

console.log('\nTry these commands:');
console.log('\n1) Create conversation and store messages');
console.log(
`curl -X POST ${base}/chat \
-H 'Content-Type: application/json' \
-d '${JSON.stringify({
messages: [
{ id: 'msg_u1', role: 'user', content: 'Plan a 2-day trip to Kyoto.' }
],
agentName: agent.name,
conversationId,
memory: { autoStore: true, storeOnCompletion: true }
})}'`
);

console.log('\n2) Inspect stored conversation');
console.log(`curl ${base}/conversations/${conversationId}`);

console.log('\n3) Continue conversation');
console.log(
`curl -X POST ${base}/chat \
-H 'Content-Type: application/json' \
-d '${JSON.stringify({
messages: [
{ id: 'msg_u2', role: 'user', content: 'Add a tea ceremony on day 1.' }
],
agentName: agent.name,
conversationId,
memory: { autoStore: true, storeOnCompletion: true }
})}'`
);

console.log('\n4) Checkpoint to the first user message by ID (remove that and everything after)');
console.log(
`curl -X POST ${base}/conversations/${conversationId}/checkpoint \
-H 'Content-Type: application/json' \
-d '${JSON.stringify({ byMessageId: 'msg_u1' })}'`
);

console.log('\n5) Verify conversation after checkpoint');
console.log(`curl ${base}/conversations/${conversationId}`);

console.log('\n6) Continue again after checkpoint');
console.log(
`curl -X POST ${base}/chat \
-H 'Content-Type: application/json' \
-d '${JSON.stringify({
messages: [
{ id: 'msg_u3', role: 'user', content: 'Actually, plan a 1-day Kyoto itinerary instead.' }
],
agentName: agent.name,
conversationId,
memory: { autoStore: true, storeOnCompletion: true }
})}'`
);

console.log('\nTip: Use jq to pretty-print responses: add | jq');
}

startCheckpointDemo().catch((err) => {
console.error('❌ Failed to start demo:', err);
process.exit(1);
});

11 changes: 7 additions & 4 deletions src/adk/runners/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ import {
GraphOptions,
GraphResult
} from '../../visualization/graphviz';

import { randomUUID } from 'crypto';
import { getOrCreateSession, addMessageToSession, addArtifactToSession } from '../sessions';
import { executeTool } from '../tools';
import { createModelMessage, getFunctionCalls, createUserMessage } from '../content';
Expand Down Expand Up @@ -1010,8 +1010,11 @@ const getMessageText = (content: Content): string => {
};

const generateRequestId = (): string => {
// Use crypto-based ID generation for pure functional approach
return `req_${Date.now()}_${Math.random().toString(36).substring(2, 11)}`;
try {
return `req_${randomUUID()}`;
} catch {
return `req_${Date.now()}_${Math.random().toString(36).substring(2, 11)}`;
}
};

const createAgentEvent = (
Expand Down Expand Up @@ -1173,4 +1176,4 @@ export const generateRunnerGraphPng = async (
};

return await generateRunnerVisualization(config, options);
};
};
47 changes: 33 additions & 14 deletions src/core/engine.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import {
Interruption,
getTextContent,
Guardrail,
generateMessageId,
} from './types.js';
import { setToolRuntime } from './tool-runtime.js';
import { buildEffectiveGuardrails, executeInputGuardrailsParallel, executeInputGuardrailsSequential, executeOutputGuardrails } from './guardrails.js';
Expand All @@ -20,19 +21,24 @@ export async function run<Ctx, Out>(
config: RunConfig<Ctx>
): Promise<RunResult<Out>> {
try {
// Ensure all existing messages have IDs
const initialStateWithIds: RunState<Ctx> = {
...initialState,
messages: initialState.messages.map(m => m.id ? m : { ...m, id: generateMessageId() })
};
config.onEvent?.({
type: 'run_start',
data: {
runId: initialState.runId,
traceId: initialState.traceId,
context: initialState.context,
userId: (initialState.context as any)?.userId,
sessionId: (initialState.context as any)?.sessionId || (initialState.context as any)?.conversationId,
messages: initialState.messages
runId: initialStateWithIds.runId,
traceId: initialStateWithIds.traceId,
context: initialStateWithIds.context,
userId: (initialStateWithIds.context as any)?.userId,
sessionId: (initialStateWithIds.context as any)?.sessionId || (initialStateWithIds.context as any)?.conversationId,
messages: initialStateWithIds.messages
}
});

let stateWithMemory = initialState;
let stateWithMemory = initialStateWithIds;
if (config.memory?.autoStore && config.conversationId) {
console.log(`[JAF:ENGINE] Loading conversation history for ${config.conversationId}`);
stateWithMemory = await loadConversationHistory(initialState, config);
Expand All @@ -59,7 +65,7 @@ export async function run<Ctx, Out>(

config.onEvent?.({
type: 'run_end',
data: { outcome: result.outcome, traceId: initialState.traceId, runId: initialState.runId }
data: { outcome: result.outcome, traceId: initialStateWithIds.traceId, runId: initialStateWithIds.runId }
});

return result;
Expand Down Expand Up @@ -474,6 +480,7 @@ async function runInternal<Ctx, Out>(
try {
streamingUsed = true;
const stream = config.modelProvider.getCompletionStream(state, currentAgent, config);
const assistantMessageId = generateMessageId();
let aggregatedText = '';
const toolCalls: Array<{ id?: string; type: 'function'; function: { name?: string; arguments: string } }> = [];

Expand All @@ -497,6 +504,7 @@ async function runInternal<Ctx, Out>(
if (chunk?.delta || chunk?.toolCallDelta) {
assistantEventStreamed = true;
const partialMessage: Message = {
id: assistantMessageId,
role: 'assistant',
content: aggregatedText,
...(toolCalls.length > 0
Expand All @@ -518,6 +526,7 @@ async function runInternal<Ctx, Out>(

llmResponse = {
message: {
id: assistantMessageId,
content: aggregatedText || undefined,
...(toolCalls.length > 0
? {
Expand Down Expand Up @@ -547,6 +556,7 @@ async function runInternal<Ctx, Out>(
try {
streamingUsed = true;
const stream = config.modelProvider.getCompletionStream(state, currentAgent, config);
const assistantMessageId = generateMessageId();
let aggregatedText = '';
const toolCalls: Array<{ id?: string; type: 'function'; function: { name?: string; arguments: string } }> = [];

Expand All @@ -570,6 +580,7 @@ async function runInternal<Ctx, Out>(
if (chunk?.delta || chunk?.toolCallDelta) {
assistantEventStreamed = true;
const partialMessage: Message = {
id: assistantMessageId,
role: 'assistant',
content: aggregatedText,
...(toolCalls.length > 0
Expand All @@ -591,6 +602,7 @@ async function runInternal<Ctx, Out>(

llmResponse = {
message: {
id: assistantMessageId,
content: aggregatedText || undefined,
...(toolCalls.length > 0
? {
Expand Down Expand Up @@ -667,6 +679,7 @@ async function runInternal<Ctx, Out>(
}

const assistantMessage: Message = {
id: llmResponse.message.id ?? generateMessageId(),
role: 'assistant',
content: llmResponse.message.content || '',
tool_calls: llmResponse.message.tool_calls
Expand Down Expand Up @@ -695,12 +708,12 @@ async function runInternal<Ctx, Out>(
config.onEvent?.({ type: 'tool_requests', data: { toolCalls: requests } });
} catch { /* ignore */ }

const toolResults = await executeToolCalls(
llmResponse.message.tool_calls,
currentAgent,
state,
config,
);
const toolResults = await executeToolCalls(
llmResponse.message.tool_calls,
currentAgent,
state,
config,
);

const interruptions = toolResults
.map(r => r.interruption)
Expand Down Expand Up @@ -1017,6 +1030,7 @@ async function executeToolCalls<Ctx>(

return {
message: {
id: generateMessageId(),
role: 'tool',
content: errorResult,
tool_call_id: toolCall.id
Expand Down Expand Up @@ -1086,6 +1100,7 @@ async function executeToolCalls<Ctx>(
sessionId: state.runId,
},
message: {
id: generateMessageId(),
role: 'tool',
content: JSON.stringify({
status: 'halted',
Expand All @@ -1102,6 +1117,7 @@ async function executeToolCalls<Ctx>(
const rejectionReason = additionalContext?.rejectionReason || 'User declined the action';
return {
message: {
id: generateMessageId(),
role: 'tool',
content: JSON.stringify({
status: 'approval_denied',
Expand Down Expand Up @@ -1162,6 +1178,7 @@ async function executeToolCalls<Ctx>(
if (handoffCheck && typeof handoffCheck === 'object' && 'handoff_to' in handoffCheck) {
return {
message: {
id: generateMessageId(),
role: 'tool',
content: resultString,
tool_call_id: toolCall.id
Expand Down Expand Up @@ -1198,6 +1215,7 @@ async function executeToolCalls<Ctx>(

return {
message: {
id: generateMessageId(),
role: 'tool',
content: finalContent,
tool_call_id: toolCall.id
Expand Down Expand Up @@ -1231,6 +1249,7 @@ async function executeToolCalls<Ctx>(

return {
message: {
id: generateMessageId(),
role: 'tool',
content: errorResult,
tool_call_id: toolCall.id
Expand Down
9 changes: 8 additions & 1 deletion src/core/types.ts
Original file line number Diff line number Diff line change
@@ -1,12 +1,19 @@
import { z } from 'zod';
import { randomUUID } from 'crypto';
import { MemoryConfig } from '../memory/types';
import type { ApprovalStorage } from '../memory/approval-storage';

export type TraceId = string & { readonly _brand: 'TraceId' };
export type RunId = string & { readonly _brand: 'RunId' };
export type MessageId = string & { readonly _brand: 'MessageId' };

export const createTraceId = (id: string): TraceId => id as TraceId;
export const createRunId = (id: string): RunId => id as RunId;
export const createMessageId = (id: string): MessageId => id as MessageId;

export const generateMessageId = (): MessageId => {
return (`msg_${randomUUID()}`) as MessageId;
};

export type ValidationResult =
| { readonly isValid: true }
Expand Down Expand Up @@ -37,6 +44,7 @@ export type MessageContentPart =
| { readonly type: 'file'; readonly file: { readonly file_id: string; readonly format?: string } };

export type Message = {
readonly id?: MessageId;
readonly role: 'user' | 'assistant' | 'tool';
readonly content: string | readonly MessageContentPart[];
readonly attachments?: readonly Attachment[]; // Optional structured attachments
Expand Down Expand Up @@ -266,4 +274,3 @@ export const jsonParseLLMOutput = (text: string): any => {
return null;
}
};

Loading