diff --git a/connectrpc/src/deadline.rs b/connectrpc/src/deadline.rs index 607af54..e282e1b 100644 --- a/connectrpc/src/deadline.rs +++ b/connectrpc/src/deadline.rs @@ -318,7 +318,11 @@ impl DeadlineStream { Self { inner: Some(inner), absolute: absolute.map(tokio::time::sleep), - per_item: inter_message.map(tokio::time::sleep), + // Do NOT arm the inter-message timer at construction. There is no + // prior message yet, so starting the timer here would measure + // stream-setup latency rather than the gap between messages. The + // lazy arm in `poll_next` starts the timer on the first poll. + per_item: None, inter_message, finished: false, } @@ -337,6 +341,15 @@ where return Poll::Ready(None); } + // Lazily arm the inter-message timer on the first poll so that + // stream-setup latency before the consumer starts reading is excluded + // from the first gap measurement. + if this.per_item.is_none() { + if let Some(d) = this.inter_message { + this.per_item.set(Some(tokio::time::sleep(*d))); + } + } + // Check the absolute deadline first — once it lapses the whole // request is over regardless of per-item progress. if let Some(sleep) = this.absolute.as_mut().as_pin_mut() @@ -589,4 +602,36 @@ mod tests { ); assert!(wrapped.next().await.is_none()); } + + #[tokio::test(start_paused = true)] + async fn setup_latency_before_first_poll_does_not_trigger_timeout() { + let p = DeadlinePolicy::new().with_inter_message_timeout(ms(50)); + let inner: BoxStream> = + Box::pin(futures::stream::iter([Ok(Bytes::from_static(b"a"))])); + let mut wrapped = p.enforce_on_response_stream(inner, None); + + tokio::time::advance(ms(100)).await; + + let item = wrapped.next().await.unwrap(); + assert!(item.is_ok(), "expected first item but got deadline error: {:?}", item); + assert_eq!(item.unwrap(), Bytes::from_static(b"a")); + } + + #[tokio::test(start_paused = true)] + async fn stream_that_never_yields_still_times_out() { + let p = DeadlinePolicy::new().with_inter_message_timeout(ms(50)); + let inner: BoxStream> = + Box::pin(futures::stream::pending()); + let mut wrapped = p.enforce_on_response_stream(inner, None); + + let first = futures::poll!(wrapped.next()); + assert!(first.is_pending()); + + tokio::time::advance(ms(100)).await; + + let err = wrapped.next().await.unwrap().unwrap_err(); + assert_eq!(err.code, crate::ErrorCode::DeadlineExceeded); + assert!(err.message.as_deref().unwrap().contains("inter-message")); + assert!(wrapped.next().await.is_none()); + } }