From e0b82f0ac973c1900c8d6e0eb235ed2fc2f92821 Mon Sep 17 00:00:00 2001 From: Jeet Dekivadia Date: Sat, 30 May 2026 01:06:44 -0700 Subject: [PATCH 1/3] Include body receipt in request deadlines Signed-off-by: Jeet Dekivadia --- connectrpc/src/deadline.rs | 6 +- connectrpc/src/service.rs | 265 +++++++++++++++++++++++-------------- docs/guide.md | 3 +- 3 files changed, 168 insertions(+), 106 deletions(-) diff --git a/connectrpc/src/deadline.rs b/connectrpc/src/deadline.rs index 607af54..54bb223 100644 --- a/connectrpc/src/deadline.rs +++ b/connectrpc/src/deadline.rs @@ -36,9 +36,9 @@ use crate::handler::BoxStream; /// mid-write — or an absurdly long one — holding server resources for /// hours. `DeadlinePolicy` clamps the client value to a server-controlled /// range, applies a server-side default when the client asserts nothing, -/// and can extend enforcement to streaming bodies (whose initial setup is -/// already bounded by the server's `tokio::time::timeout`, but whose item -/// stream is unbounded by default). +/// and can extend enforcement to streaming response bodies (whose request +/// receipt and initial setup are already bounded by the server timeout, but +/// whose item stream is unbounded by default). /// /// Construct via [`DeadlinePolicy::new`] and the `with_*` builders; the /// field set is `#[non_exhaustive]` so struct-literal construction is not diff --git a/connectrpc/src/service.rs b/connectrpc/src/service.rs index c809e10..a43083a 100644 --- a/connectrpc/src/service.rs +++ b/connectrpc/src/service.rs @@ -31,6 +31,7 @@ //! ``` use std::convert::Infallible; +use std::future::Future; use std::ops::ControlFlow; use std::pin::Pin; use std::sync::Arc; @@ -304,6 +305,39 @@ where } } +/// Run a request future within an absolute server-side deadline. +/// +/// Callers compute the deadline once after parsing headers so the same budget +/// covers body receipt and handler execution. +async fn with_request_deadline( + deadline: Option, + future: F, +) -> Result +where + F: Future>, +{ + match deadline { + Some(deadline) => tokio::time::timeout_at(tokio::time::Instant::from_std(deadline), future) + .await + .map_err(|_| ConnectError::deadline_exceeded("request timeout"))?, + None => future.await, + } +} + +fn absolute_deadline(timeout: Option) -> Option { + timeout.and_then(|t| std::time::Instant::now().checked_add(t)) +} + +fn deadline_from_headers( + headers: &http::HeaderMap, + protocol: Protocol, + path: &str, + deadline_policy: &DeadlinePolicy, +) -> Option { + let timeout = RequestMetadata::from_headers(headers, protocol).timeout; + absolute_deadline(deadline_policy.moderate(timeout, path)) +} + /// Decode the message from GET request query parameters. /// /// The message may be: @@ -1184,11 +1218,11 @@ impl ConnectRpcService { /// /// The default [`DeadlinePolicy::new`] is a no-op: the client's /// `Connect-Timeout-Ms` / `grpc-timeout` header is honored verbatim - /// and streaming bodies are not bounded by it (only the time-to- - /// first-response is). Set a policy to clamp client values to an - /// operationally sane range, supply a default when the client - /// asserts nothing, or extend enforcement to streaming bodies. See - /// [`DeadlinePolicy`] for details and recommendations. + /// for request receipt and handler execution, but streaming response + /// bodies are not bounded by it. Set a policy to clamp client values + /// to an operationally sane range, supply a default when the client + /// asserts nothing, or extend enforcement to streaming response + /// bodies. See [`DeadlinePolicy`] for details and recommendations. #[must_use] pub fn with_deadline_policy(mut self, policy: DeadlinePolicy) -> Self { self.deadline_policy = policy; @@ -1444,12 +1478,27 @@ where .and_then(|v| v.to_str().ok()) .unwrap_or("") .to_owned(); + let timeout_protocol = if ct.starts_with("application/grpc-web") { + Protocol::GrpcWeb + } else if ct.starts_with("application/grpc") { + Protocol::Grpc + } else { + Protocol::Connect + }; + let deadline = deadline_from_headers( + req.headers(), + timeout_protocol, + req.uri().path(), + deadline_policy, + ); // Drain the request body to avoid broken pipe on HTTP/1.1. - // Use Limited to cap the drain at max_request_body_size. let (_parts, body) = req.into_parts(); - let limited = http_body_util::Limited::new(body, limits.max_request_body_size); - let _ = limited.collect().await; + let _ = with_request_deadline( + deadline, + collect_body_limited(body, limits.max_request_body_size), + ) + .await; if ct.starts_with("application/grpc-web") { let err = ConnectError::internal("unsupported content type"); @@ -1484,9 +1533,18 @@ where "gRPC-Web text mode (application/grpc-web-text) is not supported", ); // Drain the body to preserve HTTP/1.1 keep-alive for the error response. + let deadline = deadline_from_headers( + req.headers(), + rp.protocol, + req.uri().path(), + deadline_policy, + ); let (_parts, body) = req.into_parts(); - let limited = http_body_util::Limited::new(body, limits.max_request_body_size); - let _ = limited.collect().await; + let _ = with_request_deadline( + deadline, + collect_body_limited(body, limits.max_request_body_size), + ) + .await; let response = grpc_error_response(&err, Protocol::GrpcWeb, rp.codec_format); return Ok(response.map(ConnectRpcBody::Streaming)); } @@ -1574,6 +1632,7 @@ where // Extract metadata from headers using the Connect protocol (unary is always Connect) let mut metadata = RequestMetadata::from_headers(req.headers(), Protocol::Connect); metadata.timeout = deadline_policy.moderate(metadata.timeout, &path); + let deadline = absolute_deadline(metadata.timeout); // Split request to consume the body let (parts, body) = req.into_parts(); @@ -1584,7 +1643,11 @@ where // "broken pipe" errors because the client is still sending data. // collect_body_limited bounds allocation *during* the read, so an // oversized body is rejected before it is fully buffered. - let post_body = collect_body_limited(body, limits.max_request_body_size).await?; + let post_body = with_request_deadline( + deadline, + collect_body_limited(body, limits.max_request_body_size), + ) + .await?; // Look up the method descriptor to check idempotency. // (Non-unary kinds are allowed through — they'll error at the dispatch call.) @@ -1674,11 +1737,6 @@ where }; // Create handler context with the request headers from metadata. - // Compute the absolute deadline once so handlers can propagate it; - // the tokio::time::timeout wrapper below still uses the relative duration. - let deadline = metadata - .timeout - .and_then(|t| std::time::Instant::now().checked_add(t)); let ctx = RequestContext::new(metadata.headers) .with_deadline(deadline) .with_extensions(extensions) @@ -1689,17 +1747,12 @@ where // and Spec::procedure. .with_path(format!("/{path}")); - // Call the handler with the appropriate codec format, applying timeout if specified - let resp: EncodedResponse = if let Some(timeout) = metadata.timeout { - tokio::time::timeout( - timeout, - call_unary_intercepted(dispatcher, interceptors, &path, ctx, body, codec_format), - ) - .await - .map_err(|_| ConnectError::deadline_exceeded("request timeout"))? - } else { - call_unary_intercepted(dispatcher, interceptors, &path, ctx, body, codec_format).await - }?; + // Call the handler with the appropriate codec format. + let resp: EncodedResponse = with_request_deadline( + deadline, + call_unary_intercepted(dispatcher, interceptors, &path, ctx, body, codec_format), + ) + .await?; // Negotiate response compression let response_encoding = compression.negotiate_encoding( @@ -1774,6 +1827,11 @@ where B: Body + Send + 'static, B::Error: std::error::Error + Send + Sync + 'static, { + // Extract metadata before any body reads, including error-path drains. + let mut metadata = RequestMetadata::from_headers(req.headers(), protocol); + metadata.timeout = deadline_policy.moderate(metadata.timeout, path); + let deadline = absolute_deadline(metadata.timeout); + // Helper to build a gRPC trailers-only error response using GrpcUnaryBody. let grpc_unary_error = |err: &ConnectError| -> Response { let grpc_trailers = build_grpc_trailers(Some(err), err.trailers()); @@ -1819,15 +1877,14 @@ where let err = ConnectError::internal(format!("invalid method for gRPC: {}", req.method())); // Drain the request body to avoid broken pipe on HTTP/1.1. let (_parts, body) = req.into_parts(); - let limited = http_body_util::Limited::new(body, limits.max_request_body_size); - let _ = limited.collect().await; + let _ = with_request_deadline( + deadline, + collect_body_limited(body, limits.max_request_body_size), + ) + .await; return grpc_unary_error(&err); } - // Extract metadata - let mut metadata = RequestMetadata::from_headers(req.headers(), protocol); - metadata.timeout = deadline_policy.moderate(metadata.timeout, path); - // Validate request compression if let Some(ref encoding) = metadata.streaming_encoding && encoding != "identity" @@ -1836,8 +1893,11 @@ where let err = ConnectError::unimplemented(format!("unsupported compression: {encoding}")); // Drain the request body to avoid broken pipe on HTTP/1.1. let (_parts, body) = req.into_parts(); - let limited = http_body_util::Limited::new(body, limits.max_request_body_size); - let _ = limited.collect().await; + let _ = with_request_deadline( + deadline, + collect_body_limited(body, limits.max_request_body_size), + ) + .await; return grpc_unary_error(&err); } @@ -1845,7 +1905,12 @@ where // read, so an oversized body is rejected before it is fully buffered. let (parts, body) = req.into_parts(); let extensions = parts.extensions; - let post_body = match collect_body_limited(body, limits.max_request_body_size).await { + let post_body = match with_request_deadline( + deadline, + collect_body_limited(body, limits.max_request_body_size), + ) + .await + { Ok(bytes) => bytes, Err(err) => return grpc_unary_error(&err), }; @@ -1898,9 +1963,6 @@ where }; // Create handler context - let deadline = metadata - .timeout - .and_then(|t| std::time::Instant::now().checked_add(t)); let ctx = RequestContext::new(metadata.headers) .with_deadline(deadline) .with_extensions(extensions) @@ -1908,28 +1970,9 @@ where .with_protocol(Some(protocol)) .with_path(format!("/{path}")); - // Call the handler with timeout if configured - let handler_result = if let Some(timeout) = metadata.timeout { - match tokio::time::timeout( - timeout, - call_unary_intercepted( - dispatcher, - interceptors, - path, - ctx, - request_body, - codec_format, - ), - ) - .await - { - Ok(result) => result, - Err(_) => { - let err = ConnectError::deadline_exceeded("request timeout"); - return grpc_unary_error(&err); - } - } - } else { + // Call the handler with the same deadline used while receiving the body. + let resp = match with_request_deadline( + deadline, call_unary_intercepted( dispatcher, interceptors, @@ -1937,11 +1980,10 @@ where ctx, request_body, codec_format, - ) - .await - }; - - let resp = match handler_result { + ), + ) + .await + { Ok(result) => result, Err(e) => return grpc_unary_error(&e), }; @@ -2045,20 +2087,24 @@ where let path = req.uri().path(); let path = path.strip_prefix('/').unwrap_or(path).to_owned(); + // Extract metadata before any body reads, including error-path drains. + let mut metadata = RequestMetadata::from_headers(req.headers(), protocol); + metadata.timeout = deadline_policy.moderate(metadata.timeout, &path); + // gRPC and gRPC-Web require POST method if matches!(protocol, Protocol::Grpc | Protocol::GrpcWeb) && req.method() != Method::POST { let err = ConnectError::internal(format!("invalid method for gRPC: {}", req.method())); // Drain the request body to avoid broken pipe on HTTP/1.1. let (_parts, body) = req.into_parts(); - let limited = http_body_util::Limited::new(body, limits.max_request_body_size); - let _ = limited.collect().await; + let deadline = absolute_deadline(metadata.timeout); + let _ = with_request_deadline( + deadline, + collect_body_limited(body, limits.max_request_body_size), + ) + .await; return streaming_error_response(&err, protocol, codec_format); } - // Extract metadata from headers using the detected protocol - let mut metadata = RequestMetadata::from_headers(req.headers(), protocol); - metadata.timeout = deadline_policy.moderate(metadata.timeout, &path); - // Validate request compression is supported (if specified) if let Some(ref encoding) = metadata.streaming_encoding && encoding != "identity" @@ -2067,8 +2113,12 @@ where let err = ConnectError::unimplemented(format!("unsupported compression: {encoding}")); // Drain the request body to avoid broken pipe on HTTP/1.1. let (_parts, body) = req.into_parts(); - let limited = http_body_util::Limited::new(body, limits.max_request_body_size); - let _ = limited.collect().await; + let deadline = absolute_deadline(metadata.timeout); + let _ = with_request_deadline( + deadline, + collect_body_limited(body, limits.max_request_body_size), + ) + .await; return streaming_error_response(&err, protocol, codec_format); } @@ -2118,9 +2168,16 @@ where .await; } + let deadline = absolute_deadline(metadata.timeout); + // For server streaming (or errors), read the full body (single envelope expected). // collect_body_limited bounds allocation during the read. - let post_body = match collect_body_limited(body, limits.max_request_body_size).await { + let post_body = match with_request_deadline( + deadline, + collect_body_limited(body, limits.max_request_body_size), + ) + .await + { Ok(bytes) => bytes, Err(err) => return streaming_error_response(&err, protocol, codec_format), }; @@ -2206,9 +2263,6 @@ where }; // Create handler context with the request headers from metadata - let deadline = metadata - .timeout - .and_then(|t| std::time::Instant::now().checked_add(t)); let ctx = RequestContext::new(metadata.headers) .with_deadline(deadline) .with_extensions(extensions) @@ -2228,18 +2282,7 @@ where request_body, codec_format, ); - let handler_result = if let Some(timeout) = metadata.timeout { - match tokio::time::timeout(timeout, fut).await { - Ok(result) => result, - Err(_) => { - let err = ConnectError::deadline_exceeded("request timeout"); - return streaming_error_response(&err, protocol, codec_format); - } - } - } else { - fut.await - }; - match handler_result { + match with_request_deadline(deadline, fut).await { Ok(result) => result, Err(e) => return streaming_error_response(&e, protocol, codec_format), } @@ -2253,18 +2296,7 @@ where request_body, codec_format, ); - let handler_result = if let Some(timeout) = metadata.timeout { - match tokio::time::timeout(timeout, fut).await { - Ok(result) => result, - Err(_) => { - let err = ConnectError::deadline_exceeded("request timeout"); - return streaming_error_response(&err, protocol, codec_format); - } - } - } else { - fut.await - }; - match handler_result { + match with_request_deadline(deadline, fut).await { // Wrap single response in a one-item stream Ok(r) => r.map_body(|bytes| -> BoxStream> { Box::pin(futures::stream::once(async move { Ok(bytes) })) @@ -3149,6 +3181,35 @@ mod tests { assert!(err.message.as_deref().unwrap().contains("limit 5")); } + #[tokio::test(start_paused = true)] + async fn test_unary_deadline_bounds_stalled_body_collection() { + let router = Router::new(); + let body = http_body_util::StreamBody::new(futures::stream::pending::< + Result, std::io::Error>, + >()); + let req = Request::builder() + .method(Method::POST) + .uri("/svc/Method") + .header(header::CONTENT_TYPE, "application/proto") + .body(body) + .unwrap(); + let deadline_policy = DeadlinePolicy::new().with_default_timeout(Duration::from_millis(1)); + + let err = handle_unary_request( + &router, + req, + Limits::default(), + Arc::new(CompressionRegistry::new()), + &CompressionPolicy::default(), + &deadline_policy, + &[], + ) + .await + .err() + .expect("stalled body must exceed the request deadline"); + assert_eq!(err.code, crate::error::ErrorCode::DeadlineExceeded); + } + #[test] fn test_parse_get_query_params_basic() { let params = parse_get_query_params(Some("message=%7B%7D&encoding=json&connect=v1")) diff --git a/docs/guide.md b/docs/guide.md index a9b55a5..ddefcd9 100644 --- a/docs/guide.md +++ b/docs/guide.md @@ -1010,7 +1010,8 @@ Why each knob: stays busy. Set it to your longest acceptable handler runtime. - **`with_default_timeout`** matters because the timeout header is optional. A request that omits it has no bound at all unless you set - one. Set it to your SLA. + one. Set it to your SLA. For unary and server-streaming RPCs, the + budget includes receiving the request body as well as handler execution. - **`with_min`** protects against a misbehaving or adversarial client cancelling the handler before it can do anything (e.g. mid-write on a streaming response). A few milliseconds is usually enough. From e302567d4605af3617bd2947b42ae1bf8896773a Mon Sep 17 00:00:00 2001 From: Jeet Dekivadia Date: Sat, 30 May 2026 12:55:13 -0700 Subject: [PATCH 2/3] Use expect_err in deadline regression test --- connectrpc/src/service.rs | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/connectrpc/src/service.rs b/connectrpc/src/service.rs index a43083a..e91382f 100644 --- a/connectrpc/src/service.rs +++ b/connectrpc/src/service.rs @@ -3205,8 +3205,7 @@ mod tests { &[], ) .await - .err() - .expect("stalled body must exceed the request deadline"); + .expect_err("stalled body must exceed the request deadline"); assert_eq!(err.code, crate::error::ErrorCode::DeadlineExceeded); } From 277d0941da5882f3800c4b0716c962b67ae35fcc Mon Sep 17 00:00:00 2001 From: Jeet Dekivadia Date: Mon, 1 Jun 2026 22:59:10 -0700 Subject: [PATCH 3/3] Add server-streaming deadline receipt regression --- connectrpc/src/service.rs | 31 +++++++++++++++++++++++++++++++ 1 file changed, 31 insertions(+) diff --git a/connectrpc/src/service.rs b/connectrpc/src/service.rs index e91382f..f6377a6 100644 --- a/connectrpc/src/service.rs +++ b/connectrpc/src/service.rs @@ -3209,6 +3209,37 @@ mod tests { assert_eq!(err.code, crate::error::ErrorCode::DeadlineExceeded); } + #[tokio::test(start_paused = true)] + async fn test_server_streaming_deadline_bounds_stalled_body_collection() { + let router = Router::new(); + let body = http_body_util::StreamBody::new(futures::stream::pending::< + Result, std::io::Error>, + >()); + let req = Request::builder() + .method(Method::POST) + .uri("/svc/Method") + .body(body) + .unwrap(); + let deadline_policy = DeadlinePolicy::new().with_default_timeout(Duration::from_millis(1)); + + let resp = handle_streaming_request( + &router, + req, + Protocol::Grpc, + CodecFormat::Proto, + Limits::default(), + Arc::new(CompressionRegistry::new()), + &CompressionPolicy::default(), + &deadline_policy, + &[], + ) + .await; + assert_eq!( + resp.headers().get(&GRPC_STATUS).unwrap(), + &crate::ErrorCode::DeadlineExceeded.grpc_code().to_string() + ); + } + #[test] fn test_parse_get_query_params_basic() { let params = parse_get_query_params(Some("message=%7B%7D&encoding=json&connect=v1"))