Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
47 changes: 46 additions & 1 deletion connectrpc/src/deadline.rs
Original file line number Diff line number Diff line change
Expand Up @@ -318,7 +318,11 @@ impl<S> DeadlineStream<S> {
Self {
inner: Some(inner),
absolute: absolute.map(tokio::time::sleep),
per_item: inter_message.map(tokio::time::sleep),
// Do NOT arm the inter-message timer at construction. There is no
// prior message yet, so starting the timer here would measure
// stream-setup latency rather than the gap between messages. The
// lazy arm in `poll_next` starts the timer on the first poll.
per_item: None,
inter_message,
finished: false,
}
Expand All @@ -337,6 +341,15 @@ where
return Poll::Ready(None);
}

// Lazily arm the inter-message timer on the first poll so that
// stream-setup latency before the consumer starts reading is excluded
// from the first gap measurement.
if this.per_item.is_none() {
if let Some(d) = this.inter_message {
this.per_item.set(Some(tokio::time::sleep(*d)));
}
}

// Check the absolute deadline first — once it lapses the whole
// request is over regardless of per-item progress.
if let Some(sleep) = this.absolute.as_mut().as_pin_mut()
Expand Down Expand Up @@ -589,4 +602,36 @@ mod tests {
);
assert!(wrapped.next().await.is_none());
}

#[tokio::test(start_paused = true)]
async fn setup_latency_before_first_poll_does_not_trigger_timeout() {
let p = DeadlinePolicy::new().with_inter_message_timeout(ms(50));
let inner: BoxStream<Result<Bytes, ConnectError>> =
Box::pin(futures::stream::iter([Ok(Bytes::from_static(b"a"))]));
let mut wrapped = p.enforce_on_response_stream(inner, None);

tokio::time::advance(ms(100)).await;

let item = wrapped.next().await.unwrap();
assert!(item.is_ok(), "expected first item but got deadline error: {:?}", item);
assert_eq!(item.unwrap(), Bytes::from_static(b"a"));
}

#[tokio::test(start_paused = true)]
async fn stream_that_never_yields_still_times_out() {
let p = DeadlinePolicy::new().with_inter_message_timeout(ms(50));
let inner: BoxStream<Result<Bytes, ConnectError>> =
Box::pin(futures::stream::pending());
let mut wrapped = p.enforce_on_response_stream(inner, None);

let first = futures::poll!(wrapped.next());
assert!(first.is_pending());

tokio::time::advance(ms(100)).await;

let err = wrapped.next().await.unwrap().unwrap_err();
assert_eq!(err.code, crate::ErrorCode::DeadlineExceeded);
assert!(err.message.as_deref().unwrap().contains("inter-message"));
assert!(wrapped.next().await.is_none());
}
}