Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions src/common/orpc/schemas/errors.ts
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ export const StreamErrorTypeSchema = z.enum([
"runtime_not_ready", // Container/runtime doesn't exist or failed to start (permanent)
"runtime_start_failed", // Runtime is starting or temporarily unavailable (retryable)
"empty_output", // Provider ended the stream without any assistant-visible output
"stream_truncated", // Provider stream closed before its terminal finish event
"max_output_tokens", // Provider truncated the response at max_tokens (finishReason: "length")
"unknown", // Catch-all
]);
Expand Down
92 changes: 92 additions & 0 deletions src/node/services/copilot/copilotResponsesLanguageModel.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -378,6 +378,98 @@ describe("CopilotResponsesLanguageModel", () => {
});
});

it("emits an error when the SSE stream closes before a terminal event", async () => {
restoreFetchers.push(
mockFetch(() =>
Promise.resolve(
createSseResponse([
{
event: "response.output_item.added",
data: {
type: "response.output_item.added",
output_index: 0,
content_index: 0,
item: { type: "message", id: "msg_1" },
},
},
{
event: "response.output_text.delta",
data: {
type: "response.output_text.delta",
output_index: 0,
content_index: 0,
item_id: "msg_1",
delta: "partial",
},
},
])
)
)
);

const model = createModel();
const result = await model.doStream({
prompt: [{ role: "user", content: [{ type: "text", text: "Stream please" }] }],
});
const parts = await collectStreamParts(result.stream);

expect(parts.map((part) => part.type)).toEqual([
"stream-start",
"text-start",
"text-delta",
"error",
]);
const errorPart = parts.find(
(part): part is Extract<LanguageModelV2StreamPart, { type: "error" }> => part.type === "error"
);
expect(errorPart?.error).toBeInstanceOf(Error);
expect(errorPart?.error instanceof Error ? errorPart.error.message : "").toContain(
"stream closed before terminal event"
);
});

it("treats response.incomplete as a terminal finish event", async () => {
restoreFetchers.push(
mockFetch(() =>
Promise.resolve(
createSseResponse([
{
event: "response.incomplete",
data: {
type: "response.incomplete",
response: {
incomplete_details: { reason: "max_output_tokens" },
usage: { input_tokens: 3, output_tokens: 2, total_tokens: 5 },
},
},
},
])
)
)
);

const model = createModel();
const result = await model.doStream({
prompt: [{ role: "user", content: [{ type: "text", text: "Stream please" }] }],
});
const parts = await collectStreamParts(result.stream);

expect(parts).toEqual([
{ type: "stream-start", warnings: [] },
{
type: "finish",
finishReason: "length",
usage: {
inputTokens: 3,
outputTokens: 2,
totalTokens: 5,
reasoningTokens: undefined,
cachedInputTokens: undefined,
},
},
]);
});

