From 62ce1602effba6dbf9849217ebb4cbe7d89fabbb Mon Sep 17 00:00:00 2001 From: Dev-X25874 <283057883+Dev-X25874@users.noreply.github.com> Date: Wed, 20 May 2026 13:44:05 +0530 Subject: [PATCH 1/2] deadline: don't arm inter-message timer before first item is yielded --- connectrpc/src/deadline.rs | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/connectrpc/src/deadline.rs b/connectrpc/src/deadline.rs index 607af54..687c56a 100644 --- a/connectrpc/src/deadline.rs +++ b/connectrpc/src/deadline.rs @@ -318,7 +318,12 @@ impl DeadlineStream { Self { inner: Some(inner), absolute: absolute.map(tokio::time::sleep), - per_item: inter_message.map(tokio::time::sleep), + // Do NOT arm the inter-message timer at construction. There is no + // prior message yet, so starting the timer here would measure + // stream-setup latency rather than the gap between items. The + // re-arm in `poll_next` starts the timer after each yielded item, + // which is the correct measurement point. + per_item: None, inter_message, finished: false, } @@ -326,6 +331,8 @@ impl DeadlineStream { } impl Stream for DeadlineStream +where + S: Stream>, where S: Stream>, { From f30604dd07155ee8776b2726a5636b01a21ed731 Mon Sep 17 00:00:00 2001 From: Dev-X25874 <283057883+Dev-X25874@users.noreply.github.com> Date: Wed, 27 May 2026 08:15:12 +0530 Subject: [PATCH 2/2] =?UTF-8?q?deadline:=20arm=20inter-message=20timer=20o?= =?UTF-8?q?n=20first=20poll,=20not=20at=20construction=EE=81=96=EE=80=BB?= =?UTF-8?q?=EE=83=BB=EE=83=B9=EE=83=8E?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- connectrpc/src/deadline.rs | 48 ++++++++++++++++++++++++++++++++++---- 1 file changed, 43 insertions(+), 5 deletions(-) diff --git a/connectrpc/src/deadline.rs b/connectrpc/src/deadline.rs index 687c56a..e282e1b 100644 --- a/connectrpc/src/deadline.rs +++ b/connectrpc/src/deadline.rs @@ -320,9 +320,8 @@ impl DeadlineStream { absolute: absolute.map(tokio::time::sleep), // Do NOT arm the inter-message timer at construction. There is no // prior message yet, so starting the timer here would measure - // stream-setup latency rather than the gap between items. The - // re-arm in `poll_next` starts the timer after each yielded item, - // which is the correct measurement point. + // stream-setup latency rather than the gap between messages. The + // lazy arm in `poll_next` starts the timer on the first poll. per_item: None, inter_message, finished: false, @@ -331,8 +330,6 @@ impl DeadlineStream { } impl Stream for DeadlineStream -where - S: Stream>, where S: Stream>, { @@ -344,6 +341,15 @@ where return Poll::Ready(None); } + // Lazily arm the inter-message timer on the first poll so that + // stream-setup latency before the consumer starts reading is excluded + // from the first gap measurement. + if this.per_item.is_none() { + if let Some(d) = this.inter_message { + this.per_item.set(Some(tokio::time::sleep(*d))); + } + } + // Check the absolute deadline first — once it lapses the whole // request is over regardless of per-item progress. if let Some(sleep) = this.absolute.as_mut().as_pin_mut() @@ -596,4 +602,36 @@ mod tests { ); assert!(wrapped.next().await.is_none()); } + + #[tokio::test(start_paused = true)] + async fn setup_latency_before_first_poll_does_not_trigger_timeout() { + let p = DeadlinePolicy::new().with_inter_message_timeout(ms(50)); + let inner: BoxStream> = + Box::pin(futures::stream::iter([Ok(Bytes::from_static(b"a"))])); + let mut wrapped = p.enforce_on_response_stream(inner, None); + + tokio::time::advance(ms(100)).await; + + let item = wrapped.next().await.unwrap(); + assert!(item.is_ok(), "expected first item but got deadline error: {:?}", item); + assert_eq!(item.unwrap(), Bytes::from_static(b"a")); + } + + #[tokio::test(start_paused = true)] + async fn stream_that_never_yields_still_times_out() { + let p = DeadlinePolicy::new().with_inter_message_timeout(ms(50)); + let inner: BoxStream> = + Box::pin(futures::stream::pending()); + let mut wrapped = p.enforce_on_response_stream(inner, None); + + let first = futures::poll!(wrapped.next()); + assert!(first.is_pending()); + + tokio::time::advance(ms(100)).await; + + let err = wrapped.next().await.unwrap().unwrap_err(); + assert_eq!(err.code, crate::ErrorCode::DeadlineExceeded); + assert!(err.message.as_deref().unwrap().contains("inter-message")); + assert!(wrapped.next().await.is_none()); + } }