From 7d5c90e1a8552ef97e30978794ce84fcb073fa8a Mon Sep 17 00:00:00 2001 From: Chris Busillo Date: Sat, 30 May 2026 09:32:30 -0400 Subject: [PATCH] fix(core): retry transient agent provider failures --- code-rs/core/src/agent_tool.rs | 496 +++++++++++++++++++++++++--- code-rs/core/src/codex/streaming.rs | 7 + docs/agents.md | 7 + 3 files changed, 471 insertions(+), 39 deletions(-) diff --git a/code-rs/core/src/agent_tool.rs b/code-rs/core/src/agent_tool.rs index 6caf4a23bd5..da9b756006a 100644 --- a/code-rs/core/src/agent_tool.rs +++ b/code-rs/core/src/agent_tool.rs @@ -374,6 +374,8 @@ pub struct Agent { pub status: AgentStatus, pub result: Option, pub error: Option, + #[serde(default)] + pub retry: AgentRetryMetadata, pub created_at: DateTime, pub started_at: Option>, pub completed_at: Option>, @@ -394,6 +396,30 @@ pub struct Agent { pub last_activity: DateTime, } +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] +pub struct AgentRetryMetadata { + #[serde(skip_serializing_if = "String::is_empty")] + pub original_model: String, + #[serde(skip_serializing_if = "String::is_empty")] + pub final_model: String, + pub retry_count: u32, + pub max_retries: u32, + #[serde(skip_serializing_if = "Option::is_none")] + pub last_retryable_error: Option, +} + +impl Default for AgentRetryMetadata { + fn default() -> Self { + Self { + original_model: String::new(), + final_model: String::new(), + retry_count: 0, + max_retries: DEFAULT_AGENT_PROVIDER_MAX_RETRIES as u32, + last_retryable_error: None, + } + } +} + // Global agent manager lazy_static::lazy_static! { pub static ref AGENT_MANAGER: Arc> = Arc::new(RwLock::new(AgentManager::new())); @@ -443,8 +469,101 @@ const MAX_AGENT_PROGRESS_LINE_BYTES: usize = 2048; const MAX_AGENT_RESULT_BYTES: usize = 64 * 1024; const MAX_TRACKED_TERMINAL_AGENTS: usize = 512; const MAX_STATUS_TERMINAL_AGENTS: usize = 128; +const DEFAULT_AGENT_PROVIDER_MAX_RETRIES: usize = 2; +const AGENT_PROVIDER_RETRY_BASE_DELAY: StdDuration = StdDuration::from_secs(2); +const AGENT_PROVIDER_RETRY_MAX_DELAY: StdDuration = StdDuration::from_secs(10); pub(crate) const CODE_AGENT_SPAWN_DEPTH_ENV: &str = "CODE_AGENT_SPAWN_DEPTH"; +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +enum AgentProviderFailureClass { + Retryable, + NonRetryable, +} + +fn classify_agent_provider_failure(error: &str) -> AgentProviderFailureClass { + let lower = error.to_ascii_lowercase(); + let contains_any = |needles: &[&str]| needles.iter().any(|needle| lower.contains(needle)); + + if contains_any(&[ + "unauthorized", + "authentication", + "auth failed", + "invalid api key", + "invalid token", + "permission denied", + "forbidden", + "not found", + "could not be found", + "not installed", + "no such file", + "invalid config", + "misconfigured", + "cancelled", + "canceled", + "interrupted", + "policy", + ]) { + return AgentProviderFailureClass::NonRetryable; + } + + if contains_any(&[ + "overloaded", + "rate limit", + "rate_limit", + "too many requests", + "temporarily unavailable", + "temporary failure", + "transient", + "timeout", + "timed out", + "deadline exceeded", + "service unavailable", + "internal server error", + "bad gateway", + "gateway timeout", + "connection reset", + "connection refused", + "connection closed", + "connection aborted", + "broken pipe", + "transport error", + "stream disconnected", + "network error", + "http 408", + "http 409", + "http 429", + "http 500", + "http 502", + "http 503", + "http 504", + "status 408", + "status 409", + "status 429", + "status 500", + "status 502", + "status 503", + "status 504", + "api error: 408", + "api error: 409", + "api error: 429", + "api error: 500", + "api error: 502", + "api error: 503", + "api error: 504", + ]) { + return AgentProviderFailureClass::Retryable; + } + + AgentProviderFailureClass::NonRetryable +} + +fn agent_retry_delay(attempt_index: usize) -> StdDuration { + let multiplier = 1_u32 << attempt_index.min(8); + AGENT_PROVIDER_RETRY_BASE_DELAY + .saturating_mul(multiplier) + .min(AGENT_PROVIDER_RETRY_MAX_DELAY) +} + pub(crate) fn current_agent_spawn_depth() -> i32 { std::env::var(CODE_AGENT_SPAWN_DEPTH_ENV) .ok() @@ -474,6 +593,10 @@ fn agent_belongs_to_session(agent: &Agent, owner_session_id: Option) -> bo } } +fn agent_is_owned_by_session(agent: &Agent, owner_session_id: Uuid) -> bool { + agent.owner_session_id == Some(owner_session_id) +} + fn agent_info_for_status(agent: &Agent, now: DateTime) -> AgentInfo { // Just show the model name - status provides the useful info. let name = agent @@ -1089,6 +1212,12 @@ impl AgentManager { _ => None, }; + let retry = AgentRetryMetadata { + original_model: model.clone(), + final_model: model.clone(), + ..AgentRetryMetadata::default() + }; + let agent = Agent { id: agent_id.clone(), owner_session_id: Some(owner_session_id), @@ -1103,6 +1232,7 @@ impl AgentManager { status: AgentStatus::Pending, result: None, error: None, + retry, created_at: Utc::now(), started_at: None, completed_at: None, @@ -1250,7 +1380,7 @@ impl AgentManager { ) -> bool { if self .get_agent(agent_id) - .is_some_and(|agent| agent_belongs_to_session(&agent, Some(owner_session_id))) + .is_some_and(|agent| agent_is_owned_by_session(&agent, owner_session_id)) { self.cancel_agent(agent_id).await } else { @@ -1284,7 +1414,7 @@ impl AgentManager { .agents .values() .filter(|agent| agent.batch_id.as_ref() == Some(&batch_id.to_string())) - .filter(|agent| agent_belongs_to_session(agent, Some(owner_session_id))) + .filter(|agent| agent_is_owned_by_session(agent, owner_session_id)) .map(|agent| agent.id.clone()) .collect(); @@ -1375,6 +1505,20 @@ impl AgentManager { } } + pub async fn update_agent_retry_metadata( + &mut self, + agent_id: &str, + retry_count: u32, + last_retryable_error: Option, + ) { + if let Some(agent) = self.agents.get_mut(agent_id) { + agent.retry.retry_count = retry_count; + agent.retry.last_retryable_error = last_retryable_error; + Self::record_activity(agent); + self.send_agent_status_update().await; + } + } + pub async fn add_progress(&mut self, agent_id: &str, message: String) { let debug_enabled = self.debug_log_root.is_some(); @@ -1620,38 +1764,44 @@ async fn execute_agent(agent_id: String, config: Option) { if !spec.is_enabled() { Err(gating_error_message(spec)) } else { + execute_agent_provider_with_retries(&agent_id, &model, || { + execute_cloud_built_in_streaming( + &agent_id, + &full_prompt, + Some(worktree_path.clone()), + config.clone(), + spec.slug, + ) + }) + .await + } + } else { + execute_agent_provider_with_retries(&agent_id, &model, || { execute_cloud_built_in_streaming( &agent_id, &full_prompt, - Some(worktree_path), + Some(worktree_path.clone()), config.clone(), - spec.slug, + model.as_str(), ) - .await - } - } else { - execute_cloud_built_in_streaming( + }) + .await + } + } else { + execute_agent_provider_with_retries(&agent_id, &model, || { + execute_model_with_permissions( &agent_id, + &model, &full_prompt, - Some(worktree_path), + false, + Some(worktree_path.clone()), config.clone(), - model.as_str(), + reasoning_effort, + review_output_json_path.as_ref(), + source_kind.clone(), + log_tag.as_deref(), ) - .await - } - } else { - execute_model_with_permissions( - &agent_id, - &model, - &full_prompt, - false, - Some(worktree_path), - config.clone(), - reasoning_effort, - review_output_json_path.as_ref(), - source_kind.clone(), - log_tag.as_deref(), - ) + }) .await } } @@ -1676,24 +1826,44 @@ async fn execute_agent(agent_id: String, config: Option) { if !spec.is_enabled() { Err(gating_error_message(spec)) } else { - execute_cloud_built_in_streaming(&agent_id, &full_prompt, None, config, spec.slug).await + execute_agent_provider_with_retries(&agent_id, &model, || { + execute_cloud_built_in_streaming( + &agent_id, + &full_prompt, + None, + config.clone(), + spec.slug, + ) + }) + .await } } else { - execute_cloud_built_in_streaming(&agent_id, &full_prompt, None, config, model.as_str()).await + execute_agent_provider_with_retries(&agent_id, &model, || { + execute_cloud_built_in_streaming( + &agent_id, + &full_prompt, + None, + config.clone(), + model.as_str(), + ) + }) + .await } } else { - execute_model_with_permissions( - &agent_id, - &model, - &full_prompt, - true, - None, - config, - reasoning_effort, - None, - source_kind, - log_tag.as_deref(), - ) + execute_agent_provider_with_retries(&agent_id, &model, || { + execute_model_with_permissions( + &agent_id, + &model, + &full_prompt, + true, + None, + config.clone(), + reasoning_effort, + None, + source_kind.clone(), + log_tag.as_deref(), + ) + }) .await } }; @@ -1704,6 +1874,95 @@ async fn execute_agent(agent_id: String, config: Option) { manager.update_agent_result(&agent_id, final_result).await; } +async fn execute_agent_provider_with_retries( + agent_id: &str, + model: &str, + mut run: F, +) -> Result +where + F: FnMut() -> Fut, + Fut: std::future::Future>, +{ + let mut retry_count = 0usize; + let mut last_retryable_error: Option = None; + + loop { + let result = run().await; + match result { + Ok(output) => { + if retry_count > 0 { + let mut manager = AGENT_MANAGER.write().await; + manager + .update_agent_retry_metadata( + agent_id, + retry_count as u32, + last_retryable_error.clone(), + ) + .await; + } + return Ok(output); + } + Err(error) + if retry_count < DEFAULT_AGENT_PROVIDER_MAX_RETRIES + && classify_agent_provider_failure(&error) + == AgentProviderFailureClass::Retryable => + { + retry_count += 1; + last_retryable_error = Some(error.clone()); + let delay = agent_retry_delay(retry_count - 1); + let error_preview = error.trim().replace('\n', " "); + + { + let mut manager = AGENT_MANAGER.write().await; + manager + .update_agent_retry_metadata( + agent_id, + retry_count as u32, + last_retryable_error.clone(), + ) + .await; + manager + .add_progress( + agent_id, + format!( + "Retrying {model} after transient provider failure ({retry_count}/{DEFAULT_AGENT_PROVIDER_MAX_RETRIES}) in {:.1}s: {}", + delay.as_secs_f32(), + AgentManager::trim_to_tail_utf8(&error_preview, 500) + ), + ) + .await; + } + + tokio::time::sleep(TokioDuration::from_secs_f32(delay.as_secs_f32())).await; + } + Err(error) => { + if retry_count > 0 { + let mut manager = AGENT_MANAGER.write().await; + manager + .update_agent_retry_metadata( + agent_id, + retry_count as u32, + last_retryable_error.clone(), + ) + .await; + if classify_agent_provider_failure(&error) == AgentProviderFailureClass::Retryable + { + manager + .add_progress( + agent_id, + format!( + "Exhausted {retry_count} automatic provider retries for {model}." + ), + ) + .await; + } + } + return Err(error); + } + } + } +} + fn prefer_json_result(path: Option<&PathBuf>, fallback: Result) -> Result { if let Some(p) = path { let json = std::fs::read_to_string(p).ok(); @@ -2991,25 +3250,35 @@ where mod tests { use super::Agent; use super::AgentManager; + use super::AgentProviderFailureClass; + use super::AgentRetryMetadata; use super::AgentStatus; + use super::AGENT_PROVIDER_RETRY_MAX_DELAY; use super::MAX_AGENT_PROGRESS_ENTRIES; use super::MAX_AGENT_RESULT_BYTES; use super::MAX_TRACKED_TERMINAL_AGENTS; + use super::classify_agent_provider_failure; use super::normalize_agent_name; use super::maybe_set_gemini_config_dir; use super::execute_model_with_permissions; + use super::execute_agent_provider_with_retries; use super::resolve_program_path; use super::should_use_current_exe_for_agent; use super::prefer_json_result; use super::current_code_binary_path; + use super::agent_retry_delay; + use super::AGENT_MANAGER; use crate::config_types::AgentConfig; use code_protocol::config_types::ReasoningEffort; + use serial_test::serial; use std::collections::HashMap; use std::ffi::OsString; use tempfile::tempdir; use std::path::Path; use std::path::PathBuf; + use std::sync::atomic::Ordering; use std::sync::{Mutex, OnceLock}; + use std::time::Duration as StdDuration; use uuid::Uuid; #[cfg(unix)] @@ -3131,6 +3400,11 @@ mod tests { status, result: None, error: None, + retry: AgentRetryMetadata { + original_model: "code-gpt-5.5".to_string(), + final_model: "code-gpt-5.5".to_string(), + ..AgentRetryMetadata::default() + }, created_at: now, started_at: Some(now), completed_at: None, @@ -3385,6 +3659,11 @@ mod tests { status: AgentStatus::Completed, result: Some("ok".to_string()), error: None, + retry: AgentRetryMetadata { + original_model: "code-gpt-5.5".to_string(), + final_model: "code-gpt-5.5".to_string(), + ..AgentRetryMetadata::default() + }, created_at: now, started_at: Some(now), completed_at: Some(now), @@ -3937,6 +4216,11 @@ exit 0 status: AgentStatus::Completed, result: Some("result".repeat(1024)), error: None, + retry: AgentRetryMetadata { + original_model: "code-gpt-5.5".to_string(), + final_model: "code-gpt-5.5".to_string(), + ..AgentRetryMetadata::default() + }, created_at: now, started_at: Some(now), completed_at: Some(now + chrono::Duration::seconds(idx as i64)), @@ -3987,6 +4271,11 @@ exit 0 status: AgentStatus::Completed, result: Some("result".repeat(32 * 1024)), error: None, + retry: AgentRetryMetadata { + original_model: "code-gpt-5.5".to_string(), + final_model: "code-gpt-5.5".to_string(), + ..AgentRetryMetadata::default() + }, created_at: now, started_at: Some(now), completed_at: Some(now), @@ -4054,6 +4343,11 @@ exit 0 status: AgentStatus::Completed, result: Some("ok".to_string()), error: None, + retry: AgentRetryMetadata { + original_model: "code-gpt-5.5".to_string(), + final_model: "code-gpt-5.5".to_string(), + ..AgentRetryMetadata::default() + }, created_at: now, started_at: Some(now), completed_at: Some(now + chrono::Duration::seconds(idx as i64)), @@ -4087,6 +4381,130 @@ exit 0 && agent.branch_name.as_deref() == Some("code-branch-0") })); } + + #[test] + fn classifies_retryable_agent_provider_failures() { + for error in [ + "API Error: Overloaded", + "HTTP 429: too many requests", + "request timed out while waiting for provider", + "upstream service unavailable", + "stream disconnected: connection reset", + "broken pipe while reading response", + ] { + assert_eq!( + classify_agent_provider_failure(error), + AgentProviderFailureClass::Retryable, + "expected retryable: {error}" + ); + } + } + + #[test] + fn classifies_non_retryable_agent_provider_failures() { + for error in [ + "unauthorized: invalid token", + "permission denied by provider", + "Agent 'claude' could not be found.", + "invalid config: missing command", + "run cancelled by user", + "policy violation", + ] { + assert_eq!( + classify_agent_provider_failure(error), + AgentProviderFailureClass::NonRetryable, + "expected non-retryable: {error}" + ); + } + } + + #[test] + fn retry_delay_is_bounded_exponential_backoff() { + assert_eq!(agent_retry_delay(0), StdDuration::from_secs(2)); + assert_eq!(agent_retry_delay(1), StdDuration::from_secs(4)); + assert_eq!(agent_retry_delay(2), StdDuration::from_secs(8)); + assert_eq!(agent_retry_delay(8), AGENT_PROVIDER_RETRY_MAX_DELAY); + } + + #[tokio::test] + #[serial] + async fn agent_provider_retry_wrapper_recovers_from_transient_failure() { + let agent_id = "retry-success".to_string(); + let owner_session_id = Uuid::new_v4(); + let mut manager = AgentManager::new(); + manager.agents.insert( + agent_id.clone(), + test_agent( + &agent_id, + owner_session_id, + "batch-retry", + AgentStatus::Running, + ), + ); + + let old_manager = { + let mut global = AGENT_MANAGER.write().await; + std::mem::replace(&mut *global, manager) + }; + + let attempts = std::sync::Arc::new(std::sync::atomic::AtomicUsize::new(0)); + let attempts_for_run = attempts.clone(); + let result = execute_agent_provider_with_retries(&agent_id, "code-gpt-5.5", || { + let attempts_for_run = attempts_for_run.clone(); + async move { + let attempt = attempts_for_run.fetch_add(1, Ordering::SeqCst); + if attempt == 0 { + Err("API Error: Overloaded".to_string()) + } else { + Ok("ok".to_string()) + } + } + }) + .await; + + assert_eq!(result.as_deref(), Ok("ok")); + assert_eq!(attempts.load(Ordering::SeqCst), 2); + + let manager = { + let mut global = AGENT_MANAGER.write().await; + std::mem::replace(&mut *global, old_manager) + }; + let agent = manager + .agents + .get(&agent_id) + .expect("agent should remain tracked"); + assert_eq!(agent.retry.retry_count, 1); + assert_eq!( + agent.retry.last_retryable_error.as_deref(), + Some("API Error: Overloaded") + ); + assert!( + agent + .progress + .iter() + .any(|line| line.contains("Retrying code-gpt-5.5")) + ); + } + + #[tokio::test] + async fn agent_provider_retry_wrapper_does_not_retry_non_retryable_failure() { + let attempts = std::sync::Arc::new(std::sync::atomic::AtomicUsize::new(0)); + let attempts_for_run = attempts.clone(); + let result = execute_agent_provider_with_retries("missing-agent", "claude", || { + let attempts_for_run = attempts_for_run.clone(); + async move { + attempts_for_run.fetch_add(1, Ordering::SeqCst); + Err("Agent 'claude' could not be found.".to_string()) + } + }) + .await; + + assert_eq!( + result.as_ref().map(String::as_str).map_err(String::as_str), + Err("Agent 'claude' could not be found.") + ); + assert_eq!(attempts.load(Ordering::SeqCst), 1); + } } #[derive(Debug, Clone, Serialize, Deserialize)] diff --git a/code-rs/core/src/codex/streaming.rs b/code-rs/core/src/codex/streaming.rs index 016323013fa..e7325a2d615 100644 --- a/code-rs/core/src/codex/streaming.rs +++ b/code-rs/core/src/codex/streaming.rs @@ -8992,6 +8992,7 @@ async fn handle_check_agent_status( "name": agent.name, "status": agent.status, "model": agent.model, + "retry": agent.retry, "batch_id": agent.batch_id, "created_at": agent.created_at, "started_at": agent.started_at, @@ -9088,6 +9089,7 @@ async fn handle_get_agent_result( "agent_id": params.agent_id, "batch_id": params.batch_id.clone(), "status": agent.status, + "retry": agent.retry, "output_preview": preview, "output_total_lines": total_lines, "output_file": file_path, @@ -9110,6 +9112,7 @@ async fn handle_get_agent_result( "agent_id": params.agent_id, "batch_id": params.batch_id.clone(), "status": agent.status, + "retry": agent.retry, "error_preview": preview, "error_total_lines": total_lines, "error_file": file_path, @@ -9355,6 +9358,7 @@ async fn handle_wait_for_agent( "agent_id": agent.id, "batch_id": batch_id, "status": agent.status, + "retry": agent.retry, "wait_time_seconds": start.elapsed().as_secs(), "total_lines": total_lines, "agent_result_hint": hint, @@ -9448,6 +9452,7 @@ async fn handle_wait_for_agent( let mut obj = serde_json::json!({ "agent_id": a.id, "status": a.status, + "retry": a.retry, "total_lines": total_lines, "agent_result_hint": hint, "agent_result_params": { "action": "result", "result": { "agent_id": a.id, "batch_id": batch_id } }, @@ -9549,6 +9554,7 @@ async fn handle_wait_for_agent( let mut response = serde_json::json!({ "agent_id": unseen.id, "status": unseen.status, + "retry": unseen.retry, "wait_time_seconds": start.elapsed().as_secs(), "total_lines": total_lines, "agent_result_hint": hint, @@ -9752,6 +9758,7 @@ async fn handle_list_agents( "name": t.name.clone(), "model": t.model, "status": t.status, + "retry": t.retry, "created_at": t.created_at, "batch_id": t.batch_id, "worktree_path": t.worktree_path, diff --git a/docs/agents.md b/docs/agents.md index 14f8f701263..2955a801b8e 100644 --- a/docs/agents.md +++ b/docs/agents.md @@ -65,6 +65,13 @@ The orchestrator fans out agents, waits for results, and merges reasoning accord - `show_raw_agent_reasoning = true` surfaces raw chains-of-thought when provided by the model. - Notification filtering is controlled via `/notifications` or `config.toml` `notify` / `tui.notifications`. +## Automatic retries +- Agent provider failures that look transient, such as overloads, rate limits, + timeouts, temporary upstream errors, and transport resets, are retried + automatically with bounded backoff. +- Auth, configuration, missing-command, policy, and cancellation failures fail + fast. Retry attempts appear in agent progress and result metadata. + ## Headless `code exec` - `code exec --json` streams JSONL events (agent turns included). - `--output-schema ` enforces structured JSON output; combine with `--output-last-message` to capture only the final payload.