Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 32 additions & 6 deletions server/fresh-agent/adapters/opencode/serve-manager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import { allocateLocalhostPort, type LoopbackServerEndpoint } from '../../../loc
import { logger } from '../../../logger.js'
import { parseServeEvent, type ParsedServeEvent } from './serve-events.js'

type OpencodeServeLogger = Pick<pino.Logger, 'warn' | 'error'>
type OpencodeServeLogger = Pick<pino.Logger, 'warn' | 'error' | 'debug' | 'info'>

const OWNERSHIP_ENV = 'FRESHELL_OPENCODE_SIDECAR_ID'
const DEFAULT_IDLE_POLL_MS = 500
Expand Down Expand Up @@ -84,6 +84,15 @@ class OpencodeServeRequestTimeoutError extends Error {
}
}

export class OpencodeServeLostError extends Error {
readonly sessionId: string
constructor(sessionId: string) {
super(`opencode serve sidecar was lost while waiting for session ${sessionId} to go idle.`)
this.name = 'OpencodeServeLostError'
this.sessionId = sessionId
}
}

export class OpencodeServeManager {
private readonly command: string
private readonly spawnFn: typeof spawn
Expand Down Expand Up @@ -141,14 +150,24 @@ export class OpencodeServeManager {
}

private forgetSessionsForCwd(cwdKey: string): void {
const lostSessions: string[] = []
for (const [sessionId, sessionCwdKey] of this.sessionCwdById.entries()) {
if (sessionCwdKey !== cwdKey) continue
this.sessionCwdById.delete(sessionId)
this.sessionEmitters.delete(sessionId)
lostSessions.push(sessionId)
}
if (cwdKey === DEFAULT_CWD_KEY) {
for (const sessionId of this.sessionEmitters.keys()) {
if (!this.sessionCwdById.has(sessionId)) this.sessionEmitters.delete(sessionId)
if (!this.sessionCwdById.has(sessionId)) {
if (!lostSessions.includes(sessionId)) lostSessions.push(sessionId)
}
}
}
for (const sessionId of lostSessions) {
const emitter = this.sessionEmitters.get(sessionId)
if (emitter) {
emitter.emit('lost', new OpencodeServeLostError(sessionId))
this.sessionEmitters.delete(sessionId)
}
}
}
Expand All @@ -166,10 +185,12 @@ export class OpencodeServeManager {
running.idleTimer = undefined
if (running.activeRequests > 0) return
if (this.runningByCwd.get(running.cwdKey) !== running) return
this.forgetSessionsForCwd(running.cwdKey)
this.runningByCwd.delete(running.cwdKey)
this.startPromiseByCwd.delete(running.cwdKey)
this.startAbortByCwd.delete(running.cwdKey)
try { running.stopEventStream() } catch { /* ignore */ }
this.log.info({ cwdKey: running.cwdKey }, 'killing idle opencode serve sidecar after idle timeout')
void killOwnedProcesses(running.child, running.ownershipId, this.log)
}, this.idleShutdownMs)
running.idleTimer.unref?.()
Expand Down Expand Up @@ -282,10 +303,12 @@ export class OpencodeServeManager {
// Drain stdout/stderr so the child's pipe buffers never back-pressure
// and stall the serve process. Diagnostics are captured below before health.
child.stdout?.on('data', () => {})
child.stderr?.on('data', () => {})
child.stderr?.on('data', (chunk) => {
this.log.debug({ chunk: chunk.toString().trim() }, 'opencode serve stderr')
})
child.on('error', (err) => this.log.error({ err }, 'opencode serve process error'))
child.on('close', (code) => {
this.log.warn({ code }, 'opencode serve exited')
child.on('close', (code, signal) => {
this.log.warn({ code, signal }, 'opencode serve exited')
const runningEntry = this.runningByCwd.get(route.cwdKey)
if (runningEntry && runningEntry.child === child) {
this.runningByCwd.delete(route.cwdKey)
Expand Down Expand Up @@ -540,6 +563,7 @@ export class OpencodeServeManager {
clearTimeout(timer)
clearInterval(pollTimer)
emitter.off('event', handler)
emitter.off('lost', onLost)
}
const finish = () => {
if (settled) return
Expand Down Expand Up @@ -602,7 +626,9 @@ export class OpencodeServeManager {
void checkStatusMap()
}
}
const onLost = (err: OpencodeServeLostError) => fail(err)
emitter.on('event', handler)
emitter.on('lost', onLost)
})
}

Expand Down
13 changes: 13 additions & 0 deletions test/unit/server/fresh-agent/opencode-serve-adapter.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -254,6 +254,19 @@ describe('OpenCode serve adapter: create + send', () => {
}
})

it('emits idle status and rejects when onceIdle rejects with a lost-session error (sidecar died)', async () => {
const manager = makeFakeManager()
manager.onceIdle = vi.fn(() => Promise.reject(new Error('opencode serve sidecar was lost.')))
const adapter = makeAdapter(manager)
await adapter.create({ requestId: 'lost-1', sessionType: 'freshopencode', provider: 'opencode' })

const events: unknown[] = []
adapter.subscribe?.('freshopencode-lost-1', (e) => events.push(e))

await expect(adapter.send?.('freshopencode-lost-1', { text: 'hi' })).rejects.toThrow(/sidecar was lost/i)
expect(events).toContainEqual({ type: 'sdk.session.snapshot', sessionId: 'freshopencode-lost-1', status: 'idle' })
})

it('does not return to running when OpenCode emits a late message update after idle', async () => {
const manager = makeFakeManager()
const adapter = makeAdapter(manager)
Expand Down
229 changes: 228 additions & 1 deletion test/unit/server/fresh-agent/opencode-serve-manager.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ import { PassThrough } from 'node:stream'
import { ReadableStream } from 'node:stream/web'
import { describe, expect, it, vi } from 'vitest'
import { parseServeEvent as parseEvt } from '../../../../server/fresh-agent/adapters/opencode/serve-events.js'
import { OpencodeServeManager } from '../../../../server/fresh-agent/adapters/opencode/serve-manager.js'
import { OpencodeServeManager, OpencodeServeLostError } from '../../../../server/fresh-agent/adapters/opencode/serve-manager.js'

function fakeChild() {
const child = new EventEmitter() as any
Expand Down Expand Up @@ -749,6 +749,55 @@ describe('OpencodeServeManager fan-out', () => {
expect((manager as any).sessionEmitters.get('ses_a')?.listenerCount('event') ?? 0).toBe(0)
})

it('onceIdle rejects promptly when the sidecar dies mid-turn instead of waiting for the full timeout', async () => {
const child = fakeChild()
const stopStream = vi.fn()
const spawnFn = vi.fn(() => child)
const manager = new OpencodeServeManager({
spawnFn: spawnFn as any,
fetchFn: vi.fn(async (url: string) => {
if (url.endsWith('/global/health')) return jsonResponse({ healthy: true })
return jsonResponse({})
}) as any,
allocatePort: async () => ({ hostname: '127.0.0.1', port: 47999 }),
connectEventStream: () => stopStream,
healthTimeoutMs: 1000,
})
await manager.ensureStarted()

const idle = manager.onceIdle('ses_dying', 600_000)
// Kill the sidecar mid-turn
child.emit('close', 1)

// Should reject within a short time, not wait 600 seconds
await expect(idle).rejects.toThrow(/sidecar|lost|exit|closed|unavailable/i)
})

it('onceIdle lost rejection cleans up its event listener', async () => {
const child = fakeChild()
const stopStream = vi.fn()
const spawnFn = vi.fn(() => child)
const manager = new OpencodeServeManager({
spawnFn: spawnFn as any,
fetchFn: vi.fn(async (url: string) => {
if (url.endsWith('/global/health')) return jsonResponse({ healthy: true })
return jsonResponse({})
}) as any,
allocatePort: async () => ({ hostname: '127.0.0.1', port: 47999 }),
connectEventStream: () => stopStream,
healthTimeoutMs: 1000,
})
await manager.ensureStarted()

const emitterBefore = (manager as any).emitterFor('ses_cleanup')
const listenerCountBefore = emitterBefore.listenerCount('event')
const idle = manager.onceIdle('ses_cleanup', 600_000)
child.emit('close', 1)
await expect(idle).rejects.toThrow()
// The onceIdle handler must have been removed from the emitter
expect(emitterBefore.listenerCount('event')).toBe(listenerCountBefore)
})

it('normalizes CRLF SSE boundaries', async () => {
let push!: (e: any) => void
const { manager } = makeManager({
Expand Down Expand Up @@ -960,4 +1009,182 @@ describe('OpencodeServeManager fan-out', () => {
closeStream?.()
expect(seen.map((e) => e.kind)).toEqual(['session.idle'])
})

it('idle-timer kill emits lost event and cleans up session emitters', async () => {
vi.useFakeTimers()
try {
const childDefault = fakeChild()
const childProject = fakeChild()
const spawnFn = vi.fn()
.mockReturnValueOnce(childDefault)
.mockReturnValueOnce(childProject)
const fetchFn = vi.fn(async (url: string, init: any) => {
if (url === 'http://127.0.0.1:47999/global/health') return jsonResponse({ healthy: true })
if (url === 'http://127.0.0.1:47999/session/ses_project' && init?.method === 'GET') {
return jsonResponse({ id: 'ses_project', directory: '/project-a' })
}
if (url === 'http://127.0.0.1:47999/session/ses_project/compact' && init?.method === 'POST') {
return jsonResponse({}, { status: 204 })
}
if (url === 'http://127.0.0.1:48000/global/health') return jsonResponse({ healthy: true })
if (url === 'http://127.0.0.1:48000/session/ses_project/summarize' && init?.method === 'POST') {
return jsonResponse({}, { status: 204 })
}
return jsonResponse({}, { status: 404 })
})
const manager = new OpencodeServeManager({
spawnFn: spawnFn as any,
fetchFn: fetchFn as any,
allocatePort: vi.fn()
.mockResolvedValueOnce({ hostname: '127.0.0.1', port: 47999 })
.mockResolvedValueOnce({ hostname: '127.0.0.1', port: 48000 }),
connectEventStream: () => () => {},
healthTimeoutMs: 1000,
idleShutdownMs: 50,
} as any)

await manager.ensureStarted()
await manager.getSession('ses_project')
await manager.compact('ses_project')
manager.subscribe('ses_project', () => {})

const lostHandler = vi.fn()
const emitter = (manager as any).sessionEmitters.get('ses_project')
emitter.on('lost', lostHandler)
expect((manager as any).sessionEmitters.has('ses_project')).toBe(true)

await vi.advanceTimersByTimeAsync(51)

expect(childProject.kill).toHaveBeenCalled()
expect(lostHandler).toHaveBeenCalledTimes(1)
expect(lostHandler.mock.calls[0][0]).toBeInstanceOf(OpencodeServeLostError)
expect((manager as any).sessionEmitters.has('ses_project')).toBe(false)
} finally {
vi.useRealTimers()
}
})

it('idle-timer kill rejects pending onceIdle with OpencodeServeLostError', async () => {
vi.useFakeTimers()
try {
const childDefault = fakeChild()
const childProject = fakeChild()
const spawnFn = vi.fn()
.mockReturnValueOnce(childDefault)
.mockReturnValueOnce(childProject)
const fetchFn = vi.fn(async (url: string, init: any) => {
if (url === 'http://127.0.0.1:47999/global/health') return jsonResponse({ healthy: true })
if (url === 'http://127.0.0.1:47999/session/ses_project' && init?.method === 'GET') {
return jsonResponse({ id: 'ses_project', directory: '/project-a' })
}
if (url === 'http://127.0.0.1:47999/session/ses_project/compact' && init?.method === 'POST') {
return jsonResponse({}, { status: 204 })
}
if (url === 'http://127.0.0.1:48000/global/health') return jsonResponse({ healthy: true })
if (url === 'http://127.0.0.1:48000/session/ses_project/summarize' && init?.method === 'POST') {
return jsonResponse({}, { status: 204 })
}
if (url === 'http://127.0.0.1:48000/session/status') return jsonResponse({})
return jsonResponse({}, { status: 404 })
})
const manager = new OpencodeServeManager({
spawnFn: spawnFn as any,
fetchFn: fetchFn as any,
allocatePort: vi.fn()
.mockResolvedValueOnce({ hostname: '127.0.0.1', port: 47999 })
.mockResolvedValueOnce({ hostname: '127.0.0.1', port: 48000 }),
connectEventStream: () => () => {},
healthTimeoutMs: 1000,
idleShutdownMs: 50,
idlePollMs: 10_000,
} as any)

await manager.ensureStarted()
await manager.getSession('ses_project')
await manager.compact('ses_project')
manager.subscribe('ses_project', () => {})

const idle = manager.onceIdle('ses_project', 200)
idle.catch(() => {})

await vi.advanceTimersByTimeAsync(201)

await expect(idle).rejects.toThrow(/opencode serve sidecar was lost/)
expect((manager as any).sessionEmitters.has('ses_project')).toBe(false)
} finally {
vi.useRealTimers()
}
})
})

describe('OpencodeServeManager diagnostics', () => {
it('close handler logs both exit code and signal', async () => {
const { manager, child } = makeManager()
const warnSpy = vi.spyOn((manager as any).log, 'warn')
await manager.ensureStarted()
child.emit('close', null, 'SIGTERM')
const exitLog = warnSpy.mock.calls.find((c) => c[1] === 'opencode serve exited')
expect(exitLog?.[0]).toEqual(expect.objectContaining({ code: null, signal: 'SIGTERM' }))
})

it('idle-timer kill logs a distinctive message before killing', async () => {
vi.useFakeTimers()
try {
const childDefault = fakeChild()
const childProject = fakeChild()
const spawnFn = vi.fn()
.mockReturnValueOnce(childDefault)
.mockReturnValueOnce(childProject)
const fetchFn = vi.fn(async (url: string, init: any) => {
if (url === 'http://127.0.0.1:47999/global/health') return jsonResponse({ healthy: true })
if (url === 'http://127.0.0.1:47999/session/ses_project' && init?.method === 'GET') {
return jsonResponse({ id: 'ses_project', directory: '/project-a' })
}
if (url === 'http://127.0.0.1:47999/session/ses_project/compact' && init?.method === 'POST') {
return jsonResponse({}, { status: 204 })
}
if (url === 'http://127.0.0.1:48000/global/health') return jsonResponse({ healthy: true })
if (url === 'http://127.0.0.1:48000/session/ses_project/summarize' && init?.method === 'POST') {
return jsonResponse({}, { status: 204 })
}
return jsonResponse({}, { status: 404 })
})
const manager = new OpencodeServeManager({
spawnFn: spawnFn as any,
fetchFn: fetchFn as any,
allocatePort: vi.fn()
.mockResolvedValueOnce({ hostname: '127.0.0.1', port: 47999 })
.mockResolvedValueOnce({ hostname: '127.0.0.1', port: 48000 }),
connectEventStream: () => () => {},
healthTimeoutMs: 1000,
idleShutdownMs: 50,
} as any)

const infoSpy = vi.spyOn((manager as any).log, 'info')
await manager.ensureStarted()
await manager.getSession('ses_project')
await manager.compact('ses_project')

await vi.advanceTimersByTimeAsync(51)

expect(childProject.kill).toHaveBeenCalled()
const killLog = infoSpy.mock.calls.find((c) =>
typeof c[1] === 'string' && c[1].includes('idle') && c[1].includes('kill'),
)
expect(killLog).toBeDefined()
} finally {
vi.useRealTimers()
}
})

it('sidecar stderr is captured at debug level', async () => {
const { manager, child } = makeManager()
const debugSpy = vi.spyOn((manager as any).log, 'debug')
await manager.ensureStarted()
child.stderr.emit('data', Buffer.from('panic: something went wrong'))
expect(debugSpy).toHaveBeenCalledWith(
expect.objectContaining({ chunk: 'panic: something went wrong' }),
'opencode serve stderr',
)
})
})
Loading