From b7e8f4f93a616f0400b83e1a6037e4a15136596c Mon Sep 17 00:00:00 2001 From: Steven Zimmerman <15812269+EffortlessSteven@users.noreply.github.com> Date: Fri, 5 Jun 2026 01:32:53 -0400 Subject: [PATCH] service: surface streaming request body read errors to handlers `spawn_body_reader` logged `Body::poll_frame` errors and stopped the reader, so the handler-facing request stream saw a clean EOF when the HTTP request body failed mid-decode. A truncated or aborted client-streaming/bidi request was then indistinguishable from a complete client stream. Forward transport-level body read failures to the handler stream as `ConnectError::internal` while the reader is still decoding request messages (`BodyReader::on_body_error`), matching the unary body-read path. Once the reader is only draining trailing bytes (after END_STREAM, a decode error, or the handler dropping the request stream), the error stays diagnostic-only, preserving the END_STREAM-as-terminal drain contract. Adds regression coverage for both the surfaced (decoding) and suppressed (draining) paths. --- connectrpc/src/service.rs | 149 +++++++++++++++++++++++++++++++++++++- 1 file changed, 146 insertions(+), 3 deletions(-) diff --git a/connectrpc/src/service.rs b/connectrpc/src/service.rs index c809e10..6dffb74 100644 --- a/connectrpc/src/service.rs +++ b/connectrpc/src/service.rs @@ -2627,6 +2627,29 @@ impl BodyReader { pending_trailing_data_warn, }; } + + /// Handle a transport-level request body error. + /// + /// While the reader is still decoding request messages, the failure is + /// surfaced to the handler stream as an internal [`ConnectError`] so a + /// truncated or failed body is not mistaken for a complete client stream. + /// Once the reader is only draining trailing bytes (after END_STREAM, a + /// decode error, or the handler dropping the request stream), the error is + /// diagnostic-only — the handler has already seen the terminal outcome. + async fn on_body_error(&mut self, err: impl std::fmt::Display + Send) { + tracing::debug!(error = %err, "request body error, stopping reader"); + + // Only the decoding path allocates the message; the drain path logs + // via `Display` above and returns without touching the handler. + if matches!(self.mode, ReadMode::Decoding) { + let _ = self + .tx + .send(Err(ConnectError::internal(format!( + "failed to read request body: {err}" + )))) + .await; + } + } } /// Spawn a background task that reads envelope-framed messages from an HTTP @@ -2677,9 +2700,7 @@ where } } Some(Err(e)) => { - // Transport-level body error — the handler just sees the - // stream end; record the cause for diagnostics. - tracing::debug!(error = %e, "request body error, stopping reader"); + reader.on_body_error(e).await; break; } None => { @@ -4523,4 +4544,126 @@ mod tests { (expected at most {max_expected})" ); } + + /// Test body that yields a fixed sequence of data frames and then fails + /// with a single transport-level body error. + struct ErrorAfterFramesBody { + frames: std::collections::VecDeque, + errored: bool, + } + + impl Body for ErrorAfterFramesBody { + type Data = Bytes; + type Error = std::io::Error; + + fn poll_frame( + self: Pin<&mut Self>, + _cx: &mut TaskContext<'_>, + ) -> Poll, Self::Error>>> { + let this = self.get_mut(); + + if let Some(data) = this.frames.pop_front() { + return Poll::Ready(Some(Ok(Frame::data(data)))); + } + + if !this.errored { + this.errored = true; + return Poll::Ready(Some(Err(std::io::Error::new( + std::io::ErrorKind::UnexpectedEof, + "simulated body read failure", + )))); + } + + Poll::Ready(None) + } + } + + /// A request body that yields a valid streaming message and then fails at + /// the transport level (while the reader is still decoding) must surface + /// the failure to the handler as an internal error — not a clean EOF that + /// is indistinguishable from a complete client stream. + #[tokio::test] + async fn test_body_reader_surfaces_body_error_while_decoding() { + let mut frames = std::collections::VecDeque::new(); + frames.push_back(Envelope::data(Bytes::from_static(b"hello")).encode()); + + let body = ErrorAfterFramesBody { + frames, + errored: false, + }; + + let (mut request_stream, reader_task) = spawn_body_reader( + body, + DEFAULT_MAX_MESSAGE_SIZE, + None, + Arc::new(CompressionRegistry::new()), + ); + + let first = request_stream + .next() + .await + .expect("one message before the body error") + .expect("message decodes before the body error"); + assert_eq!(&first[..], b"hello"); + + let err = request_stream + .next() + .await + .expect("body error must be delivered") + .expect_err("body error must not be converted to clean EOF"); + + assert_eq!(err.code, crate::error::ErrorCode::Internal); + assert!( + err.message + .as_deref() + .unwrap_or_default() + .contains("failed to read request body: simulated body read failure"), + "unexpected error message: {:?}", + err.message + ); + + assert!( + request_stream.next().await.is_none(), + "no further items after the body error" + ); + + reader_task + .expect("tests run inside a tokio runtime") + .await + .expect("reader task must not panic"); + } + + /// Once the reader has finished decoding (here: a clean END_STREAM has put + /// it in drain mode), a subsequent transport-level body error is + /// diagnostic-only — the handler already observed the terminal end of the + /// stream and must not then receive a spurious error. + #[tokio::test] + async fn test_body_reader_body_error_after_end_stream_is_suppressed() { + let mut frames = std::collections::VecDeque::new(); + frames.push_back(Envelope::end_stream(Bytes::from_static(b"{}")).encode()); + + let body = ErrorAfterFramesBody { + frames, + errored: false, + }; + + let (mut request_stream, reader_task) = spawn_body_reader( + body, + DEFAULT_MAX_MESSAGE_SIZE, + None, + Arc::new(CompressionRegistry::new()), + ); + + // END_STREAM ended the stream cleanly; the body error that follows is + // suppressed, so the handler sees a clean end with no error item. + assert!( + request_stream.next().await.is_none(), + "a body error after END_STREAM must not surface to the handler" + ); + + reader_task + .expect("tests run inside a tokio runtime") + .await + .expect("reader task must not panic"); + } }