From 5ea2967ca0cda414d298e880dc0e487dc25d24e2 Mon Sep 17 00:00:00 2001 From: s-brez Date: Wed, 3 Jun 2026 17:52:54 +1000 Subject: [PATCH] Retry failed runner turns and preserve run output --- src/cli.rs | 28 ++++ src/config.rs | 44 ++++++ src/lib.rs | 12 ++ src/run/command.rs | 193 +++++++++++++++++++++++---- src/run/diagnostics.rs | 62 ++++++++- src/run/harnesses/codex.rs | 10 +- src/run/harnesses/mod.rs | 4 +- src/run/options.rs | 33 ++++- src/run/types.rs | 36 +++++ tests/history.rs | 22 +-- tests/render/save_out.rs | 4 +- tests/run/capture_output.rs | 22 +++ tests/run/failures.rs | 222 ++++++++++++++++++++++++++++++- tests/run/feedback/real_codex.rs | 34 +++++ tests/run/feedback/success.rs | 6 +- tests/run/runner_modes.rs | 134 ++++++++++++++++++- tests/store_lifecycle.rs | 14 +- 17 files changed, 819 insertions(+), 61 deletions(-) diff --git a/src/cli.rs b/src/cli.rs index 0c11fec..551e952 100644 --- a/src/cli.rs +++ b/src/cli.rs @@ -135,6 +135,34 @@ pub enum Command { #[arg(long, value_name = "N", help = "Run N iterations")] iterations: Option, + #[arg( + long, + value_name = "N", + conflicts_with = "no_retry", + help = "Retry each failed runner turn N times" + )] + retries: Option, + + #[arg( + long, + conflicts_with = "retries", + help = "Do not retry failed runner turns" + )] + no_retry: bool, + + #[arg( + long, + value_name = "MS", + help = "Delay MS milliseconds between runner retries" + )] + retry_delay_ms: Option, + + #[arg( + long, + help = "Inherit runner stdout/stderr without preserving bounded copies" + )] + no_preserve_output: bool, + #[arg( long, value_enum, diff --git a/src/config.rs b/src/config.rs index 0b3987a..3054828 100644 --- a/src/config.rs +++ b/src/config.rs @@ -8,6 +8,9 @@ use crate::error::AppError; use crate::store; pub(crate) const CONFIG_FILE: &str = "config.toml"; +pub(crate) const DEFAULT_RUN_RETRIES: usize = 2; +pub(crate) const DEFAULT_RUN_RETRY_DELAY_MS: u64 = 3000; +pub(crate) const DEFAULT_RUN_PRESERVE_OUTPUT: bool = true; #[derive(Debug, Clone, Serialize, Deserialize)] #[serde(deny_unknown_fields)] @@ -15,10 +18,39 @@ pub(crate) struct ConfigFile { pub version: i64, #[serde(skip_serializing_if = "Option::is_none")] pub default_runner: Option, + #[serde(default, skip_serializing_if = "RunConfig::is_default")] + pub run: RunConfig, #[serde(default, skip_serializing_if = "BTreeMap::is_empty")] pub runners: BTreeMap, } +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] +#[serde(deny_unknown_fields)] +pub(crate) struct RunConfig { + #[serde(default = "default_run_retries")] + pub retries: usize, + #[serde(default = "default_run_retry_delay_ms")] + pub retry_delay_ms: u64, + #[serde(default = "default_run_preserve_output")] + pub preserve_output: bool, +} + +impl Default for RunConfig { + fn default() -> Self { + Self { + retries: DEFAULT_RUN_RETRIES, + retry_delay_ms: DEFAULT_RUN_RETRY_DELAY_MS, + preserve_output: DEFAULT_RUN_PRESERVE_OUTPUT, + } + } +} + +impl RunConfig { + fn is_default(&self) -> bool { + self == &Self::default() + } +} + #[derive(Debug, Clone, Serialize, Deserialize)] #[serde(deny_unknown_fields)] pub(crate) struct RunnerConfig { @@ -127,6 +159,18 @@ fn validate_config(config: &ConfigFile) -> Result<(), AppError> { Ok(()) } +fn default_run_retries() -> usize { + DEFAULT_RUN_RETRIES +} + +fn default_run_retry_delay_ms() -> u64 { + DEFAULT_RUN_RETRY_DELAY_MS +} + +fn default_run_preserve_output() -> bool { + DEFAULT_RUN_PRESERVE_OUTPUT +} + pub(crate) fn validate_argv(argv: &[String], label: &str) -> Result<(), AppError> { if argv .first() diff --git a/src/lib.rs b/src/lib.rs index 407d9ab..b376890 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -123,6 +123,10 @@ pub fn run(cli: Cli) -> Result { variables_file, max_captured_output, iterations, + retries, + no_retry, + retry_delay_ms, + no_preserve_output, session_scope, feedback_from, feedback_var, @@ -141,6 +145,14 @@ pub fn run(cli: Cli) -> Result { max_captured_output: max_captured_output .unwrap_or(run::DEFAULT_MAX_CAPTURED_OUTPUT), iterations: iterations.unwrap_or(1), + retries, + no_retry, + retry_delay_ms, + preserve_output: if no_preserve_output { + Some(false) + } else { + None + }, session_scope: session_scope .map(session_scope_arg) .unwrap_or(run::SessionScope::Run), diff --git a/src/run/command.rs b/src/run/command.rs index 16265d0..48fbaf7 100644 --- a/src/run/command.rs +++ b/src/run/command.rs @@ -1,17 +1,19 @@ use std::path::Path; +use std::time::Duration; use crate::error::AppError; use crate::render; use crate::store; use super::diagnostics::{ - should_write_diagnostics, write_runner_failure_diagnostic, write_turn_diagnostic, + should_write_diagnostics, write_runner_failure_diagnostic, write_runner_retry_diagnostic, + write_turn_diagnostic, }; use super::harnesses::{RunnerHarnessSession, prepare_runner_command}; -use super::model::ProcessTurnOutput; +use super::model::{OutputMode, ProcessTermination, ProcessTurnOutput}; use super::options::{ - feedback_variable, load_base_variables, load_feedback_seed, output_mode, resolve_runner, - validate_options, + RunSettings, feedback_variable, load_base_variables, load_feedback_seed, output_mode, + resolve_run_settings, resolve_runner, validate_options, }; use super::types::*; @@ -24,6 +26,7 @@ pub fn run_sequence( validate_options(&options)?; let feedback_variable = feedback_variable(&options)?; + let run_settings = resolve_run_settings(store_path, &options)?; let runner = resolve_runner(store_path, &options)?; let base_variables = load_base_variables(&options, feedback_variable.as_deref())?; let mut previous_feedback = load_feedback_seed(&options)?; @@ -48,7 +51,10 @@ pub fn run_sequence( }; let turns_per_iteration = first_sequence.turns.len(); let total_turns = turns_per_iteration * options.iterations; - let output_mode = output_mode(&options); + let output_mode = output_mode(&options, run_settings); + let include_output = + options.capture_output || run_settings.preserve_output || options.feedback_from.is_some(); + let write_diagnostics = should_write_diagnostics(options.capture_output, options.quiet); let mut run_session = RunnerHarnessSession::new(); let mut turns = Vec::new(); @@ -78,7 +84,7 @@ pub fn run_sequence( SessionScope::Iteration => turn.index < sequence.turns.len(), }; let command = prepare_runner_command(runner.command_for_turn(scoped_turn_index))?; - if should_write_diagnostics(options.capture_output, options.quiet) { + if write_diagnostics { write_turn_diagnostic( iteration, options.iterations, @@ -93,13 +99,26 @@ pub fn run_sequence( SessionScope::Run => &mut run_session, SessionScope::Iteration => &mut iteration_session, }; - let (command, process) = runner_session.run_turn( - &command, - &turn.text, - output_mode, - options.max_captured_output, - has_later_turn, + let attempts = run_turn_attempts( + runner_session, + RunAttemptRequest { + command: &command, + prompt: &turn.text, + output_mode, + max_captured_output: options.max_captured_output, + has_later_turn, + settings: run_settings, + diagnostics: RetryDiagnosticContext { + enabled: write_diagnostics, + iteration, + turn: turn.index, + }, + }, )?; + let final_attempt = attempts + .last() + .expect("run_turn_attempts always returns one attempt"); + let process = &final_attempt.process; let is_feedback_turn = options.feedback_from.is_some() && turn.index == sequence.turns.len(); let feedback_stdout = if is_feedback_turn { @@ -113,16 +132,17 @@ pub fn run_sequence( options.iterations, iteration, turn, - command, - &process, - options.capture_output, + &final_attempt.command, + process, + include_output, + &attempts, )); if !process.success { failed_iteration = (options.iterations > 1).then_some(iteration); failed_turn = Some(turn.index); - if should_write_diagnostics(options.capture_output, options.quiet) { - write_runner_failure_diagnostic(iteration, turn.index, &process); + if write_diagnostics { + write_runner_failure_diagnostic(iteration, turn.index, process, attempts.len()); } break 'iterations; } @@ -169,30 +189,151 @@ pub fn run_sequence( )) } +#[derive(Debug)] +struct RunAttemptRecord { + attempt: usize, + command: Vec, + process: ProcessTurnOutput, + retryable: bool, +} + +#[derive(Debug, Clone, Copy)] +struct RetryDiagnosticContext { + enabled: bool, + iteration: usize, + turn: usize, +} + +struct RunAttemptRequest<'a> { + command: &'a [String], + prompt: &'a str, + output_mode: OutputMode, + max_captured_output: usize, + has_later_turn: bool, + settings: RunSettings, + diagnostics: RetryDiagnosticContext, +} + +fn run_turn_attempts( + runner_session: &mut RunnerHarnessSession, + request: RunAttemptRequest<'_>, +) -> Result, AppError> { + let settings = request.settings; + let max_attempts = settings.retries.saturating_add(1); + let mut attempts = Vec::new(); + + for attempt in 1..=max_attempts { + let (attempt_command, process) = runner_session.run_turn( + request.command, + request.prompt, + request.output_mode, + request.max_captured_output, + request.has_later_turn, + )?; + let retryable = is_retryable_runner_failure(&process); + let success = process.success; + attempts.push(RunAttemptRecord { + attempt, + command: attempt_command, + process, + retryable, + }); + + if success || attempt == max_attempts || !retryable { + break; + } + + if request.diagnostics.enabled { + write_runner_retry_diagnostic( + request.diagnostics.iteration, + request.diagnostics.turn, + attempt, + max_attempts, + settings.retry_delay_ms, + &attempts.last().expect("attempt was just pushed").process, + ); + } + if settings.retry_delay_ms > 0 { + std::thread::sleep(Duration::from_millis(settings.retry_delay_ms)); + } + } + + Ok(attempts) +} + +fn is_retryable_runner_failure(process: &ProcessTurnOutput) -> bool { + !process.success + && matches!( + process.termination, + ProcessTermination::Exit | ProcessTermination::Unknown + ) +} + fn run_turn_output( iterations: usize, iteration: usize, turn: &render::RenderedTurn, - command: Vec, + command: &[String], process: &ProcessTurnOutput, - capture_output: bool, + include_output: bool, + attempts: &[RunAttemptRecord], ) -> RunTurnOutput { RunTurnOutput { iteration: (iterations > 1).then_some(iteration), index: turn.index, fragment: turn.fragment.clone(), - command, + command: command.to_vec(), pid: process.pid, termination: process.termination.as_str().to_owned(), exit_code: process.exit_code, signal: process.signal, signal_name: process.signal_name.map(str::to_owned), core_dumped: process.core_dumped, - stdout: capture_output.then_some(process.stdout.clone()).flatten(), - stderr: capture_output.then_some(process.stderr.clone()).flatten(), - stdout_bytes: capture_output.then_some(process.stdout_bytes).flatten(), - stderr_bytes: capture_output.then_some(process.stderr_bytes).flatten(), - stdout_truncated: capture_output.then_some(process.stdout_truncated).flatten(), - stderr_truncated: capture_output.then_some(process.stderr_truncated).flatten(), + attempt_count: (attempts.len() > 1).then_some(attempts.len()), + attempts: (attempts.len() > 1).then(|| run_attempt_outputs(attempts, include_output)), + stdout: include_output.then_some(process.stdout.clone()).flatten(), + stderr: include_output.then_some(process.stderr.clone()).flatten(), + stdout_bytes: include_output.then_some(process.stdout_bytes).flatten(), + stderr_bytes: include_output.then_some(process.stderr_bytes).flatten(), + stdout_truncated: include_output.then_some(process.stdout_truncated).flatten(), + stderr_truncated: include_output.then_some(process.stderr_truncated).flatten(), } } + +fn run_attempt_outputs( + attempts: &[RunAttemptRecord], + include_output: bool, +) -> Vec { + attempts + .iter() + .map(|attempt| RunAttemptOutput { + attempt: attempt.attempt, + command: attempt.command.clone(), + pid: attempt.process.pid, + termination: attempt.process.termination.as_str().to_owned(), + exit_code: attempt.process.exit_code, + signal: attempt.process.signal, + signal_name: attempt.process.signal_name.map(str::to_owned), + core_dumped: attempt.process.core_dumped, + retryable: attempt.retryable, + stdout: include_output + .then_some(attempt.process.stdout.clone()) + .flatten(), + stderr: include_output + .then_some(attempt.process.stderr.clone()) + .flatten(), + stdout_bytes: include_output + .then_some(attempt.process.stdout_bytes) + .flatten(), + stderr_bytes: include_output + .then_some(attempt.process.stderr_bytes) + .flatten(), + stdout_truncated: include_output + .then_some(attempt.process.stdout_truncated) + .flatten(), + stderr_truncated: include_output + .then_some(attempt.process.stderr_truncated) + .flatten(), + }) + .collect() +} diff --git a/src/run/diagnostics.rs b/src/run/diagnostics.rs index a8e361b..8b3011d 100644 --- a/src/run/diagnostics.rs +++ b/src/run/diagnostics.rs @@ -13,10 +13,10 @@ pub(super) fn write_turn_diagnostic( fragment: &str, ) { if iterations == 1 { - eprintln!("pseq: running turn {turn}/{turns} with {runner}: {fragment}"); + eprintln!("\npseq: running turn {turn}/{turns} with {runner}: {fragment}"); } else { eprintln!( - "pseq: running iteration {iteration}/{iterations} turn {turn}/{turns} with {runner}: {fragment}" + "\npseq: running iteration {iteration}/{iterations} turn {turn}/{turns} with {runner}: {fragment}" ); } } @@ -25,11 +25,13 @@ pub(super) fn write_runner_failure_diagnostic( iteration: usize, turn: usize, process: &ProcessTurnOutput, + attempts: usize, ) { + let attempt_context = attempt_context(attempts); match process.termination { ProcessTermination::Exit => { eprintln!( - "pseq: runner exited unsuccessfully at iteration {iteration} turn {turn} with exit code {} (pid {})", + "\npseq: runner exited unsuccessfully{attempt_context} at iteration {iteration} turn {turn} with exit code {} (pid {})", process.exit_code, process.pid ); } @@ -43,15 +45,65 @@ pub(super) fn write_runner_failure_diagnostic( .map(|name| format!("{name} ({signal})")) .unwrap_or(signal); eprintln!( - "pseq: runner terminated by signal {signal_label} at iteration {iteration} turn {turn} (pid {}, exit code {})", + "\npseq: runner terminated by signal {signal_label}{attempt_context} at iteration {iteration} turn {turn} (pid {}, exit code {})", process.pid, process.exit_code ); } ProcessTermination::Unknown => { eprintln!( - "pseq: runner ended without an exit code or signal at iteration {iteration} turn {turn} (pid {}, exit code {})", + "\npseq: runner ended without an exit code or signal{attempt_context} at iteration {iteration} turn {turn} (pid {}, exit code {})", process.pid, process.exit_code ); } } } + +fn attempt_context(attempts: usize) -> String { + match attempts { + 0 | 1 => String::new(), + 2 => " after 2 attempts".to_owned(), + attempts => format!(" after {attempts} attempts"), + } +} + +pub(super) fn write_runner_retry_diagnostic( + iteration: usize, + turn: usize, + attempt: usize, + attempts: usize, + retry_delay_ms: u64, + process: &ProcessTurnOutput, +) { + eprintln!( + "\npseq: runner attempt {attempt}/{attempts} failed at iteration {iteration} turn {turn} with {}; retrying in {retry_delay_ms}ms", + process_status(process) + ); +} + +fn process_status(process: &ProcessTurnOutput) -> String { + match process.termination { + ProcessTermination::Exit => { + format!("exit code {} (pid {})", process.exit_code, process.pid) + } + ProcessTermination::Signal => { + let signal = process + .signal + .map(|signal| signal.to_string()) + .unwrap_or_else(|| "unknown".to_owned()); + let signal_label = process + .signal_name + .map(|name| format!("{name} ({signal})")) + .unwrap_or(signal); + format!( + "signal {signal_label} (pid {}, exit code {})", + process.pid, process.exit_code + ) + } + ProcessTermination::Unknown => { + format!( + "unknown process status (pid {}, exit code {})", + process.pid, process.exit_code + ) + } + } +} diff --git a/src/run/harnesses/codex.rs b/src/run/harnesses/codex.rs index ce1beef..e02165b 100644 --- a/src/run/harnesses/codex.rs +++ b/src/run/harnesses/codex.rs @@ -168,14 +168,20 @@ fn finalize_codex_turn_output( }; let _ = fs::remove_file(output_path); + let stdout_text = if process.success || !final_message.is_empty() { + final_message + } else { + process.stdout.clone().unwrap_or_default() + }; + if output_mode != OutputMode::Capture { - write_to_stdout(&final_message)?; + write_to_stdout(&stdout_text)?; if let Some(stderr) = &process.stderr { write_to_stderr(stderr)?; } } - let stdout_capture = capture_for_mode(&final_message, output_mode, max_captured_output); + let stdout_capture = capture_for_mode(&stdout_text, output_mode, max_captured_output); let stderr_capture = process .stderr .as_deref() diff --git a/src/run/harnesses/mod.rs b/src/run/harnesses/mod.rs index 82a2e43..02f5f7d 100644 --- a/src/run/harnesses/mod.rs +++ b/src/run/harnesses/mod.rs @@ -80,7 +80,9 @@ impl RunnerHarnessSession { RunnerHarness::Generic => generic::run_turn(&request), }?; - if let Some(next_harness) = outcome.next_harness { + if outcome.process.success + && let Some(next_harness) = outcome.next_harness + { self.active = next_harness; } diff --git a/src/run/options.rs b/src/run/options.rs index 785f5a0..d541575 100644 --- a/src/run/options.rs +++ b/src/run/options.rs @@ -2,6 +2,7 @@ use std::collections::BTreeMap; use std::io::{self, Read}; use std::path::Path; +use crate::config; use crate::error::AppError; use crate::render; use crate::runner::{self, ResolvedRunner}; @@ -31,10 +32,38 @@ pub(super) fn validate_options(options: &RunOptions<'_>) -> Result<(), AppError> message: "feedback seed can only be set with --feedback-from".to_owned(), }); } + if options.no_retry && options.retries.is_some() { + return Err(AppError::InvalidRunInvocation { + message: "--no-retry cannot be used with --retries".to_owned(), + }); + } Ok(()) } +#[derive(Debug, Clone, Copy)] +pub(super) struct RunSettings { + pub(super) retries: usize, + pub(super) retry_delay_ms: u64, + pub(super) preserve_output: bool, +} + +pub(super) fn resolve_run_settings( + store_path: &Path, + options: &RunOptions<'_>, +) -> Result { + let config = config::read_config(store_path)?.run; + Ok(RunSettings { + retries: if options.no_retry { + 0 + } else { + options.retries.unwrap_or(config.retries) + }, + retry_delay_ms: options.retry_delay_ms.unwrap_or(config.retry_delay_ms), + preserve_output: options.preserve_output.unwrap_or(config.preserve_output), + }) +} + pub(super) fn feedback_variable(options: &RunOptions<'_>) -> Result, AppError> { match options.feedback_from { Some(_) => { @@ -101,10 +130,10 @@ pub(super) fn load_feedback_seed(options: &RunOptions<'_>) -> Result) -> OutputMode { +pub(super) fn output_mode(options: &RunOptions<'_>, settings: RunSettings) -> OutputMode { if options.capture_output { OutputMode::Capture - } else if options.feedback_from.is_some() { + } else if options.feedback_from.is_some() || settings.preserve_output { OutputMode::Tee } else { OutputMode::Inherit diff --git a/src/run/types.rs b/src/run/types.rs index 59a7727..9eae2dc 100644 --- a/src/run/types.rs +++ b/src/run/types.rs @@ -24,6 +24,10 @@ pub struct RunOptions<'a> { pub capture_output: bool, pub max_captured_output: usize, pub iterations: usize, + pub retries: Option, + pub no_retry: bool, + pub retry_delay_ms: Option, + pub preserve_output: Option, pub session_scope: SessionScope, pub feedback_from: Option, pub feedback_var: Option<&'a str>, @@ -78,6 +82,38 @@ pub struct RunTurnOutput { #[serde(skip_serializing_if = "Option::is_none")] pub core_dumped: Option, #[serde(skip_serializing_if = "Option::is_none")] + pub attempt_count: Option, + #[serde(skip_serializing_if = "Option::is_none")] + pub attempts: Option>, + #[serde(skip_serializing_if = "Option::is_none")] + pub stdout: Option, + #[serde(skip_serializing_if = "Option::is_none")] + pub stderr: Option, + #[serde(skip_serializing_if = "Option::is_none")] + pub stdout_bytes: Option, + #[serde(skip_serializing_if = "Option::is_none")] + pub stderr_bytes: Option, + #[serde(skip_serializing_if = "Option::is_none")] + pub stdout_truncated: Option, + #[serde(skip_serializing_if = "Option::is_none")] + pub stderr_truncated: Option, +} + +#[derive(Debug, Serialize)] +pub struct RunAttemptOutput { + pub attempt: usize, + pub command: Vec, + pub pid: u32, + pub termination: String, + pub exit_code: i32, + #[serde(skip_serializing_if = "Option::is_none")] + pub signal: Option, + #[serde(skip_serializing_if = "Option::is_none")] + pub signal_name: Option, + #[serde(skip_serializing_if = "Option::is_none")] + pub core_dumped: Option, + pub retryable: bool, + #[serde(skip_serializing_if = "Option::is_none")] pub stdout: Option, #[serde(skip_serializing_if = "Option::is_none")] pub stderr: Option, diff --git a/tests/history.rs b/tests/history.rs index ce4850c..c96adda 100644 --- a/tests/history.rs +++ b/tests/history.rs @@ -9,7 +9,7 @@ use common::{ }; #[test] -fn log_exposes_git_backed_history_as_human_and_json() { +fn log_exposes_git_backed_history_as_non_json_and_json() { let store = TestStore::initialized("history-log"); assert_success(&pseq_with_stdin( &[ @@ -38,10 +38,10 @@ fn log_exposes_git_backed_history_as_human_and_json() { .any(|entry| entry["summary"] == "Initialize pseq store") ); - let human_output = pseq(&["log", "--store", path_str(store.path())]); - assert_success(&human_output); - assert_stdout_contains(&human_output, "Add fragment History"); - assert_stdout_contains(&human_output, "Initialize pseq store"); + let non_json_output = pseq(&["log", "--store", path_str(store.path())]); + assert_success(&non_json_output); + assert_stdout_contains(&non_json_output, "Add fragment History"); + assert_stdout_contains(&non_json_output, "Initialize pseq store"); assert_git_clean(store.path()); } @@ -106,12 +106,12 @@ fn diff_exposes_tracked_and_untracked_store_differences_without_mutating() { .any(|path| { path["status"] == "??" && path["path"] == "captures/manual.json" }) ); - let human_output = pseq(&["diff", "--store", path_str(store.path())]); - assert_success(&human_output); - assert_stdout_contains(&human_output, "fragments/tracked.md"); - assert_stdout_contains(&human_output, "-old body"); - assert_stdout_contains(&human_output, "+new body"); - assert_stdout_contains(&human_output, "?? captures/manual.json"); + let non_json_output = pseq(&["diff", "--store", path_str(store.path())]); + assert_success(&non_json_output); + assert_stdout_contains(&non_json_output, "fragments/tracked.md"); + assert_stdout_contains(&non_json_output, "-old body"); + assert_stdout_contains(&non_json_output, "+new body"); + assert_stdout_contains(&non_json_output, "?? captures/manual.json"); assert_eq!(git_status(store.path()), expected_status); } diff --git a/tests/render/save_out.rs b/tests/render/save_out.rs index 7a391a6..73fc064 100644 --- a/tests/render/save_out.rs +++ b/tests/render/save_out.rs @@ -51,8 +51,8 @@ fn render_out_annotate_and_save_write_requested_records() { } #[test] -fn render_out_suppresses_human_stdout() { - let store = TestStore::initialized("render-out-human"); +fn render_out_suppresses_non_json_stdout() { + let store = TestStore::initialized("render-out-non-json"); create_single_fragment_sequence(&store, "Combo", "Only", "body\n"); let out_path = store.path().with_extension("out.md"); diff --git a/tests/run/capture_output.rs b/tests/run/capture_output.rs index efb9af9..05e66d4 100644 --- a/tests/run/capture_output.rs +++ b/tests/run/capture_output.rs @@ -71,3 +71,25 @@ fn run_json_bounds_captured_runner_output_and_reports_truncation() { assert_eq!(turn["stderr_bytes"], 0); assert_eq!(turn["stderr_truncated"], false); } + +#[test] +fn run_no_preserve_output_inherits_runner_streams_in_non_json_mode() { + let store = TestStore::initialized("run-no-preserve-output"); + create_sequence_with_fragments(&store, "Workflow", &[("Only", "body\n")]); + + let output = pseq(&[ + "--store", + path_str(store.path()), + "--quiet", + "run", + "Workflow", + "--no-preserve-output", + "--", + "sh", + "-c", + "printf 'runner stdout\\n'; printf 'runner stderr\\n' >&2", + ]); + assert_success(&output); + assert_eq!(String::from_utf8_lossy(&output.stdout), "runner stdout\n"); + assert_eq!(String::from_utf8_lossy(&output.stderr), "runner stderr\n"); +} diff --git a/tests/run/failures.rs b/tests/run/failures.rs index ddc15ad..8082017 100644 --- a/tests/run/failures.rs +++ b/tests/run/failures.rs @@ -31,7 +31,7 @@ fn run_rejects_mixed_named_runner_and_ad_hoc_command_before_execution() { } #[test] -fn run_stops_after_first_unsuccessful_runner_exit() { +fn run_stops_after_unsuccessful_runner_turn_exhausts_retries() { let store = TestStore::initialized("run-failure"); let missing = TestStore::new("run-missing-store"); create_sequence_with_fragments(&store, "Workflow", &[("First", "A\n"), ("Second", "B\n")]); @@ -62,6 +62,186 @@ fn run_stops_after_first_unsuccessful_runner_exit() { assert_eq!(json["turns"][0]["exit_code"], 1); } +#[test] +fn run_retries_failed_runner_turn_by_default() { + let store = TestStore::initialized("run-default-retry"); + let scratch = TestStore::new("run-default-retry-scratch"); + fs::create_dir_all(scratch.path()).unwrap(); + let counter = scratch.path().join("attempts"); + create_sequence_with_fragments(&store, "Workflow", &[("Only", "body\n")]); + + let output = pseq(&[ + "--store", + path_str(store.path()), + "--json", + "run", + "Workflow", + "--retry-delay-ms", + "0", + "--", + "sh", + "-c", + r#"counter=$1 +count=$(cat "$counter" 2>/dev/null || printf 0) +count=$((count + 1)) +printf '%s\n' "$count" > "$counter" +printf 'stdout attempt %s\n' "$count" +printf 'stderr attempt %s\n' "$count" >&2 +if [ "$count" -lt 3 ]; then + exit 17 +fi +"#, + "sh", + path_str(&counter), + ]); + assert_success(&output); + assert!(output.stderr.is_empty()); + assert_eq!(fs::read_to_string(&counter).unwrap().trim(), "3"); + + let json = stdout_json(&output); + let turn = &json["turns"][0]; + assert_eq!(json["success"], true); + assert_eq!(json["completed_turns"], 1); + assert_eq!(turn["exit_code"], 0); + assert_eq!(turn["attempt_count"], 3); + assert_eq!(turn["attempts"].as_array().unwrap().len(), 3); + assert_eq!(turn["attempts"][0]["exit_code"], 17); + assert_eq!(turn["attempts"][0]["retryable"], true); + assert_eq!(turn["attempts"][0]["stdout"], "stdout attempt 1\n"); + assert_eq!(turn["attempts"][0]["stderr"], "stderr attempt 1\n"); + assert_eq!(turn["attempts"][2]["exit_code"], 0); + assert_eq!(turn["attempts"][2]["retryable"], false); + assert_eq!(turn["stdout"], "stdout attempt 3\n"); + assert_eq!(turn["stderr"], "stderr attempt 3\n"); +} + +#[test] +fn run_no_retry_preserves_one_shot_failure_behavior() { + let store = TestStore::initialized("run-no-retry"); + let scratch = TestStore::new("run-no-retry-scratch"); + fs::create_dir_all(scratch.path()).unwrap(); + let counter = scratch.path().join("attempts"); + create_sequence_with_fragments(&store, "Workflow", &[("Only", "body\n")]); + + let output = pseq(&[ + "--store", + path_str(store.path()), + "--json", + "run", + "Workflow", + "--no-retry", + "--", + "sh", + "-c", + r#"counter=$1 +count=$(cat "$counter" 2>/dev/null || printf 0) +count=$((count + 1)) +printf '%s\n' "$count" > "$counter" +printf 'failed once\n' +exit 9 +"#, + "sh", + path_str(&counter), + ]); + assert_eq!(output.status.code(), Some(1)); + assert!(output.stderr.is_empty()); + assert_eq!(fs::read_to_string(&counter).unwrap().trim(), "1"); + + let json = stdout_json(&output); + let turn = &json["turns"][0]; + assert_eq!(json["success"], false); + assert_eq!(json["completed_turns"], 0); + assert_eq!(turn["exit_code"], 9); + assert!(turn.get("attempt_count").is_none()); + assert!(turn.get("attempts").is_none()); + assert_eq!(turn["stdout"], "failed once\n"); +} + +#[test] +fn run_uses_configured_retry_defaults() { + let store = TestStore::initialized("run-config-retry"); + let scratch = TestStore::new("run-config-retry-scratch"); + fs::create_dir_all(scratch.path()).unwrap(); + let counter = scratch.path().join("attempts"); + create_sequence_with_fragments(&store, "Workflow", &[("Only", "body\n")]); + fs::write( + store.path().join("config.toml"), + "version = 1\n\n[run]\nretries = 1\nretry_delay_ms = 0\n", + ) + .unwrap(); + + let output = pseq(&[ + "--store", + path_str(store.path()), + "--json", + "run", + "Workflow", + "--", + "sh", + "-c", + r#"counter=$1 +count=$(cat "$counter" 2>/dev/null || printf 0) +count=$((count + 1)) +printf '%s\n' "$count" > "$counter" +exit 12 +"#, + "sh", + path_str(&counter), + ]); + assert_eq!(output.status.code(), Some(1)); + assert_eq!(fs::read_to_string(&counter).unwrap().trim(), "2"); + + let json = stdout_json(&output); + let turn = &json["turns"][0]; + assert_eq!(turn["attempt_count"], 2); + assert_eq!(turn["attempts"].as_array().unwrap().len(), 2); + assert_eq!(turn["attempts"][0]["retryable"], true); + assert_eq!(turn["attempts"][1]["retryable"], true); +} + +#[test] +fn run_retry_diagnostics_are_factual_and_concise() { + let store = TestStore::initialized("run-retry-diagnostic"); + let scratch = TestStore::new("run-retry-diagnostic-scratch"); + fs::create_dir_all(scratch.path()).unwrap(); + let counter = scratch.path().join("attempts"); + create_sequence_with_fragments(&store, "Workflow", &[("Only", "body\n")]); + + let output = pseq(&[ + "--store", + path_str(store.path()), + "run", + "Workflow", + "--retries", + "1", + "--retry-delay-ms", + "0", + "--", + "sh", + "-c", + r#"counter=$1 +count=$(cat "$counter" 2>/dev/null || printf 0) +count=$((count + 1)) +printf '%s\n' "$count" > "$counter" +if [ "$count" -eq 1 ]; then + exit 6 +fi +"#, + "sh", + path_str(&counter), + ]); + assert_success(&output); + let stderr = String::from_utf8_lossy(&output.stderr); + assert!( + stderr.contains("\npseq: runner attempt 1/2 failed at iteration 1 turn 1 with exit code 6"), + "stderr should include factual retry status, got {stderr:?}" + ); + assert!( + stderr.contains("retrying in 0ms"), + "stderr should include retry delay, got {stderr:?}" + ); +} + #[test] fn run_failure_uses_pseq_failure_exit_code_and_reports_runner_exit_code() { let store = TestStore::initialized("run-failure-exit-code"); @@ -89,6 +269,46 @@ fn run_failure_uses_pseq_failure_exit_code_and_reports_runner_exit_code() { assert_eq!(json["turns"][0]["exit_code"], 2); } +#[cfg(unix)] +#[test] +fn run_does_not_retry_signal_terminated_runner_process() { + let store = TestStore::initialized("run-signal-no-retry"); + let scratch = TestStore::new("run-signal-no-retry-scratch"); + fs::create_dir_all(scratch.path()).unwrap(); + let counter = scratch.path().join("attempts"); + create_sequence_with_fragments(&store, "Workflow", &[("Only", "body\n")]); + + let output = pseq(&[ + "--store", + path_str(store.path()), + "--json", + "run", + "Workflow", + "--retry-delay-ms", + "0", + "--", + "sh", + "-c", + r#"counter=$1 +count=$(cat "$counter" 2>/dev/null || printf 0) +count=$((count + 1)) +printf '%s\n' "$count" > "$counter" +kill -TERM $$ +"#, + "sh", + path_str(&counter), + ]); + assert_eq!(output.status.code(), Some(1)); + assert_eq!(fs::read_to_string(&counter).unwrap().trim(), "1"); + + let json = stdout_json(&output); + let turn = &json["turns"][0]; + assert_eq!(turn["termination"], "signal"); + assert_eq!(turn["signal"], 15); + assert!(turn.get("attempt_count").is_none()); + assert!(turn.get("attempts").is_none()); +} + #[cfg(unix)] #[test] fn run_reports_signal_terminated_runner_process() { diff --git a/tests/run/feedback/real_codex.rs b/tests/run/feedback/real_codex.rs index 54c801f..5bdbe0b 100644 --- a/tests/run/feedback/real_codex.rs +++ b/tests/run/feedback/real_codex.rs @@ -1,5 +1,39 @@ use super::super::*; +#[test] +#[ignore = "requires the real Codex CLI binary"] +fn run_retries_real_codex_cli_failure_and_preserves_stderr() { + let store = TestStore::initialized("run-codex-retry-failure"); + create_sequence_with_fragments(&store, "Workflow", &[("Only", "body\n")]); + + let output = pseq(&[ + "--store", + path_str(store.path()), + "--json", + "run", + "Workflow", + "--retry-delay-ms", + "0", + "--", + "codex", + "exec", + "--pseq-invalid-option", + "-", + ]); + assert_eq!(output.status.code(), Some(1)); + + let json = stdout_json(&output); + let turn = &json["turns"][0]; + assert_eq!(turn["attempt_count"], 3); + assert_eq!(turn["attempts"].as_array().unwrap().len(), 3); + assert!( + turn["attempts"][0]["stderr"] + .as_str() + .is_some_and(|stderr| !stderr.trim().is_empty()), + "real Codex parse failure should preserve stderr" + ); +} + #[test] #[ignore = "requires Codex CLI auth and spends model tokens"] fn run_feedback_loop_with_real_codex_cli() { diff --git a/tests/run/feedback/success.rs b/tests/run/feedback/success.rs index c9c7802..d96249e 100644 --- a/tests/run/feedback/success.rs +++ b/tests/run/feedback/success.rs @@ -236,8 +236,8 @@ fn run_feedback_seed_can_read_from_file() { } #[test] -fn run_feedback_human_mode_tees_output_while_retaining_feedback() { - let store = TestStore::initialized("run-feedback-human-tee"); +fn run_feedback_non_json_mode_tees_output_while_retaining_feedback() { + let store = TestStore::initialized("run-feedback-non-json-tee"); create_sequence_with_fragments( &store, "Workflow", @@ -268,7 +268,7 @@ fn run_feedback_human_mode_tees_output_while_retaining_feedback() { "EMPTY\nUPDATE\nSAW-FEEDBACK\nUPDATE\n" ); assert!( - String::from_utf8_lossy(&output.stderr).contains("pseq: running iteration 2/2 turn 1/2") + String::from_utf8_lossy(&output.stderr).contains("\npseq: running iteration 2/2 turn 1/2") ); assert_git_clean(store.path()); } diff --git a/tests/run/runner_modes.rs b/tests/run/runner_modes.rs index 9c4e48f..ecadab9 100644 --- a/tests/run/runner_modes.rs +++ b/tests/run/runner_modes.rs @@ -23,7 +23,7 @@ fn run_with_ad_hoc_command_feeds_each_fragment_as_one_turn() { assert_success(&output); assert_eq!(git_head(store.path()), before_head); assert!(String::from_utf8_lossy(&output.stdout).contains("created capture:")); - assert!(String::from_utf8_lossy(&output.stderr).contains("pseq: running turn 1/2")); + assert!(String::from_utf8_lossy(&output.stderr).contains("\npseq: running turn 1/2")); let texts = capture_texts(&sink); assert_eq!(texts.len(), 2); @@ -317,6 +317,138 @@ fi assert!(log.contains("resume session=fake-session-123 input=second prompt")); } +#[cfg(unix)] +#[test] +fn run_retries_codex_resume_turn_in_same_session() { + use std::os::unix::fs::PermissionsExt; + + let store = TestStore::initialized("run-codex-retry-same-session-store"); + let bin_dir = TestStore::new("run-codex-retry-same-session-bin"); + let log_path = bin_dir.path().join("codex.log"); + let retry_count_path = bin_dir.path().join("resume-attempts.txt"); + fs::create_dir_all(bin_dir.path()).unwrap(); + let fake_codex = bin_dir.path().join("codex"); + fs::write( + &fake_codex, + r#"#!/bin/sh +out="" +resume=0 +session="" +while [ "$#" -gt 0 ]; do + case "$1" in + --output-last-message|-o) + shift + out="$1" + ;; + resume) + resume=1 + ;; + --json|--color|--sandbox|-m|-s) + if [ "$1" != "--json" ]; then + shift + fi + ;; + --*) + ;; + -) + ;; + *) + if [ "$resume" = "1" ] && [ -z "$session" ]; then + session="$1" + fi + ;; + esac + shift +done +input=$(cat) +if [ "$resume" = "1" ]; then + count=$(cat "$PSEQ_FAKE_CODEX_RETRY_COUNT" 2>/dev/null || printf 0) + count=$((count + 1)) + printf '%s\n' "$count" > "$PSEQ_FAKE_CODEX_RETRY_COUNT" + printf 'resume attempt=%s session=%s input=%s\n' "$count" "$session" "$input" >> "$PSEQ_FAKE_CODEX_LOG" + if [ "$count" -eq 1 ]; then + printf 'transient resume failure\n' >&2 + exit 23 + fi + printf 'resumed %s after retry\n' "$session" > "$out" +else + printf 'first input=%s\n' "$input" >> "$PSEQ_FAKE_CODEX_LOG" + printf '{"type":"thread.started","thread_id":"fake-session-123"}\n' + printf 'started fake-session-123\n' > "$out" +fi +"#, + ) + .unwrap(); + fs::set_permissions(&fake_codex, fs::Permissions::from_mode(0o755)).unwrap(); + create_sequence_with_fragments( + &store, + "Workflow", + &[("First", "first prompt\n"), ("Second", "second prompt\n")], + ); + + let path = format!( + "{}:{}", + path_str(bin_dir.path()), + std::env::var("PATH").unwrap_or_default() + ); + let log_path_arg = path_str(&log_path); + let retry_count_path_arg = path_str(&retry_count_path); + let output = pseq_in_dir_with_env( + &[ + "--store", + path_str(store.path()), + "--json", + "run", + "Workflow", + "--retry-delay-ms", + "0", + "--", + "codex", + "exec", + "--sandbox", + "workspace-write", + "--color", + "never", + "-", + ], + store.path(), + &[ + ("PATH", path.as_str()), + ("PSEQ_FAKE_CODEX_LOG", log_path_arg), + ("PSEQ_FAKE_CODEX_RETRY_COUNT", retry_count_path_arg), + ], + ); + assert_success(&output); + + let json = stdout_json(&output); + let first_turn = &json["turns"][0]; + let second_turn = &json["turns"][1]; + assert!(first_turn.get("attempt_count").is_none()); + assert_eq!(second_turn["attempt_count"], 2); + assert_eq!( + second_turn["stdout"], + "resumed fake-session-123 after retry\n" + ); + let attempts = second_turn["attempts"].as_array().unwrap(); + assert_eq!(attempts.len(), 2); + for attempt in attempts { + let command = attempt["command"].as_array().unwrap(); + assert!( + command.iter().any(|arg| arg == "resume"), + "retry attempt should resume the established Codex session, got {command:?}" + ); + assert!( + command.iter().any(|arg| arg == "fake-session-123"), + "retry attempt should use the original session id, got {command:?}" + ); + } + + let log = fs::read_to_string(log_path).unwrap(); + assert!(log.contains("first input=first prompt")); + assert!(log.contains("resume attempt=1 session=fake-session-123 input=second prompt")); + assert!(log.contains("resume attempt=2 session=fake-session-123 input=second prompt")); +} + #[cfg(unix)] #[test] fn run_can_reset_codex_session_per_iteration_while_carrying_feedback() { diff --git a/tests/store_lifecycle.rs b/tests/store_lifecycle.rs index 7bf4167..6f2fa2d 100644 --- a/tests/store_lifecycle.rs +++ b/tests/store_lifecycle.rs @@ -500,7 +500,7 @@ rendered } #[test] -fn config_show_reports_config_as_human_and_json() { +fn config_show_reports_config_as_non_json_and_json() { let store = TestStore::initialized("config-show"); let json_output = pseq(&[ @@ -516,11 +516,11 @@ fn config_show_reports_config_as_human_and_json() { assert_eq!(json["version"], 1); assert_eq!(json["runner_count"], 0); - let human_output = pseq(&["config", "show", "--store", path_str(store.path())]); - assert_success(&human_output); - assert_stdout_contains(&human_output, "path: config.toml"); - assert_stdout_contains(&human_output, "version: 1"); - assert_stdout_contains(&human_output, "runners: 0"); + let non_json_output = pseq(&["config", "show", "--store", path_str(store.path())]); + assert_success(&non_json_output); + assert_stdout_contains(&non_json_output, "path: config.toml"); + assert_stdout_contains(&non_json_output, "version: 1"); + assert_stdout_contains(&non_json_output, "runners: 0"); assert_git_clean(store.path()); } @@ -611,7 +611,7 @@ fn blank_fragment_and_sequence_names_fail_before_mutation() { } #[test] -fn quiet_suppresses_human_success_output() { +fn quiet_suppresses_non_json_success_output() { let store = TestStore::new("quiet"); let output = pseq(&["init", "--store", path_str(store.path()), "--quiet"]);