Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 14 additions & 9 deletions server/fresh-agent/adapters/codex/adapter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -900,24 +900,29 @@ export function createCodexFreshAgentAdapter(deps: {
listener(makeCodexStatusEvent(sessionId, event.status))
})

// Server-authoritative turn-complete edge for the GREEN/SOUND pipeline. The
// app-server fires turn/completed for interrupts too, carrying the authoritative
// outcome inline at params.turn.status, so we chime only for a positive
// completion ('completed') and never on interrupt/failure.
// onTurnCompleted fires after the turn is committed to the app-server's
// thread history. thread_status_changed(idle) can fire BEFORE that commit,
// leaving the client with an empty transcript. Emit an idle snapshot here
// to make the client re-fetch the committed transcript (parity with
// freshopencode's post-idle emit).
const offTurnCompleted = runtime.onTurnCompleted?.((event) => {
if (event.threadId !== sessionId) return
// turn/completed fires for interrupts/failures too, so chime only on a positive
// completion. The authoritative status appears either inline at params.turn.status
// (codex-cli 0.142.0, probed live) or flat at params.status (the shape the
// app-server client tests model); accept either so neither version silently fails.
activeTurnByThread.delete(sessionId)
listener(makeCodexStatusEvent(sessionId, 'idle'))

// Server-authoritative turn-complete edge for the GREEN/SOUND pipeline.
// turn/completed fires for interrupts/failures too, so chime only on a
// positive completion. The authoritative status appears either inline at
// params.turn.status (codex-cli 0.142.0, probed live) or flat at
// params.status (the shape the app-server client tests model); accept
// either so neither version silently fails.
const params = event.params as { status?: unknown; turn?: { status?: unknown } } | undefined
const status = params?.turn?.status ?? params?.status
if (status !== 'completed') return
const at = nextMonotonicTurnCompleteAt(lastTurnCompleteAtByThread.get(sessionId), Date.now())
lastTurnCompleteAtByThread.set(sessionId, at)
listener({ type: 'sdk.turn.complete', sessionId, at })
})

