diff --git a/server/fresh-agent/adapters/opencode/serve-manager.ts b/server/fresh-agent/adapters/opencode/serve-manager.ts index a43c24356..b282bc975 100644 --- a/server/fresh-agent/adapters/opencode/serve-manager.ts +++ b/server/fresh-agent/adapters/opencode/serve-manager.ts @@ -7,7 +7,7 @@ import { allocateLocalhostPort, type LoopbackServerEndpoint } from '../../../loc import { logger } from '../../../logger.js' import { parseServeEvent, type ParsedServeEvent } from './serve-events.js' -type OpencodeServeLogger = Pick +type OpencodeServeLogger = Pick const OWNERSHIP_ENV = 'FRESHELL_OPENCODE_SIDECAR_ID' const DEFAULT_IDLE_POLL_MS = 500 @@ -84,6 +84,15 @@ class OpencodeServeRequestTimeoutError extends Error { } } +export class OpencodeServeLostError extends Error { + readonly sessionId: string + constructor(sessionId: string) { + super(`opencode serve sidecar was lost while waiting for session ${sessionId} to go idle.`) + this.name = 'OpencodeServeLostError' + this.sessionId = sessionId + } +} + export class OpencodeServeManager { private readonly command: string private readonly spawnFn: typeof spawn @@ -141,14 +150,24 @@ export class OpencodeServeManager { } private forgetSessionsForCwd(cwdKey: string): void { + const lostSessions: string[] = [] for (const [sessionId, sessionCwdKey] of this.sessionCwdById.entries()) { if (sessionCwdKey !== cwdKey) continue this.sessionCwdById.delete(sessionId) - this.sessionEmitters.delete(sessionId) + lostSessions.push(sessionId) } if (cwdKey === DEFAULT_CWD_KEY) { for (const sessionId of this.sessionEmitters.keys()) { - if (!this.sessionCwdById.has(sessionId)) this.sessionEmitters.delete(sessionId) + if (!this.sessionCwdById.has(sessionId)) { + if (!lostSessions.includes(sessionId)) lostSessions.push(sessionId) + } + } + } + for (const sessionId of lostSessions) { + const emitter = this.sessionEmitters.get(sessionId) + if (emitter) { + emitter.emit('lost', new OpencodeServeLostError(sessionId)) + this.sessionEmitters.delete(sessionId) } } } @@ -166,10 +185,12 @@ export class OpencodeServeManager { running.idleTimer = undefined if (running.activeRequests > 0) return if (this.runningByCwd.get(running.cwdKey) !== running) return + this.forgetSessionsForCwd(running.cwdKey) this.runningByCwd.delete(running.cwdKey) this.startPromiseByCwd.delete(running.cwdKey) this.startAbortByCwd.delete(running.cwdKey) try { running.stopEventStream() } catch { /* ignore */ } + this.log.info({ cwdKey: running.cwdKey }, 'killing idle opencode serve sidecar after idle timeout') void killOwnedProcesses(running.child, running.ownershipId, this.log) }, this.idleShutdownMs) running.idleTimer.unref?.() @@ -282,10 +303,12 @@ export class OpencodeServeManager { // Drain stdout/stderr so the child's pipe buffers never back-pressure // and stall the serve process. Diagnostics are captured below before health. child.stdout?.on('data', () => {}) - child.stderr?.on('data', () => {}) + child.stderr?.on('data', (chunk) => { + this.log.debug({ chunk: chunk.toString().trim() }, 'opencode serve stderr') + }) child.on('error', (err) => this.log.error({ err }, 'opencode serve process error')) - child.on('close', (code) => { - this.log.warn({ code }, 'opencode serve exited') + child.on('close', (code, signal) => { + this.log.warn({ code, signal }, 'opencode serve exited') const runningEntry = this.runningByCwd.get(route.cwdKey) if (runningEntry && runningEntry.child === child) { this.runningByCwd.delete(route.cwdKey) @@ -540,6 +563,7 @@ export class OpencodeServeManager { clearTimeout(timer) clearInterval(pollTimer) emitter.off('event', handler) + emitter.off('lost', onLost) } const finish = () => { if (settled) return @@ -602,7 +626,9 @@ export class OpencodeServeManager { void checkStatusMap() } } + const onLost = (err: OpencodeServeLostError) => fail(err) emitter.on('event', handler) + emitter.on('lost', onLost) }) } diff --git a/test/unit/server/fresh-agent/opencode-serve-adapter.test.ts b/test/unit/server/fresh-agent/opencode-serve-adapter.test.ts index 5f38fdf45..4ff9dbaef 100644 --- a/test/unit/server/fresh-agent/opencode-serve-adapter.test.ts +++ b/test/unit/server/fresh-agent/opencode-serve-adapter.test.ts @@ -254,6 +254,19 @@ describe('OpenCode serve adapter: create + send', () => { } }) + it('emits idle status and rejects when onceIdle rejects with a lost-session error (sidecar died)', async () => { + const manager = makeFakeManager() + manager.onceIdle = vi.fn(() => Promise.reject(new Error('opencode serve sidecar was lost.'))) + const adapter = makeAdapter(manager) + await adapter.create({ requestId: 'lost-1', sessionType: 'freshopencode', provider: 'opencode' }) + + const events: unknown[] = [] + adapter.subscribe?.('freshopencode-lost-1', (e) => events.push(e)) + + await expect(adapter.send?.('freshopencode-lost-1', { text: 'hi' })).rejects.toThrow(/sidecar was lost/i) + expect(events).toContainEqual({ type: 'sdk.session.snapshot', sessionId: 'freshopencode-lost-1', status: 'idle' }) + }) + it('does not return to running when OpenCode emits a late message update after idle', async () => { const manager = makeFakeManager() const adapter = makeAdapter(manager) diff --git a/test/unit/server/fresh-agent/opencode-serve-manager.test.ts b/test/unit/server/fresh-agent/opencode-serve-manager.test.ts index 102f8f87a..44f47157b 100644 --- a/test/unit/server/fresh-agent/opencode-serve-manager.test.ts +++ b/test/unit/server/fresh-agent/opencode-serve-manager.test.ts @@ -3,7 +3,7 @@ import { PassThrough } from 'node:stream' import { ReadableStream } from 'node:stream/web' import { describe, expect, it, vi } from 'vitest' import { parseServeEvent as parseEvt } from '../../../../server/fresh-agent/adapters/opencode/serve-events.js' -import { OpencodeServeManager } from '../../../../server/fresh-agent/adapters/opencode/serve-manager.js' +import { OpencodeServeManager, OpencodeServeLostError } from '../../../../server/fresh-agent/adapters/opencode/serve-manager.js' function fakeChild() { const child = new EventEmitter() as any @@ -749,6 +749,55 @@ describe('OpencodeServeManager fan-out', () => { expect((manager as any).sessionEmitters.get('ses_a')?.listenerCount('event') ?? 0).toBe(0) }) + it('onceIdle rejects promptly when the sidecar dies mid-turn instead of waiting for the full timeout', async () => { + const child = fakeChild() + const stopStream = vi.fn() + const spawnFn = vi.fn(() => child) + const manager = new OpencodeServeManager({ + spawnFn: spawnFn as any, + fetchFn: vi.fn(async (url: string) => { + if (url.endsWith('/global/health')) return jsonResponse({ healthy: true }) + return jsonResponse({}) + }) as any, + allocatePort: async () => ({ hostname: '127.0.0.1', port: 47999 }), + connectEventStream: () => stopStream, + healthTimeoutMs: 1000, + }) + await manager.ensureStarted() + + const idle = manager.onceIdle('ses_dying', 600_000) + // Kill the sidecar mid-turn + child.emit('close', 1) + + // Should reject within a short time, not wait 600 seconds + await expect(idle).rejects.toThrow(/sidecar|lost|exit|closed|unavailable/i) + }) + + it('onceIdle lost rejection cleans up its event listener', async () => { + const child = fakeChild() + const stopStream = vi.fn() + const spawnFn = vi.fn(() => child) + const manager = new OpencodeServeManager({ + spawnFn: spawnFn as any, + fetchFn: vi.fn(async (url: string) => { + if (url.endsWith('/global/health')) return jsonResponse({ healthy: true }) + return jsonResponse({}) + }) as any, + allocatePort: async () => ({ hostname: '127.0.0.1', port: 47999 }), + connectEventStream: () => stopStream, + healthTimeoutMs: 1000, + }) + await manager.ensureStarted() + + const emitterBefore = (manager as any).emitterFor('ses_cleanup') + const listenerCountBefore = emitterBefore.listenerCount('event') + const idle = manager.onceIdle('ses_cleanup', 600_000) + child.emit('close', 1) + await expect(idle).rejects.toThrow() + // The onceIdle handler must have been removed from the emitter + expect(emitterBefore.listenerCount('event')).toBe(listenerCountBefore) + }) + it('normalizes CRLF SSE boundaries', async () => { let push!: (e: any) => void const { manager } = makeManager({ @@ -960,4 +1009,182 @@ describe('OpencodeServeManager fan-out', () => { closeStream?.() expect(seen.map((e) => e.kind)).toEqual(['session.idle']) }) + + it('idle-timer kill emits lost event and cleans up session emitters', async () => { + vi.useFakeTimers() + try { + const childDefault = fakeChild() + const childProject = fakeChild() + const spawnFn = vi.fn() + .mockReturnValueOnce(childDefault) + .mockReturnValueOnce(childProject) + const fetchFn = vi.fn(async (url: string, init: any) => { + if (url === 'http://127.0.0.1:47999/global/health') return jsonResponse({ healthy: true }) + if (url === 'http://127.0.0.1:47999/session/ses_project' && init?.method === 'GET') { + return jsonResponse({ id: 'ses_project', directory: '/project-a' }) + } + if (url === 'http://127.0.0.1:47999/session/ses_project/compact' && init?.method === 'POST') { + return jsonResponse({}, { status: 204 }) + } + if (url === 'http://127.0.0.1:48000/global/health') return jsonResponse({ healthy: true }) + if (url === 'http://127.0.0.1:48000/session/ses_project/summarize' && init?.method === 'POST') { + return jsonResponse({}, { status: 204 }) + } + return jsonResponse({}, { status: 404 }) + }) + const manager = new OpencodeServeManager({ + spawnFn: spawnFn as any, + fetchFn: fetchFn as any, + allocatePort: vi.fn() + .mockResolvedValueOnce({ hostname: '127.0.0.1', port: 47999 }) + .mockResolvedValueOnce({ hostname: '127.0.0.1', port: 48000 }), + connectEventStream: () => () => {}, + healthTimeoutMs: 1000, + idleShutdownMs: 50, + } as any) + + await manager.ensureStarted() + await manager.getSession('ses_project') + await manager.compact('ses_project') + manager.subscribe('ses_project', () => {}) + + const lostHandler = vi.fn() + const emitter = (manager as any).sessionEmitters.get('ses_project') + emitter.on('lost', lostHandler) + expect((manager as any).sessionEmitters.has('ses_project')).toBe(true) + + await vi.advanceTimersByTimeAsync(51) + + expect(childProject.kill).toHaveBeenCalled() + expect(lostHandler).toHaveBeenCalledTimes(1) + expect(lostHandler.mock.calls[0][0]).toBeInstanceOf(OpencodeServeLostError) + expect((manager as any).sessionEmitters.has('ses_project')).toBe(false) + } finally { + vi.useRealTimers() + } + }) + + it('idle-timer kill rejects pending onceIdle with OpencodeServeLostError', async () => { + vi.useFakeTimers() + try { + const childDefault = fakeChild() + const childProject = fakeChild() + const spawnFn = vi.fn() + .mockReturnValueOnce(childDefault) + .mockReturnValueOnce(childProject) + const fetchFn = vi.fn(async (url: string, init: any) => { + if (url === 'http://127.0.0.1:47999/global/health') return jsonResponse({ healthy: true }) + if (url === 'http://127.0.0.1:47999/session/ses_project' && init?.method === 'GET') { + return jsonResponse({ id: 'ses_project', directory: '/project-a' }) + } + if (url === 'http://127.0.0.1:47999/session/ses_project/compact' && init?.method === 'POST') { + return jsonResponse({}, { status: 204 }) + } + if (url === 'http://127.0.0.1:48000/global/health') return jsonResponse({ healthy: true }) + if (url === 'http://127.0.0.1:48000/session/ses_project/summarize' && init?.method === 'POST') { + return jsonResponse({}, { status: 204 }) + } + if (url === 'http://127.0.0.1:48000/session/status') return jsonResponse({}) + return jsonResponse({}, { status: 404 }) + }) + const manager = new OpencodeServeManager({ + spawnFn: spawnFn as any, + fetchFn: fetchFn as any, + allocatePort: vi.fn() + .mockResolvedValueOnce({ hostname: '127.0.0.1', port: 47999 }) + .mockResolvedValueOnce({ hostname: '127.0.0.1', port: 48000 }), + connectEventStream: () => () => {}, + healthTimeoutMs: 1000, + idleShutdownMs: 50, + idlePollMs: 10_000, + } as any) + + await manager.ensureStarted() + await manager.getSession('ses_project') + await manager.compact('ses_project') + manager.subscribe('ses_project', () => {}) + + const idle = manager.onceIdle('ses_project', 200) + idle.catch(() => {}) + + await vi.advanceTimersByTimeAsync(201) + + await expect(idle).rejects.toThrow(/opencode serve sidecar was lost/) + expect((manager as any).sessionEmitters.has('ses_project')).toBe(false) + } finally { + vi.useRealTimers() + } + }) +}) + +describe('OpencodeServeManager diagnostics', () => { + it('close handler logs both exit code and signal', async () => { + const { manager, child } = makeManager() + const warnSpy = vi.spyOn((manager as any).log, 'warn') + await manager.ensureStarted() + child.emit('close', null, 'SIGTERM') + const exitLog = warnSpy.mock.calls.find((c) => c[1] === 'opencode serve exited') + expect(exitLog?.[0]).toEqual(expect.objectContaining({ code: null, signal: 'SIGTERM' })) + }) + + it('idle-timer kill logs a distinctive message before killing', async () => { + vi.useFakeTimers() + try { + const childDefault = fakeChild() + const childProject = fakeChild() + const spawnFn = vi.fn() + .mockReturnValueOnce(childDefault) + .mockReturnValueOnce(childProject) + const fetchFn = vi.fn(async (url: string, init: any) => { + if (url === 'http://127.0.0.1:47999/global/health') return jsonResponse({ healthy: true }) + if (url === 'http://127.0.0.1:47999/session/ses_project' && init?.method === 'GET') { + return jsonResponse({ id: 'ses_project', directory: '/project-a' }) + } + if (url === 'http://127.0.0.1:47999/session/ses_project/compact' && init?.method === 'POST') { + return jsonResponse({}, { status: 204 }) + } + if (url === 'http://127.0.0.1:48000/global/health') return jsonResponse({ healthy: true }) + if (url === 'http://127.0.0.1:48000/session/ses_project/summarize' && init?.method === 'POST') { + return jsonResponse({}, { status: 204 }) + } + return jsonResponse({}, { status: 404 }) + }) + const manager = new OpencodeServeManager({ + spawnFn: spawnFn as any, + fetchFn: fetchFn as any, + allocatePort: vi.fn() + .mockResolvedValueOnce({ hostname: '127.0.0.1', port: 47999 }) + .mockResolvedValueOnce({ hostname: '127.0.0.1', port: 48000 }), + connectEventStream: () => () => {}, + healthTimeoutMs: 1000, + idleShutdownMs: 50, + } as any) + + const infoSpy = vi.spyOn((manager as any).log, 'info') + await manager.ensureStarted() + await manager.getSession('ses_project') + await manager.compact('ses_project') + + await vi.advanceTimersByTimeAsync(51) + + expect(childProject.kill).toHaveBeenCalled() + const killLog = infoSpy.mock.calls.find((c) => + typeof c[1] === 'string' && c[1].includes('idle') && c[1].includes('kill'), + ) + expect(killLog).toBeDefined() + } finally { + vi.useRealTimers() + } + }) + + it('sidecar stderr is captured at debug level', async () => { + const { manager, child } = makeManager() + const debugSpy = vi.spyOn((manager as any).log, 'debug') + await manager.ensureStarted() + child.stderr.emit('data', Buffer.from('panic: something went wrong')) + expect(debugSpy).toHaveBeenCalledWith( + expect.objectContaining({ chunk: 'panic: something went wrong' }), + 'opencode serve stderr', + ) + }) })