From 3f70110ab061c2bb766db457d96f62f220b6a8f9 Mon Sep 17 00:00:00 2001 From: pranc1ngpegasus Date: Sun, 24 May 2026 19:28:58 +0900 Subject: [PATCH] chore(deps): replace unmaintained backoff crate with backon The backoff crate is unmaintained per RUSTSEC-2025-0012. Migrate gRPC retry and poll-buffer backoff to backon's ExponentialBuilder API in temporalio-client and temporalio-sdk-core respectively. Co-authored-by: Cursor --- crates/client/Cargo.toml | 2 +- crates/client/src/retry.rs | 167 ++++++++------------- crates/sdk-core/Cargo.toml | 2 +- crates/sdk-core/src/pollers/poll_buffer.rs | 66 ++++---- 4 files changed, 99 insertions(+), 138 deletions(-) diff --git a/crates/client/Cargo.toml b/crates/client/Cargo.toml index 067bcfa1c..fc0e3eca4 100644 --- a/crates/client/Cargo.toml +++ b/crates/client/Cargo.toml @@ -22,7 +22,7 @@ envconfig = ["temporalio-common/envconfig"] [dependencies] anyhow = "1.0" async-trait = "0.1" -backoff = "0.4" +backon = { version = "1.6", default-features = false } base64 = "0.22" bon = { version = "3", default-features = false, features = ["alloc"] } derive_more = { workspace = true } diff --git a/crates/client/src/retry.rs b/crates/client/src/retry.rs index 238f91327..a33fc1a60 100644 --- a/crates/client/src/retry.rs +++ b/crates/client/src/retry.rs @@ -3,11 +3,7 @@ use crate::{ grpc::IsUserLongPoll, request_extensions::{IsWorkerTaskLongPoll, NoRetryOnMatching, RetryConfigForCall}, }; -use backoff::{ - Clock, SystemClock, - backoff::Backoff, - exponential::{self, ExponentialBackoff}, -}; +use backon::{BackoffBuilder, ExponentialBuilder}; use futures_retry::{ErrorHandler, FutureRetry, RetryPolicy}; use std::{ error::Error, @@ -35,14 +31,13 @@ const LONG_POLL_FATAL_GRACE: Duration = Duration::from_secs(60); pub struct RetryOptions { /// initial wait time before the first retry. pub initial_interval: Duration, - /// randomization jitter that is used as a multiplier for the current retry interval - /// and is added or subtracted from the interval length. + /// When greater than zero, retry delays include randomized jitter via [`backon`]. pub randomization_factor: f64, /// rate at which retry time should be increased, until it reaches max_interval. pub multiplier: f64, /// maximum amount of time to wait between retries. pub max_interval: Duration, - /// maximum total amount of time requests should be retried for, if None is set then no limit + /// maximum cumulative backoff time before retries stop, if None is set then no limit /// will be used. pub max_elapsed_time: Option, /// maximum number of retry attempts. @@ -53,7 +48,7 @@ impl Default for RetryOptions { fn default() -> Self { Self { initial_interval: Duration::from_millis(100), // 100 ms wait by default. - randomization_factor: 0.2, // +-20% jitter. + randomization_factor: 0.2, multiplier: 1.7, // each next retry delay will increase by 70% max_interval: Duration::from_secs(5), // until it reaches 5 seconds. max_elapsed_time: Some(Duration::from_secs(10)), // 10 seconds total allocated time for all retries. @@ -131,30 +126,24 @@ impl RetryOptions { } } - pub(crate) fn into_exp_backoff(self, clock: C) -> exponential::ExponentialBackoff { - exponential::ExponentialBackoff { - current_interval: self.initial_interval, - initial_interval: self.initial_interval, - randomization_factor: self.randomization_factor, - multiplier: self.multiplier, - max_interval: self.max_interval, - max_elapsed_time: self.max_elapsed_time, - clock, - start_time: Instant::now(), + fn exponential_builder(&self) -> ExponentialBuilder { + let mut builder = ExponentialBuilder::new() + .with_min_delay(self.initial_interval) + .with_factor(self.multiplier as f32) + .with_max_delay(self.max_interval) + .with_total_delay(self.max_elapsed_time) + .without_max_times(); + if self.randomization_factor > 0.0 { + builder = builder.with_jitter(); } - } -} - -impl From for backoff::ExponentialBackoff { - fn from(c: RetryOptions) -> Self { - c.into_exp_backoff(SystemClock::default()) + builder } } pub(crate) fn make_future_retry( info: CallInfo, factory: F, -) -> FutureRetry> +) -> FutureRetry where F: FnMut() -> Fut + Unpin, Fut: Future>, @@ -166,40 +155,30 @@ where } #[derive(Debug)] -pub(crate) struct TonicErrorHandler { - backoff: ExponentialBackoff, - throttle_backoff: ExponentialBackoff, +pub(crate) struct TonicErrorHandler { + backoff: backon::ExponentialBackoff, + throttle_backoff: backon::ExponentialBackoff, + max_interval: Duration, + retry_started_at: Instant, max_retries: usize, call_type: CallType, call_name: &'static str, retry_short_circuit: Option, } -impl TonicErrorHandler { + +impl TonicErrorHandler { fn new(call_info: CallInfo, throttle_cfg: RetryOptions) -> Self { - Self::new_with_clock( - call_info, - throttle_cfg, - SystemClock::default(), - SystemClock::default(), - ) - } -} -impl TonicErrorHandler -where - C: Clock, -{ - fn new_with_clock( - call_info: CallInfo, - throttle_cfg: RetryOptions, - clock: C, - throttle_clock: C, - ) -> Self { + let backoff_builder = call_info.retry_cfg.exponential_builder(); + let throttle_backoff_builder = throttle_cfg.exponential_builder(); + let max_interval = call_info.retry_cfg.max_interval; Self { call_type: call_info.call_type, call_name: call_info.call_name, max_retries: call_info.retry_cfg.max_retries, - backoff: call_info.retry_cfg.into_exp_backoff(clock), - throttle_backoff: throttle_cfg.into_exp_backoff(throttle_clock), + backoff: backoff_builder.build(), + throttle_backoff: throttle_backoff_builder.build(), + max_interval, + retry_started_at: Instant::now(), retry_short_circuit: call_info.retry_short_circuit, } } @@ -250,10 +229,7 @@ impl CallType { } } -impl ErrorHandler for TonicErrorHandler -where - C: Clock, -{ +impl ErrorHandler for TonicErrorHandler { type OutError = tonic::Status; fn handle( @@ -286,7 +262,7 @@ where || e.message() .starts_with("grpc: received message after decompression larger than max")) { - // Leave a marker so we don't have duplicate detection logic in the workflow + // Leave a marker so we don't have to have duplicate detection logic in the workflow e.metadata_mut().insert( MESSAGE_TOO_LARGE_KEY, tonic::metadata::MetadataValue::from(0), @@ -316,14 +292,14 @@ where self.maybe_log_retry(current_attempt, &e); } - match self.backoff.next_backoff() { - None => RetryPolicy::ForwardError(e), // None is returned when we've ran out of time + match self.backoff.next() { + None => RetryPolicy::ForwardError(e), Some(backoff) => { // We treat ResourceExhausted as a special case and backoff more // so we don't overload the server if e.code() == Code::ResourceExhausted { let extended_backoff = - backoff.max(self.throttle_backoff.next_backoff().unwrap_or_default()); + backoff.max(self.throttle_backoff.next().unwrap_or_default()); RetryPolicy::WaitRetry(extended_backoff) } else { RetryPolicy::WaitRetry(backoff) @@ -331,11 +307,11 @@ where } } } else if self.call_type == CallType::TaskLongPoll - && self.backoff.get_elapsed_time() <= LONG_POLL_FATAL_GRACE + && self.retry_started_at.elapsed() <= LONG_POLL_FATAL_GRACE { // We permit "fatal" errors while long polling for a while, because some proxies return // stupid error codes while getting ready, among other weird infra issues - RetryPolicy::WaitRetry(self.backoff.max_interval) + RetryPolicy::WaitRetry(self.max_interval) } else { RetryPolicy::ForwardError(e) } @@ -359,8 +335,6 @@ fn is_transport_cancelled(status: &tonic::Status) -> bool { mod tests { use super::*; use assert_matches::assert_matches; - use backoff::Clock; - use std::{ops::Add, time::Instant}; use temporalio_common::protos::temporal::api::workflowservice::v1::{ PollActivityTaskQueueRequest, PollNexusTaskQueueRequest, PollWorkflowTaskQueueRequest, }; @@ -380,11 +354,15 @@ mod tests { const POLL_ACTIVITY_METH_NAME: &str = "poll_activity_task_queue"; const POLL_NEXUS_METH_NAME: &str = "poll_nexus_task_queue"; - struct FixedClock(Instant); - impl Clock for FixedClock { - fn now(&self) -> Instant { - self.0 - } + fn new_test_handler(call_info: CallInfo, throttle_cfg: RetryOptions) -> TonicErrorHandler { + TonicErrorHandler::new(call_info, throttle_cfg) + } + + fn new_test_handler_after_grace(call_info: CallInfo, throttle_cfg: RetryOptions) -> TonicErrorHandler { + let mut handler = TonicErrorHandler::new(call_info, throttle_cfg); + handler.retry_started_at = + Instant::now() - LONG_POLL_FATAL_GRACE - Duration::from_secs(1); + handler } #[tokio::test] @@ -399,7 +377,7 @@ mod tests { Code::Unimplemented, ] { for call_name in [POLL_WORKFLOW_METH_NAME, POLL_ACTIVITY_METH_NAME] { - let mut err_handler = TonicErrorHandler::new_with_clock( + let mut err_handler = new_test_handler( CallInfo { call_type: CallType::TaskLongPoll, call_name, @@ -407,16 +385,18 @@ mod tests { retry_short_circuit: None, }, TEST_RETRY_CONFIG, - FixedClock(Instant::now()), - FixedClock(Instant::now()), ); let result = err_handler.handle(1, Status::new(code, "Ahh")); assert_matches!(result, RetryPolicy::WaitRetry(_)); - err_handler.backoff.clock.0 = err_handler - .backoff - .clock - .0 - .add(LONG_POLL_FATAL_GRACE + Duration::from_secs(1)); + let mut err_handler = new_test_handler_after_grace( + CallInfo { + call_type: CallType::TaskLongPoll, + call_name, + retry_cfg: TEST_RETRY_CONFIG, + retry_short_circuit: None, + }, + TEST_RETRY_CONFIG, + ); let result = err_handler.handle(2, Status::new(code, "Ahh")); assert_matches!(result, RetryPolicy::ForwardError(_)); } @@ -427,7 +407,7 @@ mod tests { async fn long_poll_retryable_errors_never_fatal() { for code in RETRYABLE_ERROR_CODES { for call_name in [POLL_WORKFLOW_METH_NAME, POLL_ACTIVITY_METH_NAME] { - let mut err_handler = TonicErrorHandler::new_with_clock( + let mut err_handler = new_test_handler( CallInfo { call_type: CallType::TaskLongPoll, call_name, @@ -435,16 +415,11 @@ mod tests { retry_short_circuit: None, }, TEST_RETRY_CONFIG, - FixedClock(Instant::now()), - FixedClock(Instant::now()), ); let result = err_handler.handle(1, Status::new(code, "Ahh")); assert_matches!(result, RetryPolicy::WaitRetry(_)); - err_handler.backoff.clock.0 = err_handler - .backoff - .clock - .0 - .add(LONG_POLL_FATAL_GRACE + Duration::from_secs(1)); + err_handler.retry_started_at = + Instant::now() - LONG_POLL_FATAL_GRACE - Duration::from_secs(1); let result = err_handler.handle(2, Status::new(code, "Ahh")); assert_matches!(result, RetryPolicy::WaitRetry(_)); } @@ -453,7 +428,7 @@ mod tests { #[tokio::test] async fn retry_resource_exhausted() { - let mut err_handler = TonicErrorHandler::new_with_clock( + let mut err_handler = new_test_handler( CallInfo { call_type: CallType::TaskLongPoll, call_name: POLL_WORKFLOW_METH_NAME, @@ -468,20 +443,12 @@ mod tests { max_elapsed_time: None, max_retries: 10, }, - FixedClock(Instant::now()), - FixedClock(Instant::now()), ); let result = err_handler.handle(1, Status::new(Code::ResourceExhausted, "leave me alone")); match result { RetryPolicy::WaitRetry(duration) => assert_eq!(duration, Duration::from_millis(2)), _ => panic!(), } - err_handler.backoff.clock.0 = err_handler.backoff.clock.0.add(Duration::from_millis(10)); - err_handler.throttle_backoff.clock.0 = err_handler - .throttle_backoff - .clock - .0 - .add(Duration::from_millis(10)); let result = err_handler.handle(2, Status::new(Code::ResourceExhausted, "leave me alone")); match result { RetryPolicy::WaitRetry(duration) => assert_eq!(duration, Duration::from_millis(8)), @@ -491,7 +458,7 @@ mod tests { #[tokio::test] async fn retry_short_circuit() { - let mut err_handler = TonicErrorHandler::new_with_clock( + let mut err_handler = new_test_handler( CallInfo { call_type: CallType::TaskLongPoll, call_name: POLL_WORKFLOW_METH_NAME, @@ -501,8 +468,6 @@ mod tests { }), }, TEST_RETRY_CONFIG, - FixedClock(Instant::now()), - FixedClock(Instant::now()), ); let result = err_handler.handle(1, Status::new(Code::ResourceExhausted, "leave me alone")); let e = assert_matches!(result, RetryPolicy::ForwardError(e) => e); @@ -515,7 +480,7 @@ mod tests { #[tokio::test] async fn message_too_large_not_retried() { - let mut err_handler = TonicErrorHandler::new_with_clock( + let mut err_handler = new_test_handler( CallInfo { call_type: CallType::TaskLongPoll, call_name: POLL_WORKFLOW_METH_NAME, @@ -523,8 +488,6 @@ mod tests { retry_short_circuit: None, }, TEST_RETRY_CONFIG, - FixedClock(Instant::now()), - FixedClock(Instant::now()), ); let result = err_handler.handle( 1, @@ -625,7 +588,7 @@ mod tests { async fn plain_cancelled_not_retried_on_normal_call() { // A plain Code::Cancelled (no transport error in source chain) on a Normal call // must NOT be retried — this is spec-correct behavior for application-level cancels. - let mut err_handler = TonicErrorHandler::new_with_clock( + let mut err_handler = new_test_handler( CallInfo { call_type: CallType::Normal, call_name: "respond_activity_task_completed", @@ -633,8 +596,6 @@ mod tests { retry_short_circuit: None, }, TEST_RETRY_CONFIG, - FixedClock(Instant::now()), - FixedClock(Instant::now()), ); let result = err_handler.handle(1, Status::new(Code::Cancelled, "caller cancelled")); assert_matches!(result, RetryPolicy::ForwardError(_)); @@ -660,7 +621,7 @@ mod tests { // For this test, we verify through the `handle` method that a transport-sourced // Cancelled status (created via from_error, which sets Code::Unknown but preserves // the transport source chain) IS retried multiple times on the standard budget. - let mut err_handler = TonicErrorHandler::new_with_clock( + let mut err_handler = new_test_handler( CallInfo { call_type: CallType::Normal, call_name: "respond_activity_task_completed", @@ -668,8 +629,6 @@ mod tests { retry_short_circuit: None, }, TEST_RETRY_CONFIG, - FixedClock(Instant::now()), - FixedClock(Instant::now()), ); // Code::Unknown with a transport source IS retried (it's in RETRYABLE_ERROR_CODES) diff --git a/crates/sdk-core/Cargo.toml b/crates/sdk-core/Cargo.toml index c87a1fb5f..fa266ca5c 100644 --- a/crates/sdk-core/Cargo.toml +++ b/crates/sdk-core/Cargo.toml @@ -41,7 +41,7 @@ antithesis_sdk = { version = "0.2.1", optional = true, default-features = false, "full", ] } assert_matches = { version = "1.5", optional = true } -backoff = "0.4" +backon = { version = "1.6", default-features = false } bimap = { version = "0.6.3", optional = true } async-trait = "0.1" bon = { workspace = true } diff --git a/crates/sdk-core/src/pollers/poll_buffer.rs b/crates/sdk-core/src/pollers/poll_buffer.rs index a4d1353de..0ab425ae5 100644 --- a/crates/sdk-core/src/pollers/poll_buffer.rs +++ b/crates/sdk-core/src/pollers/poll_buffer.rs @@ -7,7 +7,23 @@ use crate::{ client::{PollActivityOptions, PollOptions, PollWorkflowOptions, WorkerClient}, }, }; -use backoff::{SystemClock, backoff::Backoff, exponential::ExponentialBackoff}; +use backon::{BackoffBuilder, ExponentialBuilder}; + +/// Matches [`temporalio_client::retry::RetryOptions::task_poll_retry_policy`]. +const TASK_POLL_EXPONENTIAL: ExponentialBuilder = ExponentialBuilder::new() + .with_min_delay(Duration::from_millis(200)) + .with_jitter() + .with_factor(2.0) + .with_max_delay(Duration::from_secs(10)) + .without_max_times(); + +/// Matches [`temporalio_client::retry::RetryOptions::throttle_retry_policy`]. +const THROTTLE_POLL_EXPONENTIAL: ExponentialBuilder = ExponentialBuilder::new() + .with_min_delay(Duration::from_secs(1)) + .with_jitter() + .with_factor(2.0) + .with_max_delay(Duration::from_secs(10)) + .without_max_times(); use crossbeam_utils::atomic::AtomicCell; use futures_util::{FutureExt, StreamExt, future::BoxFuture}; use std::{ @@ -530,28 +546,10 @@ where ingested_last_period: Default::default(), scale_up_allowed: AtomicBool::new(true), last_successful_poll_time, - exponential_backoff: parking_lot::Mutex::new(ExponentialBackoff { - // Copied from RetryOptions::task_poll_retry_policy() - current_interval: Duration::from_millis(200), - initial_interval: Duration::from_millis(200), - randomization_factor: 0.2, - multiplier: 2.0, - max_interval: Duration::from_secs(10), - max_elapsed_time: None, - clock: SystemClock::default(), - start_time: std::time::Instant::now(), - }), - resource_exhausted_backoff: parking_lot::Mutex::new(ExponentialBackoff { - // Copied from RetryOptions::throttle_retry_policy() - current_interval: Duration::from_secs(1), - initial_interval: Duration::from_secs(1), - randomization_factor: 0.2, - multiplier: 2.0, - max_interval: Duration::from_secs(10), - max_elapsed_time: None, - clock: SystemClock::default(), - start_time: std::time::Instant::now(), - }), + exponential_backoff_builder: TASK_POLL_EXPONENTIAL, + exponential_backoff: parking_lot::Mutex::new(TASK_POLL_EXPONENTIAL.build()), + resource_exhausted_backoff_builder: THROTTLE_POLL_EXPONENTIAL, + resource_exhausted_backoff: parking_lot::Mutex::new(THROTTLE_POLL_EXPONENTIAL.build()), }); let rhc = report_handle.clone(); let ingestor_task = if behavior.is_autoscaling() { @@ -617,9 +615,10 @@ struct PollScalerReportHandle { scale_up_allowed: AtomicBool, last_successful_poll_time: Arc>>, - // Exponential backoff for normal errors and resource exhausted errors - exponential_backoff: parking_lot::Mutex>, - resource_exhausted_backoff: parking_lot::Mutex>, + exponential_backoff_builder: ExponentialBuilder, + exponential_backoff: parking_lot::Mutex, + resource_exhausted_backoff_builder: ExponentialBuilder, + resource_exhausted_backoff: parking_lot::Mutex, } impl PollScalerReportHandle { @@ -636,8 +635,9 @@ impl PollScalerReportHandle { .store(Some(SystemTime::now())); // Reset backoff on successful poll - self.exponential_backoff.lock().reset(); - self.resource_exhausted_backoff.lock().reset(); + *self.exponential_backoff.lock() = self.exponential_backoff_builder.build(); + *self.resource_exhausted_backoff.lock() = + self.resource_exhausted_backoff_builder.build(); if let PollerBehavior::SimpleMaximum(_) = self.behavior { // We don't do auto-scaling with the simple max @@ -674,9 +674,9 @@ impl PollScalerReportHandle { Err(e) => { if matches!(self.behavior, PollerBehavior::Autoscaling { .. }) { // Follow the same backoff logic as the retry client - let mut backoff_duration = self.exponential_backoff.lock().next_backoff(); + let mut backoff_duration = self.exponential_backoff.lock().next(); if e.code() == Code::ResourceExhausted { - backoff_duration = self.resource_exhausted_backoff.lock().next_backoff(); + backoff_duration = self.resource_exhausted_backoff.lock().next(); }; // Only propagate errors out if they weren't because of the short-circuiting @@ -1285,8 +1285,10 @@ mod tests { ingested_last_period: Default::default(), scale_up_allowed: AtomicBool::new(true), last_successful_poll_time: Arc::new(AtomicCell::new(None)), - exponential_backoff: parking_lot::Mutex::new(ExponentialBackoff::default()), - resource_exhausted_backoff: parking_lot::Mutex::new(ExponentialBackoff::default()), + exponential_backoff_builder: TASK_POLL_EXPONENTIAL, + exponential_backoff: parking_lot::Mutex::new(TASK_POLL_EXPONENTIAL.build()), + resource_exhausted_backoff_builder: THROTTLE_POLL_EXPONENTIAL, + resource_exhausted_backoff: parking_lot::Mutex::new(THROTTLE_POLL_EXPONENTIAL.build()), }); for _ in 0..20 {