Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
149 changes: 146 additions & 3 deletions connectrpc/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2627,6 +2627,29 @@ impl BodyReader {
pending_trailing_data_warn,
};
}

/// Handle a transport-level request body error.
///
/// While the reader is still decoding request messages, the failure is
/// surfaced to the handler stream as an internal [`ConnectError`] so a
/// truncated or failed body is not mistaken for a complete client stream.
/// Once the reader is only draining trailing bytes (after END_STREAM, a
/// decode error, or the handler dropping the request stream), the error is
/// diagnostic-only — the handler has already seen the terminal outcome.
async fn on_body_error(&mut self, err: impl std::fmt::Display + Send) {
tracing::debug!(error = %err, "request body error, stopping reader");

// Only the decoding path allocates the message; the drain path logs
// via `Display` above and returns without touching the handler.
if matches!(self.mode, ReadMode::Decoding) {
let _ = self
.tx
.send(Err(ConnectError::internal(format!(
"failed to read request body: {err}"
))))
.await;
}
}
}

/// Spawn a background task that reads envelope-framed messages from an HTTP
Expand Down Expand Up @@ -2677,9 +2700,7 @@ where
}
}
Some(Err(e)) => {
// Transport-level body error — the handler just sees the
// stream end; record the cause for diagnostics.
tracing::debug!(error = %e, "request body error, stopping reader");
reader.on_body_error(e).await;
break;
}
None => {
Expand Down Expand Up @@ -4523,4 +4544,126 @@ mod tests {
(expected at most {max_expected})"
);
}

/// Test body that yields a fixed sequence of data frames and then fails
/// with a single transport-level body error.
struct ErrorAfterFramesBody {
frames: std::collections::VecDeque<Bytes>,
errored: bool,
}

impl Body for ErrorAfterFramesBody {
type Data = Bytes;
type Error = std::io::Error;

fn poll_frame(
self: Pin<&mut Self>,
_cx: &mut TaskContext<'_>,
) -> Poll<Option<Result<Frame<Self::Data>, Self::Error>>> {
let this = self.get_mut();

if let Some(data) = this.frames.pop_front() {
return Poll::Ready(Some(Ok(Frame::data(data))));
}

if !this.errored {
this.errored = true;
return Poll::Ready(Some(Err(std::io::Error::new(
std::io::ErrorKind::UnexpectedEof,
"simulated body read failure",
))));
}

Poll::Ready(None)
}
}

/// A request body that yields a valid streaming message and then fails at
/// the transport level (while the reader is still decoding) must surface
/// the failure to the handler as an internal error — not a clean EOF that
/// is indistinguishable from a complete client stream.
#[tokio::test]
async fn test_body_reader_surfaces_body_error_while_decoding() {
let mut frames = std::collections::VecDeque::new();
frames.push_back(Envelope::data(Bytes::from_static(b"hello")).encode());

let body = ErrorAfterFramesBody {
frames,
errored: false,
};

let (mut request_stream, reader_task) = spawn_body_reader(
body,
DEFAULT_MAX_MESSAGE_SIZE,
None,
Arc::new(CompressionRegistry::new()),
);

let first = request_stream
.next()
.await
.expect("one message before the body error")
.expect("message decodes before the body error");
assert_eq!(&first[..], b"hello");

let err = request_stream
.next()
.await
.expect("body error must be delivered")
.expect_err("body error must not be converted to clean EOF");

assert_eq!(err.code, crate::error::ErrorCode::Internal);
assert!(
err.message
.as_deref()
.unwrap_or_default()
.contains("failed to read request body: simulated body read failure"),
"unexpected error message: {:?}",
err.message
);

assert!(
request_stream.next().await.is_none(),
"no further items after the body error"
);

reader_task
.expect("tests run inside a tokio runtime")
.await
.expect("reader task must not panic");
}

/// Once the reader has finished decoding (here: a clean END_STREAM has put
/// it in drain mode), a subsequent transport-level body error is
/// diagnostic-only — the handler already observed the terminal end of the
/// stream and must not then receive a spurious error.
#[tokio::test]
async fn test_body_reader_body_error_after_end_stream_is_suppressed() {
let mut frames = std::collections::VecDeque::new();
frames.push_back(Envelope::end_stream(Bytes::from_static(b"{}")).encode());

let body = ErrorAfterFramesBody {
frames,
errored: false,
};

let (mut request_stream, reader_task) = spawn_body_reader(
body,
DEFAULT_MAX_MESSAGE_SIZE,
None,
Arc::new(CompressionRegistry::new()),
);

// END_STREAM ended the stream cleanly; the body error that follows is
// suppressed, so the handler sees a clean end with no error item.
assert!(
request_stream.next().await.is_none(),
"a body error after END_STREAM must not surface to the handler"
);

reader_task
.expect("tests run inside a tokio runtime")
.await
.expect("reader task must not panic");
}
}