Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
18 commits
Select commit Hold shift + click to select a range
4c8fa7f
feat(gemini): support --resume flag to restore previous gemini session
junmo-kim Mar 8, 2026
851c535
fix(gemini): use session/load ACP call when resuming existing session
junmo-kim Mar 8, 2026
fa31111
tidy(gemini): extract findGeminiTranscriptPath and readGeminiTranscri…
junmo-kim Mar 8, 2026
10d69bc
test(gemini): add tests for findGeminiTranscriptPath and readGeminiTr…
junmo-kim Mar 8, 2026
8421d57
feat(gemini): replay historical messages in UI when resuming a session
junmo-kim Mar 8, 2026
89cbde9
fix(gemini): send historical messages to web client on session resume
junmo-kim Mar 8, 2026
c18e74f
fix(gemini): fall back to new session when loadSession fails on resume
junmo-kim Mar 8, 2026
c3974f5
fix(gemini): handle structured content in user transcript messages
junmo-kim Mar 8, 2026
e58d1fb
fix(gemini): prevent duplicate history replay on local-to-remote mode…
junmo-kim Mar 8, 2026
37a6077
fix(gemini): remove hub message replay on session resume
junmo-kim Mar 8, 2026
f055e60
fix(gemini): abort stuck ACP prompt via AbortSignal on session cancel
junmo-kim Mar 8, 2026
82db71a
fix(gemini): send historical messages to web UI on remote resume
junmo-kim Mar 8, 2026
f377aa1
fix(acp): fix TypeScript error in AcpStdioTransport sendRequest
junmo-kim Mar 8, 2026
6ee6eee
fix(gemini): suppress false 'prompt failed' error on user abort
junmo-kim Mar 9, 2026
f3264ad
fix(acp): address bot review comments
junmo-kim Mar 9, 2026
8901487
test: add TDD tests for abort cleanup and history replay fixes
junmo-kim Mar 9, 2026
e02a7da
fix(gemini): fix local→remote history gap and array content handling
junmo-kim Mar 9, 2026
1061603
fix(gemini): prevent duplicate history replay on remote→local→remote …
junmo-kim Mar 9, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions bun.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

45 changes: 45 additions & 0 deletions cli/src/agent/backends/acp/AcpSdkBackend.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -101,4 +101,49 @@ describe('AcpSdkBackend', () => {
'turn_complete'
]);
});

