Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 6 additions & 15 deletions src-tauri/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion src/openacp/api/client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,7 @@ export function createApiClient(server: ServerInfo, workspaceId?: string) {

/** Cancel/abort the current prompt in a session */
async cancelPrompt(sessionID: string): Promise<void> {
await api(`/sessions/${encodeURIComponent(sessionID)}/cancel`, {
await api(`/sse/sessions/${encodeURIComponent(sessionID)}/cancel`, {
method: "POST",
})
},
Expand Down
70 changes: 53 additions & 17 deletions src/openacp/context/chat.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -195,8 +195,8 @@ export function ChatProvider({ children, onPermissionRequest, onPermissionResolv
const messagesRef = useRef<Record<string, Message[]>>({})

const abortedSessions = useRef(new Set<string>())
/** turnId of the turn that was aborted — used to allow next queued turn to proceed */
const abortedTurnId = useRef<string | undefined>(undefined)
/** Per-session turnId of the turn that was aborted — used to allow next queued turn to proceed */
const abortedTurnIds = useRef(new Map<string, string | undefined>())
/** Cached messageMode setting — read on mount + settings change, avoids async in critical path */
const messageModeRef = useRef<"queue" | "instant">("queue")
const assistantMsgId = useRef(new Map<string, string>())
Expand Down Expand Up @@ -319,13 +319,36 @@ export function ChatProvider({ children, onPermissionRequest, onPermissionResolv
const localAstBlocks = local.filter((m) => m.role === "assistant").reduce((n, m) => n + m.blocks.length, 0)

if (serverAstBlocks > 0 && serverAstBlocks >= localAstBlocks) {
// Server is authoritative — interrupted turns already have stopReason: "interrupted"
// which turnToMessage() maps to interrupted: true. No cache merge needed.
if (assistantMsgId.current.get(sessionID) === streamingPlaceholderAtStart) {
assistantMsgId.current.delete(sessionID)
setStore((draft) => { draft.streaming = false; draft.streamingSession = undefined })
}

// Preserve client-side interrupted flags: the client is the source of truth
// for interruption since the server may not have persisted the stopReason yet.
// Local turnIds (random UUID) differ from server turnIds (turn index), so match
// by position: find the Nth interrupted assistant message locally, mark the Nth
// assistant message from the server.
const localInterruptedIndices = new Set<number>()
let localAstIdx = 0
for (const m of local) {
if (m.role === "assistant") {
if (m.interrupted) localInterruptedIndices.add(localAstIdx)
localAstIdx++
}
}
if (localInterruptedIndices.size > 0) {
let serverAstIdx = 0
for (const msg of serverMessages) {
if (msg.role === "assistant") {
if (localInterruptedIndices.has(serverAstIdx) && !msg.interrupted) {
msg.interrupted = true
}
serverAstIdx++
}
}
}

const lastServerTime = new Date(history.turns[history.turns.length - 1].timestamp).getTime()
const serverUserTexts = new Set(
serverMessages
Expand Down Expand Up @@ -570,9 +593,12 @@ export function ChatProvider({ children, onPermissionRequest, onPermissionResolv
const sessionID = event.sessionId
if (!sessionID) return
// Drop events belonging to the aborted turn; allow events from subsequent turns.
// If abortedTurnId is unknown (turn completed before abort), block ALL events for safety.
if (abortedSessions.current.has(sessionID)) {
if (!abortedTurnId.current || event.turnId === abortedTurnId.current) return
const blockedTurnId = abortedTurnIds.current.get(sessionID)
// If we know which turnId was aborted, only block that specific turn.
// If turnId is unknown (undefined), block all events for this session
// until handleMessageProcessing clears the guard on the next turn.
if (blockedTurnId === undefined || event.turnId === blockedTurnId) return
}

// Broadcast for consumers outside chat context (file tree, notifications, etc.)
Expand Down Expand Up @@ -845,10 +871,11 @@ export function ChatProvider({ children, onPermissionRequest, onPermissionResolv

// If this session was aborted, check if this is the aborted turn or a new one
if (abortedSessions.current.has(sid)) {
if (ev.turnId === abortedTurnId.current) return // still the aborted turn
const blockedTurnId = abortedTurnIds.current.get(sid)
if (blockedTurnId !== undefined && ev.turnId === blockedTurnId) return // still the aborted turn
// New turn from queue — clear abort guard so it can proceed
abortedSessions.current.delete(sid)
abortedTurnId.current = undefined
abortedTurnIds.current.delete(sid)
}

const processingStartedAt = new Date(ev.timestamp).getTime()
Expand Down Expand Up @@ -1097,9 +1124,10 @@ export function ChatProvider({ children, onPermissionRequest, onPermissionResolv
}

// Instant mode: interrupt current turn before sending new message.
// Uses cached ref (no async) so abort happens synchronously before any events slip through.
// Await abort so the server has acknowledged cancellation before we send the new prompt,
// preventing the race where cancel arrives after the new prompt starts processing.
if (store.streaming && store.activeSession && messageModeRef.current === "instant") {
abort()
await abort()
// Do NOT clear the abort guard here — handleMessageProcessing
// clears it when the NEW turn's message:processing event arrives.
}
Expand Down Expand Up @@ -1226,10 +1254,13 @@ export function ChatProvider({ children, onPermissionRequest, onPermissionResolv
void loadHistory(id)
}, [workspace.client])

const abort = useCallback(() => {
const abort = useCallback(async () => {
const sessionID = store.activeSession
if (!sessionID) return

// Already aborting this session — avoid double-fire
if (abortedSessions.current.has(sessionID)) return

// Find the turnId of the currently streaming turn so we can block only its events
const currentMsgId = assistantMsgId.current.get(sessionID)
let currentTurnId: string | undefined
Expand All @@ -1240,7 +1271,7 @@ export function ChatProvider({ children, onPermissionRequest, onPermissionResolv
}

abortedSessions.current.add(sessionID)
abortedTurnId.current = currentTurnId
abortedTurnIds.current.set(sessionID, currentTurnId)

// Discard unrevealed content — do NOT flush buffers to the UI.
// Text already rendered (up to charStream cursor) stays; everything
Expand All @@ -1266,16 +1297,21 @@ export function ChatProvider({ children, onPermissionRequest, onPermissionResolv
assistantMsgId.current.delete(sessionID)
setStore((draft) => { draft.streaming = false; draft.streamingSession = undefined })
// Tell server to cancel only the current prompt (queue preserved).
// Await so instant-mode callers know the server has acknowledged before sending a new prompt.
// Guard stays active until:
// - handleMessageProcessing receives a NEW turn (queue drained)
// - user sends a new message (doSendPrompt clears it)
// - 30s fallback timeout
workspace.client.cancelPrompt(sessionID).catch(() => {})
// Fallback: clear guard after 30s even if server never responds
// - 10s fallback timeout
try {
await workspace.client.cancelPrompt(sessionID)
} catch (e) {
console.warn("[Chat] cancelPrompt failed:", e)
}
// Fallback: clear guard after 10s even if server never responds
setTimeout(() => {
abortedSessions.current.delete(sessionID)
abortedTurnId.current = undefined
}, 30_000)
abortedTurnIds.current.delete(sessionID)
}, 10_000)
}, [store.activeSession, workspace.client])

// Load messageMode setting on mount
Expand Down
Loading