return () => {
offLifecycle()
offTurnCompleted?.()
Expand Down
7 changes: 6 additions & 1 deletion server/fresh-agent/adapters/opencode/adapter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,12 @@ export function createOpencodeFreshAgentAdapter(options: CreateOpencodeFreshAgen
}
try {
const status = await getSessionStatus.call(serveManager, realId, cwdRoute(state.cwd) ?? {})
if (!status || typeof status !== 'object' || Array.isArray(status) || typeof status.type !== 'string') {
// The opencode /session/status map only reports active (busy/retry) sessions,
// so an idle session is absent (undefined). Treat a missing entry as idle —
// consistent with the serve manager's onceIdle treatment of absence as idle —
// rather than logging a false-positive malformed warning.
if (status == null) return
if (typeof status !== 'object' || Array.isArray(status) || typeof status.type !== 'string') {
log.warn({
...logContext,
reason: 'malformed_session_status',
Expand Down
4 changes: 2 additions & 2 deletions server/fresh-agent/adapters/opencode/serve-manager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -112,13 +112,13 @@ export class OpencodeServeManager {
private shutdownRequested = false

constructor(options: OpencodeServeManagerOptions = {}) {
this.command = options.command ?? 'opencode'
this.env = options.env ?? process.env
this.command = options.command ?? (this.env.OPENCODE_CMD || 'opencode')
this.spawnFn = options.spawnFn ?? spawn
this.fetchFn = options.fetchFn ?? fetch
this.allocatePort = options.allocatePort ?? allocateLocalhostPort
this.connectEventStream = options.connectEventStream
this.healthTimeoutMs = options.healthTimeoutMs ?? 20_000
this.env = options.env ?? process.env
this.idlePollMs = options.idlePollMs ?? DEFAULT_IDLE_POLL_MS
this.requestTimeoutMs = options.requestTimeoutMs ?? DEFAULT_REQUEST_TIMEOUT_MS
}
Expand Down
2 changes: 1 addition & 1 deletion server/fresh-agent/runtime-manager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -205,7 +205,7 @@ export class FreshAgentRuntimeManager {
}

async subscribe(locator: FreshAgentSessionLocator, listener: (message: unknown) => void) {
const record = this.requireSession(locator)
const record = await this.requireOrRecoverSession(locator)
if (!record.adapter.subscribe) {
throw new FreshAgentUnsupportedCapabilityError(`Subscribe is not supported for ${record.sessionType}`)
}
Expand Down
61 changes: 61 additions & 0 deletions test/unit/server/fresh-agent/codex-adapter.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1132,6 +1132,67 @@ describe('Codex fresh-agent adapter', () => {
expect(offTurnCompleted).toHaveBeenCalledTimes(1)
})

it('emits a snapshot event after a codex turn completes so the client re-fetches the committed transcript', async () => {
let lifecycleHandler: ((event: any) => void) | undefined
let turnCompletedHandler: ((event: any) => void) | undefined
const offLifecycle = vi.fn()
const offTurnCompleted = vi.fn()
const runtime = {
startThread: vi.fn(),
resumeThread: vi.fn(),
onThreadLifecycle: vi.fn((handler: any) => {
lifecycleHandler = handler
return offLifecycle
}),
onTurnCompleted: vi.fn((handler: any) => {
turnCompletedHandler = handler
return offTurnCompleted
}),
readThread: vi.fn(),
listThreadTurns: vi.fn(),
readThreadTurn: vi.fn(),
}
const adapter = createCodexFreshAgentAdapter({ runtime: runtime as any })
const listener = vi.fn()

const unsubscribe = await adapter.subscribe?.('thread-new-1', listener)

expect(runtime.onThreadLifecycle).toHaveBeenCalledWith(expect.any(Function))
expect(runtime.onTurnCompleted).toHaveBeenCalledWith(expect.any(Function))

// thread_status_changed(idle) fires BEFORE the completed turn is committed
// to the app-server's thread history, so the client re-fetches but gets
// an empty transcript. This produces one idle snapshot.
lifecycleHandler?.({
kind: 'thread_status_changed',
threadId: 'thread-new-1',
status: { type: 'idle' },
})

const idleSnapshotsBeforeCompletion = listener.mock.calls.filter(
([event]: any[]) => event?.type === 'sdk.session.snapshot' && event?.status === 'idle',
)
expect(idleSnapshotsBeforeCompletion).toHaveLength(1)

// onTurnCompleted fires AFTER the turn is committed to the thread history.
// The adapter must emit another snapshot-invalidating event so the client
// re-fetches and renders the committed transcript (parity with freshopencode).
turnCompletedHandler?.({ threadId: 'thread-new-1', turnId: 'turn-1', params: {} })

const idleSnapshotsAfterCompletion = listener.mock.calls.filter(
([event]: any[]) => event?.type === 'sdk.session.snapshot' && event?.status === 'idle',
)
expect(idleSnapshotsAfterCompletion).toHaveLength(2)

// Turn-completed events for other threads must not trigger emission.
turnCompletedHandler?.({ threadId: 'other-thread', turnId: 'turn-2', params: {} })
expect(idleSnapshotsAfterCompletion).toHaveLength(2)

unsubscribe?.()
expect(offLifecycle).toHaveBeenCalledTimes(1)
expect(offTurnCompleted).toHaveBeenCalledTimes(1)
})

it('chimes for a flat params.status completion shape and skips a flat interrupted', async () => {
// The app-server client passes the notification params straight through, and the
// repo's own client tests model turn/completed as a FLAT { threadId, turnId, status }
Expand Down
60 changes: 60 additions & 0 deletions test/unit/server/fresh-agent/opencode-serve-adapter.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,37 @@ describe('OpenCode serve adapter: create + send', () => {
expect(manager.onceIdle).toHaveBeenCalledWith('ses_real_1', expect.any(Number), { cwd: '/repo' })
})

it('attach during an in-flight send reuses the materialized state (no duplicate serve subscription)', async () => {
const idle = createDeferred<void>()
const manager = makeFakeManager()
manager.onceIdle = vi.fn(() => idle.promise)
const adapter = makeAdapter(manager)
await adapter.create({ requestId: 'req-race', sessionType: 'freshopencode', provider: 'opencode', cwd: '/repo' })

// Start the send: it materializes (remember + bindServeStream subscribes once),
// emits freshAgent.session.materialized, then parks at `await idle`.
const sendPromise = adapter.send?.('freshopencode-req-race', { text: 'go' })
// Wait until materialization is done and the send is in-flight at await idle
// (promptAsync called => past emitMaterialized, before onceIdle resolves).
await vi.waitFor(() => expect(manager.promptAsync).toHaveBeenCalledWith('ses_real_1', expect.anything(), expect.anything()))

// Concurrently attach the real id while the send is still in-flight. attach
// MUST find the already-remembered state (existing-branch) and NOT bind a
// second serve stream. This pins concurrent attach idempotency: exactly one
// serve subscription for the real id, regardless of when attach arrives
// during the send lifecycle.
const attached = await adapter.attach?.({
sessionId: 'ses_real_1', sessionType: 'freshopencode', provider: 'opencode', cwd: '/repo',
})
expect(attached).toEqual({ sessionId: 'ses_real_1', sessionRef: { provider: 'opencode', sessionId: 'ses_real_1' } })
expect(manager.subscribe).toHaveBeenCalledTimes(1)
expect(manager.subscribe).toHaveBeenCalledWith('ses_real_1', expect.any(Function))

// The in-flight send still completes with the correct result once idle resolves.
idle.resolve()
await expect(sendPromise).resolves.toEqual({ sessionId: 'ses_real_1', sessionRef: { provider: 'opencode', sessionId: 'ses_real_1' } })
})

it('continues a materialized session on later sends without re-creating it', async () => {
const manager = makeFakeManager()
const adapter = makeAdapter(manager)
Expand Down Expand Up @@ -595,6 +626,35 @@ describe('OpenCode serve adapter: create + send', () => {
)
})

it('treats a session absent from the status map as idle (no malformed warning)', async () => {
loggerMocks.logger.warn.mockClear()
const manager = makeFakeManager()
// The opencode /session/status map only reports active (busy/retry) sessions;
// an idle session is absent (undefined). This must NOT be treated as malformed
// (it matches the serve manager's onceIdle semantics).
manager.getSessionStatus = vi.fn(async () => undefined)
const adapter = makeAdapter(manager)

await adapter.attach?.({
sessionId: 'ses_idle_absent',
sessionType: 'freshopencode',
provider: 'opencode',
cwd: '/repo/safe',
})

await expect(adapter.getSnapshot?.({
threadId: 'ses_idle_absent',
sessionType: 'freshopencode',
provider: 'opencode',
cwd: '/repo/safe',
})).resolves.toMatchObject({ status: 'idle' })
expect(manager.getSessionStatus).toHaveBeenCalledWith('ses_idle_absent', { cwd: '/repo/safe' })
expect(loggerMocks.logger.warn).not.toHaveBeenCalledWith(
expect.objectContaining({ reason: 'malformed_session_status' }),
expect.any(String),
)
})

it('keeps recovered sessions idle and warns when getSessionStatus is missing', async () => {
loggerMocks.logger.warn.mockClear()
const manager = makeFakeManager()
Expand Down
25 changes: 25 additions & 0 deletions test/unit/server/fresh-agent/opencode-serve-manager.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,9 @@ function makeManager(overrides: Partial<Parameters<typeof OpencodeServeManager>[
return jsonResponse({})
})
const manager = new OpencodeServeManager({
// Isolate from host process.env so tests asserting the 'opencode' default
// command don't break when a developer has OPENCODE_CMD set in their shell.
env: {},
spawnFn: spawnFn as any,
fetchFn: fetchFn as any,
allocatePort: async () => ({ hostname: '127.0.0.1', port: 47999 }),
Expand All @@ -58,6 +61,26 @@ describe('OpencodeServeManager lifecycle', () => {
expect(fetchFn).toHaveBeenCalledWith('http://127.0.0.1:47999/global/health', expect.anything())
})

it('honors the OPENCODE_CMD env var to override the serve binary (parity with CODEX_CMD/CLAUDE_CMD)', async () => {
const { manager, spawnFn } = makeManager({ env: { OPENCODE_CMD: '/custom/opencode-bin' } })
await manager.ensureStarted()
expect(spawnFn).toHaveBeenCalledWith(
'/custom/opencode-bin',
['serve', '--hostname', '127.0.0.1', '--port', '47999'],
expect.objectContaining({ env: expect.objectContaining({ FRESHELL_OPENCODE_SIDECAR_ID: expect.any(String) }) }),
)
})

it('falls back to the default opencode command when OPENCODE_CMD is unset', async () => {
const { manager, spawnFn } = makeManager({ env: {} })
await manager.ensureStarted()
expect(spawnFn).toHaveBeenCalledWith(
'opencode',
['serve', '--hostname', '127.0.0.1', '--port', '47999'],
expect.objectContaining({ env: expect.objectContaining({ FRESHELL_OPENCODE_SIDECAR_ID: expect.any(String) }) }),
)
})

it('routes the requested session directory without changing the serve process cwd', async () => {
const calls: Array<{ url: string; init: any }> = []
const fetchFn = vi.fn(async (url: string, init: any) => {
Expand Down Expand Up @@ -125,6 +148,7 @@ describe('OpencodeServeManager lifecycle', () => {
return jsonResponse({})
})
const manager = new OpencodeServeManager({
env: {},
spawnFn: spawnFn as any,
fetchFn: fetchFn as any,
allocatePort: vi.fn().mockResolvedValue({ hostname: '127.0.0.1', port: 47999 }),
Expand Down Expand Up @@ -435,6 +459,7 @@ describe('OpencodeServeManager HTTP client', () => {
return jsonResponse({}, { status: 404 })
})
const manager = new OpencodeServeManager({
env: {},
spawnFn: spawnFn as any,
fetchFn: fetchFn as any,
allocatePort: vi.fn()
Expand Down
34 changes: 34 additions & 0 deletions test/unit/server/fresh-agent/runtime-manager.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -394,6 +394,40 @@ describe('FreshAgentRuntimeManager', () => {
expect(opencodeAdapter.compact).toHaveBeenCalledWith('opencode-restored-1', { instructions: 'keep decisions' })
})

it('recovers (attaches) a not-yet-registered FreshOpenCode session on subscribe instead of throwing (materialization race)', async () => {
const listener = vi.fn()
const off = vi.fn()
const opencodeAdapter = {
create: vi.fn().mockResolvedValue({ sessionId: 'freshopencode-req-1' }),
attach: vi.fn().mockResolvedValue({ sessionId: 'ses_materialized_1' }),
subscribe: vi.fn().mockReturnValue(off),
send: vi.fn().mockResolvedValue(undefined),
}
const registry = createFreshAgentProviderRegistry([{
sessionType: 'freshopencode',
runtimeProvider: 'opencode',
adapter: opencodeAdapter as any,
}])
const manager = new FreshAgentRuntimeManager({ registry })

// Simulate the materialization race: the real session id is not yet registered
// with the runtime manager (adapter.send hasn't resolved) when subscribe is
// called for the materialized real id. This must recover via attach rather
// than throwing "is not tracked" (which would leak to the client as an error).
await expect(manager.subscribe(
{ sessionId: 'ses_materialized_1', sessionType: 'freshopencode', provider: 'opencode', cwd: '/repo/safe' },
listener,
)).resolves.toBe(off)

expect(opencodeAdapter.attach).toHaveBeenCalledWith(expect.objectContaining({
sessionId: 'ses_materialized_1',
sessionType: 'freshopencode',
provider: 'opencode',
cwd: '/repo/safe',
}))
expect(opencodeAdapter.subscribe).toHaveBeenCalledWith('ses_materialized_1', listener)
})

it('recovers a missing FreshOpenCode durable session with cwd before mutation', async () => {
const opencodeAdapter = {
create: vi.fn(),
Expand Down
Loading