diff --git a/src/common/orpc/schemas/errors.ts b/src/common/orpc/schemas/errors.ts index 2846abd653..94924600a4 100644 --- a/src/common/orpc/schemas/errors.ts +++ b/src/common/orpc/schemas/errors.ts @@ -40,6 +40,7 @@ export const StreamErrorTypeSchema = z.enum([ "runtime_not_ready", // Container/runtime doesn't exist or failed to start (permanent) "runtime_start_failed", // Runtime is starting or temporarily unavailable (retryable) "empty_output", // Provider ended the stream without any assistant-visible output + "stream_truncated", // Provider stream closed before its terminal finish event "max_output_tokens", // Provider truncated the response at max_tokens (finishReason: "length") "unknown", // Catch-all ]); diff --git a/src/node/services/copilot/copilotResponsesLanguageModel.test.ts b/src/node/services/copilot/copilotResponsesLanguageModel.test.ts index 4fbe84e593..74d9a5df71 100644 --- a/src/node/services/copilot/copilotResponsesLanguageModel.test.ts +++ b/src/node/services/copilot/copilotResponsesLanguageModel.test.ts @@ -378,6 +378,98 @@ describe("CopilotResponsesLanguageModel", () => { }); }); + it("emits an error when the SSE stream closes before a terminal event", async () => { + restoreFetchers.push( + mockFetch(() => + Promise.resolve( + createSseResponse([ + { + event: "response.output_item.added", + data: { + type: "response.output_item.added", + output_index: 0, + content_index: 0, + item: { type: "message", id: "msg_1" }, + }, + }, + { + event: "response.output_text.delta", + data: { + type: "response.output_text.delta", + output_index: 0, + content_index: 0, + item_id: "msg_1", + delta: "partial", + }, + }, + ]) + ) + ) + ); + + const model = createModel(); + const result = await model.doStream({ + prompt: [{ role: "user", content: [{ type: "text", text: "Stream please" }] }], + }); + const parts = await collectStreamParts(result.stream); + + expect(parts.map((part) => part.type)).toEqual([ + "stream-start", + "text-start", + "text-delta", + "error", + ]); + const errorPart = parts.find( + (part): part is Extract => part.type === "error" + ); + expect(errorPart?.error).toBeInstanceOf(Error); + expect(errorPart?.error instanceof Error ? errorPart.error.message : "").toContain( + "stream closed before terminal event" + ); + }); + + it("treats response.incomplete as a terminal finish event", async () => { + restoreFetchers.push( + mockFetch(() => + Promise.resolve( + createSseResponse([ + { + event: "response.incomplete", + data: { + type: "response.incomplete", + response: { + incomplete_details: { reason: "max_output_tokens" }, + usage: { input_tokens: 3, output_tokens: 2, total_tokens: 5 }, + }, + }, + }, + ]) + ) + ) + ); + + const model = createModel(); + const result = await model.doStream({ + prompt: [{ role: "user", content: [{ type: "text", text: "Stream please" }] }], + }); + const parts = await collectStreamParts(result.stream); + + expect(parts).toEqual([ + { type: "stream-start", warnings: [] }, + { + type: "finish", + finishReason: "length", + usage: { + inputTokens: 3, + outputTokens: 2, + totalTokens: 5, + reasoningTokens: undefined, + cachedInputTokens: undefined, + }, + }, + ]); + }); + it("uses a stable synthetic text id even when item_id rotates", async () => { restoreFetchers.push( mockFetch(() => diff --git a/src/node/services/copilot/copilotResponsesLanguageModel.ts b/src/node/services/copilot/copilotResponsesLanguageModel.ts index 90417854ac..0b0045ac09 100644 --- a/src/node/services/copilot/copilotResponsesLanguageModel.ts +++ b/src/node/services/copilot/copilotResponsesLanguageModel.ts @@ -283,6 +283,7 @@ async function consumeSseStream( const reader = source.getReader(); const decoder = new TextDecoder(); let buffer = ""; + let sawTerminalEvent = false; try { for (;;) { @@ -294,13 +295,18 @@ async function consumeSseStream( while (boundary >= 0) { const frame = buffer.slice(0, boundary); buffer = buffer.slice(boundary + 2); - processFrame(frame, aliasRegistry, includeRawChunks, controller); + sawTerminalEvent = + processFrame(frame, aliasRegistry, includeRawChunks, controller) || sawTerminalEvent; boundary = buffer.indexOf("\n\n"); } if (done) { if (buffer.trim().length > 0) { - processFrame(buffer, aliasRegistry, includeRawChunks, controller); + sawTerminalEvent = + processFrame(buffer, aliasRegistry, includeRawChunks, controller) || sawTerminalEvent; + } + if (!sawTerminalEvent) { + controller.enqueue({ type: "error", error: createStreamTruncatedError() }); } break; } @@ -318,19 +324,23 @@ function processFrame( aliasRegistry: Map, includeRawChunks: boolean, controller: ReadableStreamDefaultController -) { +): boolean { const parsed = parseSseFrame(frame); if (parsed == null) { - return; + return false; } if (includeRawChunks) { controller.enqueue({ type: "raw", rawValue: parsed }); } + let sawTerminalEvent = false; for (const part of mapStreamEvent(parsed.event, parsed.data, aliasRegistry)) { + sawTerminalEvent = part.type === "finish" || part.type === "error" || sawTerminalEvent; controller.enqueue(part); } + + return sawTerminalEvent; } function parseSseFrame(frame: string) { @@ -393,6 +403,7 @@ function mapStreamEvent(event: string, data: JsonRecord, aliasRegistry: Map, @@ -479,6 +509,7 @@ function mapFinishReason(reason: unknown): LanguageModelV2FinishReason { case "stop": return "stop"; case "max_tokens": + case "max_output_tokens": return "length"; case "content_filter": return "content-filter"; diff --git a/src/node/services/streamManager.modelOnlyNotifications.test.ts b/src/node/services/streamManager.modelOnlyNotifications.test.ts index 9eb38db02b..602a24b470 100644 --- a/src/node/services/streamManager.modelOnlyNotifications.test.ts +++ b/src/node/services/streamManager.modelOnlyNotifications.test.ts @@ -52,6 +52,8 @@ describe("StreamManager - model-only tool notifications", () => { __mux_notifications: ["hello"], }, }; + + yield { type: "finish", finishReason: "stop" }; })(), totalUsage: Promise.resolve({ inputTokens: 0, outputTokens: 0, totalTokens: 0 }), usage: Promise.resolve({ inputTokens: 0, outputTokens: 0, totalTokens: 0 }), @@ -131,6 +133,8 @@ describe("StreamManager - model-only tool notifications", () => { ], }, }; + + yield { type: "finish", finishReason: "stop" }; })(), totalUsage: Promise.resolve({ inputTokens: 0, outputTokens: 0, totalTokens: 0 }), usage: Promise.resolve({ inputTokens: 0, outputTokens: 0, totalTokens: 0 }), diff --git a/src/node/services/streamManager.test.ts b/src/node/services/streamManager.test.ts index 49beb03db5..c60f3cdd9d 100644 --- a/src/node/services/streamManager.test.ts +++ b/src/node/services/streamManager.test.ts @@ -209,6 +209,7 @@ function createStreamInfoForTests( cumulativeUsage: { inputTokens: 0, outputTokens: 0, totalTokens: 0 }, cumulativeProviderMetadata: undefined, didRetryPreviousResponseIdAtStep: false, + receivedTerminalEvent: false, currentStepStartIndex: 0, stepTracker: {}, ...overrides, @@ -941,6 +942,12 @@ describe("StreamManager - language model cleanup", () => { workspaceId: "cleanup-workspace", messageId: "cleanup-message", streamInfoOverrides: () => ({ + streamResult: createStreamResultForTests( + (async function* () { + await Promise.resolve(); + yield { type: "finish", finishReason: "stop" }; + })() + ), parts: [{ type: "text" as const, text: "done", timestamp: Date.now() }], }), }, @@ -1583,6 +1590,70 @@ describe("StreamManager - empty stream completions", () => { expect(partial?.metadata?.metadataModel).toBe(KNOWN_MODELS.SONNET.id); expect(partial?.parts).toEqual([]); }); + + test("persists retryable partial error when a non-empty stream closes before finish", async () => { + const streamManager = new StreamManager(historyService); + const errorEvents: Array<{ messageId: string; error: string; errorType?: string }> = []; + const streamEndEvents: unknown[] = []; + + streamManager.on("error", (data) => { + errorEvents.push(data as { messageId: string; error: string; errorType?: string }); + }); + streamManager.on("stream-end", (data) => { + streamEndEvents.push(data); + }); + + const replaceTokenTrackerResult = Reflect.set(streamManager, "tokenTracker", { + setModel: () => Promise.resolve(undefined), + countTokens: () => Promise.resolve(0), + }); + expect(replaceTokenTrackerResult).toBe(true); + + const workspaceId = "truncated-stream-workspace"; + const messageId = "truncated-stream-message"; + const historySequence = 1; + + await appendPartialAssistantForTests(workspaceId, messageId, historySequence); + const processStreamWithCleanup = getProcessStreamWithCleanupForTests(streamManager); + const startTime = Date.now() - 250; + const streamInfo = createStreamInfoForTests({ + streamResult: createStreamResultForTests( + (async function* () { + await Promise.resolve(); + yield { type: "text-delta", text: "partial answer" }; + })(), + { inputTokens: 3, outputTokens: 2, totalTokens: 5 } + ), + messageId, + startTime, + lastPartTimestamp: startTime, + model: KNOWN_MODELS.SONNET.id, + metadataModel: KNOWN_MODELS.SONNET.id, + historySequence, + initialMetadata: { agentId: "plan" }, + runtime, + }); + + await processStreamWithCleanup.call(streamManager, workspaceId, streamInfo, historySequence); + + expect(streamEndEvents).toHaveLength(0); + expect(errorEvents).toHaveLength(1); + expect(errorEvents[0]).toMatchObject({ + messageId, + errorType: "stream_truncated", + }); + expect(errorEvents[0]?.error).toContain( + "Anthropic stream closed unexpectedly before the response completed" + ); + + const partial = await historyService.readPartial(workspaceId); + expect(partial?.metadata?.errorType).toBe("stream_truncated"); + expect(partial?.metadata?.error).toContain( + "Anthropic stream closed unexpectedly before the response completed" + ); + expect(partial?.metadata?.metadataModel).toBe(KNOWN_MODELS.SONNET.id); + expect(partial?.parts).toMatchObject([{ type: "text", text: "partial answer" }]); + }); }); describe("StreamManager - TTFT metadata persistence", () => { @@ -1702,7 +1773,9 @@ describe("StreamManager - TTFT metadata persistence", () => { const streamInfo = createStreamInfoForTests({ streamResult: createStreamResultForTests( (async function* () { - // No-op stream: tests verify stream-end finalization behavior from pre-populated parts. + // Tests pre-populate parts but still need the provider's terminal proof of completion. + await Promise.resolve(); + yield { type: "finish", finishReason: "stop" }; })(), usage ), @@ -2606,6 +2679,16 @@ describe("StreamManager - categorizeError", () => { }); const categorizeCases: Array<{ name: string; error: unknown; expected: string }> = [ + { + name: "classifies Anthropic missing message_stop as stream_truncated", + error: new Error("anthropic stream closed before message_stop"), + expected: "stream_truncated", + }, + { + name: "classifies OpenAI Responses missing terminal event as stream_truncated", + error: new Error("openai responses stream closed before terminal event"), + expected: "stream_truncated", + }, { name: "classifies model_not_found via message fallback", error: new Error("The model `gpt-5.2-codex` does not exist or you do not have access to it."), diff --git a/src/node/services/streamManager.ts b/src/node/services/streamManager.ts index bc69ad3763..fbd499454a 100644 --- a/src/node/services/streamManager.ts +++ b/src/node/services/streamManager.ts @@ -64,6 +64,7 @@ import { runLanguageModelCleanup } from "./languageModelCleanup"; import { shellQuote } from "@/common/utils/shell"; import { classify429Capacity } from "@/common/utils/errors/classify429Capacity"; import { extractChunkDeltaText } from "@/common/utils/ai/streamChunks"; +import { PROVIDER_DEFINITIONS } from "@/common/constants/providers"; // Disable noisy AI SDK warning logging. globalThis.AI_SDK_LOG_WARNINGS = false; @@ -74,6 +75,8 @@ const EMPTY_STREAM_OUTPUT_ERROR_MESSAGE = "The model ended the stream before producing any assistant-visible output. This usually means the upstream stream was dropped rather than completed normally. Mux will retry automatically when possible, and if retries keep failing you should try again or switch models."; const MAX_EMPTY_STREAM_RECOVERY_ATTEMPTS = 1; +const STREAM_TRUNCATED_MESSAGE_SUFFIX = + "stream closed unexpectedly before the response completed. Mux will retry automatically when possible, and if retries keep failing you should try again or switch models."; class EmptyStreamOutputError extends Error { constructor() { @@ -82,6 +85,16 @@ class EmptyStreamOutputError extends Error { } } +class StreamTruncatedError extends Error { + readonly providerDisplayName: string; + + constructor(providerDisplayName: string) { + super(`${providerDisplayName} ${STREAM_TRUNCATED_MESSAGE_SUFFIX}`); + this.name = "StreamTruncatedError"; + this.providerDisplayName = providerDisplayName; + } +} + // Type definitions for stream parts with extended properties interface ReasoningDeltaPart { type: "reasoning-delta"; @@ -131,6 +144,31 @@ interface StreamRequestConfig { toolPolicy?: ToolPolicy; } +function isKnownProviderName(provider: string): provider is keyof typeof PROVIDER_DEFINITIONS { + return Object.hasOwn(PROVIDER_DEFINITIONS, provider); +} + +function getStreamProviderDisplayName(model: string): string { + const canonicalModel = normalizeToCanonical(model); + const providerSeparatorIndex = canonicalModel.indexOf(":"); + const provider = + providerSeparatorIndex > 0 ? canonicalModel.slice(0, providerSeparatorIndex) : undefined; + if (provider && isKnownProviderName(provider)) { + return PROVIDER_DEFINITIONS[provider].displayName; + } + + return "The model"; +} + +function isStreamTruncatedMessage(message: string): boolean { + const lowerMessage = message.toLowerCase(); + return ( + lowerMessage.includes("stream closed before message_stop") || + lowerMessage.includes("stream closed before terminal event") || + lowerMessage.includes("stream closed unexpectedly before the response completed") + ); +} + function isRecord(value: unknown): value is Record { return typeof value === "object" && value !== null; } @@ -370,6 +408,10 @@ interface WorkspaceStreamInfo { // stream-end prefers cumulative usage across attempts instead of the final // attempt's totalUsage only. didRetryAfterEmptyOutput?: boolean; + // Provider streams must prove semantic completion with a terminal SDK finish part. + // A clean EOF can be a proxy/provider drop and must not finalize partial assistant text. + receivedTerminalEvent: boolean; + terminalFinishReason?: string; // Index into parts where the current step started (used to ensure safe retries) currentStepStartIndex: number; historySequence: number; @@ -1380,6 +1422,7 @@ export class StreamManager extends EventEmitter { didRetryPreviousResponseIdAtStep: false, didRetryAfterEmptyOutput: false, stepTracker, + receivedTerminalEvent: false, currentStepStartIndex: 0, request, historySequence, @@ -1686,6 +1729,39 @@ export class StreamManager extends EventEmitter { await this.handleStreamFailure(workspaceId, streamInfo, new EmptyStreamOutputError()); } + private async handleTruncatedStreamCompletion( + workspaceId: WorkspaceId, + streamInfo: WorkspaceStreamInfo + ): Promise { + const workspaceLog = this.getWorkspaceLogger(workspaceId, streamInfo); + const streamMeta = await this.getStreamMetadata(streamInfo); + const totalUsage = this.resolveTotalUsageForStreamEnd(streamInfo, streamMeta.totalUsage); + const contextUsage = streamMeta.contextUsage ?? streamInfo.lastStepUsage; + const previousResponseId = this.getOpenAIPreviousResponseId(streamInfo.request.providerOptions); + const providerDisplayName = getStreamProviderDisplayName(streamInfo.model); + + // Do not treat iterator EOF as success. Anthropic and OpenAI Responses both + // have semantic terminal events; without the SDK finish part, this may be a + // clean proxy/provider drop after partial text was already streamed. + workspaceLog.error("Stream ended without a terminal finish event", { + messageId: streamInfo.messageId, + model: streamInfo.model, + providerDisplayName, + durationMs: streamMeta.duration, + totalUsage, + contextUsage, + cumulativeUsage: streamInfo.cumulativeUsage, + previousResponseId, + partsCount: streamInfo.parts.length, + }); + + await this.handleStreamFailure( + workspaceId, + streamInfo, + new StreamTruncatedError(providerDisplayName) + ); + } + private async retryEmptyStreamBeforeFailure( workspaceId: WorkspaceId, streamInfo: WorkspaceStreamInfo, @@ -2038,10 +2114,18 @@ export class StreamManager extends EventEmitter { // Handle other event types as needed case "start": case "text-start": - case "finish": // These events can be logged or handled if needed break; + case "finish": { + const finishPart = part as { finishReason?: unknown }; + streamInfo.receivedTerminalEvent = true; + if (typeof finishPart.finishReason === "string") { + streamInfo.terminalFinishReason = finishPart.finishReason; + } + break; + } + case "finish-step": { // Emit usage-delta event with usage from this step const finishStepPart = part as { @@ -2115,6 +2199,11 @@ export class StreamManager extends EventEmitter { break; } + if (!streamInfo.receivedTerminalEvent) { + await this.handleTruncatedStreamCompletion(workspaceId, streamInfo); + break; + } + // Get all metadata from stream result in one call // - totalUsage: sum of all steps (for cost calculation) // - contextUsage: last step only (for context window display) @@ -2130,7 +2219,7 @@ export class StreamManager extends EventEmitter { const contextUsage = streamMeta.contextUsage ?? streamInfo.lastStepUsage; const contextProviderMetadata = streamMeta.contextProviderMetadata ?? streamInfo.lastStepProviderMetadata; - const finishReason = streamMeta.finishReason; + const finishReason = streamInfo.terminalFinishReason ?? streamMeta.finishReason; const duration = streamMeta.duration; const ttftMs = this.resolveTtftMsForStreamEnd(streamInfo); // Aggregated provider metadata across all steps (for cost calculation with cache tokens) @@ -2323,6 +2412,15 @@ export class StreamManager extends EventEmitter { }; } + if (error instanceof StreamTruncatedError) { + return { + messageId: streamInfo.messageId, + error: error.message, + errorType: "stream_truncated", + acpPromptId: streamInfo.initialMetadata?.acpPromptId, + }; + } + // Extract error message (errors thrown from 'error' parts already have the correct message) // Apply prefix stripping to remove noisy "undefined: " prefixes from provider errors let errorMessage: string = stripNoisyErrorPrefix(getErrorMessage(error)); @@ -2508,6 +2606,8 @@ export class StreamManager extends EventEmitter { if (!preserveParts) { streamInfo.parts = []; } + streamInfo.receivedTerminalEvent = false; + streamInfo.terminalFinishReason = undefined; streamInfo.lastPartialWriteTime = 0; if (!preserveUsage) { @@ -2651,6 +2751,10 @@ export class StreamManager extends EventEmitter { * Categorizes errors for better error handling (used for event emission) */ private categorizeError(error: unknown): StreamErrorType { + if (error instanceof StreamTruncatedError) { + return "stream_truncated"; + } + // Use AI SDK error type guards first if (LoadAPIKeyError.isInstance(error)) { return "authentication"; @@ -2769,6 +2873,10 @@ export class StreamManager extends EventEmitter { if (error instanceof Error) { const message = error.message.toLowerCase(); + if (isStreamTruncatedMessage(message)) { + return "stream_truncated"; + } + if (error.name === "AbortError" || message.includes("abort")) { return "aborted"; } else if (message.includes("network") || message.includes("fetch")) {