diff --git a/CHANGELOG.md b/CHANGELOG.md index 003d84d..d4eaeff 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -8,6 +8,13 @@ with the [Rust 0.x convention](https://doc.rust-lang.org/cargo/reference/semver. breaking changes increment the minor version (0.2 → 0.3), additive changes increment the patch version. +## [Unreleased] + +### Fixed + +- Connect streaming clients now report an error when a response body ends + without the required END_STREAM envelope ([#140]). + ## [0.6.1] - 2026-05-27 A patch release focused on the robustness of the streaming request and @@ -46,6 +53,7 @@ now 1.6. [#131]: https://github.com/anthropics/connect-rust/pull/131 [#132]: https://github.com/anthropics/connect-rust/pull/132 [#133]: https://github.com/anthropics/connect-rust/pull/133 +[#140]: https://github.com/anthropics/connect-rust/issues/140 ## [0.6.0] - 2026-05-20 diff --git a/connectrpc/src/client/mod.rs b/connectrpc/src/client/mod.rs index 9694e53..b03deca 100644 --- a/connectrpc/src/client/mod.rs +++ b/connectrpc/src/client/mod.rs @@ -1978,6 +1978,11 @@ where if !self.poll_body().await? { // Body exhausted — check buffer for remaining trailer data self.done = true; + if matches!(self.protocol, Protocol::Connect) { + return Err(ConnectError::unavailable( + "Connect streaming response ended without END_STREAM envelope", + )); + } if matches!(self.protocol, Protocol::GrpcWeb) && !self.buf.is_empty() && self.buf[0] & 0x80 != 0 @@ -3651,6 +3656,48 @@ mod tests { assert_debug::(); } + #[tokio::test] + async fn connect_server_stream_truncated_after_data_errors() { + use buffa::Message; + use buffa_types::google::protobuf::__buffa::view::StringValueView; + use buffa_types::google::protobuf::StringValue; + + let body = Full::new(Envelope::data(StringValue::from("hello").encode_to_bytes()).encode()); + let mut stream: ServerStream<_, StringValueView<'static>> = ServerStream { + headers: http::HeaderMap::new(), + body, + buf: BytesMut::new(), + encoding: None, + compression: CompressionRegistry::new(), + codec_format: CodecFormat::Proto, + protocol: Protocol::Connect, + max_message_size: Some(1024), + deadline: None, + trailers: None, + error: None, + done: false, + _phantom: PhantomData, + }; + + let msg = stream + .message() + .await + .expect("first message should decode") + .expect("stream should yield the data envelope before EOF"); + assert_eq!(msg.value, "hello"); + + let err = match stream.message().await { + Err(err) => err, + Ok(Some(_)) => panic!("truncated stream unexpectedly yielded another message"), + Ok(None) => panic!("truncated stream ended cleanly without END_STREAM"), + }; + assert_eq!(err.code, ErrorCode::Unavailable); + assert!( + err.to_string().contains("END_STREAM"), + "unexpected error: {err}" + ); + } + #[cfg(feature = "client")] #[tokio::test] async fn http_client_plaintext_rejects_https() {