Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,13 @@ with the [Rust 0.x convention](https://doc.rust-lang.org/cargo/reference/semver.
breaking changes increment the minor version (0.2 → 0.3), additive changes
increment the patch version.

## [Unreleased]

### Fixed

- Connect streaming clients now report an error when a response body ends
without the required END_STREAM envelope ([#140]).

## [0.6.1] - 2026-05-27

A patch release focused on the robustness of the streaming request and
Expand Down Expand Up @@ -46,6 +53,7 @@ now 1.6.
[#131]: https://github.com/anthropics/connect-rust/pull/131
[#132]: https://github.com/anthropics/connect-rust/pull/132
[#133]: https://github.com/anthropics/connect-rust/pull/133
[#140]: https://github.com/anthropics/connect-rust/issues/140

## [0.6.0] - 2026-05-20

Expand Down
47 changes: 47 additions & 0 deletions connectrpc/src/client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1978,6 +1978,11 @@ where
if !self.poll_body().await? {
// Body exhausted — check buffer for remaining trailer data
self.done = true;
if matches!(self.protocol, Protocol::Connect) {
return Err(ConnectError::unavailable(
"Connect streaming response ended without END_STREAM envelope",
));
}
if matches!(self.protocol, Protocol::GrpcWeb)
&& !self.buf.is_empty()
&& self.buf[0] & 0x80 != 0
Expand Down Expand Up @@ -3651,6 +3656,48 @@ mod tests {
assert_debug::<HttpClient>();
}

#[tokio::test]
async fn connect_server_stream_truncated_after_data_errors() {
use buffa::Message;
use buffa_types::google::protobuf::__buffa::view::StringValueView;
use buffa_types::google::protobuf::StringValue;

let body = Full::new(Envelope::data(StringValue::from("hello").encode_to_bytes()).encode());
let mut stream: ServerStream<_, StringValueView<'static>> = ServerStream {
headers: http::HeaderMap::new(),
body,
buf: BytesMut::new(),
encoding: None,
compression: CompressionRegistry::new(),
codec_format: CodecFormat::Proto,
protocol: Protocol::Connect,
max_message_size: Some(1024),
deadline: None,
trailers: None,
error: None,
done: false,
_phantom: PhantomData,
};

let msg = stream
.message()
.await
.expect("first message should decode")
.expect("stream should yield the data envelope before EOF");
assert_eq!(msg.value, "hello");

let err = match stream.message().await {
Err(err) => err,
Ok(Some(_)) => panic!("truncated stream unexpectedly yielded another message"),
Ok(None) => panic!("truncated stream ended cleanly without END_STREAM"),
};
assert_eq!(err.code, ErrorCode::Unavailable);
assert!(
err.to_string().contains("END_STREAM"),
"unexpected error: {err}"
);
}

#[cfg(feature = "client")]
#[tokio::test]
async fn http_client_plaintext_rejects_https() {
Expand Down