Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
53 changes: 27 additions & 26 deletions server/fresh-agent/adapters/opencode/adapter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,11 @@ export function createOpencodeFreshAgentAdapter(options: CreateOpencodeFreshAgen
})
}

function emitStatus(state: OpencodeSessionState, status: 'running' | 'idle'): void {
state.status = status
state.events.emit('event', { type: 'sdk.session.snapshot', sessionId: state.placeholderId, status })
}

async function materializeOrSend(state: OpencodeSessionState, text: string, settings?: Partial<FreshAgentCreateRequest>): Promise<FreshAgentSendResult> {
const normalized = settings
? normalizeOpencodeInput({ requestId: state.placeholderId, sessionType: 'freshopencode', provider: 'opencode', ...settings } as FreshAgentCreateRequest)
Expand All @@ -159,40 +164,37 @@ export function createOpencodeFreshAgentAdapter(options: CreateOpencodeFreshAgen
const effort = normalized?.effort ?? state.effort
const effectiveCwd = normalized?.cwd ?? state.cwd

if (!state.realSessionId) {
const session = await serveManager.createSession({ title: undefined, ...(effectiveCwd ? { directory: effectiveCwd } : {}) })
state.realSessionId = session.id
if (typeof session.directory === 'string' && session.directory.length > 0) state.cwd = session.directory
else if (effectiveCwd) state.cwd = effectiveCwd
remember(state)
bindServeStream(state)
}

const realId = state.realSessionId!
state.status = 'running'
state.events.emit('event', { type: 'sdk.session.snapshot', sessionId: state.placeholderId, status: 'running' })
const idle = serveManager.onceIdle(realId, turnTimeoutMs)
// If promptAsync fails and we leave via the catch(), `idle` may still
// reject later on its timeout timer. Attach a no-op handler now so that
// later rejection cannot become an unhandled rejection.
void idle.catch(() => {})
emitStatus(state, 'running')
try {
if (!state.realSessionId) {
const session = await serveManager.createSession({ title: undefined, ...(effectiveCwd ? { directory: effectiveCwd } : {}) })
state.realSessionId = session.id
if (typeof session.directory === 'string' && session.directory.length > 0) state.cwd = session.directory
else if (effectiveCwd) state.cwd = effectiveCwd
remember(state)
bindServeStream(state)
}

const realId = state.realSessionId!
const idle = serveManager.onceIdle(realId, turnTimeoutMs)
// If promptAsync fails and we leave via the catch(), `idle` may still
// reject later on its timeout timer. Attach a no-op handler now so that
// later rejection cannot become an unhandled rejection.
void idle.catch(() => {})
await promptAsyncForState(state, realId, {
parts: [{ type: 'text', text }],
...(splitOpencodeModel(modelStr) ? { model: splitOpencodeModel(modelStr)! } : {}),
...(effort ? { variant: effort } : {}),
})
await idle
state.model = modelStr ?? state.model
state.effort = effort
emitStatus(state, 'idle')
return sendResult(state.realSessionId)
} catch (error) {
state.status = 'idle'
state.events.emit('event', { type: 'sdk.session.snapshot', sessionId: state.placeholderId, status: 'idle' })
emitStatus(state, 'idle')
throw error
}
state.model = modelStr ?? state.model
state.effort = effort
state.status = 'idle'
state.events.emit('event', { type: 'sdk.session.snapshot', sessionId: state.placeholderId, status: 'idle' })
return sendResult(state.realSessionId)
}

async function assembleExport(
Expand Down Expand Up @@ -316,8 +318,7 @@ export function createOpencodeFreshAgentAdapter(options: CreateOpencodeFreshAgen
async interrupt(sessionId) {
const state = requireState(sessionId)
await abortForState(state).catch((err) => log.warn({ err }, 'abort failed'))
state.status = 'idle'
state.events.emit('event', { type: 'sdk.session.snapshot', sessionId: state.placeholderId, status: 'idle' })
emitStatus(state, 'idle')
},

async compact(sessionId, input) {
Expand Down
85 changes: 76 additions & 9 deletions server/fresh-agent/adapters/opencode/serve-manager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ type OpencodeServeLogger = Pick<pino.Logger, 'warn' | 'error'>
const OWNERSHIP_ENV = 'FRESHELL_OPENCODE_SIDECAR_ID'
const DEFAULT_IDLE_POLL_MS = 500
const REQUIRED_IDLE_STATUS_POLLS = 2
const DEFAULT_REQUEST_TIMEOUT_MS = 30_000

export type OpencodeServeManagerOptions = {
command?: string
Expand All @@ -24,6 +25,7 @@ export type OpencodeServeManagerOptions = {
env?: NodeJS.ProcessEnv
idleShutdownMs?: number
idlePollMs?: number
requestTimeoutMs?: number
}

export type OpencodeServeMessage = { info: Record<string, any>; parts: Array<Record<string, any>> }
Expand Down Expand Up @@ -75,6 +77,13 @@ type ServeRoute = {

const DEFAULT_CWD_KEY = '<inherit-process-cwd>'

class OpencodeServeRequestTimeoutError extends Error {
constructor(method: string, requestPath: string, timeoutMs: number) {
super(`opencode serve ${method} ${requestPath} timed out after ${timeoutMs}ms`)
this.name = 'OpencodeServeRequestTimeoutError'
}
}

export class OpencodeServeManager {
private readonly command: string
private readonly spawnFn: typeof spawn
Expand All @@ -85,6 +94,7 @@ export class OpencodeServeManager {
private readonly env: NodeJS.ProcessEnv
private readonly idleShutdownMs: number
private readonly idlePollMs: number
private readonly requestTimeoutMs: number
private readonly log: OpencodeServeLogger = logger.child({ component: 'opencode-serve-manager' })
/** sessionId → emitter of ParsedServeEvent (and a synthetic 'idle' event). */
private readonly sessionEmitters = new Map<string, EventEmitter>()
Expand All @@ -105,6 +115,7 @@ export class OpencodeServeManager {
this.env = options.env ?? process.env
this.idleShutdownMs = options.idleShutdownMs ?? 15 * 60_000
this.idlePollMs = options.idlePollMs ?? DEFAULT_IDLE_POLL_MS
this.requestTimeoutMs = options.requestTimeoutMs ?? DEFAULT_REQUEST_TIMEOUT_MS
}

private routeFromCwd(cwd?: string): { cwdKey: string; cwd?: string } {
Expand Down Expand Up @@ -164,6 +175,55 @@ export class OpencodeServeManager {
running.idleTimer.unref?.()
}

private discardRunning(route: { cwdKey: string; cwd?: string }, reason: string): void {
const running = this.runningByCwd.get(route.cwdKey)
if (!running) return
this.runningByCwd.delete(route.cwdKey)
this.startPromiseByCwd.delete(route.cwdKey)
this.startAbortByCwd.delete(route.cwdKey)
this.forgetSessionsForCwd(route.cwdKey)
this.clearIdleTimer(running)
try { running.stopEventStream() } catch { /* ignore */ }
this.log.warn({ reason, cwd: route.cwd }, 'discarding opencode serve sidecar')
void killOwnedProcesses(running.child, running.ownershipId, this.log)
}

private async fetchWithRequestTimeout(
url: string,
requestPath: string,
init: RequestInit | undefined,
): Promise<Response> {
if (this.requestTimeoutMs <= 0) {
return await this.fetchFn(url, init)
}
const controller = new AbortController()
let timedOut = false
const upstreamSignal = init?.signal
const abortFromUpstream = () => controller.abort()
if (upstreamSignal?.aborted) {
controller.abort()
} else {
upstreamSignal?.addEventListener('abort', abortFromUpstream, { once: true })
}
const timeout = setTimeout(() => {
timedOut = true
controller.abort()
}, this.requestTimeoutMs)
timeout.unref?.()

try {
return await this.fetchFn(url, { ...init, signal: controller.signal })
} catch (error) {
if (timedOut) {
throw new OpencodeServeRequestTimeoutError(init?.method ?? 'GET', requestPath, this.requestTimeoutMs)
}
throw error
} finally {
clearTimeout(timeout)
upstreamSignal?.removeEventListener('abort', abortFromUpstream)
}
}

private async withRunning<T>(
route: { cwdKey: string; cwd?: string },
fn: (baseUrl: string) => Promise<T>,
Expand Down Expand Up @@ -322,16 +382,23 @@ export class OpencodeServeManager {
const resolved = route.sessionId
? this.routeForSession(route.sessionId, route)
: this.routeFromCwd(route.cwd)
return this.withRunning(resolved, async (base) => {
const res = await this.fetchFn(`${base}${requestPath}`, init)
if (!res.ok && res.status !== 204) {
if (res.status === 404 && init?.notFoundValue !== undefined) return init.notFoundValue
const text = await res.text().catch(() => '')
throw new Error(`opencode serve ${init?.method ?? 'GET'} ${requestPath} → ${res.status} ${text}`)
try {
return await this.withRunning(resolved, async (base) => {
const res = await this.fetchWithRequestTimeout(`${base}${requestPath}`, requestPath, init)
if (!res.ok && res.status !== 204) {
if (res.status === 404 && init?.notFoundValue !== undefined) return init.notFoundValue
const text = await res.text().catch(() => '')
throw new Error(`opencode serve ${init?.method ?? 'GET'} ${requestPath} → ${res.status} ${text}`)
}
if (res.status === 204) return undefined as T
return (await res.json()) as T
})
} catch (error) {
if (error instanceof OpencodeServeRequestTimeoutError) {
this.discardRunning(resolved, 'request_timeout')
}
if (res.status === 204) return undefined as T
return (await res.json()) as T
})
throw error
}
}

private async getSessionStatusMap(route: ServeRoute = {}, init?: RequestInit): Promise<OpencodeStatusMap> {
Expand Down
48 changes: 43 additions & 5 deletions src/components/fresh-agent/FreshAgentView.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,12 @@ type LocalEcho = {
submittedTurnId?: string
}

function sameLocalEcho(a: LocalEcho | null | undefined, b: LocalEcho | null | undefined): boolean {
return (a?.requestId ?? null) === (b?.requestId ?? null)
&& (a?.text ?? null) === (b?.text ?? null)
&& (a?.submittedTurnId ?? null) === (b?.submittedTurnId ?? null)
}

type PendingSendMetadata = {
cwd?: string
checkpointId?: string
Expand Down Expand Up @@ -414,7 +420,7 @@ export function FreshAgentView({
// Optimistic echo of the just-sent user message: the transcript renders
// snapshot turns only, which left a 2-10s blank gap after send
// (live-test finding). Cleared when a snapshot containing the turn lands.
const [localEcho, setLocalEcho] = useState<LocalEcho | null>(null)
const [localEcho, setLocalEchoState] = useState<LocalEcho | null>(() => paneContent.pendingLocalEcho ?? null)
const localEchoRef = useRef<LocalEcho | null>(null)
localEchoRef.current = localEcho
const pendingSendMetadataRef = useRef<Map<string, PendingSendMetadata>>(new Map())
Expand All @@ -431,6 +437,25 @@ export function FreshAgentView({
const paneContentRef = useRef(paneContent)
const composerRef = useRef<FreshAgentComposerHandle | null>(null)
paneContentRef.current = paneContent
const setLocalEcho = useCallback((next: LocalEcho | null) => {
setLocalEchoState(next)
const current = paneContentRef.current
if (sameLocalEcho(current.pendingLocalEcho, next)) return
dispatch(mergePaneContent({
tabId,
paneId,
updates: { pendingLocalEcho: next ?? undefined },
}))
}, [dispatch, paneId, tabId])
useEffect(() => {
const next = paneContent.pendingLocalEcho ?? null
if (sameLocalEcho(localEchoRef.current, next)) return
setLocalEchoState(next)
}, [
paneContent.pendingLocalEcho?.requestId,
paneContent.pendingLocalEcho?.submittedTurnId,
paneContent.pendingLocalEcho?.text,
])
const restoreTimeoutRef = useRef<number | null>(null)
const createSentRef = useRef(false)
// Session-scoped "always allow" tool names; reset with the pane, never persisted.
Expand Down Expand Up @@ -682,9 +707,10 @@ export function FreshAgentView({
restoreError: undefined,
createError: undefined,
status: 'creating',
pendingLocalEcho: undefined,
},
}))
}, [commitSnapshot, dispatch, paneId, sendFreshAgentMessage, tabId])
}, [commitSnapshot, dispatch, paneId, sendFreshAgentMessage, setLocalEcho, tabId])

const sendFork = useCallback((atTurnId?: string) => {
const current = paneContentRef.current
Expand Down Expand Up @@ -1026,9 +1052,11 @@ export function FreshAgentView({
commitSnapshot(displaySnapshot)
setSnapshotAutoTitleIdentity(snapshotIdentity)
const echo = localEchoRef.current
const landedEcho = echo
? localEchoLanded(displaySnapshot.turns, echo, pendingSendMetadataRef.current.get(echo.requestId))
: false
if (echo) {
const pending = pendingSendMetadataRef.current.get(echo.requestId)
if (localEchoLanded(displaySnapshot.turns, echo, pending)) setLocalEcho(null)
if (landedEcho) setLocalEcho(null)
}
const fresh = paneContentRef.current
const nextStatus = (resolved.status as FreshAgentPaneContent['status']) ?? fresh.status
Expand Down Expand Up @@ -1059,6 +1087,7 @@ export function FreshAgentView({
sessionRef: nextSessionRef,
status: nextStatus,
resumeSessionId: nextResumeSessionId,
pendingLocalEcho: landedEcho ? undefined : fresh.pendingLocalEcho,
},
}))
})
Expand Down Expand Up @@ -1288,6 +1317,7 @@ export function FreshAgentView({
firstMessage: text,
}))
}
const nextLocalEcho: LocalEcho = { text, requestId }
sendFreshAgentMessage({
type: 'freshAgent.send',
requestId,
Expand All @@ -1303,7 +1333,15 @@ export function FreshAgentView({
...(getEffectiveFreshAgentEffort(current) ? { effort: getEffectiveFreshAgentEffort(current) } : {}),
},
})
setLocalEcho({ text, requestId })
setLocalEchoState(nextLocalEcho)
dispatch(mergePaneContent({
tabId,
paneId,
updates: {
...(current.provider === 'opencode' ? { status: 'running' } : {}),
pendingLocalEcho: nextLocalEcho,
},
}))
}, [dispatch, paneId, recordPendingSendMetadata, sendFreshAgentMessage, snapshotConfirmsNoUserTurns, tabId])

// Flush queued messages when the turn ends. One flush per status change is
Expand Down
22 changes: 22 additions & 0 deletions src/store/paneTypes.ts
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,26 @@ export type FreshAgentCreateError = {
retryable?: boolean
}

export type FreshAgentPendingLocalEcho = {
requestId: string
text: string
submittedTurnId?: string
}

export function normalizeFreshAgentPendingLocalEcho(value: unknown): FreshAgentPendingLocalEcho | undefined {
if (!value || typeof value !== 'object' || Array.isArray(value)) return undefined
const record = value as Record<string, unknown>
if (typeof record.requestId !== 'string' || record.requestId.length === 0) return undefined
if (typeof record.text !== 'string' || record.text.length === 0) return undefined
return {
requestId: record.requestId,
text: record.text,
...(typeof record.submittedTurnId === 'string' && record.submittedTurnId.length > 0
? { submittedTurnId: record.submittedTurnId }
: {}),
}
}

export type FreshAgentPaneContent = {
kind: 'fresh-agent'
sessionType: FreshAgentSessionType
Expand Down Expand Up @@ -150,6 +170,8 @@ export type FreshAgentPaneContent = {
showThinking?: boolean
showTools?: boolean
showTimecodes?: boolean
/** Persisted optimistic user turn that has not yet appeared in a durable provider snapshot. */
pendingLocalEcho?: FreshAgentPendingLocalEcho
}

/**
Expand Down
Loading
Loading