it("uses a stable synthetic text id even when item_id rotates", async () => {
restoreFetchers.push(
mockFetch(() =>
Expand Down
39 changes: 35 additions & 4 deletions src/node/services/copilot/copilotResponsesLanguageModel.ts
Original file line number Diff line number Diff line change
Expand Up @@ -283,6 +283,7 @@ async function consumeSseStream(
const reader = source.getReader();
const decoder = new TextDecoder();
let buffer = "";
let sawTerminalEvent = false;

try {
for (;;) {
Expand All @@ -294,13 +295,18 @@ async function consumeSseStream(
while (boundary >= 0) {
const frame = buffer.slice(0, boundary);
buffer = buffer.slice(boundary + 2);
processFrame(frame, aliasRegistry, includeRawChunks, controller);
sawTerminalEvent =
processFrame(frame, aliasRegistry, includeRawChunks, controller) || sawTerminalEvent;
boundary = buffer.indexOf("\n\n");
}

if (done) {
if (buffer.trim().length > 0) {
processFrame(buffer, aliasRegistry, includeRawChunks, controller);
sawTerminalEvent =
processFrame(buffer, aliasRegistry, includeRawChunks, controller) || sawTerminalEvent;
}
if (!sawTerminalEvent) {
controller.enqueue({ type: "error", error: createStreamTruncatedError() });
}
break;
}
Expand All @@ -318,19 +324,23 @@ function processFrame(
aliasRegistry: Map<string, string>,
includeRawChunks: boolean,
controller: ReadableStreamDefaultController<LanguageModelV2StreamPart>
) {
): boolean {
const parsed = parseSseFrame(frame);
if (parsed == null) {
return;
return false;
}

if (includeRawChunks) {
controller.enqueue({ type: "raw", rawValue: parsed });
}

let sawTerminalEvent = false;
for (const part of mapStreamEvent(parsed.event, parsed.data, aliasRegistry)) {
sawTerminalEvent = part.type === "finish" || part.type === "error" || sawTerminalEvent;
controller.enqueue(part);
}

return sawTerminalEvent;
}

function parseSseFrame(frame: string) {
Expand Down Expand Up @@ -393,20 +403,40 @@ function mapStreamEvent(event: string, data: JsonRecord, aliasRegistry: Map<stri
return id ? ([{ type: "text-end", id }] satisfies LanguageModelV2StreamPart[]) : [];
}
case "response.completed":
case "response.incomplete":
return [
{
type: "finish",
usage: mapUsage((data.response as JsonRecord | undefined)?.usage),
finishReason: mapFinishReason(getRawFinishReason(data.response)),
},
] satisfies LanguageModelV2StreamPart[];
case "response.failed":
return [
{ type: "error", error: createResponseFailedError(data) },
] satisfies LanguageModelV2StreamPart[];
case "error":
return [{ type: "error", error: data }] satisfies LanguageModelV2StreamPart[];
default:
return [];
}
}

function createStreamTruncatedError() {
return new Error("Copilot Responses stream closed before terminal event");
}

function createResponseFailedError(data: JsonRecord) {
const response = data.response as JsonRecord | undefined;
const error = response?.error as JsonRecord | undefined;
const message = getString(error?.message) ?? "Response failed";
const code = getString(error?.code);

return new Error(
code ? `response failed: ${message} (code: ${code})` : `response failed: ${message}`
);
}

function resolveStableTextId(
data: JsonRecord,
aliasRegistry: Map<string, string>,
Expand Down Expand Up @@ -479,6 +509,7 @@ function mapFinishReason(reason: unknown): LanguageModelV2FinishReason {
case "stop":
return "stop";
case "max_tokens":
case "max_output_tokens":
return "length";
case "content_filter":
return "content-filter";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,8 @@ describe("StreamManager - model-only tool notifications", () => {
__mux_notifications: ["<notification>hello</notification>"],
},
};

yield { type: "finish", finishReason: "stop" };
})(),
totalUsage: Promise.resolve({ inputTokens: 0, outputTokens: 0, totalTokens: 0 }),
usage: Promise.resolve({ inputTokens: 0, outputTokens: 0, totalTokens: 0 }),
Expand Down Expand Up @@ -131,6 +133,8 @@ describe("StreamManager - model-only tool notifications", () => {
],
},
};

yield { type: "finish", finishReason: "stop" };
})(),
totalUsage: Promise.resolve({ inputTokens: 0, outputTokens: 0, totalTokens: 0 }),
usage: Promise.resolve({ inputTokens: 0, outputTokens: 0, totalTokens: 0 }),
Expand Down
85 changes: 84 additions & 1 deletion src/node/services/streamManager.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -209,6 +209,7 @@ function createStreamInfoForTests(
cumulativeUsage: { inputTokens: 0, outputTokens: 0, totalTokens: 0 },
cumulativeProviderMetadata: undefined,
didRetryPreviousResponseIdAtStep: false,
receivedTerminalEvent: false,
currentStepStartIndex: 0,
stepTracker: {},
...overrides,
Expand Down Expand Up @@ -941,6 +942,12 @@ describe("StreamManager - language model cleanup", () => {
workspaceId: "cleanup-workspace",
messageId: "cleanup-message",
streamInfoOverrides: () => ({
streamResult: createStreamResultForTests(
(async function* () {
await Promise.resolve();
yield { type: "finish", finishReason: "stop" };
})()
),
parts: [{ type: "text" as const, text: "done", timestamp: Date.now() }],
}),
},
Expand Down Expand Up @@ -1583,6 +1590,70 @@ describe("StreamManager - empty stream completions", () => {
expect(partial?.metadata?.metadataModel).toBe(KNOWN_MODELS.SONNET.id);
expect(partial?.parts).toEqual([]);
});

test("persists retryable partial error when a non-empty stream closes before finish", async () => {
const streamManager = new StreamManager(historyService);
const errorEvents: Array<{ messageId: string; error: string; errorType?: string }> = [];
const streamEndEvents: unknown[] = [];

streamManager.on("error", (data) => {
errorEvents.push(data as { messageId: string; error: string; errorType?: string });
});
streamManager.on("stream-end", (data) => {
streamEndEvents.push(data);
});

const replaceTokenTrackerResult = Reflect.set(streamManager, "tokenTracker", {
setModel: () => Promise.resolve(undefined),
countTokens: () => Promise.resolve(0),
});
expect(replaceTokenTrackerResult).toBe(true);

const workspaceId = "truncated-stream-workspace";
const messageId = "truncated-stream-message";
const historySequence = 1;

await appendPartialAssistantForTests(workspaceId, messageId, historySequence);
const processStreamWithCleanup = getProcessStreamWithCleanupForTests(streamManager);
const startTime = Date.now() - 250;
const streamInfo = createStreamInfoForTests({
streamResult: createStreamResultForTests(
(async function* () {
await Promise.resolve();
yield { type: "text-delta", text: "partial answer" };
})(),
{ inputTokens: 3, outputTokens: 2, totalTokens: 5 }
),
messageId,
startTime,
lastPartTimestamp: startTime,
model: KNOWN_MODELS.SONNET.id,
metadataModel: KNOWN_MODELS.SONNET.id,
historySequence,
initialMetadata: { agentId: "plan" },
runtime,
});

await processStreamWithCleanup.call(streamManager, workspaceId, streamInfo, historySequence);

expect(streamEndEvents).toHaveLength(0);
expect(errorEvents).toHaveLength(1);
expect(errorEvents[0]).toMatchObject({
messageId,
errorType: "stream_truncated",
});
expect(errorEvents[0]?.error).toContain(
"Anthropic stream closed unexpectedly before the response completed"
);

const partial = await historyService.readPartial(workspaceId);
expect(partial?.metadata?.errorType).toBe("stream_truncated");
expect(partial?.metadata?.error).toContain(
"Anthropic stream closed unexpectedly before the response completed"
);
expect(partial?.metadata?.metadataModel).toBe(KNOWN_MODELS.SONNET.id);
expect(partial?.parts).toMatchObject([{ type: "text", text: "partial answer" }]);
});
});

describe("StreamManager - TTFT metadata persistence", () => {
Expand Down Expand Up @@ -1702,7 +1773,9 @@ describe("StreamManager - TTFT metadata persistence", () => {
const streamInfo = createStreamInfoForTests({
streamResult: createStreamResultForTests(
(async function* () {
// No-op stream: tests verify stream-end finalization behavior from pre-populated parts.
// Tests pre-populate parts but still need the provider's terminal proof of completion.
await Promise.resolve();
yield { type: "finish", finishReason: "stop" };
})(),
usage
),
Expand Down Expand Up @@ -2606,6 +2679,16 @@ describe("StreamManager - categorizeError", () => {
});

const categorizeCases: Array<{ name: string; error: unknown; expected: string }> = [
{
name: "classifies Anthropic missing message_stop as stream_truncated",
error: new Error("anthropic stream closed before message_stop"),
expected: "stream_truncated",
},
{
name: "classifies OpenAI Responses missing terminal event as stream_truncated",
error: new Error("openai responses stream closed before terminal event"),
expected: "stream_truncated",
},
{
name: "classifies model_not_found via message fallback",
error: new Error("The model `gpt-5.2-codex` does not exist or you do not have access to it."),
Expand Down
Loading
Loading