Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion crates/client/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ envconfig = ["temporalio-common/envconfig"]
[dependencies]
anyhow = "1.0"
async-trait = "0.1"
backoff = "0.4"
backon = { version = "1.6", default-features = false }
base64 = "0.22"
bon = { version = "3", default-features = false, features = ["alloc"] }
derive_more = { workspace = true }
Expand Down
167 changes: 63 additions & 104 deletions crates/client/src/retry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,7 @@ use crate::{
grpc::IsUserLongPoll,
request_extensions::{IsWorkerTaskLongPoll, NoRetryOnMatching, RetryConfigForCall},
};
use backoff::{
Clock, SystemClock,
backoff::Backoff,
exponential::{self, ExponentialBackoff},
};
use backon::{BackoffBuilder, ExponentialBuilder};
use futures_retry::{ErrorHandler, FutureRetry, RetryPolicy};
use std::{
error::Error,
Expand Down Expand Up @@ -35,14 +31,13 @@ const LONG_POLL_FATAL_GRACE: Duration = Duration::from_secs(60);
pub struct RetryOptions {
/// initial wait time before the first retry.
pub initial_interval: Duration,
/// randomization jitter that is used as a multiplier for the current retry interval
/// and is added or subtracted from the interval length.
/// When greater than zero, retry delays include randomized jitter via [`backon`].
pub randomization_factor: f64,
/// rate at which retry time should be increased, until it reaches max_interval.
pub multiplier: f64,
/// maximum amount of time to wait between retries.
pub max_interval: Duration,
/// maximum total amount of time requests should be retried for, if None is set then no limit
/// maximum cumulative backoff time before retries stop, if None is set then no limit
/// will be used.
pub max_elapsed_time: Option<Duration>,
/// maximum number of retry attempts.
Expand All @@ -53,7 +48,7 @@ impl Default for RetryOptions {
fn default() -> Self {
Self {
initial_interval: Duration::from_millis(100), // 100 ms wait by default.
randomization_factor: 0.2, // +-20% jitter.
randomization_factor: 0.2,
multiplier: 1.7, // each next retry delay will increase by 70%
max_interval: Duration::from_secs(5), // until it reaches 5 seconds.
max_elapsed_time: Some(Duration::from_secs(10)), // 10 seconds total allocated time for all retries.
Expand Down Expand Up @@ -131,30 +126,24 @@ impl RetryOptions {
}
}

pub(crate) fn into_exp_backoff<C>(self, clock: C) -> exponential::ExponentialBackoff<C> {
exponential::ExponentialBackoff {
current_interval: self.initial_interval,
initial_interval: self.initial_interval,
randomization_factor: self.randomization_factor,
multiplier: self.multiplier,
max_interval: self.max_interval,
max_elapsed_time: self.max_elapsed_time,
clock,
start_time: Instant::now(),
fn exponential_builder(&self) -> ExponentialBuilder {
let mut builder = ExponentialBuilder::new()
.with_min_delay(self.initial_interval)
.with_factor(self.multiplier as f32)
.with_max_delay(self.max_interval)
.with_total_delay(self.max_elapsed_time)
.without_max_times();
if self.randomization_factor > 0.0 {
builder = builder.with_jitter();
}
}
}

impl From<RetryOptions> for backoff::ExponentialBackoff {
fn from(c: RetryOptions) -> Self {
c.into_exp_backoff(SystemClock::default())
builder
}
}

pub(crate) fn make_future_retry<R, F, Fut>(
info: CallInfo,
factory: F,
) -> FutureRetry<F, TonicErrorHandler<SystemClock>>
) -> FutureRetry<F, TonicErrorHandler>
where
F: FnMut() -> Fut + Unpin,
Fut: Future<Output = Result<R, tonic::Status>>,
Expand All @@ -166,40 +155,30 @@ where
}

#[derive(Debug)]
pub(crate) struct TonicErrorHandler<C: Clock> {
backoff: ExponentialBackoff<C>,
throttle_backoff: ExponentialBackoff<C>,
pub(crate) struct TonicErrorHandler {
backoff: backon::ExponentialBackoff,
throttle_backoff: backon::ExponentialBackoff,
max_interval: Duration,
retry_started_at: Instant,
max_retries: usize,
call_type: CallType,
call_name: &'static str,
retry_short_circuit: Option<NoRetryOnMatching>,
}
impl TonicErrorHandler<SystemClock> {

impl TonicErrorHandler {
fn new(call_info: CallInfo, throttle_cfg: RetryOptions) -> Self {
Self::new_with_clock(
call_info,
throttle_cfg,
SystemClock::default(),
SystemClock::default(),
)
}
}
impl<C> TonicErrorHandler<C>
where
C: Clock,
{
fn new_with_clock(
call_info: CallInfo,
throttle_cfg: RetryOptions,
clock: C,
throttle_clock: C,
) -> Self {
let backoff_builder = call_info.retry_cfg.exponential_builder();
let throttle_backoff_builder = throttle_cfg.exponential_builder();
let max_interval = call_info.retry_cfg.max_interval;
Self {
call_type: call_info.call_type,
call_name: call_info.call_name,
max_retries: call_info.retry_cfg.max_retries,
backoff: call_info.retry_cfg.into_exp_backoff(clock),
throttle_backoff: throttle_cfg.into_exp_backoff(throttle_clock),
backoff: backoff_builder.build(),
throttle_backoff: throttle_backoff_builder.build(),
max_interval,
retry_started_at: Instant::now(),
retry_short_circuit: call_info.retry_short_circuit,
}
}
Expand Down Expand Up @@ -250,10 +229,7 @@ impl CallType {
}
}

impl<C> ErrorHandler<tonic::Status> for TonicErrorHandler<C>
where
C: Clock,
{
impl ErrorHandler<tonic::Status> for TonicErrorHandler {
type OutError = tonic::Status;

fn handle(
Expand Down Expand Up @@ -286,7 +262,7 @@ where
|| e.message()
.starts_with("grpc: received message after decompression larger than max"))
{
// Leave a marker so we don't have duplicate detection logic in the workflow
// Leave a marker so we don't have to have duplicate detection logic in the workflow
e.metadata_mut().insert(
MESSAGE_TOO_LARGE_KEY,
tonic::metadata::MetadataValue::from(0),
Expand Down Expand Up @@ -316,26 +292,26 @@ where
self.maybe_log_retry(current_attempt, &e);
}

match self.backoff.next_backoff() {
None => RetryPolicy::ForwardError(e), // None is returned when we've ran out of time
match self.backoff.next() {
None => RetryPolicy::ForwardError(e),
Some(backoff) => {
// We treat ResourceExhausted as a special case and backoff more
// so we don't overload the server
if e.code() == Code::ResourceExhausted {
let extended_backoff =
backoff.max(self.throttle_backoff.next_backoff().unwrap_or_default());
backoff.max(self.throttle_backoff.next().unwrap_or_default());
RetryPolicy::WaitRetry(extended_backoff)
} else {
RetryPolicy::WaitRetry(backoff)
}
}
}
} else if self.call_type == CallType::TaskLongPoll
&& self.backoff.get_elapsed_time() <= LONG_POLL_FATAL_GRACE
&& self.retry_started_at.elapsed() <= LONG_POLL_FATAL_GRACE
{
// We permit "fatal" errors while long polling for a while, because some proxies return
// stupid error codes while getting ready, among other weird infra issues
RetryPolicy::WaitRetry(self.backoff.max_interval)
RetryPolicy::WaitRetry(self.max_interval)
} else {
RetryPolicy::ForwardError(e)
}
Expand All @@ -359,8 +335,6 @@ fn is_transport_cancelled(status: &tonic::Status) -> bool {
mod tests {
use super::*;
use assert_matches::assert_matches;
use backoff::Clock;
use std::{ops::Add, time::Instant};
use temporalio_common::protos::temporal::api::workflowservice::v1::{
PollActivityTaskQueueRequest, PollNexusTaskQueueRequest, PollWorkflowTaskQueueRequest,
};
Expand All @@ -380,11 +354,15 @@ mod tests {
const POLL_ACTIVITY_METH_NAME: &str = "poll_activity_task_queue";
const POLL_NEXUS_METH_NAME: &str = "poll_nexus_task_queue";

struct FixedClock(Instant);
impl Clock for FixedClock {
fn now(&self) -> Instant {
self.0
}
fn new_test_handler(call_info: CallInfo, throttle_cfg: RetryOptions) -> TonicErrorHandler {
TonicErrorHandler::new(call_info, throttle_cfg)
}

fn new_test_handler_after_grace(call_info: CallInfo, throttle_cfg: RetryOptions) -> TonicErrorHandler {
let mut handler = TonicErrorHandler::new(call_info, throttle_cfg);
handler.retry_started_at =
Instant::now() - LONG_POLL_FATAL_GRACE - Duration::from_secs(1);
handler
}

#[tokio::test]
Expand All @@ -399,24 +377,26 @@ mod tests {
Code::Unimplemented,
] {
for call_name in [POLL_WORKFLOW_METH_NAME, POLL_ACTIVITY_METH_NAME] {
let mut err_handler = TonicErrorHandler::new_with_clock(
let mut err_handler = new_test_handler(
CallInfo {
call_type: CallType::TaskLongPoll,
call_name,
retry_cfg: TEST_RETRY_CONFIG,
retry_short_circuit: None,
},
TEST_RETRY_CONFIG,
FixedClock(Instant::now()),
FixedClock(Instant::now()),
);
let result = err_handler.handle(1, Status::new(code, "Ahh"));
assert_matches!(result, RetryPolicy::WaitRetry(_));
err_handler.backoff.clock.0 = err_handler
.backoff
.clock
.0
.add(LONG_POLL_FATAL_GRACE + Duration::from_secs(1));
let mut err_handler = new_test_handler_after_grace(
CallInfo {
call_type: CallType::TaskLongPoll,
call_name,
retry_cfg: TEST_RETRY_CONFIG,
retry_short_circuit: None,
},
TEST_RETRY_CONFIG,
);
let result = err_handler.handle(2, Status::new(code, "Ahh"));
assert_matches!(result, RetryPolicy::ForwardError(_));
}
Expand All @@ -427,24 +407,19 @@ mod tests {
async fn long_poll_retryable_errors_never_fatal() {
for code in RETRYABLE_ERROR_CODES {
for call_name in [POLL_WORKFLOW_METH_NAME, POLL_ACTIVITY_METH_NAME] {
let mut err_handler = TonicErrorHandler::new_with_clock(
let mut err_handler = new_test_handler(
CallInfo {
call_type: CallType::TaskLongPoll,
call_name,
retry_cfg: TEST_RETRY_CONFIG,
retry_short_circuit: None,
},
TEST_RETRY_CONFIG,
FixedClock(Instant::now()),
FixedClock(Instant::now()),
);
let result = err_handler.handle(1, Status::new(code, "Ahh"));
assert_matches!(result, RetryPolicy::WaitRetry(_));
err_handler.backoff.clock.0 = err_handler
.backoff
.clock
.0
.add(LONG_POLL_FATAL_GRACE + Duration::from_secs(1));
err_handler.retry_started_at =
Instant::now() - LONG_POLL_FATAL_GRACE - Duration::from_secs(1);
let result = err_handler.handle(2, Status::new(code, "Ahh"));
assert_matches!(result, RetryPolicy::WaitRetry(_));
}
Expand All @@ -453,7 +428,7 @@ mod tests {

#[tokio::test]
async fn retry_resource_exhausted() {
let mut err_handler = TonicErrorHandler::new_with_clock(
let mut err_handler = new_test_handler(
CallInfo {
call_type: CallType::TaskLongPoll,
call_name: POLL_WORKFLOW_METH_NAME,
Expand All @@ -468,20 +443,12 @@ mod tests {
max_elapsed_time: None,
max_retries: 10,
},
FixedClock(Instant::now()),
FixedClock(Instant::now()),
);
let result = err_handler.handle(1, Status::new(Code::ResourceExhausted, "leave me alone"));
match result {
RetryPolicy::WaitRetry(duration) => assert_eq!(duration, Duration::from_millis(2)),
_ => panic!(),
}
err_handler.backoff.clock.0 = err_handler.backoff.clock.0.add(Duration::from_millis(10));
err_handler.throttle_backoff.clock.0 = err_handler
.throttle_backoff
.clock
.0
.add(Duration::from_millis(10));
let result = err_handler.handle(2, Status::new(Code::ResourceExhausted, "leave me alone"));
match result {
RetryPolicy::WaitRetry(duration) => assert_eq!(duration, Duration::from_millis(8)),
Expand All @@ -491,7 +458,7 @@ mod tests {

#[tokio::test]
async fn retry_short_circuit() {
let mut err_handler = TonicErrorHandler::new_with_clock(
let mut err_handler = new_test_handler(
CallInfo {
call_type: CallType::TaskLongPoll,
call_name: POLL_WORKFLOW_METH_NAME,
Expand All @@ -501,8 +468,6 @@ mod tests {
}),
},
TEST_RETRY_CONFIG,
FixedClock(Instant::now()),
FixedClock(Instant::now()),
);
let result = err_handler.handle(1, Status::new(Code::ResourceExhausted, "leave me alone"));
let e = assert_matches!(result, RetryPolicy::ForwardError(e) => e);
Expand All @@ -515,16 +480,14 @@ mod tests {

#[tokio::test]
async fn message_too_large_not_retried() {
let mut err_handler = TonicErrorHandler::new_with_clock(
let mut err_handler = new_test_handler(
CallInfo {
call_type: CallType::TaskLongPoll,
call_name: POLL_WORKFLOW_METH_NAME,
retry_cfg: TEST_RETRY_CONFIG,
retry_short_circuit: None,
},
TEST_RETRY_CONFIG,
FixedClock(Instant::now()),
FixedClock(Instant::now()),
);
let result = err_handler.handle(
1,
Expand Down Expand Up @@ -625,16 +588,14 @@ mod tests {
async fn plain_cancelled_not_retried_on_normal_call() {
// A plain Code::Cancelled (no transport error in source chain) on a Normal call
// must NOT be retried — this is spec-correct behavior for application-level cancels.
let mut err_handler = TonicErrorHandler::new_with_clock(
let mut err_handler = new_test_handler(
CallInfo {
call_type: CallType::Normal,
call_name: "respond_activity_task_completed",
retry_cfg: TEST_RETRY_CONFIG,
retry_short_circuit: None,
},
TEST_RETRY_CONFIG,
FixedClock(Instant::now()),
FixedClock(Instant::now()),
);
let result = err_handler.handle(1, Status::new(Code::Cancelled, "caller cancelled"));
assert_matches!(result, RetryPolicy::ForwardError(_));
Expand All @@ -660,16 +621,14 @@ mod tests {
// For this test, we verify through the `handle` method that a transport-sourced
// Cancelled status (created via from_error, which sets Code::Unknown but preserves
// the transport source chain) IS retried multiple times on the standard budget.
let mut err_handler = TonicErrorHandler::new_with_clock(
let mut err_handler = new_test_handler(
CallInfo {
call_type: CallType::Normal,
call_name: "respond_activity_task_completed",
retry_cfg: TEST_RETRY_CONFIG,
retry_short_circuit: None,
},
TEST_RETRY_CONFIG,
FixedClock(Instant::now()),
FixedClock(Instant::now()),
);

// Code::Unknown with a transport source IS retried (it's in RETRYABLE_ERROR_CODES)
Expand Down
Loading