From 82ecab1b0419a8759994977c96b9f4eeceb9255c Mon Sep 17 00:00:00 2001 From: ethan Date: Thu, 7 May 2026 17:38:58 +1000 Subject: [PATCH 1/4] fix(providers/anthropic): require message_stop before yielding Finish The Anthropic Messages streaming protocol guarantees message_stop as the final SSE event of every successful stream. Today the adapter treats any clean EOF (Stream.Err() == nil or io.EOF) as a successful Finish, even when the upstream body was cut off mid-response. This silently truncates the assistant's reply and commits the partial text as if it were the model's complete answer. Track whether message_stop was observed during the SSE loop. On clean EOF without it, yield StreamPartTypeError wrapping io.EOF so the failure surfaces as a retryable transport error rather than a phantom success. Existing transport errors continue to flow through the unchanged else branch; the event: error path keeps yielding via Stream.Err(). Tests cover happy path, EOF before message_stop, empty stream, and malformed stream (existing error path preserved). Also picks up a one-line gofmt fix in TestComputerUseToolJSON; the test file was not gofmt-clean at HEAD without it. --- providers/anthropic/anthropic.go | 12 +++ providers/anthropic/anthropic_test.go | 116 +++++++++++++++++++++++++- 2 files changed, 127 insertions(+), 1 deletion(-) diff --git a/providers/anthropic/anthropic.go b/providers/anthropic/anthropic.go index 2f05b2f12..86486b83d 100644 --- a/providers/anthropic/anthropic.go +++ b/providers/anthropic/anthropic.go @@ -1244,6 +1244,7 @@ func (a languageModel) Stream(ctx context.Context, call fantasy.Call) (fantasy.S stream := a.client.Messages.NewStreaming(ctx, *params, reqOpts...) acc := anthropic.Message{} + var sawMessageStop bool return func(yield func(fantasy.StreamPart) bool) { if len(warnings) > 0 { if !yield(fantasy.StreamPart{ @@ -1448,11 +1449,22 @@ func (a languageModel) Stream(ctx context.Context, call fantasy.Call) (fantasy.S } } case "message_stop": + sawMessageStop = true } } err := stream.Err() if err == nil || errors.Is(err, io.EOF) { + if !sawMessageStop { + if err == nil { + err = io.EOF + } + yield(fantasy.StreamPart{ + Type: fantasy.StreamPartTypeError, + Error: fmt.Errorf("anthropic stream closed before message_stop: %w", err), + }) + return + } yield(fantasy.StreamPart{ Type: fantasy.StreamPartTypeFinish, ID: acc.ID, diff --git a/providers/anthropic/anthropic_test.go b/providers/anthropic/anthropic_test.go index 4387a34fa..5f6a3c168 100644 --- a/providers/anthropic/anthropic_test.go +++ b/providers/anthropic/anthropic_test.go @@ -5,6 +5,7 @@ import ( "encoding/json" "errors" "fmt" + "io" "math" "net/http" "net/http/httptest" @@ -504,6 +505,95 @@ func TestStream_SendsOutputConfigEffort(t *testing.T) { requireAnthropicEffort(t, call.body, EffortHigh) } +func TestStream_RequiresMessageStopBeforeFinish(t *testing.T) { + t.Parallel() + + completeTextStream := []string{ + anthropicSSEEvent("message_start", `{"type":"message_start","message":{"id":"msg_complete","type":"message","role":"assistant","model":"claude-sonnet-4-20250514","content":[],"stop_reason":null,"usage":{"input_tokens":1,"output_tokens":0}}}`), + anthropicSSEEvent("content_block_start", `{"type":"content_block_start","index":0,"content_block":{"type":"text","text":""}}`), + anthropicSSEEvent("content_block_delta", `{"type":"content_block_delta","index":0,"delta":{"type":"text_delta","text":"hello"}}`), + anthropicSSEEvent("content_block_stop", `{"type":"content_block_stop","index":0}`), + anthropicSSEEvent("message_delta", `{"type":"message_delta","delta":{"stop_reason":"end_turn","stop_sequence":null},"usage":{"input_tokens":1,"output_tokens":1}}`), + anthropicSSEEvent("message_stop", `{"type":"message_stop"}`), + } + truncatedTextStream := completeTextStream[:len(completeTextStream)-1] + + tests := []struct { + name string + chunks []string + wantFinish bool + wantEOF bool + }{ + { + name: "complete stream finishes", + chunks: completeTextStream, + wantFinish: true, + }, + { + name: "eof before message stop returns retryable error", + chunks: truncatedTextStream, + wantEOF: true, + }, + { + name: "empty stream returns retryable error", + wantEOF: true, + }, + { + name: "malformed stream keeps existing error path", + chunks: []string{ + "event: message_start\n", + "data: {not-json}\n\n", + }, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + t.Parallel() + + server, calls := newAnthropicStreamingServer(tt.chunks) + defer server.Close() + + provider, err := New( + WithAPIKey("test-api-key"), + WithBaseURL(server.URL), + ) + require.NoError(t, err) + + model, err := provider.LanguageModel(context.Background(), "claude-sonnet-4-20250514") + require.NoError(t, err) + + stream, err := model.Stream(context.Background(), fantasy.Call{ + Prompt: testPrompt(), + }) + require.NoError(t, err) + + parts := collectAnthropicStreamParts(stream) + _ = awaitAnthropicCall(t, calls) + + finishParts := streamPartsByType(parts, fantasy.StreamPartTypeFinish) + errorParts := streamPartsByType(parts, fantasy.StreamPartTypeError) + + if tt.wantFinish { + require.Len(t, finishParts, 1) + require.Empty(t, errorParts) + require.Equal(t, fantasy.FinishReasonStop, finishParts[0].FinishReason) + return + } + + require.Empty(t, finishParts) + require.Len(t, errorParts, 1) + require.Error(t, errorParts[0].Error) + if tt.wantEOF { + require.ErrorIs(t, errorParts[0].Error, io.EOF) + require.Contains(t, errorParts[0].Error.Error(), "message_stop") + } else { + require.NotContains(t, errorParts[0].Error.Error(), "message_stop") + } + }) + } +} + type anthropicCall struct { method string path string @@ -563,6 +653,29 @@ func newAnthropicStreamingServer(chunks []string) (*httptest.Server, <-chan anth return server, calls } +func anthropicSSEEvent(event, data string) string { + return fmt.Sprintf("event: %s\ndata: %s\n\n", event, data) +} + +func collectAnthropicStreamParts(stream fantasy.StreamResponse) []fantasy.StreamPart { + var parts []fantasy.StreamPart + stream(func(part fantasy.StreamPart) bool { + parts = append(parts, part) + return true + }) + return parts +} + +func streamPartsByType(parts []fantasy.StreamPart, typ fantasy.StreamPartType) []fantasy.StreamPart { + var matches []fantasy.StreamPart + for _, part := range parts { + if part.Type == typ { + matches = append(matches, part) + } + } + return matches +} + func awaitAnthropicCall(t *testing.T, calls <-chan anthropicCall) anthropicCall { t.Helper() @@ -1574,7 +1687,8 @@ func TestComputerUseToolJSON(t *testing.T) { } _, err := computerUseToolJSON(pdt) require.Error(t, err) - require.Contains(t, err.Error(), "tool_version arg is missing") }) + require.Contains(t, err.Error(), "tool_version arg is missing") + }) t.Run("returns error for unsupported version", func(t *testing.T) { t.Parallel() From 67c253350802065ca8c83dbef7702d90b7c8ead0 Mon Sep 17 00:00:00 2001 From: ethan Date: Thu, 7 May 2026 17:50:25 +1000 Subject: [PATCH 2/4] fix(providers/openai): require terminal Responses event before Finish The OpenAI Responses API emits terminal lifecycle events when a streamed response reaches its final state. The adapter currently yields Finish on any clean EOF, even if the stream ended before response.completed or response.incomplete. That has the same silent-truncation shape as the Anthropic message_stop bug in this PR. Track response.completed and response.incomplete before yielding Finish from both Stream and StreamObject. If the transport closes cleanly first, yield a StreamPartTypeError/ObjectStreamPartTypeError wrapping io.EOF so callers can retry instead of committing partial output. Also surface response.failed as an error event instead of falling through to Finish. Tests cover completed and incomplete terminal events, EOF before terminal event, empty streams, response.failed, malformed streams, and JSON-mode StreamObject truncation. Also fixes a pre-existing OpenAI test compile issue where one toResponsesPrompt call still expected two return values. --- providers/openai/openai_test.go | 193 ++++++++++++++++++- providers/openai/responses_language_model.go | 64 ++++++ 2 files changed, 256 insertions(+), 1 deletion(-) diff --git a/providers/openai/openai_test.go b/providers/openai/openai_test.go index db37452a7..5e22eaafd 100644 --- a/providers/openai/openai_test.go +++ b/providers/openai/openai_test.go @@ -5,6 +5,7 @@ import ( "encoding/base64" "encoding/json" "errors" + "io" "net/http" "net/http/httptest" "strings" @@ -3643,6 +3644,21 @@ func newResponsesProvider(t *testing.T, serverURL string) fantasy.LanguageModel return model } +func responsesSSEEvent(event, data string) string { + return "event: " + event + "\ndata: " + data + "\n\n" +} + +func collectObjectStreamParts(stream fantasy.ObjectStreamResponse) []fantasy.ObjectStreamPart { + var parts []fantasy.ObjectStreamPart + for part := range stream { + parts = append(parts, part) + if part.Type == fantasy.ObjectStreamPartTypeError || part.Type == fantasy.ObjectStreamPartTypeFinish { + break + } + } + return parts +} + func TestResponsesGenerate_WebSearchResponse(t *testing.T) { t.Parallel() @@ -4333,6 +4349,180 @@ func TestResponsesToPrompt_ReasoningWithFunctionCallCombined(t *testing.T) { require.Equal(t, functionCallID, input[3].OfFunctionCallOutput.CallID) } +func TestResponsesStream_RequiresTerminalEventBeforeFinish(t *testing.T) { + t.Parallel() + + textChunks := []string{ + responsesSSEEvent("response.output_item.added", `{"type":"response.output_item.added","output_index":0,"item":{"type":"message","id":"msg_01","role":"assistant","status":"in_progress","content":[]}}`), + responsesSSEEvent("response.output_text.delta", `{"type":"response.output_text.delta","output_index":0,"content_index":0,"item_id":"msg_01","delta":"hello"}`), + responsesSSEEvent("response.output_item.done", `{"type":"response.output_item.done","output_index":0,"item":{"type":"message","id":"msg_01","role":"assistant","status":"completed","content":[{"type":"output_text","text":"hello","annotations":[]}]}}`), + } + completedEvent := responsesSSEEvent("response.completed", `{"type":"response.completed","response":{"id":"resp_01","status":"completed","output":[],"usage":{"input_tokens":1,"output_tokens":1,"total_tokens":2}}}`) + incompleteEvent := responsesSSEEvent("response.incomplete", `{"type":"response.incomplete","response":{"id":"resp_02","status":"incomplete","output":[],"incomplete_details":{"reason":"max_output_tokens"},"usage":{"input_tokens":1,"output_tokens":1,"total_tokens":2}}}`) + failedEvent := responsesSSEEvent("response.failed", `{"type":"response.failed","response":{"id":"resp_03","status":"failed","error":{"code":"server_error","message":"boom"},"output":[]}}`) + + tests := []struct { + name string + chunks []string + wantFinishReason fantasy.FinishReason + wantEOF bool + wantError string + }{ + { + name: "completed stream finishes", + chunks: append(append([]string{}, textChunks...), completedEvent), + wantFinishReason: fantasy.FinishReasonStop, + }, + { + name: "incomplete stream is terminal", + chunks: append(append([]string{}, textChunks...), incompleteEvent), + wantFinishReason: fantasy.FinishReasonLength, + }, + { + name: "eof before terminal event returns retryable error", + chunks: textChunks, + wantEOF: true, + }, + { + name: "empty stream returns retryable error", + wantEOF: true, + }, + { + name: "failed event returns provider error", + chunks: []string{failedEvent}, + wantError: "boom", + }, + { + name: "malformed stream keeps existing error path", + chunks: []string{responsesSSEEvent("response.output_text.delta", `{not-json}`)}, + wantError: "invalid", + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + t.Parallel() + + sms := newStreamingMockServer() + defer sms.close() + sms.chunks = tt.chunks + + model := newResponsesProvider(t, sms.server.URL) + + stream, err := model.Stream(context.Background(), fantasy.Call{ + Prompt: testPrompt, + }) + require.NoError(t, err) + + parts, err := collectStreamParts(stream) + require.NoError(t, err) + + var finishes []fantasy.StreamPart + var errors []fantasy.StreamPart + for _, part := range parts { + switch part.Type { + case fantasy.StreamPartTypeFinish: + finishes = append(finishes, part) + case fantasy.StreamPartTypeError: + errors = append(errors, part) + } + } + + if tt.wantFinishReason != "" { + require.Len(t, finishes, 1) + require.Empty(t, errors) + require.Equal(t, tt.wantFinishReason, finishes[0].FinishReason) + return + } + + require.Empty(t, finishes) + require.Len(t, errors, 1) + require.Error(t, errors[0].Error) + if tt.wantEOF { + require.ErrorIs(t, errors[0].Error, io.EOF) + require.Contains(t, errors[0].Error.Error(), "terminal event") + } else { + require.NotContains(t, errors[0].Error.Error(), "terminal event") + require.Contains(t, errors[0].Error.Error(), tt.wantError) + } + }) + } +} + +func TestResponsesStreamObject_RequiresTerminalEventBeforeFinish(t *testing.T) { + t.Parallel() + + objectChunks := []string{ + responsesSSEEvent("response.output_text.delta", `{"type":"response.output_text.delta","output_index":0,"content_index":0,"item_id":"msg_01","delta":"{\"name\":\"Alice\"}"}`), + } + completedEvent := responsesSSEEvent("response.completed", `{"type":"response.completed","response":{"id":"resp_01","status":"completed","output":[],"usage":{"input_tokens":1,"output_tokens":1,"total_tokens":2}}}`) + + tests := []struct { + name string + chunks []string + wantFinish bool + }{ + { + name: "completed stream finishes", + chunks: append(append([]string{}, objectChunks...), completedEvent), + wantFinish: true, + }, + { + name: "eof before terminal event returns retryable error", + chunks: objectChunks, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + t.Parallel() + + sms := newStreamingMockServer() + defer sms.close() + sms.chunks = tt.chunks + + model := newResponsesProvider(t, sms.server.URL) + stream, err := model.StreamObject(context.Background(), fantasy.ObjectCall{ + Prompt: fantasy.Prompt{fantasy.NewUserMessage("Generate a person.")}, + Schema: fantasy.Schema{ + Type: "object", + Properties: map[string]*fantasy.Schema{ + "name": {Type: "string"}, + }, + Required: []string{"name"}, + }, + SchemaName: "Person", + }) + require.NoError(t, err) + + parts := collectObjectStreamParts(stream) + + var finishes []fantasy.ObjectStreamPart + var errors []fantasy.ObjectStreamPart + for _, part := range parts { + switch part.Type { + case fantasy.ObjectStreamPartTypeFinish: + finishes = append(finishes, part) + case fantasy.ObjectStreamPartTypeError: + errors = append(errors, part) + } + } + + if tt.wantFinish { + require.Len(t, finishes, 1) + require.Empty(t, errors) + require.Equal(t, fantasy.FinishReasonStop, finishes[0].FinishReason) + return + } + + require.Empty(t, finishes) + require.Len(t, errors, 1) + require.ErrorIs(t, errors[0].Error, io.EOF) + require.Contains(t, errors[0].Error.Error(), "terminal event") + }) + } +} + func TestResponsesStream_WebSearchResponse(t *testing.T) { t.Parallel() @@ -4696,7 +4886,8 @@ func TestComputerUseGenerateRoundTrip_NonImageResult(t *testing.T) { }, } - input, warnings := toResponsesPrompt(prompt, "system", false) + input, warnings, err := toResponsesPrompt(prompt, "system", false) + require.NoError(t, err) // Should warn about non-image result. var foundWarning bool diff --git a/providers/openai/responses_language_model.go b/providers/openai/responses_language_model.go index d6d2db4b3..83ade60e1 100644 --- a/providers/openai/responses_language_model.go +++ b/providers/openai/responses_language_model.go @@ -6,6 +6,7 @@ import ( "encoding/json" "errors" "fmt" + "io" "reflect" "slices" "strings" @@ -1130,6 +1131,26 @@ func mapResponsesFinishReason(reason string, hasFunctionCall bool) fantasy.Finis } } +func responsesStreamClosedBeforeTerminalEventError(err error) error { + if err == nil { + err = io.EOF + } + return fmt.Errorf("openai responses stream closed before terminal event: %w", err) +} + +func responsesFailedStreamError(response responses.Response) error { + if response.Error.Message == "" && response.Error.Code == "" { + return fmt.Errorf("response failed") + } + if response.Error.Code == "" { + return fmt.Errorf("response failed: %s", response.Error.Message) + } + if response.Error.Message == "" { + return fmt.Errorf("response failed (code: %s)", response.Error.Code) + } + return fmt.Errorf("response failed: %s (code: %s)", response.Error.Message, response.Error.Code) +} + func (o responsesLanguageModel) Stream(ctx context.Context, call fantasy.Call) (fantasy.StreamResponse, error) { params, warnings, err := o.prepareParams(call) if err != nil { @@ -1148,6 +1169,7 @@ func (o responsesLanguageModel) Stream(ctx context.Context, call fantasy.Call) ( responseID := "" ongoingToolCalls := make(map[int64]*ongoingToolCall) hasFunctionCall := false + sawTerminalEvent := false activeReasoning := make(map[string]*reasoningState) return func(yield func(fantasy.StreamPart) bool) { @@ -1449,17 +1471,30 @@ func (o responsesLanguageModel) Stream(ctx context.Context, call fantasy.Call) ( } case "response.completed": + sawTerminalEvent = true completed := event.AsResponseCompleted() responseID = completed.Response.ID finishReason = mapResponsesFinishReason(completed.Response.IncompleteDetails.Reason, hasFunctionCall) usage = responsesUsage(completed.Response) case "response.incomplete": + sawTerminalEvent = true incomplete := event.AsResponseIncomplete() responseID = incomplete.Response.ID finishReason = mapResponsesFinishReason(incomplete.Response.IncompleteDetails.Reason, hasFunctionCall) usage = responsesUsage(incomplete.Response) + case "response.failed": + failed := event.AsResponseFailed() + responseID = failed.Response.ID + if !yield(fantasy.StreamPart{ + Type: fantasy.StreamPartTypeError, + Error: responsesFailedStreamError(failed.Response), + }) { + return + } + return + case "error": errorEvent := event.AsError() if !yield(fantasy.StreamPart{ @@ -1480,6 +1515,13 @@ func (o responsesLanguageModel) Stream(ctx context.Context, call fantasy.Call) ( }) return } + if !sawTerminalEvent { + yield(fantasy.StreamPart{ + Type: fantasy.StreamPartTypeError, + Error: responsesStreamClosedBeforeTerminalEventError(err), + }) + return + } yield(fantasy.StreamPart{ Type: fantasy.StreamPartTypeFinish, @@ -1757,6 +1799,7 @@ func (o responsesLanguageModel) streamObjectWithJSONMode(ctx context.Context, ca var responseID string var streamErr error hasFunctionCall := false + sawTerminalEvent := false for stream.Next() { event := stream.Current() @@ -1810,17 +1853,31 @@ func (o responsesLanguageModel) streamObjectWithJSONMode(ctx context.Context, ca } case "response.completed": + sawTerminalEvent = true completed := event.AsResponseCompleted() responseID = completed.Response.ID finishReason = mapResponsesFinishReason(completed.Response.IncompleteDetails.Reason, hasFunctionCall) usage = responsesUsage(completed.Response) case "response.incomplete": + sawTerminalEvent = true incomplete := event.AsResponseIncomplete() responseID = incomplete.Response.ID finishReason = mapResponsesFinishReason(incomplete.Response.IncompleteDetails.Reason, hasFunctionCall) usage = responsesUsage(incomplete.Response) + case "response.failed": + failed := event.AsResponseFailed() + responseID = failed.Response.ID + streamErr = responsesFailedStreamError(failed.Response) + if !yield(fantasy.ObjectStreamPart{ + Type: fantasy.ObjectStreamPartTypeError, + Error: streamErr, + }) { + return + } + return + case "error": errorEvent := event.AsError() streamErr = fmt.Errorf("response error: %s (code: %s)", errorEvent.Message, errorEvent.Code) @@ -1842,6 +1899,13 @@ func (o responsesLanguageModel) streamObjectWithJSONMode(ctx context.Context, ca }) return } + if !sawTerminalEvent { + yield(fantasy.ObjectStreamPart{ + Type: fantasy.ObjectStreamPartTypeError, + Error: responsesStreamClosedBeforeTerminalEventError(err), + }) + return + } // Final validation and emit if streamErr == nil && lastParsedObject != nil { From ee802ad680a6d56f888cd5a908c3b75e09ab2fe0 Mon Sep 17 00:00:00 2001 From: ethan Date: Thu, 7 May 2026 17:56:53 +1000 Subject: [PATCH 3/4] fix(providers/openai): align Responses EOF handling --- providers/openai/openai_test.go | 12 ++++++++++++ providers/openai/responses_language_model.go | 4 ++-- 2 files changed, 14 insertions(+), 2 deletions(-) diff --git a/providers/openai/openai_test.go b/providers/openai/openai_test.go index 5e22eaafd..749ba75f9 100644 --- a/providers/openai/openai_test.go +++ b/providers/openai/openai_test.go @@ -4456,11 +4456,13 @@ func TestResponsesStreamObject_RequiresTerminalEventBeforeFinish(t *testing.T) { responsesSSEEvent("response.output_text.delta", `{"type":"response.output_text.delta","output_index":0,"content_index":0,"item_id":"msg_01","delta":"{\"name\":\"Alice\"}"}`), } completedEvent := responsesSSEEvent("response.completed", `{"type":"response.completed","response":{"id":"resp_01","status":"completed","output":[],"usage":{"input_tokens":1,"output_tokens":1,"total_tokens":2}}}`) + failedEvent := responsesSSEEvent("response.failed", `{"type":"response.failed","response":{"id":"resp_02","status":"failed","error":{"code":"server_error","message":"boom"},"output":[]}}`) tests := []struct { name string chunks []string wantFinish bool + wantError string }{ { name: "completed stream finishes", @@ -4471,6 +4473,11 @@ func TestResponsesStreamObject_RequiresTerminalEventBeforeFinish(t *testing.T) { name: "eof before terminal event returns retryable error", chunks: objectChunks, }, + { + name: "failed event returns provider error", + chunks: []string{failedEvent}, + wantError: "boom", + }, } for _, tt := range tests { @@ -4517,6 +4524,11 @@ func TestResponsesStreamObject_RequiresTerminalEventBeforeFinish(t *testing.T) { require.Empty(t, finishes) require.Len(t, errors, 1) + if tt.wantError != "" { + require.NotContains(t, errors[0].Error.Error(), "terminal event") + require.Contains(t, errors[0].Error.Error(), tt.wantError) + return + } require.ErrorIs(t, errors[0].Error, io.EOF) require.Contains(t, errors[0].Error.Error(), "terminal event") }) diff --git a/providers/openai/responses_language_model.go b/providers/openai/responses_language_model.go index 83ade60e1..713fdf0b8 100644 --- a/providers/openai/responses_language_model.go +++ b/providers/openai/responses_language_model.go @@ -1508,7 +1508,7 @@ func (o responsesLanguageModel) Stream(ctx context.Context, call fantasy.Call) ( } err := stream.Err() - if err != nil { + if err != nil && !errors.Is(err, io.EOF) { yield(fantasy.StreamPart{ Type: fantasy.StreamPartTypeError, Error: toProviderErr(err), @@ -1892,7 +1892,7 @@ func (o responsesLanguageModel) streamObjectWithJSONMode(ctx context.Context, ca } err := stream.Err() - if err != nil { + if err != nil && !errors.Is(err, io.EOF) { yield(fantasy.ObjectStreamPart{ Type: fantasy.ObjectStreamPartTypeError, Error: toProviderErr(err), From c2f479f4b45cf2aba360f440eb13792baf54972b Mon Sep 17 00:00:00 2001 From: ethan Date: Thu, 7 May 2026 18:24:17 +1000 Subject: [PATCH 4/4] test(providers): align stream truncation coverage --- providers/anthropic/anthropic_test.go | 12 +++++++----- providers/openai/openai_test.go | 19 +++++++++++++------ providers/openai/responses_language_model.go | 11 ----------- 3 files changed, 20 insertions(+), 22 deletions(-) diff --git a/providers/anthropic/anthropic_test.go b/providers/anthropic/anthropic_test.go index 5f6a3c168..0f91262a2 100644 --- a/providers/anthropic/anthropic_test.go +++ b/providers/anthropic/anthropic_test.go @@ -523,6 +523,7 @@ func TestStream_RequiresMessageStopBeforeFinish(t *testing.T) { chunks []string wantFinish bool wantEOF bool + wantError string }{ { name: "complete stream finishes", @@ -530,20 +531,20 @@ func TestStream_RequiresMessageStopBeforeFinish(t *testing.T) { wantFinish: true, }, { - name: "eof before message stop returns retryable error", + name: "eof before message_stop returns EOF error", chunks: truncatedTextStream, wantEOF: true, }, { - name: "empty stream returns retryable error", + name: "empty stream returns EOF error", wantEOF: true, }, { - name: "malformed stream keeps existing error path", + name: "error event keeps existing error path", chunks: []string{ - "event: message_start\n", - "data: {not-json}\n\n", + anthropicSSEEvent("error", `{"type":"error","error":{"type":"overloaded_error","message":"stream down"}}`), }, + wantError: "stream down", }, } @@ -589,6 +590,7 @@ func TestStream_RequiresMessageStopBeforeFinish(t *testing.T) { require.Contains(t, errorParts[0].Error.Error(), "message_stop") } else { require.NotContains(t, errorParts[0].Error.Error(), "message_stop") + require.Contains(t, errorParts[0].Error.Error(), tt.wantError) } }) } diff --git a/providers/openai/openai_test.go b/providers/openai/openai_test.go index 749ba75f9..c344e6fad 100644 --- a/providers/openai/openai_test.go +++ b/providers/openai/openai_test.go @@ -4360,6 +4360,7 @@ func TestResponsesStream_RequiresTerminalEventBeforeFinish(t *testing.T) { completedEvent := responsesSSEEvent("response.completed", `{"type":"response.completed","response":{"id":"resp_01","status":"completed","output":[],"usage":{"input_tokens":1,"output_tokens":1,"total_tokens":2}}}`) incompleteEvent := responsesSSEEvent("response.incomplete", `{"type":"response.incomplete","response":{"id":"resp_02","status":"incomplete","output":[],"incomplete_details":{"reason":"max_output_tokens"},"usage":{"input_tokens":1,"output_tokens":1,"total_tokens":2}}}`) failedEvent := responsesSSEEvent("response.failed", `{"type":"response.failed","response":{"id":"resp_03","status":"failed","error":{"code":"server_error","message":"boom"},"output":[]}}`) + errorEvent := responsesSSEEvent("error", `{"type":"error","message":"stream down","code":"server_error"}`) tests := []struct { name string @@ -4379,12 +4380,12 @@ func TestResponsesStream_RequiresTerminalEventBeforeFinish(t *testing.T) { wantFinishReason: fantasy.FinishReasonLength, }, { - name: "eof before terminal event returns retryable error", + name: "eof before terminal event returns EOF error", chunks: textChunks, wantEOF: true, }, { - name: "empty stream returns retryable error", + name: "empty stream returns EOF error", wantEOF: true, }, { @@ -4393,9 +4394,9 @@ func TestResponsesStream_RequiresTerminalEventBeforeFinish(t *testing.T) { wantError: "boom", }, { - name: "malformed stream keeps existing error path", - chunks: []string{responsesSSEEvent("response.output_text.delta", `{not-json}`)}, - wantError: "invalid", + name: "error event keeps existing error path", + chunks: []string{errorEvent}, + wantError: "stream down", }, } @@ -4457,6 +4458,7 @@ func TestResponsesStreamObject_RequiresTerminalEventBeforeFinish(t *testing.T) { } completedEvent := responsesSSEEvent("response.completed", `{"type":"response.completed","response":{"id":"resp_01","status":"completed","output":[],"usage":{"input_tokens":1,"output_tokens":1,"total_tokens":2}}}`) failedEvent := responsesSSEEvent("response.failed", `{"type":"response.failed","response":{"id":"resp_02","status":"failed","error":{"code":"server_error","message":"boom"},"output":[]}}`) + errorEvent := responsesSSEEvent("error", `{"type":"error","message":"stream down","code":"server_error"}`) tests := []struct { name string @@ -4470,7 +4472,7 @@ func TestResponsesStreamObject_RequiresTerminalEventBeforeFinish(t *testing.T) { wantFinish: true, }, { - name: "eof before terminal event returns retryable error", + name: "eof before terminal event returns EOF error", chunks: objectChunks, }, { @@ -4478,6 +4480,11 @@ func TestResponsesStreamObject_RequiresTerminalEventBeforeFinish(t *testing.T) { chunks: []string{failedEvent}, wantError: "boom", }, + { + name: "error event keeps existing error path", + chunks: []string{errorEvent}, + wantError: "stream down", + }, } for _, tt := range tests { diff --git a/providers/openai/responses_language_model.go b/providers/openai/responses_language_model.go index 713fdf0b8..077c27325 100644 --- a/providers/openai/responses_language_model.go +++ b/providers/openai/responses_language_model.go @@ -1139,15 +1139,6 @@ func responsesStreamClosedBeforeTerminalEventError(err error) error { } func responsesFailedStreamError(response responses.Response) error { - if response.Error.Message == "" && response.Error.Code == "" { - return fmt.Errorf("response failed") - } - if response.Error.Code == "" { - return fmt.Errorf("response failed: %s", response.Error.Message) - } - if response.Error.Message == "" { - return fmt.Errorf("response failed (code: %s)", response.Error.Code) - } return fmt.Errorf("response failed: %s (code: %s)", response.Error.Message, response.Error.Code) } @@ -1486,7 +1477,6 @@ func (o responsesLanguageModel) Stream(ctx context.Context, call fantasy.Call) ( case "response.failed": failed := event.AsResponseFailed() - responseID = failed.Response.ID if !yield(fantasy.StreamPart{ Type: fantasy.StreamPartTypeError, Error: responsesFailedStreamError(failed.Response), @@ -1868,7 +1858,6 @@ func (o responsesLanguageModel) streamObjectWithJSONMode(ctx context.Context, ca case "response.failed": failed := event.AsResponseFailed() - responseID = failed.Response.ID streamErr = responsesFailedStreamError(failed.Response) if !yield(fantasy.ObjectStreamPart{ Type: fantasy.ObjectStreamPartTypeError,