it('rejects prompt() when AbortSignal fires before response arrives', async () => {
backendStatics.PRE_PROMPT_UPDATE_QUIET_PERIOD_MS = 0;
backendStatics.PRE_PROMPT_UPDATE_DRAIN_TIMEOUT_MS = 0;
backendStatics.UPDATE_QUIET_PERIOD_MS = 0;
backendStatics.UPDATE_DRAIN_TIMEOUT_MS = 0;

const backend = new AcpSdkBackend({ command: 'gemini' });
const backendInternal = backend as unknown as {
transport: {
sendRequest: (...args: unknown[]) => Promise<unknown>;
close: () => Promise<void>;
} | null;
};

// Transport that never resolves (simulates stuck Gemini CLI)
backendInternal.transport = {
sendRequest: (_method: unknown, _params: unknown, opts: unknown) => {
return new Promise<unknown>((_resolve, reject) => {
const signal = (opts as { signal?: AbortSignal })?.signal;
const onAbort = () => reject(new DOMException('ACP request aborted', 'AbortError'));
if (signal?.aborted) {
onAbort();
return;
}
signal?.addEventListener('abort', onAbort, { once: true });
});
},
close: async () => {}
};

const controller = new AbortController();
const promptPromise = backend.prompt(
'session-1',
[{ type: 'text', text: 'hello' }],
() => {},
controller.signal
);

// Let prompt() advance past its internal awaits and reach sendRequest()
await new Promise<void>((resolve) => setTimeout(resolve, 10));
controller.abort();

await expect(promptPromise).rejects.toMatchObject({ name: 'AbortError' });
});
});
5 changes: 3 additions & 2 deletions cli/src/agent/backends/acp/AcpSdkBackend.ts
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,8 @@ export class AcpSdkBackend implements AgentBackend {
async prompt(
sessionId: string,
content: PromptContent[],
onUpdate: (msg: AgentMessage) => void
onUpdate: (msg: AgentMessage) => void,
signal?: AbortSignal
): Promise<void> {
if (!this.transport) {
throw new Error('ACP transport not initialized');
Expand Down Expand Up @@ -167,7 +168,7 @@ export class AcpSdkBackend implements AgentBackend {
const response = await this.transport.sendRequest('session/prompt', {
sessionId,
prompt: content
}, { timeoutMs: Infinity });
}, { timeoutMs: Infinity, signal });

stopReason = isObject(response) ? asString(response.stopReason) : null;
} finally {
Expand Down
91 changes: 91 additions & 0 deletions cli/src/agent/backends/acp/AcpStdioTransport.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
import { describe, expect, it, vi, beforeEach } from 'vitest';
import { AcpStdioTransport } from './AcpStdioTransport';
import * as childProcess from 'node:child_process';
import { EventEmitter } from 'node:events';

vi.mock('node:child_process');

function makeFakeProcess() {
const stdin = { write: vi.fn(), end: vi.fn() };
const stdout = new EventEmitter() as EventEmitter & { setEncoding: (enc: string) => void };
stdout.setEncoding = vi.fn();
const stderr = new EventEmitter() as EventEmitter & { setEncoding: (enc: string) => void };
stderr.setEncoding = vi.fn();
const proc = new EventEmitter() as EventEmitter & {
stdin: typeof stdin;
stdout: typeof stdout;
stderr: typeof stderr;
pid: number;
};
proc.stdin = stdin;
proc.stdout = stdout;
proc.stderr = stderr;
proc.pid = 12345;
return proc;
}

describe('AcpStdioTransport.sendRequest', () => {
beforeEach(() => {
vi.clearAllMocks();
});

it('rejects immediately when signal is already aborted', async () => {
const fakeProc = makeFakeProcess();
vi.mocked(childProcess.spawn).mockReturnValue(fakeProc as unknown as ReturnType<typeof childProcess.spawn>);

const transport = new AcpStdioTransport({ command: 'gemini' });
const controller = new AbortController();
controller.abort();

await expect(
transport.sendRequest('session/prompt', {}, { timeoutMs: Infinity, signal: controller.signal })
).rejects.toMatchObject({ name: 'AbortError' });
});

it('clears the timeout timer when signal is already aborted', async () => {
const clearTimeoutSpy = vi.spyOn(globalThis, 'clearTimeout');

const fakeProc = makeFakeProcess();
vi.mocked(childProcess.spawn).mockReturnValue(fakeProc as unknown as ReturnType<typeof childProcess.spawn>);

const transport = new AcpStdioTransport({ command: 'gemini' });
const controller = new AbortController();
controller.abort();

await expect(
transport.sendRequest('session/prompt', {}, { timeoutMs: 1000, signal: controller.signal })
).rejects.toMatchObject({ name: 'AbortError' });

expect(clearTimeoutSpy).toHaveBeenCalled();
clearTimeoutSpy.mockRestore();
});

it('rejects when signal fires after request is sent', async () => {
const fakeProc = makeFakeProcess();
vi.mocked(childProcess.spawn).mockReturnValue(fakeProc as unknown as ReturnType<typeof childProcess.spawn>);

const transport = new AcpStdioTransport({ command: 'gemini' });
const controller = new AbortController();

const requestPromise = transport.sendRequest('session/prompt', {}, { timeoutMs: Infinity, signal: controller.signal });

controller.abort();

await expect(requestPromise).rejects.toMatchObject({ name: 'AbortError' });
});

it('resolves normally when response arrives before abort', async () => {
const fakeProc = makeFakeProcess();
vi.mocked(childProcess.spawn).mockReturnValue(fakeProc as unknown as ReturnType<typeof childProcess.spawn>);

const transport = new AcpStdioTransport({ command: 'gemini' });
const controller = new AbortController();

const requestPromise = transport.sendRequest('session/prompt', {}, { timeoutMs: Infinity, signal: controller.signal });

// Simulate Gemini CLI responding with id=1
fakeProc.stdout.emit('data', JSON.stringify({ jsonrpc: '2.0', id: 1, result: { stopReason: 'end_turn' } }) + '\n');

await expect(requestPromise).resolves.toEqual({ stopReason: 'end_turn' });
});
});
49 changes: 31 additions & 18 deletions cli/src/agent/backends/acp/AcpStdioTransport.ts
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ export class AcpStdioTransport {
/** Default timeout for requests in milliseconds (2 minutes) */
static readonly DEFAULT_TIMEOUT_MS = 120_000;

async sendRequest(method: string, params?: unknown, options?: { timeoutMs?: number }): Promise<unknown> {
async sendRequest(method: string, params?: unknown, options?: { timeoutMs?: number; signal?: AbortSignal }): Promise<unknown> {
const id = this.nextId++;
const payload: JsonRpcRequest = {
jsonrpc: '2.0',
Expand All @@ -111,32 +111,45 @@ export class AcpStdioTransport {
};

const timeoutMs = options?.timeoutMs ?? AcpStdioTransport.DEFAULT_TIMEOUT_MS;

// Skip timeout for infinite/no-timeout requests (e.g., long-running prompts)
if (!Number.isFinite(timeoutMs)) {
return new Promise<unknown>((resolve, reject) => {
this.pending.set(id, { resolve, reject });
this.writePayload(payload);
});
}
const signal = options?.signal;

return new Promise<unknown>((resolve, reject) => {
const timer = setTimeout(() => {
if (this.pending.has(id)) {
this.pending.delete(id);
let timer: ReturnType<typeof setTimeout> | null = null;

const cleanup = () => {
this.pending.delete(id);
if (timer !== null) clearTimeout(timer);
signal?.removeEventListener('abort', onAbort);
};

const onAbort = () => {
cleanup();
reject(new DOMException('ACP request aborted', 'AbortError'));
};

if (Number.isFinite(timeoutMs)) {
timer = setTimeout(() => {
cleanup();
reject(new Error(`ACP request '${method}' timed out after ${timeoutMs}ms`));
}
}, timeoutMs);
// Don't let timer keep Node alive if process wants to exit
timer.unref();
}, timeoutMs as number);
timer.unref?.();
}

if (signal?.aborted) {
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[MINOR] signal.aborted check after timer setup leaves a pending timer

If the signal is already aborted, the timeout timer is still scheduled and left to fire later. Move the aborted check before creating the timer, or clear the timer before returning.

Suggested fix:

if (signal?.aborted) {
    reject(new DOMException('ACP request aborted', 'AbortError'));
    return;
}

if (Number.isFinite(timeoutMs)) {
    // existing timer setup
}

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed in commit f3264ad — cleanup() is now called before rejecting on a pre-aborted signal, which clears the pending timer.

cleanup();
reject(new DOMException('ACP request aborted', 'AbortError'));
return;
}

signal?.addEventListener('abort', onAbort, { once: true });

this.pending.set(id, {
resolve: (value) => {
clearTimeout(timer);
cleanup();
resolve(value);
},
reject: (error) => {
clearTimeout(timer);
cleanup();
reject(error);
}
});
Expand Down
7 changes: 7 additions & 0 deletions cli/src/commands/gemini.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ export const geminiCommand: CommandDefinition = {
startingMode?: 'local' | 'remote'
permissionMode?: GeminiPermissionMode
model?: string
resumeGeminiSessionId?: string
} = {}

for (let i = 0; i < commandArgs.length; i++) {
Expand All @@ -36,6 +37,12 @@ export const geminiCommand: CommandDefinition = {
throw new Error('Missing --model value')
}
options.model = model
} else if (arg === '--resume') {
const sessionId = commandArgs[++i]
if (!sessionId) {
throw new Error('Missing --resume value')
}
options.resumeGeminiSessionId = sessionId
}
}

Expand Down
115 changes: 115 additions & 0 deletions cli/src/gemini/geminiLocalLauncher.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
import { describe, it, expect, vi, afterEach } from 'vitest';
import { geminiLocalLauncher } from './geminiLocalLauncher';

vi.mock('./geminiLocal', () => ({ geminiLocal: vi.fn().mockResolvedValue(undefined) }));

vi.mock('./utils/sessionScanner', async (importOriginal) => {
const actual = await importOriginal<typeof import('./utils/sessionScanner')>();
return {
...actual,
readGeminiTranscript: vi.fn(),
createGeminiSessionScanner: vi.fn().mockResolvedValue({
cleanup: vi.fn().mockResolvedValue(undefined),
onNewSession: vi.fn(),
}),
};
});

vi.mock('@/modules/common/launcher/BaseLocalLauncher', () => ({
// eslint-disable-next-line @typescript-eslint/no-explicit-any
BaseLocalLauncher: vi.fn().mockImplementation(function(this: any) {
this.run = vi.fn().mockResolvedValue('exit');
}),
}));

afterEach(() => {
vi.clearAllMocks();
});

function makeMockSession(opts: {
startingMode: 'local' | 'remote';
transcriptPath?: string;
historyReplayed?: boolean;
}) {
return {
path: '/test/path',
logPath: '/test/log',
sessionId: null,
transcriptPath: opts.transcriptPath ?? null,
historyReplayed: opts.historyReplayed ?? false,
historyReplayCutoff: 0,
startedBy: 'runner' as const,
startingMode: opts.startingMode,
queue: { waitForMessagesAndGetAsString: vi.fn(), size: vi.fn(), reset: vi.fn() },
client: { rpcHandlerManager: { registerHandler: vi.fn() } },
sendSessionEvent: vi.fn(),
sendUserMessage: vi.fn(),
sendCodexMessage: vi.fn(),
onSessionFound: vi.fn(),
onThinkingChange: vi.fn(),
getPermissionMode: vi.fn().mockReturnValue('auto'),
addTranscriptPathCallback: vi.fn(),
removeTranscriptPathCallback: vi.fn(),
recordLocalLaunchFailure: vi.fn(),
};
}

describe('geminiLocalLauncher', () => {
describe('historyReplayCutoff on ensureScanner', () => {
it('sets historyReplayed=true and historyReplayCutoff=0 when startingMode is remote (regardless of existing messages)', async () => {
const { readGeminiTranscript } = await import('./utils/sessionScanner');
vi.mocked(readGeminiTranscript).mockResolvedValue({
messages: [
{ type: 'user', content: 'msg1' },
{ type: 'gemini', content: 'reply1' },
]
});

const session = makeMockSession({ startingMode: 'remote', transcriptPath: '/some/path.json' });
await geminiLocalLauncher(session as never, {});

expect(session.historyReplayed).toBe(true);
expect(session.historyReplayCutoff).toBe(0);
});

it('sets historyReplayCutoff to existing count when startingMode is local and messages exist', async () => {
const { readGeminiTranscript } = await import('./utils/sessionScanner');
vi.mocked(readGeminiTranscript).mockResolvedValue({
messages: [
{ type: 'user', content: 'msg1' },
{ type: 'gemini', content: 'reply1' },
{ type: 'user', content: 'msg2' },
]
});

const session = makeMockSession({ startingMode: 'local', transcriptPath: '/some/path.json' });
await geminiLocalLauncher(session as never, {});

expect(session.historyReplayCutoff).toBe(3);
expect(session.historyReplayed).toBe(false);
});

it('does not overwrite historyReplayed or historyReplayCutoff if already replayed', async () => {
const { readGeminiTranscript } = await import('./utils/sessionScanner');
vi.mocked(readGeminiTranscript).mockResolvedValue({
messages: [
{ type: 'user', content: 'msg1' },
{ type: 'user', content: 'msg2' },
{ type: 'user', content: 'msg3' },
]
});

const session = makeMockSession({
startingMode: 'local',
transcriptPath: '/some/path.json',
historyReplayed: true,
});
session.historyReplayCutoff = 0;
await geminiLocalLauncher(session as never, {});

// historyReplayed was already true; should not be reset by scanner
expect(session.historyReplayed).toBe(true);
expect(session.historyReplayCutoff).toBe(0);
});
});
});
Loading