From f0848920f70abc56f0278e9e6b11f4e05e58b6f0 Mon Sep 17 00:00:00 2001 From: Justin Chapman Date: Sat, 9 May 2026 00:54:28 -0400 Subject: [PATCH 1/7] Recycle worker process after 128 tests to bound FD accumulation MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Long-lived Python workers accumulate module-level state across imports — logging handlers, sqlite/ssl objects, atexit callbacks — that holds open file descriptors which `del sys.modules[name]` cannot reclaim because the FDs are owned by objects living outside the module dict. On macOS where the per-process FD soft limit is 256, this exhausts FDs when running tryke against large suites like homeassistant/core (~5,267 test files). WorkerProcess now tracks completed tests; the pool recycles a worker via the existing crash-recovery path (shutdown + ensure_worker respawn with cached hook replay) once the count reaches 128. Co-Authored-By: Claude Opus 4.7 (1M context) --- crates/tryke_runner/src/pool.rs | 93 ++++++++++++++++++++++++++++++- crates/tryke_runner/src/worker.rs | 21 +++++++ 2 files changed, 113 insertions(+), 1 deletion(-) diff --git a/crates/tryke_runner/src/pool.rs b/crates/tryke_runner/src/pool.rs index 4987db4..5bfff52 100644 --- a/crates/tryke_runner/src/pool.rs +++ b/crates/tryke_runner/src/pool.rs @@ -11,7 +11,7 @@ use tryke_types::{HookItem, TestOutcome, TestResult}; use crate::protocol::RegisterHooksParams; use crate::schedule::WorkUnit; -use crate::worker::WorkerProcess; +use crate::worker::{MAX_TESTS_PER_WORKER, WorkerProcess}; /// Per-worker-task state: the (optional) live Python process plus a cache of /// the most recent `register_hooks` call per module. The cache exists so @@ -257,6 +257,21 @@ async fn run_single_test( }); } } + + // Recycle a worker that has run enough tests to be at risk of FD/state + // accumulation. The Err arm above already nulled state.process, so this + // only fires on the success path. The next test on this worker_task + // calls ensure_worker, which spawns a fresh process and replays cached + // hooks — same path as crash recovery. + if state + .process + .as_ref() + .is_some_and(WorkerProcess::should_recycle) + && let Some(mut proc) = state.process.take() + { + debug!("worker_task: recycling worker after {MAX_TESTS_PER_WORKER} tests"); + proc.shutdown().await; + } } /// Send `register_hooks` to the worker for each unique module in the work @@ -670,4 +685,80 @@ def test_noop() -> None: pool.shutdown(); } + + /// A worker process must be recycled after `MAX_TESTS_PER_WORKER` tests + /// so accumulated module-level state (and the FDs it owns) does not + /// grow unboundedly across long runs. We prove a recycle happened by + /// recording the worker pid in the test module body — which only runs + /// on a fresh interpreter — and asserting we see exactly the expected + /// number of distinct pids for the test count. + #[tokio::test] + async fn worker_recycles_after_max_tests() { + let dir = tempfile::tempdir().expect("tempdir"); + std::fs::write(dir.path().join("pyproject.toml"), "").expect("write pyproject.toml"); + + let pid_log = dir.path().join("PID_LOG"); + let pid_log_escaped = pid_log.to_string_lossy().replace('\\', "\\\\"); + let test_file = dir.path().join("test_recycle.py"); + let source = format!( + r#"import os + +# Module body runs once per fresh interpreter — record this worker's pid so +# the test can count distinct workers (one per recycle). +with open("{pid_log_escaped}", "a") as f: + f.write(str(os.getpid()) + "\n") + f.flush() + +from tryke import test, expect + +@test +def test_noop() -> None: + expect(1).to_equal(1) +"# + ); + std::fs::write(&test_file, source).expect("write test file"); + + let n_tests = usize::try_from(MAX_TESTS_PER_WORKER + 4).expect("test count fits in usize"); + let max_per_worker = + usize::try_from(MAX_TESTS_PER_WORKER).expect("MAX_TESTS_PER_WORKER fits in usize"); + let tests: Vec = (0..n_tests) + .map(|_| make_test_item("test_recycle", "test_noop", &test_file)) + .collect(); + let unit = WorkUnit { + tests, + hooks: vec![], + }; + + let pool = WorkerPool::with_python_path( + 1, + &test_python_bin(), + dir.path(), + &[dir.path().to_path_buf(), python_package_dir()], + LevelFilter::Off, + ); + pool.warm().await; + + let results: Vec = pool.run(vec![unit]).collect().await; + assert_eq!(results.len(), n_tests, "expected {n_tests} results"); + for r in &results { + assert!( + matches!(r.outcome, TestOutcome::Passed), + "test failed unexpectedly: {r:?}", + ); + } + + let pid_lines = std::fs::read_to_string(&pid_log).unwrap_or_default(); + let distinct_pids: std::collections::HashSet<&str> = + pid_lines.lines().filter(|l| !l.is_empty()).collect(); + let expected_workers = n_tests.div_ceil(max_per_worker); + assert_eq!( + distinct_pids.len(), + expected_workers, + "expected {expected_workers} distinct worker pid(s) (one per recycle); \ + got {} from log: {pid_lines:?}", + distinct_pids.len(), + ); + + pool.shutdown(); + } } diff --git a/crates/tryke_runner/src/worker.rs b/crates/tryke_runner/src/worker.rs index c406f73..0aa8368 100644 --- a/crates/tryke_runner/src/worker.rs +++ b/crates/tryke_runner/src/worker.rs @@ -19,6 +19,16 @@ use crate::protocol::{ /// without unbounded memory growth on workers that spew warnings. const STDERR_RETAIN_BYTES: usize = 1 << 20; // 1 MiB +/// Recycle a worker process after this many tests. Long-lived python +/// interpreters accumulate module-level state across imports — logging +/// handlers, sqlite/ssl objects, atexit callbacks — much of which holds +/// open file descriptors that `del sys.modules[name]` cannot reclaim +/// because the FDs are owned by objects living outside the module dict. +/// Recycling bounds growth: the only mechanism in `CPython` that reliably +/// frees module-level FDs is process exit. The cap is intentionally +/// hardcoded for now; revisit if it proves wrong on real workloads. +pub(crate) const MAX_TESTS_PER_WORKER: u64 = 128; + pub struct WorkerProcess { child: Child, stdin: BufWriter, @@ -28,6 +38,7 @@ pub struct WorkerProcess { /// the worker can't block on a stderr write mid-RPC. stderr_buf: Arc>>, next_id: u64, + tests_completed: u64, } impl WorkerProcess { @@ -88,9 +99,17 @@ impl WorkerProcess { stdout, stderr_buf, next_id: 1, + tests_completed: 0, }) } + /// Whether this worker has run enough tests to warrant recycling. + /// See `MAX_TESTS_PER_WORKER` for the rationale. + #[must_use] + pub fn should_recycle(&self) -> bool { + self.tests_completed >= MAX_TESTS_PER_WORKER + } + async fn call serde::Deserialize<'de>>( &mut self, method: &str, @@ -177,6 +196,7 @@ impl WorkerProcess { case_label: test.case_label.clone(), })?; let wire: RunTestResultWire = self.call("run_test", Some(params)).await?; + self.tests_completed = self.tests_completed.saturating_add(1); Ok(convert_result(test.clone(), wire)) } @@ -207,6 +227,7 @@ impl WorkerProcess { object_path: object_path.to_owned(), })?; let wire: RunTestResultWire = self.call("run_doctest", Some(params)).await?; + self.tests_completed = self.tests_completed.saturating_add(1); Ok(convert_result(test.clone(), wire)) } From f8ccedfdd955a1a22da9ea9dc8c08cee4e809b90 Mon Sep 17 00:00:00 2001 From: justin Date: Sat, 9 May 2026 01:11:20 -0400 Subject: [PATCH 2/7] Defer worker recycle until end of unit so scope teardown runs (#106) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Recycling inside `run_single_test` could fire mid-unit, dropping the live process before `handle_unit`'s `finalize_hooks` loop ran — and that loop is gated on `state.process.as_mut()`, so per="scope" fixture teardown was silently skipped (or run on a fresh worker's freshly re-imported fixtures, double-counting setup). Move the recycle check to after `finalize_hooks` in `handle_unit`. Adjusts the existing recycle test to submit one unit per test (so the end-of-unit check fires at the threshold) and adds a coverage test that crosses the threshold inside a single unit with a per="scope" fixture, asserting setup and teardown each run exactly once. Co-authored-by: Claude --- crates/tryke_runner/src/pool.rs | 151 ++++++++++++++++++++++++++------ 1 file changed, 125 insertions(+), 26 deletions(-) diff --git a/crates/tryke_runner/src/pool.rs b/crates/tryke_runner/src/pool.rs index 5bfff52..de9a1cd 100644 --- a/crates/tryke_runner/src/pool.rs +++ b/crates/tryke_runner/src/pool.rs @@ -257,21 +257,6 @@ async fn run_single_test( }); } } - - // Recycle a worker that has run enough tests to be at risk of FD/state - // accumulation. The Err arm above already nulled state.process, so this - // only fires on the success path. The next test on this worker_task - // calls ensure_worker, which spawns a fresh process and replays cached - // hooks — same path as crash recovery. - if state - .process - .as_ref() - .is_some_and(WorkerProcess::should_recycle) - && let Some(mut proc) = state.process.take() - { - debug!("worker_task: recycling worker after {MAX_TESTS_PER_WORKER} tests"); - proc.shutdown().await; - } } /// Send `register_hooks` to the worker for each unique module in the work @@ -399,6 +384,23 @@ async fn handle_unit( debug!("worker_task: finalize_hooks failed: {e}"); } } + + // Recycle a worker that has run enough tests to be at risk of FD/state + // accumulation. Deferred to the end of the unit (after finalize_hooks) + // so per="scope" fixture teardown is not skipped: recycling mid-unit + // would drop the live process before its scope fixtures got their + // teardown call. The next unit handed to this worker_task hits + // ensure_worker, which spawns a fresh process and replays cached + // hooks — same path as crash recovery. + if state + .process + .as_ref() + .is_some_and(WorkerProcess::should_recycle) + && let Some(mut proc) = state.process.take() + { + debug!("worker_task: recycling worker after {MAX_TESTS_PER_WORKER} tests"); + proc.shutdown().await; + } } async fn worker_task( @@ -688,10 +690,12 @@ def test_noop() -> None: /// A worker process must be recycled after `MAX_TESTS_PER_WORKER` tests /// so accumulated module-level state (and the FDs it owns) does not - /// grow unboundedly across long runs. We prove a recycle happened by - /// recording the worker pid in the test module body — which only runs - /// on a fresh interpreter — and asserting we see exactly the expected - /// number of distinct pids for the test count. + /// grow unboundedly across long runs. The recycle is deferred until + /// the end of a unit (so per="scope" teardown is not skipped — see + /// `recycle_does_not_skip_scope_fixture_teardown`), so we exercise it + /// here by submitting many single-test units. The module body records + /// the worker pid on every fresh import; we expect one distinct pid + /// per recycle boundary. #[tokio::test] async fn worker_recycles_after_max_tests() { let dir = tempfile::tempdir().expect("tempdir"); @@ -721,13 +725,14 @@ def test_noop() -> None: let n_tests = usize::try_from(MAX_TESTS_PER_WORKER + 4).expect("test count fits in usize"); let max_per_worker = usize::try_from(MAX_TESTS_PER_WORKER).expect("MAX_TESTS_PER_WORKER fits in usize"); - let tests: Vec = (0..n_tests) - .map(|_| make_test_item("test_recycle", "test_noop", &test_file)) + // One unit per test so the recycle check at the end of each unit + // gets a chance to fire as soon as the threshold is crossed. + let units: Vec = (0..n_tests) + .map(|_| WorkUnit { + tests: vec![make_test_item("test_recycle", "test_noop", &test_file)], + hooks: vec![], + }) .collect(); - let unit = WorkUnit { - tests, - hooks: vec![], - }; let pool = WorkerPool::with_python_path( 1, @@ -738,7 +743,7 @@ def test_noop() -> None: ); pool.warm().await; - let results: Vec = pool.run(vec![unit]).collect().await; + let results: Vec = pool.run(units).collect().await; assert_eq!(results.len(), n_tests, "expected {n_tests} results"); for r in &results { assert!( @@ -761,4 +766,98 @@ def test_noop() -> None: pool.shutdown(); } + + /// Recycling must never strand a `per="scope"` fixture without its + /// teardown. The risk is mid-unit recycling: if we drop the live + /// process before `finalize_hooks` runs, the unit's scope fixtures + /// die without their `yield`-after teardown, *and* a fresh worker + /// re-runs setup when hooks replay — so a buggy version would show + /// setup ran twice while teardown ran once (or zero times). We + /// submit enough tests in a single unit to cross the recycle + /// threshold and assert setup/teardown counts stay matched at one + /// each. + #[tokio::test] + async fn recycle_does_not_skip_scope_fixture_teardown() { + let dir = tempfile::tempdir().expect("tempdir"); + std::fs::write(dir.path().join("pyproject.toml"), "").expect("write pyproject.toml"); + + let setup_log = dir.path().join("SETUP_LOG"); + let teardown_log = dir.path().join("TEARDOWN_LOG"); + let setup_escaped = setup_log.to_string_lossy().replace('\\', "\\\\"); + let teardown_escaped = teardown_log.to_string_lossy().replace('\\', "\\\\"); + let test_file = dir.path().join("test_scope_recycle.py"); + let source = format!( + r#"import os +from tryke import test, fixture, expect, Depends + +@fixture(per="scope") +def scope_resource() -> int: + with open("{setup_escaped}", "a") as f: + f.write(str(os.getpid()) + "\n") + f.flush() + yield 42 + with open("{teardown_escaped}", "a") as f: + f.write(str(os.getpid()) + "\n") + f.flush() + +@test +def test_uses_scope(r: int = Depends(scope_resource)) -> None: + expect(r).to_equal(42) +"# + ); + std::fs::write(&test_file, source).expect("write test file"); + + let hook = HookItem { + name: "scope_resource".into(), + module_path: "test_scope_recycle".into(), + per: FixturePer::Scope, + groups: vec![], + depends_on: vec![], + line_number: None, + }; + let n_tests = usize::try_from(MAX_TESTS_PER_WORKER + 4).expect("test count fits in usize"); + let tests: Vec = (0..n_tests) + .map(|_| make_test_item("test_scope_recycle", "test_uses_scope", &test_file)) + .collect(); + let unit = WorkUnit { + tests, + hooks: vec![hook], + }; + + let pool = WorkerPool::with_python_path( + 1, + &test_python_bin(), + dir.path(), + &[dir.path().to_path_buf(), python_package_dir()], + LevelFilter::Off, + ); + pool.warm().await; + + let results: Vec = pool.run(vec![unit]).collect().await; + assert_eq!(results.len(), n_tests, "expected {n_tests} results"); + for r in &results { + assert!( + matches!(r.outcome, TestOutcome::Passed), + "test failed unexpectedly: {r:?}", + ); + } + + let setup_lines = std::fs::read_to_string(&setup_log).unwrap_or_default(); + let teardown_lines = std::fs::read_to_string(&teardown_log).unwrap_or_default(); + let setup_count = setup_lines.lines().filter(|l| !l.is_empty()).count(); + let teardown_count = teardown_lines.lines().filter(|l| !l.is_empty()).count(); + assert_eq!( + setup_count, 1, + "scope fixture setup must run exactly once for the unit; \ + got {setup_count} from log: {setup_lines:?}", + ); + assert_eq!( + teardown_count, 1, + "scope fixture teardown must run exactly once for the unit \ + (recycling must not strand teardown); got {teardown_count} \ + from log: {teardown_lines:?}", + ); + + pool.shutdown(); + } } From a43df6aa6994d8f059c7a2259a2058c1d736e3d2 Mon Sep 17 00:00:00 2001 From: justin Date: Mon, 11 May 2026 09:35:47 -0400 Subject: [PATCH 3/7] Recycle workers on memory/FD/age signals instead of test count (#108) Replaces the hardcoded `MAX_TESTS_PER_WORKER = 128` cap with self-reported resource snapshots: every `run_test`/`run_doctest` response carries a `WorkerHealthWire { rss_bytes, open_fds }` that the runner consults at unit boundaries alongside a wall-clock age check. Recycle decisions now name the tripped signal via a `RecycleReason` enum, so debug logs can attribute drops to memory pressure vs FD exhaustion vs slow drift. Soft ceilings live on `WorkerLimits` (1 GiB RSS / 200 FDs / 10 min by default, all `Option` so platforms without `/proc/self/fd` or `resource` simply skip that signal). Tests use the new `WorkerPool::with_python_path_and_limits` constructor with tiny caps (or `WorkerLimits::unlimited`) to exercise recycle behaviour deterministically; production call sites in `tryke`, `tryke_server`, and `tryke_dev` are unchanged because `with_python_path` defaults to `WorkerLimits::default()`. Test coverage: end-to-end age-based recycle test (replaces the test-count one); existing scope-fixture-teardown test now triggers via age cap; new unit tests for `evaluate_recycle` priority order and no-signal/no-cap fallbacks. https://claude.ai/code/session_01PMbxzSuASTEYbDFEu4SQqy Co-authored-by: Claude --- crates/tryke_runner/src/lib.rs | 2 +- crates/tryke_runner/src/pool.rs | 270 ++++++++++++++++----------- crates/tryke_runner/src/protocol.rs | 30 +++ crates/tryke_runner/src/worker.rs | 278 +++++++++++++++++++++++++--- python/tryke/worker.py | 72 +++++++ 5 files changed, 519 insertions(+), 133 deletions(-) diff --git a/crates/tryke_runner/src/lib.rs b/crates/tryke_runner/src/lib.rs index d3bd374..c5f034c 100644 --- a/crates/tryke_runner/src/lib.rs +++ b/crates/tryke_runner/src/lib.rs @@ -5,4 +5,4 @@ pub mod worker; pub use pool::{WorkerPool, path_to_module}; pub use schedule::{DistMode, WorkUnit, partition, partition_with_hooks}; -pub use worker::WorkerProcess; +pub use worker::{RecycleReason, WorkerHealth, WorkerLimits, WorkerProcess}; diff --git a/crates/tryke_runner/src/pool.rs b/crates/tryke_runner/src/pool.rs index de9a1cd..131b1c3 100644 --- a/crates/tryke_runner/src/pool.rs +++ b/crates/tryke_runner/src/pool.rs @@ -11,7 +11,21 @@ use tryke_types::{HookItem, TestOutcome, TestResult}; use crate::protocol::RegisterHooksParams; use crate::schedule::WorkUnit; -use crate::worker::{MAX_TESTS_PER_WORKER, WorkerProcess}; +use crate::worker::{WorkerLimits, WorkerProcess}; + +/// Bundle of inputs every spawn (and respawn) needs. These five values +/// never vary across the lifetime of a `worker_task`, so threading them +/// individually through every helper just inflates the signature and +/// trips `clippy::too_many_arguments`. Grouping them keeps the call +/// sites short and lets us add new spawn-time knobs later without +/// touching every helper signature. +struct WorkerSpawnCtx<'a> { + python_bin: &'a str, + path_refs: &'a [&'a Path], + root: &'a Path, + log_level: LevelFilter, + limits: WorkerLimits, +} /// Per-worker-task state: the (optional) live Python process plus a cache of /// the most recent `register_hooks` call per module. The cache exists so @@ -83,6 +97,29 @@ impl WorkerPool { root: &Path, python_path: &[PathBuf], log_level: LevelFilter, + ) -> Self { + Self::with_python_path_and_limits( + size, + python_bin, + root, + python_path, + log_level, + WorkerLimits::default(), + ) + } + + /// Like [`Self::with_python_path`] but with explicit recycle + /// thresholds. Tests use this to set tiny ceilings (or + /// [`WorkerLimits::unlimited`]) so they can observe — or suppress — + /// recycle behaviour without spinning up a real workload. + #[must_use] + pub fn with_python_path_and_limits( + size: usize, + python_bin: &str, + root: &Path, + python_path: &[PathBuf], + log_level: LevelFilter, + limits: WorkerLimits, ) -> Self { let size = size.max(1); let python_path = python_path.to_vec(); @@ -99,6 +136,7 @@ impl WorkerPool { python_path.clone(), root.clone(), log_level, + limits, work_rx, ctrl_rx, )); @@ -175,16 +213,19 @@ pub use tryke_types::path_to_module; /// metadata and silently skip `before_each` / `after_each`. async fn ensure_worker<'a>( state: &'a mut WorkerState, - python_bin: &str, - path_refs: &[&Path], - root: &Path, - log_level: LevelFilter, + ctx: &WorkerSpawnCtx<'_>, ) -> Option<&'a mut WorkerProcess> { if state.process.is_some() { return state.process.as_mut(); } trace!("worker_task: spawning process"); - let mut w = match WorkerProcess::spawn(python_bin, path_refs, root, log_level) { + let mut w = match WorkerProcess::spawn( + ctx.python_bin, + ctx.path_refs, + ctx.root, + ctx.log_level, + ctx.limits, + ) { Ok(w) => w, Err(e) => { debug!("worker_task: spawn failed: {e}"); @@ -212,14 +253,11 @@ async fn ensure_worker<'a>( /// `TestOutcome::Error` with the worker's stderr attached for diagnosis. async fn run_single_test( state: &mut WorkerState, - python_bin: &str, - path_refs: &[&Path], - root: &Path, - log_level: LevelFilter, + ctx: &WorkerSpawnCtx<'_>, test: tryke_types::TestItem, result_tx: &mpsc::UnboundedSender, ) { - let Some(w) = ensure_worker(state, python_bin, path_refs, root, log_level).await else { + let Some(w) = ensure_worker(state, ctx).await else { let _ = result_tx.send(TestResult { test, outcome: TestOutcome::Error { @@ -263,10 +301,7 @@ async fn run_single_test( /// unit, caching the call so any respawn later in the unit can replay it. async fn register_hooks_for_unit( state: &mut WorkerState, - python_bin: &str, - path_refs: &[&Path], - root: &Path, - log_level: LevelFilter, + ctx: &WorkerSpawnCtx<'_>, hooks: &[HookItem], tests: &[tryke_types::TestItem], ) { @@ -304,7 +339,7 @@ async fn register_hooks_for_unit( .hook_cache .insert(test.module_path.clone(), params.clone()); - let Some(w) = ensure_worker(state, python_bin, path_refs, root, log_level).await else { + let Some(w) = ensure_worker(state, ctx).await else { continue; }; if let Err(e) = w.register_hooks(params).await { @@ -316,18 +351,11 @@ async fn register_hooks_for_unit( } } -async fn handle_ctrl( - state: &mut WorkerState, - python_bin: &str, - path_refs: &[&Path], - root: &Path, - log_level: LevelFilter, - ctrl: WorkerCtrl, -) { +async fn handle_ctrl(state: &mut WorkerState, ctx: &WorkerSpawnCtx<'_>, ctrl: WorkerCtrl) { match ctrl { WorkerCtrl::Ping(ack_tx) => { trace!("worker_task: ping (pre-warm)"); - let _ = ensure_worker(state, python_bin, path_refs, root, log_level).await; + let _ = ensure_worker(state, ctx).await; let _ = ack_tx.send(()); } WorkerCtrl::Restart(ack_tx) => { @@ -338,7 +366,7 @@ async fn handle_ctrl( // Eagerly respawn so the next Unit doesn't pay Python startup // latency. ensure_worker replays cached register_hooks against // the fresh process, mirroring the crash-recovery path. - let _ = ensure_worker(state, python_bin, path_refs, root, log_level).await; + let _ = ensure_worker(state, ctx).await; let _ = ack_tx.send(()); } } @@ -346,24 +374,12 @@ async fn handle_ctrl( async fn handle_unit( state: &mut WorkerState, - python_bin: &str, - path_refs: &[&Path], - root: &Path, - log_level: LevelFilter, + ctx: &WorkerSpawnCtx<'_>, unit: WorkUnit, result_tx: mpsc::UnboundedSender, ) { if !unit.hooks.is_empty() { - register_hooks_for_unit( - state, - python_bin, - path_refs, - root, - log_level, - &unit.hooks, - &unit.tests, - ) - .await; + register_hooks_for_unit(state, ctx, &unit.hooks, &unit.tests).await; } let finalize_modules: std::collections::HashSet = if unit.hooks.is_empty() { std::collections::HashSet::new() @@ -372,10 +388,7 @@ async fn handle_unit( }; for test in unit.tests { trace!("worker_task: running test {}", test.name); - run_single_test( - state, python_bin, path_refs, root, log_level, test, &result_tx, - ) - .await; + run_single_test(state, ctx, test, &result_tx).await; } for module in finalize_modules { if let Some(w) = state.process.as_mut() @@ -385,20 +398,20 @@ async fn handle_unit( } } - // Recycle a worker that has run enough tests to be at risk of FD/state - // accumulation. Deferred to the end of the unit (after finalize_hooks) - // so per="scope" fixture teardown is not skipped: recycling mid-unit + // Recycle a worker that has tripped any of its soft resource caps. + // Deferred to the end of the unit (after finalize_hooks) so + // per="scope" fixture teardown is not skipped: recycling mid-unit // would drop the live process before its scope fixtures got their // teardown call. The next unit handed to this worker_task hits // ensure_worker, which spawns a fresh process and replays cached // hooks — same path as crash recovery. - if state + if let Some(reason) = state .process .as_ref() - .is_some_and(WorkerProcess::should_recycle) + .and_then(WorkerProcess::should_recycle) && let Some(mut proc) = state.process.take() { - debug!("worker_task: recycling worker after {MAX_TESTS_PER_WORKER} tests"); + debug!("worker_task: recycling worker ({reason})"); proc.shutdown().await; } } @@ -408,10 +421,18 @@ async fn worker_task( python_path: Vec, root: PathBuf, log_level: LevelFilter, + limits: WorkerLimits, work_rx: async_channel::Receiver, mut ctrl_rx: mpsc::UnboundedReceiver, ) { let path_refs: Vec<&Path> = python_path.iter().map(PathBuf::as_path).collect(); + let ctx = WorkerSpawnCtx { + python_bin: &python_bin, + path_refs: &path_refs, + root: &root, + log_level, + limits, + }; let mut state = WorkerState::new(); loop { @@ -424,21 +445,12 @@ async fn worker_task( biased; ctrl = ctrl_rx.recv() => { let Some(ctrl) = ctrl else { break }; - handle_ctrl(&mut state, &python_bin, &path_refs, &root, log_level, ctrl).await; + handle_ctrl(&mut state, &ctx, ctrl).await; } msg = work_rx.recv() => { match msg { Ok(WorkerMsg::Unit(unit, result_tx)) => { - handle_unit( - &mut state, - &python_bin, - &path_refs, - &root, - log_level, - unit, - result_tx, - ) - .await; + handle_unit(&mut state, &ctx, unit, result_tx).await; } Ok(WorkerMsg::Shutdown) | Err(_) => break, } @@ -688,16 +700,23 @@ def test_noop() -> None: pool.shutdown(); } - /// A worker process must be recycled after `MAX_TESTS_PER_WORKER` tests - /// so accumulated module-level state (and the FDs it owns) does not - /// grow unboundedly across long runs. The recycle is deferred until - /// the end of a unit (so per="scope" teardown is not skipped — see - /// `recycle_does_not_skip_scope_fixture_teardown`), so we exercise it - /// here by submitting many single-test units. The module body records - /// the worker pid on every fresh import; we expect one distinct pid - /// per recycle boundary. + /// A worker process must be recycled when its self-reported + /// resource snapshot crosses a configured ceiling so accumulated + /// module-level state (and the FDs it owns) does not grow + /// unboundedly across long runs. We use the wall-clock `max_age` + /// signal here because it is the easiest to drive deterministically + /// from a test (memory and FD growth would require platform- + /// specific allocations, and the priority logic is unit-tested + /// separately in `worker.rs`). + /// + /// The recycle is deferred until the end of a unit (so per="scope" + /// teardown is not skipped — see + /// `recycle_does_not_skip_scope_fixture_teardown`), so we exercise + /// it here by sleeping past the cap between units. The module body + /// records the worker pid on every fresh import; we expect one + /// distinct pid per recycle boundary. #[tokio::test] - async fn worker_recycles_after_max_tests() { + async fn worker_recycles_when_age_exceeds_limit() { let dir = tempfile::tempdir().expect("tempdir"); std::fs::write(dir.path().join("pyproject.toml"), "").expect("write pyproject.toml"); @@ -722,45 +741,57 @@ def test_noop() -> None: ); std::fs::write(&test_file, source).expect("write test file"); - let n_tests = usize::try_from(MAX_TESTS_PER_WORKER + 4).expect("test count fits in usize"); - let max_per_worker = - usize::try_from(MAX_TESTS_PER_WORKER).expect("MAX_TESTS_PER_WORKER fits in usize"); - // One unit per test so the recycle check at the end of each unit - // gets a chance to fire as soon as the threshold is crossed. - let units: Vec = (0..n_tests) - .map(|_| WorkUnit { - tests: vec![make_test_item("test_recycle", "test_noop", &test_file)], - hooks: vec![], - }) - .collect(); - - let pool = WorkerPool::with_python_path( + // Tight age cap — a sleep between units crosses it deterministically + // without making the test slow. + let limits = WorkerLimits { + max_rss_bytes: None, + max_open_fds: None, + max_age: Some(std::time::Duration::from_millis(100)), + }; + let pool = WorkerPool::with_python_path_and_limits( 1, &test_python_bin(), dir.path(), &[dir.path().to_path_buf(), python_package_dir()], LevelFilter::Off, + limits, ); pool.warm().await; - let results: Vec = pool.run(units).collect().await; - assert_eq!(results.len(), n_tests, "expected {n_tests} results"); - for r in &results { - assert!( - matches!(r.outcome, TestOutcome::Passed), - "test failed unexpectedly: {r:?}", - ); - } + let make_unit = || WorkUnit { + tests: vec![make_test_item("test_recycle", "test_noop", &test_file)], + hooks: vec![], + }; + + // First unit: worker is fresh, no recycle. + let r1: Vec = pool.run(vec![make_unit()]).collect().await; + assert_eq!(r1.len(), 1); + assert!(matches!(r1[0].outcome, TestOutcome::Passed)); + + // Sleep past the age cap so the next unit's end-of-unit check + // fires. 250ms gives comfortable margin over the 100ms cap on + // jittery CI without pushing test latency higher than necessary. + tokio::time::sleep(std::time::Duration::from_millis(250)).await; + + // Second unit: still on the same worker (recycle is checked at + // end-of-unit), but at *its* end the age check trips. + let r2: Vec = pool.run(vec![make_unit()]).collect().await; + assert_eq!(r2.len(), 1); + assert!(matches!(r2[0].outcome, TestOutcome::Passed)); + + // Third unit: forced to spawn a new worker (the previous one + // was recycled) so a second pid is logged. + let r3: Vec = pool.run(vec![make_unit()]).collect().await; + assert_eq!(r3.len(), 1); + assert!(matches!(r3[0].outcome, TestOutcome::Passed)); let pid_lines = std::fs::read_to_string(&pid_log).unwrap_or_default(); let distinct_pids: std::collections::HashSet<&str> = pid_lines.lines().filter(|l| !l.is_empty()).collect(); - let expected_workers = n_tests.div_ceil(max_per_worker); assert_eq!( distinct_pids.len(), - expected_workers, - "expected {expected_workers} distinct worker pid(s) (one per recycle); \ - got {} from log: {pid_lines:?}", + 2, + "expected 2 distinct worker pid(s) (one recycle); got {} from log: {pid_lines:?}", distinct_pids.len(), ); @@ -768,14 +799,17 @@ def test_noop() -> None: } /// Recycling must never strand a `per="scope"` fixture without its - /// teardown. The risk is mid-unit recycling: if we drop the live - /// process before `finalize_hooks` runs, the unit's scope fixtures - /// die without their `yield`-after teardown, *and* a fresh worker - /// re-runs setup when hooks replay — so a buggy version would show - /// setup ran twice while teardown ran once (or zero times). We - /// submit enough tests in a single unit to cross the recycle - /// threshold and assert setup/teardown counts stay matched at one - /// each. + /// teardown. The unit body's tests must still see their fixture + /// values, and the scope fixture's `yield`-after teardown must run + /// exactly once before the worker is recycled. A buggy version + /// that recycled mid-unit (before `finalize_hooks`) would either + /// strand teardown entirely or — if the fresh worker re-ran setup + /// — record setup twice with teardown still at one or zero. + /// + /// The recycle is forced via a tiny `max_age` so a short sleep + /// inside the unit's first test crosses the cap; the end-of-unit + /// check then trips as soon as the unit completes (after + /// finalize). We assert setup and teardown both ran exactly once. #[tokio::test] async fn recycle_does_not_skip_scope_fixture_teardown() { let dir = tempfile::tempdir().expect("tempdir"); @@ -788,6 +822,7 @@ def test_noop() -> None: let test_file = dir.path().join("test_scope_recycle.py"); let source = format!( r#"import os +import time from tryke import test, fixture, expect, Depends @fixture(per="scope") @@ -800,6 +835,14 @@ def scope_resource() -> int: f.write(str(os.getpid()) + "\n") f.flush() +@test +def test_sleep_then_age_out(r: int = Depends(scope_resource)) -> None: + # Sleep past the runner's tiny max_age so the end-of-unit recycle + # check trips. Teardown must still have a chance to run before the + # worker is killed. + time.sleep(0.25) + expect(r).to_equal(42) + @test def test_uses_scope(r: int = Depends(scope_resource)) -> None: expect(r).to_equal(42) @@ -815,26 +858,35 @@ def test_uses_scope(r: int = Depends(scope_resource)) -> None: depends_on: vec![], line_number: None, }; - let n_tests = usize::try_from(MAX_TESTS_PER_WORKER + 4).expect("test count fits in usize"); - let tests: Vec = (0..n_tests) - .map(|_| make_test_item("test_scope_recycle", "test_uses_scope", &test_file)) - .collect(); + // Two tests in one unit: first sleeps past max_age, second + // proves teardown wasn't stranded mid-unit (it would have been + // had recycle fired before finalize_hooks). + let tests: Vec = vec![ + make_test_item("test_scope_recycle", "test_sleep_then_age_out", &test_file), + make_test_item("test_scope_recycle", "test_uses_scope", &test_file), + ]; let unit = WorkUnit { tests, hooks: vec![hook], }; - let pool = WorkerPool::with_python_path( + let limits = WorkerLimits { + max_rss_bytes: None, + max_open_fds: None, + max_age: Some(std::time::Duration::from_millis(100)), + }; + let pool = WorkerPool::with_python_path_and_limits( 1, &test_python_bin(), dir.path(), &[dir.path().to_path_buf(), python_package_dir()], LevelFilter::Off, + limits, ); pool.warm().await; let results: Vec = pool.run(vec![unit]).collect().await; - assert_eq!(results.len(), n_tests, "expected {n_tests} results"); + assert_eq!(results.len(), 2, "expected 2 results, got {results:?}"); for r in &results { assert!( matches!(r.outcome, TestOutcome::Passed), diff --git a/crates/tryke_runner/src/protocol.rs b/crates/tryke_runner/src/protocol.rs index e30520f..8789031 100644 --- a/crates/tryke_runner/src/protocol.rs +++ b/crates/tryke_runner/src/protocol.rs @@ -121,6 +121,36 @@ pub struct RunDoctestParams { pub object_path: String, } +/// Worker self-reported resource snapshot, attached by the python side +/// to every `run_test` / `run_doctest` response. Each field is `Option` +/// because the underlying syscalls are not portable across every +/// platform we target (e.g. `/proc/self/fd` is linux-only; the +/// `resource` module is unavailable on windows). A missing signal is +/// not an error — it just means the runner cannot recycle on that +/// dimension for this worker. +#[derive(Debug, Default, Clone, Copy, Deserialize)] +pub struct WorkerHealthWire { + /// Resident set size in bytes, normalised across platforms. + #[serde(default)] + pub rss_bytes: Option, + /// Open file descriptor count. + #[serde(default)] + pub open_fds: Option, +} + +/// Wrapper around [`RunTestResultWire`] carrying the worker's health +/// snapshot alongside the test outcome. The python side merges the +/// outcome dict with a top-level `health` field; `#[serde(flatten)]` +/// pulls the outcome variant out of the same JSON object so the wire +/// format stays a single flat object. +#[derive(Debug, Deserialize)] +pub struct RunTestResponseWire { + #[serde(default)] + pub health: WorkerHealthWire, + #[serde(flatten)] + pub result: RunTestResultWire, +} + #[derive(Debug, Deserialize)] #[serde(tag = "outcome", rename_all = "snake_case")] pub enum RunTestResultWire { diff --git a/crates/tryke_runner/src/worker.rs b/crates/tryke_runner/src/worker.rs index 0aa8368..7f6c19c 100644 --- a/crates/tryke_runner/src/worker.rs +++ b/crates/tryke_runner/src/worker.rs @@ -1,7 +1,8 @@ use std::collections::VecDeque; +use std::fmt; use std::path::Path; use std::sync::{Arc, Mutex}; -use std::time::Duration; +use std::time::{Duration, Instant}; use anyhow::{Result, anyhow}; use log::{debug, trace}; @@ -11,7 +12,7 @@ use tryke_types::{Assertion, ExpectedAssertion, TestItem, TestOutcome, TestResul use crate::protocol::{ AssertionWire, FinalizeHooksParams, RegisterHooksParams, RpcRequest, RpcResponse, - RunDoctestParams, RunTestParams, RunTestResultWire, + RunDoctestParams, RunTestParams, RunTestResponseWire, RunTestResultWire, WorkerHealthWire, }; /// Cap on retained worker-stderr bytes. Beyond this we keep the most recent @@ -19,15 +20,117 @@ use crate::protocol::{ /// without unbounded memory growth on workers that spew warnings. const STDERR_RETAIN_BYTES: usize = 1 << 20; // 1 MiB -/// Recycle a worker process after this many tests. Long-lived python -/// interpreters accumulate module-level state across imports — logging -/// handlers, sqlite/ssl objects, atexit callbacks — much of which holds -/// open file descriptors that `del sys.modules[name]` cannot reclaim -/// because the FDs are owned by objects living outside the module dict. -/// Recycling bounds growth: the only mechanism in `CPython` that reliably -/// frees module-level FDs is process exit. The cap is intentionally -/// hardcoded for now; revisit if it proves wrong on real workloads. -pub(crate) const MAX_TESTS_PER_WORKER: u64 = 128; +/// Latest worker-reported resource snapshot. Each field is `Option` +/// because some signals are platform-conditional (see +/// [`WorkerHealthWire`]); a missing reading just means the matching +/// limit cannot fire for this worker. +#[derive(Debug, Default, Clone, Copy)] +pub struct WorkerHealth { + pub rss_bytes: Option, + pub open_fds: Option, +} + +impl From for WorkerHealth { + fn from(w: WorkerHealthWire) -> Self { + Self { + rss_bytes: w.rss_bytes, + open_fds: w.open_fds, + } + } +} + +/// Why a worker is being recycled. Carried in debug logs so post-mortem +/// triage can tell apart memory leaks, FD pressure, and slow drift. +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum RecycleReason { + /// Resident set size crossed the configured ceiling. + MemoryBytes(u64), + /// Open file descriptor count crossed the configured ceiling. + OpenFds(u64), + /// Worker has been alive longer than the configured ceiling. + Age(Duration), +} + +impl fmt::Display for RecycleReason { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match self { + Self::MemoryBytes(b) => write!(f, "memory={b} bytes"), + Self::OpenFds(n) => write!(f, "open_fds={n}"), + Self::Age(d) => write!(f, "age={:.1}s", d.as_secs_f64()), + } + } +} + +/// Soft ceilings on a worker's resource footprint. Each is `Option` so +/// callers can opt out of a signal entirely (e.g. tests that exercise +/// only one dimension). `None` on a field means "never recycle on this +/// signal." The defaults are tuned for real test suites — see +/// [`WorkerLimits::default`]. +#[derive(Debug, Clone, Copy)] +pub struct WorkerLimits { + pub max_rss_bytes: Option, + pub max_open_fds: Option, + pub max_age: Option, +} + +impl WorkerLimits { + /// Disable every soft limit. Convenient for tests that only want to + /// exercise one signal at a time. + #[must_use] + pub fn unlimited() -> Self { + Self { + max_rss_bytes: None, + max_open_fds: None, + max_age: None, + } + } +} + +impl Default for WorkerLimits { + fn default() -> Self { + Self { + // 1 GiB RSS — comfortable headroom for suites that pull in + // heavy native deps (numpy, ssl, sqlite) on first import, + // while still bounding slow leaks across long runs. + max_rss_bytes: Some(1 << 30), + // Below the macOS 256-FD per-process soft limit, leaving + // headroom for the python interpreter's own baseline (~30 + // FDs on a fresh process) plus the runner's stdio pipes. + max_open_fds: Some(200), + // 10 minutes of wall time. Long suites still finish per + // worker without churning; pathologically slow leaks that + // never trip the memory/FD cap still get bounded. + max_age: Some(Duration::from_secs(600)), + } + } +} + +/// Pure recycle-decision helper, factored out of [`WorkerProcess`] so +/// it is unit-testable without spawning a subprocess. Signals are +/// checked in priority order: memory > FDs > age, so the strongest +/// available reason is what gets reported. +pub(crate) fn evaluate_recycle( + health: WorkerHealth, + age: Duration, + limits: WorkerLimits, +) -> Option { + if let (Some(cap), Some(rss)) = (limits.max_rss_bytes, health.rss_bytes) + && rss >= cap + { + return Some(RecycleReason::MemoryBytes(rss)); + } + if let (Some(cap), Some(fds)) = (limits.max_open_fds, health.open_fds) + && fds >= cap + { + return Some(RecycleReason::OpenFds(fds)); + } + if let Some(cap) = limits.max_age + && age >= cap + { + return Some(RecycleReason::Age(age)); + } + None +} pub struct WorkerProcess { child: Child, @@ -38,7 +141,19 @@ pub struct WorkerProcess { /// the worker can't block on a stderr write mid-RPC. stderr_buf: Arc>>, next_id: u64, - tests_completed: u64, + /// Wall-clock spawn time; drives `RecycleReason::Age`. Set once at + /// spawn and read on every `should_recycle` check — never mutated. + spawned_at: Instant, + /// Latest worker-reported resource snapshot. Updated after every + /// completed `run_test` / `run_doctest` reply. The default + /// (`Option::None` on every field) is what a fresh worker reads as + /// before its first reply lands, which matches "no signal, do not + /// recycle on it." + latest_health: WorkerHealth, + /// Soft ceilings consulted by [`Self::should_recycle`]. Owned by + /// the worker (rather than passed in on each check) so the recycle + /// decision is colocated with the data it depends on. + limits: WorkerLimits, } impl WorkerProcess { @@ -55,6 +170,7 @@ impl WorkerProcess { python_path: &[&Path], root: &Path, log_level: log::LevelFilter, + limits: WorkerLimits, ) -> Result { debug!("spawning worker: {python_bin} -m tryke.worker (log={log_level})"); let pythonpath = build_pythonpath(python_path); @@ -99,15 +215,25 @@ impl WorkerProcess { stdout, stderr_buf, next_id: 1, - tests_completed: 0, + spawned_at: Instant::now(), + latest_health: WorkerHealth::default(), + limits, }) } - /// Whether this worker has run enough tests to warrant recycling. - /// See `MAX_TESTS_PER_WORKER` for the rationale. + /// Whether this worker has tripped any of its [`WorkerLimits`]. + /// Returns the first reason found (memory > FDs > age priority), + /// or `None` if the worker is still within budget. Long-lived + /// python interpreters accumulate module-level state across + /// imports — logging handlers, sqlite/ssl objects, atexit + /// callbacks — much of which holds resources (FDs, memory) that + /// `del sys.modules[name]` cannot reclaim, because they are owned + /// by objects living outside the module dict. Recycling on the + /// reported snapshot bounds that growth; only process exit + /// reliably frees module-level state in `CPython`. #[must_use] - pub fn should_recycle(&self) -> bool { - self.tests_completed >= MAX_TESTS_PER_WORKER + pub fn should_recycle(&self) -> Option { + evaluate_recycle(self.latest_health, self.spawned_at.elapsed(), self.limits) } async fn call serde::Deserialize<'de>>( @@ -195,9 +321,9 @@ impl WorkerProcess { groups: test.groups.clone(), case_label: test.case_label.clone(), })?; - let wire: RunTestResultWire = self.call("run_test", Some(params)).await?; - self.tests_completed = self.tests_completed.saturating_add(1); - Ok(convert_result(test.clone(), wire)) + let response: RunTestResponseWire = self.call("run_test", Some(params)).await?; + self.latest_health = response.health.into(); + Ok(convert_result(test.clone(), response.result)) } /// Send hook metadata for a module to the Python worker. @@ -226,9 +352,9 @@ impl WorkerProcess { module: test.module_path.clone(), object_path: object_path.to_owned(), })?; - let wire: RunTestResultWire = self.call("run_doctest", Some(params)).await?; - self.tests_completed = self.tests_completed.saturating_add(1); - Ok(convert_result(test.clone(), wire)) + let response: RunTestResponseWire = self.call("run_doctest", Some(params)).await?; + self.latest_health = response.health.into(); + Ok(convert_result(test.clone(), response.result)) } #[expect(clippy::missing_errors_doc)] @@ -554,6 +680,112 @@ mod tests { } } + #[test] + fn evaluate_recycle_returns_none_when_no_signals_tripped() { + // A fresh worker (default health, zero age) under default + // limits has nothing to report — the runner must not recycle. + assert_eq!( + evaluate_recycle( + WorkerHealth::default(), + Duration::ZERO, + WorkerLimits::default(), + ), + None, + ); + } + + #[test] + fn evaluate_recycle_prioritises_memory_then_fds_then_age() { + // All three signals tripped at once: memory wins. This + // priority matters because memory pressure is the most + // user-visible failure mode (process death, swap thrash); the + // post-mortem log line should attribute that, not whichever + // signal happened to be checked last. + let limits = WorkerLimits { + max_rss_bytes: Some(1000), + max_open_fds: Some(10), + max_age: Some(Duration::from_secs(1)), + }; + let health = WorkerHealth { + rss_bytes: Some(2000), + open_fds: Some(20), + }; + assert_eq!( + evaluate_recycle(health, Duration::from_secs(2), limits), + Some(RecycleReason::MemoryBytes(2000)), + ); + + // Drop the memory signal: FDs win over age. + let health_no_mem = WorkerHealth { + rss_bytes: None, + open_fds: Some(20), + }; + assert_eq!( + evaluate_recycle(health_no_mem, Duration::from_secs(2), limits), + Some(RecycleReason::OpenFds(20)), + ); + + // Drop FDs too: age is the last fallback. + let health_age_only = WorkerHealth { + rss_bytes: None, + open_fds: None, + }; + assert_eq!( + evaluate_recycle(health_age_only, Duration::from_secs(2), limits), + Some(RecycleReason::Age(Duration::from_secs(2))), + ); + } + + #[test] + fn evaluate_recycle_skips_signals_with_no_limit() { + // `WorkerLimits::unlimited` opts out of every cap — even an + // OOM-scale RSS reading must not trigger a recycle. This is + // the contract tests rely on when exercising one signal in + // isolation. + let unlimited = WorkerLimits::unlimited(); + let health = WorkerHealth { + rss_bytes: Some(u64::MAX), + open_fds: Some(u64::MAX), + }; + assert_eq!( + evaluate_recycle(health, Duration::from_secs(86_400), unlimited), + None, + ); + } + + #[test] + fn evaluate_recycle_skips_signals_with_no_reading() { + // Worker on a platform without `/proc/self/fd` reports + // `open_fds: None` — the runner must not synthesize a value + // and must not recycle on that signal even with a tight cap. + let limits = WorkerLimits { + max_rss_bytes: Some(1000), + max_open_fds: Some(10), + max_age: None, + }; + let health = WorkerHealth { + rss_bytes: None, + open_fds: None, + }; + assert_eq!(evaluate_recycle(health, Duration::ZERO, limits), None); + } + + #[test] + fn recycle_reason_display_is_human_readable() { + // The Display impl lands in debug logs ("recycling worker + // (memory=...)") so it needs to be terse and parseable at a + // glance — not just Debug-derived noise. + assert_eq!( + RecycleReason::MemoryBytes(1024).to_string(), + "memory=1024 bytes", + ); + assert_eq!(RecycleReason::OpenFds(42).to_string(), "open_fds=42"); + assert_eq!( + RecycleReason::Age(Duration::from_millis(2_500)).to_string(), + "age=2.5s", + ); + } + #[test] fn worker_log_env_value_off_returns_none() { // `Off` means: don't set TRYKE_LOG on the child env, preserving diff --git a/python/tryke/worker.py b/python/tryke/worker.py index 184e6fd..1860f29 100644 --- a/python/tryke/worker.py +++ b/python/tryke/worker.py @@ -120,6 +120,7 @@ class _PassedResult(TypedDict): duration_ms: int stdout: str stderr: str + health: NotRequired[_HealthSnapshot] class _FailedResult(TypedDict): @@ -131,6 +132,7 @@ class _FailedResult(TypedDict): executed_lines: list[int] stdout: str stderr: str + health: NotRequired[_HealthSnapshot] class _SkippedResult(TypedDict): @@ -139,6 +141,7 @@ class _SkippedResult(TypedDict): reason: str | None stdout: str stderr: str + health: NotRequired[_HealthSnapshot] class _XFailedResult(TypedDict): @@ -147,6 +150,7 @@ class _XFailedResult(TypedDict): reason: str | None stdout: str stderr: str + health: NotRequired[_HealthSnapshot] class _XPassedResult(TypedDict): @@ -154,6 +158,7 @@ class _XPassedResult(TypedDict): duration_ms: int stdout: str stderr: str + health: NotRequired[_HealthSnapshot] class _TodoResult(TypedDict): @@ -162,6 +167,7 @@ class _TodoResult(TypedDict): description: str | None stdout: str stderr: str + health: NotRequired[_HealthSnapshot] type _TestResult = ( @@ -173,6 +179,21 @@ class _TodoResult(TypedDict): | _TodoResult ) + +class _HealthSnapshot(TypedDict): + """Worker self-reported resource snapshot. + + Mirrors ``WorkerHealthWire`` in + ``crates/tryke_runner/src/protocol.rs``. Both fields are ``None`` + when the underlying syscall is unavailable on this platform — the + runner treats a missing signal as "do not recycle on this + dimension," not as an error. + """ + + rss_bytes: int | None + open_fds: int | None + + type _DispatchResult = _TestResult | str | None @@ -273,6 +294,50 @@ def _todo( } +def _measure_rss_bytes() -> int | None: + """Resident set size in bytes, or ``None`` where unavailable. + + Uses :mod:`resource` (POSIX-only). Linux reports ``ru_maxrss`` in + KiB, macOS in bytes — normalise to bytes for the wire. Windows has + no :mod:`resource`; return ``None`` so the runner skips the memory + cap there rather than guessing wrong. + """ + try: + import resource # noqa: PLC0415 - lazy: optional POSIX-only import + except ImportError: + return None + try: + ru_maxrss = resource.getrusage(resource.RUSAGE_SELF).ru_maxrss + except OSError: + return None + if sys.platform == "darwin": + return int(ru_maxrss) + return int(ru_maxrss) * 1024 + + +def _measure_open_fds() -> int | None: + """Open file descriptor count, or ``None`` where unavailable. + + Linux exposes ``/proc/self/fd``; macOS ships ``/dev/fd``. Windows + has neither and returns ``None`` — the runner then skips the FD + cap on this worker. + """ + for path in ("/proc/self/fd", "/dev/fd"): + try: + return sum(1 for _ in Path(path).iterdir()) + except OSError: + continue + return None + + +def _measure_health() -> _HealthSnapshot: + """Snapshot the worker's current resource footprint for the runner.""" + return { + "rss_bytes": _measure_rss_bytes(), + "open_fds": _measure_open_fds(), + } + + def _is_user_frame(frame: traceback.FrameSummary) -> bool: return not str( Path(frame.filename).resolve(), @@ -390,6 +455,13 @@ def run(self) -> None: try: result = self._dispatch(method, params) + # Attach the worker's resource snapshot to every test + # response so the runner can recycle on memory / FD + # pressure (see ``WorkerHealthWire`` on the rust side). + # Only test-shaped results (dict, not "pong"/None) carry + # health; ping/register_hooks/finalize_hooks don't. + if isinstance(result, dict): + result["health"] = _measure_health() self._write( { "jsonrpc": "2.0", From 1e4fc5e482acb56ac02021e793c121c1cc03906a Mon Sep 17 00:00:00 2001 From: Justin Chapman Date: Mon, 11 May 2026 10:01:46 -0400 Subject: [PATCH 4/7] fix(runner): de-flake worker_recycles_when_age_exceeds_limit on CI Three CI test failures (ubuntu/macos/windows on slower Python versions) traced to the same flake: the integration test pinned the recycle count to exactly 1 (= 2 distinct worker pids), but on slow runners the first unit's end-of-unit age check already trips the 100 ms cap, producing an extra recycle and 3 pids. The "age recycle eventually fires" property still holds in that scenario, so relax the assertion to >= 2 distinct pids and document why. The "no recycle when under cap" property is already covered by evaluate_recycle_returns_none_when_no_signals_tripped. Also address two of the Copilot review comments on c20ca3b: - _measure_rss_bytes returns peak RSS (ru_maxrss), not current. Update the docstring to call that out and explain why peak is the right signal for recycling (transient spikes leave fragmentation/state we can't free). - _measure_open_fds was off by one because iterating /proc/self/fd or /dev/fd holds an extra dir FD that gets counted. Subtract it so the reading reflects FDs held by the worker, not the measurement. Regenerated reporter doc samples (v0.0.27 -> v0.0.28 + new scheduler warning) caught up by the generate-reporter-examples hook. Co-Authored-By: Claude Opus 4.7 (1M context) --- README.md | 3 ++- crates/tryke_runner/src/pool.rs | 34 ++++++++++++++++++++------------- docs/guides/reporters.md | 12 ++++++++---- docs/index.md | 3 ++- python/tryke/worker.py | 32 ++++++++++++++++++++++++------- 5 files changed, 58 insertions(+), 26 deletions(-) diff --git a/README.md b/README.md index 628a45e..1c377e6 100644 --- a/README.md +++ b/README.md @@ -72,8 +72,9 @@ uvx tryke test ```text -tryke test v0.0.27 +tryke test v0.0.28 +warning: scheduler: upgrading --dist test → file for 1 module(s) because of per="scope" fixtures (sample). Move the fixture into a describe() to keep finer-grained distribution. sample.py: users get diff --git a/crates/tryke_runner/src/pool.rs b/crates/tryke_runner/src/pool.rs index 131b1c3..fb98dba 100644 --- a/crates/tryke_runner/src/pool.rs +++ b/crates/tryke_runner/src/pool.rs @@ -713,8 +713,16 @@ def test_noop() -> None: /// teardown is not skipped — see /// `recycle_does_not_skip_scope_fixture_teardown`), so we exercise /// it here by sleeping past the cap between units. The module body - /// records the worker pid on every fresh import; we expect one - /// distinct pid per recycle boundary. + /// records the worker pid on every fresh import; observing ≥ 2 + /// distinct pids proves the age recycle fired at least once. + /// + /// We deliberately do NOT pin the count to exactly 2: on slow CI + /// runners (cold Python startup, busy schedulers) the first unit's + /// end-of-unit check can already cross a tight cap, producing a + /// recycle before the sleep. That's not a correctness regression — + /// the property under test ("age cap eventually fires") still + /// holds. The complementary "no recycle when under cap" property + /// is covered by `evaluate_recycle_returns_none_when_no_signals_tripped`. #[tokio::test] async fn worker_recycles_when_age_exceeds_limit() { let dir = tempfile::tempdir().expect("tempdir"); @@ -741,8 +749,10 @@ def test_noop() -> None: ); std::fs::write(&test_file, source).expect("write test file"); - // Tight age cap — a sleep between units crosses it deterministically - // without making the test slow. + // Tight age cap — a sleep between units crosses it + // deterministically without making the test slow. Slow CI + // runners may also cross it within a single unit; see the + // assertion at the bottom for why that's acceptable. let limits = WorkerLimits { max_rss_bytes: None, max_open_fds: None, @@ -763,7 +773,6 @@ def test_noop() -> None: hooks: vec![], }; - // First unit: worker is fresh, no recycle. let r1: Vec = pool.run(vec![make_unit()]).collect().await; assert_eq!(r1.len(), 1); assert!(matches!(r1[0].outcome, TestOutcome::Passed)); @@ -773,14 +782,13 @@ def test_noop() -> None: // jittery CI without pushing test latency higher than necessary. tokio::time::sleep(std::time::Duration::from_millis(250)).await; - // Second unit: still on the same worker (recycle is checked at - // end-of-unit), but at *its* end the age check trips. let r2: Vec = pool.run(vec![make_unit()]).collect().await; assert_eq!(r2.len(), 1); assert!(matches!(r2[0].outcome, TestOutcome::Passed)); - // Third unit: forced to spawn a new worker (the previous one - // was recycled) so a second pid is logged. + // Third unit forces a fresh spawn (the prior worker was + // recycled at the end of unit 2 at latest), guaranteeing the + // pid log gains a new entry. let r3: Vec = pool.run(vec![make_unit()]).collect().await; assert_eq!(r3.len(), 1); assert!(matches!(r3[0].outcome, TestOutcome::Passed)); @@ -788,10 +796,10 @@ def test_noop() -> None: let pid_lines = std::fs::read_to_string(&pid_log).unwrap_or_default(); let distinct_pids: std::collections::HashSet<&str> = pid_lines.lines().filter(|l| !l.is_empty()).collect(); - assert_eq!( - distinct_pids.len(), - 2, - "expected 2 distinct worker pid(s) (one recycle); got {} from log: {pid_lines:?}", + assert!( + distinct_pids.len() >= 2, + "expected ≥ 2 distinct worker pid(s) (recycle fired at least once); \ + got {} from log: {pid_lines:?}", distinct_pids.len(), ); diff --git a/docs/guides/reporters.md b/docs/guides/reporters.md index 066db19..794398d 100644 --- a/docs/guides/reporters.md +++ b/docs/guides/reporters.md @@ -20,8 +20,9 @@ Sample output (with `-v` to surface per-assertion lines): ```ansi -tryke test v0.0.27 +tryke test v0.0.28 +warning: scheduler: upgrading --dist test → file for 1 module(s) because of per="scope" fixtures (sample). Move the fixture into a describe() to keep finer-grained distribution. sample.py: users get @@ -62,8 +63,9 @@ Sample output: ```ansi -tryke test v0.0.27 +tryke test v0.0.28 +warning: scheduler: upgrading --dist test → file for 1 module(s) because of per="scope" fixtures (sample). Move the fixture into a describe() to keep finer-grained distribution. .. Test Files 1 passed (1) @@ -150,8 +152,9 @@ Sample output: ```ansi -tryke test v0.0.27 +tryke test v0.0.28 +warning: scheduler: upgrading --dist test → file for 1 module(s) because of per="scope" fixtures (sample). Move the fixture into a describe() to keep finer-grained distribution. PASS  [ 0.000s] sample > users > get :: returns a stored user PASS  [ 0.000s] sample > users > set :: stores a new user @@ -180,8 +183,9 @@ Sample output: ```ansi -tryke test v0.0.27 +tryke test v0.0.28 +warning: scheduler: upgrading --dist test → file for 1 module(s) because of per="scope" fixtures (sample). Move the fixture into a describe() to keep finer-grained distribution. sample.py ✓✓ 2 100% ████████████ Test Files 1 passed (1) diff --git a/docs/index.md b/docs/index.md index 4cd8393..0f88717 100644 --- a/docs/index.md +++ b/docs/index.md @@ -105,8 +105,9 @@ uvx tryke test ```ansi -tryke test v0.0.27 +tryke test v0.0.28 +warning: scheduler: upgrading --dist test → file for 1 module(s) because of per="scope" fixtures (sample). Move the fixture into a describe() to keep finer-grained distribution. sample.py: users get diff --git a/python/tryke/worker.py b/python/tryke/worker.py index 1860f29..2314ac3 100644 --- a/python/tryke/worker.py +++ b/python/tryke/worker.py @@ -295,12 +295,23 @@ def _todo( def _measure_rss_bytes() -> int | None: - """Resident set size in bytes, or ``None`` where unavailable. - - Uses :mod:`resource` (POSIX-only). Linux reports ``ru_maxrss`` in - KiB, macOS in bytes — normalise to bytes for the wire. Windows has - no :mod:`resource`; return ``None`` so the runner skips the memory - cap there rather than guessing wrong. + """Peak resident set size in bytes, or ``None`` where unavailable. + + Uses :func:`resource.getrusage`'s ``ru_maxrss`` (POSIX-only), + which is the **maximum** RSS the process has reached so far — + monotonically non-decreasing for the worker's lifetime. We use the + peak (not current) because: + + 1. A worker that ever ballooned past the cap has likely fragmented + its allocator and accumulated module-level state we can't free; + recycling the process is the cheap way to reclaim that even if + RSS has since dropped. + 2. ``ru_maxrss`` requires no /proc parsing and is available on both + Linux and macOS via a single syscall. + + Linux reports ``ru_maxrss`` in KiB, macOS in bytes — normalise to + bytes for the wire. Windows has no :mod:`resource`; return ``None`` + so the runner skips the memory cap there rather than guessing wrong. """ try: import resource # noqa: PLC0415 - lazy: optional POSIX-only import @@ -321,12 +332,19 @@ def _measure_open_fds() -> int | None: Linux exposes ``/proc/self/fd``; macOS ships ``/dev/fd``. Windows has neither and returns ``None`` — the runner then skips the FD cap on this worker. + + The dir scan itself opens one FD on the dir handle; we subtract it + so the reading reflects FDs held by the *worker*, not the + measurement. Without this, a worker idling at N FDs would report + N+1 and could trip a tight cap before the underlying number + actually changed. """ for path in ("/proc/self/fd", "/dev/fd"): try: - return sum(1 for _ in Path(path).iterdir()) + count = sum(1 for _ in Path(path).iterdir()) except OSError: continue + return max(0, count - 1) return None From dbca0ac2ac5608e2b6e5be4caf6bcc60e09da138 Mon Sep 17 00:00:00 2001 From: Claude Date: Wed, 13 May 2026 15:50:25 +0000 Subject: [PATCH 5/7] Disable worker recycling by default; raise RLIMIT_NOFILE on startup MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The recycling thresholds (1 GiB RSS / 200 FDs / 600 s) were tuned to mitigate the macOS 256-FD per-process soft default. That mitigation has costs: surprise process churn on suites that legitimately run long or hold many FDs, plus the lost Python startup/import cache. Address the underlying FD ceiling directly instead. On startup tryke now raises its own RLIMIT_NOFILE soft limit toward the inherited hard limit, following the systemd convention (1024/524288) Home Assistant OS 16 adopted in mid-2025: ship modest soft defaults, generous hard defaults, and let applications raise their own at startup. Worker subprocesses inherit the bumped rlimit, so the FD-exhaustion failure mode disappears without churning interpreters. `WorkerLimits` defaults all three caps to `None` (opt-in only). The recycling machinery itself is unchanged — the same code path is still exercised by tests, just not on every `tryke test` invocation. macOS quirk: kern.maxfilesperproc caps per-process FDs below the reported hard limit; on EINVAL we fall back to a 10240 target, which still dwarfs the 256-FD default. Docs: new "Resource limits" section in guides/configuration.md covering both the FD bump and the opt-in recycling knobs. --- Cargo.lock | 1 + crates/tryke/Cargo.toml | 3 + crates/tryke/src/fdlimit.rs | 198 ++++++++++++++++++++++++++++++ crates/tryke/src/lib.rs | 1 + crates/tryke/src/main.rs | 20 ++- crates/tryke_runner/src/worker.rs | 50 +++----- docs/guides/configuration.md | 74 +++++++++++ 7 files changed, 315 insertions(+), 32 deletions(-) create mode 100644 crates/tryke/src/fdlimit.rs diff --git a/Cargo.lock b/Cargo.lock index 1a34937..c39dd07 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2160,6 +2160,7 @@ dependencies = [ "console 0.15.11", "env_logger", "insta", + "libc", "log", "notify-debouncer-mini", "serde_json", diff --git a/crates/tryke/Cargo.toml b/crates/tryke/Cargo.toml index eb83e5b..4eca67a 100644 --- a/crates/tryke/Cargo.toml +++ b/crates/tryke/Cargo.toml @@ -20,6 +20,9 @@ console = { workspace = true } env_logger = "0.11" log = "0.4" notify-debouncer-mini = { workspace = true } + +[target.'cfg(unix)'.dependencies] +libc = "0.2" tokio = { workspace = true } tokio-stream = { workspace = true } tryke_config = { workspace = true } diff --git a/crates/tryke/src/fdlimit.rs b/crates/tryke/src/fdlimit.rs new file mode 100644 index 0000000..e0d7bc2 --- /dev/null +++ b/crates/tryke/src/fdlimit.rs @@ -0,0 +1,198 @@ +//! Raise the process's `RLIMIT_NOFILE` soft limit on startup. +//! +//! Modern systemd ships a soft default of 1024 and a hard default of +//! 524288 open files per service (and inherits the same shape to user +//! shells on most distributions). The convention systemd encourages — +//! and what Home Assistant OS 16 moved to in mid-2025 — is for each +//! application to **explicitly raise its own soft limit at startup**, +//! up to whatever hard limit it was launched under, rather than rely +//! on the OS to ship a huge soft default.[^1] +//! +//! Tryke spawns one Python subprocess per worker and each subprocess +//! accumulates file descriptors over its lifetime (test runs open +//! sockets, sqlite handles, log files, doctest stdio pipes, ...). On +//! macOS the inherited soft limit is famously 256, which large suites +//! (e.g. ~5k tests) exhaust well before they finish — and the failure +//! mode is opaque: a Python `OSError: [Errno 24] Too many open files` +//! buried inside a fixture. Raising the runner's own soft limit on +//! startup lifts that ceiling for every spawned worker (children +//! inherit our rlimit), so the FD-exhaustion failure mode disappears +//! without having to recycle worker processes. +//! +//! Behaviour: +//! - Unix: read the current `(soft, hard)` via `getrlimit(RLIMIT_NOFILE)` +//! and call `setrlimit` to raise `soft` to `hard`. macOS has a +//! kernel-side ceiling that is often *lower* than the reported hard +//! limit (`kern.maxfilesperproc`); we don't try to detect that +//! directly — instead the setrlimit call fails cleanly and we fall +//! back to a conservative target (`OPEN_MAX_FALLBACK`) so the binary +//! still benefits from a meaningful bump. +//! - Windows: no-op. The platform has no `RLIMIT_NOFILE`; the C +//! runtime's `_setmaxstdio` ceiling (2048 by default) is the closest +//! analogue but it does not affect socket / pipe FDs. +//! +//! Errors are non-fatal: if we cannot raise the limit (e.g. running +//! inside a sandbox that pinned it) we log at `debug` and proceed — +//! the user can still override via `ulimit -n` before launching. +//! +//! [^1]: + +/// Conservative fallback target for `RLIMIT_NOFILE` when the kernel +/// rejects raising soft to the reported hard limit. macOS in +/// particular caps per-process FDs below `RLIM_INFINITY` via +/// `kern.maxfilesperproc` (default 24 576 on recent releases), so an +/// unconditional `soft = hard` setrlimit returns `EINVAL`. The value +/// chosen here is high enough to dwarf the 256-FD macOS soft default +/// that bites real test suites, while staying below historic macOS +/// per-process ceilings so the fallback itself succeeds. +#[cfg(unix)] +const OPEN_MAX_FALLBACK: libc::rlim_t = 10_240; + +/// Outcome of [`raise`]. Carries the before/after soft limit so the +/// caller can log meaningful telemetry without re-querying the +/// kernel. `Skipped` covers platforms (windows) where the operation +/// has no analogue, and the case where the inherited soft limit +/// already saturates the hard ceiling. +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum RaiseOutcome { + /// The soft limit was raised from `from` to `to`. Both are FD + /// counts (matching the units reported by `ulimit -n`). + Raised { from: u64, to: u64 }, + /// No change applied. Either the platform has no `RLIMIT_NOFILE` + /// or the soft limit already matched (or exceeded) the hard + /// limit. `current` carries the existing soft limit for logging. + Skipped { current: u64 }, +} + +#[cfg(unix)] +pub fn raise() -> std::io::Result { + use std::io::Error; + + // SAFETY: `rlimit` is a POD struct populated by the kernel and + // `getrlimit` only touches the bytes we pass it. The address is + // stack-local and lives for the duration of the call. + let mut rlim = libc::rlimit { + rlim_cur: 0, + rlim_max: 0, + }; + let rc = unsafe { libc::getrlimit(libc::RLIMIT_NOFILE, &raw mut rlim) }; + if rc != 0 { + return Err(Error::last_os_error()); + } + + // Already at or above the hard limit — leave it. We never *lower* + // the soft limit (that would punish users who deliberately raised + // it via `ulimit -n` before launching) and we don't try to push + // the hard limit (that requires CAP_SYS_RESOURCE / root on Linux + // and adds a failure mode without a clear benefit). + if rlim.rlim_cur >= rlim.rlim_max { + #[expect( + clippy::useless_conversion, + reason = "rlim_t is u32 on some targets, u64 on others; cast unifies the wire type" + )] + return Ok(RaiseOutcome::Skipped { + current: u64::from(rlim.rlim_cur), + }); + } + + let original_soft = rlim.rlim_cur; + let target = rlim.rlim_max; + rlim.rlim_cur = target; + // SAFETY: `rlim` is a fully initialised POD on our stack; the + // kernel only reads from it. + let rc = unsafe { libc::setrlimit(libc::RLIMIT_NOFILE, &raw const rlim) }; + if rc == 0 { + #[expect( + clippy::useless_conversion, + reason = "rlim_t is u32 on some targets, u64 on others; cast unifies the wire type" + )] + return Ok(RaiseOutcome::Raised { + from: u64::from(original_soft), + to: u64::from(target), + }); + } + + // setrlimit failed at the hard ceiling — macOS's + // `kern.maxfilesperproc` is the usual culprit. Retry with a + // conservative fallback: the smaller of the reported hard limit + // and `OPEN_MAX_FALLBACK`. If the fallback target is still ≤ the + // existing soft limit (very unlikely), report `Skipped`. + let fallback_target = rlim.rlim_max.min(OPEN_MAX_FALLBACK); + if fallback_target <= original_soft { + #[expect( + clippy::useless_conversion, + reason = "rlim_t is u32 on some targets, u64 on others; cast unifies the wire type" + )] + return Ok(RaiseOutcome::Skipped { + current: u64::from(original_soft), + }); + } + rlim.rlim_cur = fallback_target; + // SAFETY: see above. + let rc = unsafe { libc::setrlimit(libc::RLIMIT_NOFILE, &raw const rlim) }; + if rc != 0 { + return Err(Error::last_os_error()); + } + #[expect( + clippy::useless_conversion, + reason = "rlim_t is u32 on some targets, u64 on others; cast unifies the wire type" + )] + Ok(RaiseOutcome::Raised { + from: u64::from(original_soft), + to: u64::from(fallback_target), + }) +} + +#[cfg(not(unix))] +pub fn raise() -> std::io::Result { + // No `RLIMIT_NOFILE` analogue on windows. The C runtime's + // `_setmaxstdio` only governs stdio streams (not sockets or + // pipes), so raising it would not help worker-spawned children. + Ok(RaiseOutcome::Skipped { current: 0 }) +} + +#[cfg(test)] +mod tests { + use super::*; + + /// On Unix the call must report either a successful raise or an + /// explicit skip — never an error under normal CI conditions. + /// Asserting on the exact `to` value would be flaky (it depends + /// on the host's hard limit) so we only check the shape. + #[cfg(unix)] + #[test] + fn raise_returns_meaningful_outcome_on_unix() { + let outcome = raise().expect("raise should not fail under default CI rlimits"); + match outcome { + RaiseOutcome::Raised { from, to } => { + assert!(to >= from, "raise must be monotonic: from={from} to={to}"); + } + RaiseOutcome::Skipped { current } => { + assert!( + current > 0, + "skipped outcomes must report the current limit" + ); + } + } + } + + /// Idempotency: calling `raise` twice in a row must not lower the + /// soft limit or error. The second call typically returns + /// `Skipped` (already at hard). + #[cfg(unix)] + #[test] + fn raise_is_idempotent() { + let first = raise().expect("first raise"); + let second = raise().expect("second raise"); + let first_to = match first { + RaiseOutcome::Raised { to, .. } | RaiseOutcome::Skipped { current: to } => to, + }; + let second_to = match second { + RaiseOutcome::Raised { to, .. } | RaiseOutcome::Skipped { current: to } => to, + }; + assert!( + second_to >= first_to, + "soft limit must not drop across calls: first={first_to} second={second_to}", + ); + } +} diff --git a/crates/tryke/src/lib.rs b/crates/tryke/src/lib.rs index 904316b..5fd1169 100644 --- a/crates/tryke/src/lib.rs +++ b/crates/tryke/src/lib.rs @@ -2,6 +2,7 @@ pub mod cli; pub mod cli_docs; pub mod discovery; pub mod execution; +pub mod fdlimit; pub mod git; pub mod graph; pub mod watch; diff --git a/crates/tryke/src/main.rs b/crates/tryke/src/main.rs index 87d7711..24112c8 100644 --- a/crates/tryke/src/main.rs +++ b/crates/tryke/src/main.rs @@ -2,12 +2,13 @@ use std::{env, time::Instant}; use anyhow::Result; use clap::Parser; -use log::debug; +use log::{debug, warn}; use tryke::cli::{Cli, Commands, ReporterFormat}; use tryke::discovery::{ discover_tests, discover_tests_changed_first, discover_tests_for_paths, resolved_excludes, }; use tryke::execution::run_tests; +use tryke::fdlimit::{RaiseOutcome, raise as raise_fd_limit}; use tryke::graph::{run_fixture_graph, run_graph}; use tryke::watch::run_watch; use tryke_reporter::{ @@ -65,6 +66,23 @@ fn main() -> Result<()> { .init(); debug!("{cli:?}"); + // Raise our own RLIMIT_NOFILE soft limit toward the inherited hard + // limit before any worker subprocesses are spawned. Children + // inherit our rlimit, so a single bump here lifts the FD ceiling + // for every Python worker — addressing the macOS 256-FD soft + // default that bites large suites. Failures are non-fatal: + // sandboxes and locked-down hosts may refuse the syscall and the + // user can still raise it manually via `ulimit -n`. + match raise_fd_limit() { + Ok(RaiseOutcome::Raised { from, to }) => { + debug!("RLIMIT_NOFILE soft limit: raised {from} -> {to}"); + } + Ok(RaiseOutcome::Skipped { current }) => { + debug!("RLIMIT_NOFILE soft limit unchanged (current={current})"); + } + Err(e) => warn!("could not raise RLIMIT_NOFILE soft limit: {e}"), + } + // Cross-language verbosity for spawned python workers. `RUST_LOG` is // intentionally not consulted (its per-module filter syntax doesn't // map onto a python log level); `TRYKE_LOG` is the umbrella knob and diff --git a/crates/tryke_runner/src/worker.rs b/crates/tryke_runner/src/worker.rs index 7f6c19c..ab068ae 100644 --- a/crates/tryke_runner/src/worker.rs +++ b/crates/tryke_runner/src/worker.rs @@ -62,11 +62,21 @@ impl fmt::Display for RecycleReason { } /// Soft ceilings on a worker's resource footprint. Each is `Option` so -/// callers can opt out of a signal entirely (e.g. tests that exercise -/// only one dimension). `None` on a field means "never recycle on this -/// signal." The defaults are tuned for real test suites — see -/// [`WorkerLimits::default`]. -#[derive(Debug, Clone, Copy)] +/// callers can opt out of a signal entirely. `None` on a field means +/// "never recycle on this signal." The default is fully opt-in: every +/// signal is `None`, so workers run for the full duration of +/// `tryke test` unless the caller wires explicit caps in. +/// +/// Earlier iterations of this code shipped 1 GiB RSS / 200 FDs / 600 s +/// defaults to mitigate the macOS 256-FD ceiling, but those bounds +/// surprised suites that legitimately ran long (or held many FDs) and +/// made the runner's behaviour depend on workload shape rather than +/// user intent. The upstream fix for the FD ceiling is to raise the +/// process's own `RLIMIT_NOFILE` on startup (see the `fdlimit` module +/// in the `tryke` binary), which removes the FD-exhaustion failure +/// mode without churning interpreters. Users who still want recycling +/// on a long-lived suite can opt in through this struct. +#[derive(Debug, Clone, Copy, Default)] pub struct WorkerLimits { pub max_rss_bytes: Option, pub max_open_fds: Option, @@ -74,34 +84,12 @@ pub struct WorkerLimits { } impl WorkerLimits { - /// Disable every soft limit. Convenient for tests that only want to - /// exercise one signal at a time. + /// Disable every soft limit. Same as [`Self::default`]; retained as + /// a self-documenting alias for tests and call sites that want to + /// be explicit about opting out of recycling. #[must_use] pub fn unlimited() -> Self { - Self { - max_rss_bytes: None, - max_open_fds: None, - max_age: None, - } - } -} - -impl Default for WorkerLimits { - fn default() -> Self { - Self { - // 1 GiB RSS — comfortable headroom for suites that pull in - // heavy native deps (numpy, ssl, sqlite) on first import, - // while still bounding slow leaks across long runs. - max_rss_bytes: Some(1 << 30), - // Below the macOS 256-FD per-process soft limit, leaving - // headroom for the python interpreter's own baseline (~30 - // FDs on a fresh process) plus the runner's stdio pipes. - max_open_fds: Some(200), - // 10 minutes of wall time. Long suites still finish per - // worker without churning; pathologically slow leaks that - // never trip the memory/FD cap still get bounded. - max_age: Some(Duration::from_secs(600)), - } + Self::default() } } diff --git a/docs/guides/configuration.md b/docs/guides/configuration.md index b91824a..1c278d1 100644 --- a/docs/guides/configuration.md +++ b/docs/guides/configuration.md @@ -131,6 +131,80 @@ TRYKE_LOG=debug tryke test TRYKE_LOG=info RUST_LOG=tryke=warn tryke test ``` +## Resource limits + +### File-descriptor ceiling (`RLIMIT_NOFILE`) + +Large suites can exhaust the per-process open-FD soft limit, surfacing as +opaque `OSError: [Errno 24] Too many open files` failures inside worker +subprocesses. macOS in particular ships a 256-FD soft default that bites +suites of a few thousand tests well before they finish. + +On startup, tryke raises its own `RLIMIT_NOFILE` soft limit toward the +inherited hard limit — the convention systemd has standardized on (and +that Home Assistant OS 16 adopted in mid-2025[^ha-os-16]): ship a modest +soft default (1024) and a generous hard default (524288), and let each +application raise its own soft limit at startup based on its needs. +Worker subprocesses inherit the bumped rlimit, so a single call lifts +the ceiling for every Python interpreter tryke spawns. + +The bump is **non-fatal**. If the syscall fails (locked-down sandbox, +cgroup-pinned limit, ...) tryke logs a warning and proceeds; the user +can still raise the limit manually: + +```bash +ulimit -n 524288 +tryke test +``` + +On macOS the kernel-side `kern.maxfilesperproc` ceiling (default 24576 +on recent releases) is below the reported hard limit; tryke detects the +`setrlimit` rejection and falls back to a conservative target (10240), +which still dwarfs the 256-FD soft default that causes the failure mode. + +Windows has no `RLIMIT_NOFILE` analogue — the bump is a no-op there. + +[^ha-os-16]: + +### Worker recycling + +Tryke can recycle a Python worker subprocess mid-run when its +self-reported resource footprint crosses a configured ceiling. Long-lived +interpreters accumulate module-level state across imports (logging +handlers, sqlite/ssl objects, atexit callbacks) that `del sys.modules[name]` +cannot reclaim; only process exit reliably frees it in CPython. Recycling +is the explicit lever for bounding that growth on suites that need it. + +**Defaults: all caps disabled.** Out of the box no worker is ever +recycled — the runner pairs the FD-limit bump above with the assumption +that most suites do not need process churn, and recycling has costs of +its own (Python interpreter startup latency, cached fixture loss). If +you have a suite that leaks memory or FDs over hours of runtime, the +hooks are still there: + +- **`max_rss_bytes`** — recycle when peak RSS (as reported by + `getrusage(RUSAGE_SELF).ru_maxrss`, normalised to bytes) crosses the + given threshold. `None` disables the signal. POSIX only — Windows + workers report `None` and never trip this cap. +- **`max_open_fds`** — recycle when the worker's `/proc/self/fd` or + `/dev/fd` count (minus one for the directory handle held during the + scan) crosses the threshold. `None` disables. Linux + macOS only. +- **`max_age`** — recycle when the worker has been alive longer than + the given `Duration`. `None` disables. Works on every platform. + +When multiple signals trip simultaneously the runner reports the +**strongest** one, in priority order `memory > fds > age`, so debug +logs attribute the recycle to the most user-visible failure mode. + +Recycling is deferred to the end of a unit (after `finalize_hooks`) +so `per="scope"` fixture teardown is never skipped. The next unit +handed to the worker task spawns a fresh interpreter and replays the +cached `register_hooks` call — the same path used for crash recovery. + +These knobs are currently library-only (`tryke_runner::WorkerLimits`) +and intended for embedded use cases. A CLI surface may follow once the +defaults stabilise. + ## Example A typical configuration for a project with benchmarks and generated code: From dd23aabf0da4f6c48bba68511ed0f0c7bd8ed83e Mon Sep 17 00:00:00 2001 From: Claude Date: Wed, 13 May 2026 15:59:19 +0000 Subject: [PATCH 6/7] fix(tryke): keep core deps unconditional; address fdlimit doc nit The previous commit accidentally inserted `[target.'cfg(unix)'.dependencies]` mid-stream in the `[dependencies]` table, sweeping tokio, tokio-stream, and the six tryke_* workspace crates into the unix-only section. `uv sync` (which builds the maturin wheel) on Windows then failed with "unresolved dependencies" because none of those crates were available on the windows target. Move the unix-gated table to the end so it only contains `libc`, restoring the cross-platform dependency surface. Also reword the fdlimit module preamble per Copilot review: this module returns an `io::Result` rather than logging directly; the caller (main.rs) is what logs at warn level. --- crates/tryke/Cargo.toml | 6 +++--- crates/tryke/src/fdlimit.rs | 8 +++++--- 2 files changed, 8 insertions(+), 6 deletions(-) diff --git a/crates/tryke/Cargo.toml b/crates/tryke/Cargo.toml index 4eca67a..e64fd4b 100644 --- a/crates/tryke/Cargo.toml +++ b/crates/tryke/Cargo.toml @@ -20,9 +20,6 @@ console = { workspace = true } env_logger = "0.11" log = "0.4" notify-debouncer-mini = { workspace = true } - -[target.'cfg(unix)'.dependencies] -libc = "0.2" tokio = { workspace = true } tokio-stream = { workspace = true } tryke_config = { workspace = true } @@ -32,6 +29,9 @@ tryke_runner = { workspace = true } tryke_server = { workspace = true } tryke_types = { workspace = true } +[target.'cfg(unix)'.dependencies] +libc = "0.2" + [dev-dependencies] insta = { version = "1", features = ["filters"] } serde_json = "1" diff --git a/crates/tryke/src/fdlimit.rs b/crates/tryke/src/fdlimit.rs index e0d7bc2..ec9331f 100644 --- a/crates/tryke/src/fdlimit.rs +++ b/crates/tryke/src/fdlimit.rs @@ -31,9 +31,11 @@ //! runtime's `_setmaxstdio` ceiling (2048 by default) is the closest //! analogue but it does not affect socket / pipe FDs. //! -//! Errors are non-fatal: if we cannot raise the limit (e.g. running -//! inside a sandbox that pinned it) we log at `debug` and proceed — -//! the user can still override via `ulimit -n` before launching. +//! Errors are non-fatal: this module returns the `io::Result` so the +//! caller can choose how to surface a failure (in `main` we log it at +//! `warn` and proceed). The user can still override the limit +//! manually via `ulimit -n` before launching tryke if a sandbox or +//! cgroup pinned it. //! //! [^1]: From 7ac8e5e2830035b4d17af96d42faf1b5430245c4 Mon Sep 17 00:00:00 2001 From: Justin Chapman Date: Thu, 14 May 2026 22:08:50 -0700 Subject: [PATCH 7/7] refactor --- .vscode/settings.json | 15 +-- README.md | 3 +- crates/tryke/src/execution.rs | 184 ++++++++++++++++++++-------------- crates/tryke/src/main.rs | 24 +++-- crates/tryke/src/watch.rs | 18 ++-- docs/guides/reporters.md | 12 +-- docs/index.md | 3 +- rustfmt.toml | 2 - 8 files changed, 148 insertions(+), 113 deletions(-) diff --git a/.vscode/settings.json b/.vscode/settings.json index e665ce1..a714cbc 100644 --- a/.vscode/settings.json +++ b/.vscode/settings.json @@ -16,13 +16,9 @@ "rust-analyzer.checkOnSave": true, "rust-analyzer.restartServerOnConfigChange": true, "rust-analyzer.showDependenciesExplorer": true, - "rust-analyzer.showSyntaxTree": true, + "rust-analyzer.showSyntaxTree": false, "rust-analyzer.testExplorer": true, "rust-analyzer.debug.buildBeforeRestart": true, - "rust-analyzer.diagnostics.enable": true, - "rust-analyzer.diagnostics.experimental.enable": true, - "rust-analyzer.diagnostics.styleLints.enable": true, - "rust-analyzer.cachePriming.enable": true, "rust-analyzer.cargo.buildScripts.rebuildOnSave": true, "editor.formatOnSave": true, "ruff.enable": true, @@ -31,5 +27,12 @@ "ruff.showSyntaxErrors": true, "ruff.organizeImports": true, "explorer.excludeGitIgnore": true, - "ty.path": ["${workspaceFolder}/.venv/bin/ty"] + "ty.path": ["${workspaceFolder}/.venv/bin/ty"], + "rust-analyzer.cargo.allTargets": true, + "rust-analyzer.cargo.autoreload": true, + "rust-analyzer.cargo.buildScripts.enable": true, + "rust-analyzer.check.workspace": true, + "rust-analyzer.procMacro.enable": true, + "rust-analyzer.check.command": "clippy", + "rust-analyzer.check.allTargets": true } diff --git a/README.md b/README.md index 1c377e6..628a45e 100644 --- a/README.md +++ b/README.md @@ -72,9 +72,8 @@ uvx tryke test ```text -tryke test v0.0.28 +tryke test v0.0.27 -warning: scheduler: upgrading --dist test → file for 1 module(s) because of per="scope" fixtures (sample). Move the fixture into a describe() to keep finer-grained distribution. sample.py: users get diff --git a/crates/tryke/src/execution.rs b/crates/tryke/src/execution.rs index 829c2c7..d9a87e6 100644 --- a/crates/tryke/src/execution.rs +++ b/crates/tryke/src/execution.rs @@ -1,4 +1,7 @@ -use std::time::{Duration, Instant}; +use std::{ + path::{Path, PathBuf}, + time::{Duration, Instant}, +}; use anyhow::Result; use log::LevelFilter; @@ -11,32 +14,58 @@ pub fn worker_pool_size() -> usize { std::thread::available_parallelism().map_or(4, std::num::NonZero::get) } -#[expect(clippy::too_many_arguments)] +pub struct RunTestsRequest<'a> { + pub root: &'a Path, + pub python: &'a str, + pub log_level: LevelFilter, + pub tests: Vec, + pub hooks: &'a [HookItem], + pub maxfail: Option, + pub workers: Option, + pub dist: DistMode, + pub discovery_duration: Option, + pub changed_selection: Option, +} + +pub struct ReportCycleRequest<'a> { + pub tests: Vec, + pub hooks: &'a [HookItem], + pub pool: &'a WorkerPool, + pub maxfail: Option, + pub dist: DistMode, + pub discovery_duration: Option, + pub changed_selection: Option, +} + pub async fn run_tests( reporter: &mut dyn Reporter, - root: &std::path::Path, - python: &str, - log_level: LevelFilter, - tests: Vec, - hooks: &[HookItem], - maxfail: Option, - workers: Option, - dist: DistMode, - discovery_duration: Option, - changed_selection: Option, -) -> Result { - let pool_size = workers.unwrap_or_else(|| tests.len().min(worker_pool_size())); - let pool = WorkerPool::new(pool_size, python, root, log_level); - pool.warm().await; - let summary = report_cycle( - reporter, + RunTestsRequest { + root, + python, + log_level, tests, hooks, - &pool, maxfail, + workers, dist, discovery_duration, changed_selection, + }: RunTestsRequest<'_>, +) -> Result { + let pool_size = workers.unwrap_or_else(|| tests.len().min(worker_pool_size())); + let pool = WorkerPool::new(pool_size, python, root, log_level); + pool.warm().await; + let summary = report_cycle( + reporter, + ReportCycleRequest { + tests, + hooks, + pool: &pool, + maxfail, + dist, + discovery_duration, + changed_selection, + }, ) .await?; pool.shutdown(); @@ -44,11 +73,8 @@ pub async fn run_tests( } fn flush_buffer( - file: &Option, - buffers: &mut std::collections::HashMap< - Option, - Vec<(usize, tryke_types::TestResult)>, - >, + file: &Option, + buffers: &mut std::collections::HashMap, Vec<(usize, tryke_types::TestResult)>>, reporter: &mut dyn Reporter, ) { if let Some(mut buf) = buffers.remove(file) { @@ -59,19 +85,19 @@ fn flush_buffer( } } -#[expect(clippy::too_many_arguments)] pub async fn report_cycle( reporter: &mut dyn Reporter, - tests: Vec, - hooks: &[HookItem], - pool: &WorkerPool, - maxfail: Option, - dist: DistMode, - discovery_duration: Option, - changed_selection: Option, + ReportCycleRequest { + tests, + hooks, + pool, + maxfail, + dist, + discovery_duration, + changed_selection, + }: ReportCycleRequest<'_>, ) -> Result { use std::collections::{HashMap, HashSet}; - use std::path::PathBuf; let file_count = tests .iter() @@ -232,13 +258,15 @@ mod tests { ) -> anyhow::Result { report_cycle( reporter, - discoverer.rediscover(), - &[], - pool, - None, - DistMode::Test, - None, - None, + ReportCycleRequest { + tests: discoverer.rediscover(), + hooks: &[], + pool, + maxfail: None, + dist: DistMode::Test, + discovery_duration: None, + changed_selection: None, + }, ) .await } @@ -254,16 +282,18 @@ mod tests { let tests = discover_tests(dir.path(), false, None, &excludes).tests; let _ = run_tests( reporter, - dir.path(), - &test_python_bin(), - LevelFilter::Off, - tests, - &[], - None, - None, - DistMode::Test, - None, - None, + RunTestsRequest { + root: dir.path(), + python: &test_python_bin(), + log_level: LevelFilter::Off, + tests, + hooks: &[], + maxfail: None, + workers: None, + dist: DistMode::Test, + discovery_duration: None, + changed_selection: None, + }, ) .await; } @@ -357,16 +387,18 @@ mod tests { assert!( run_tests( &mut reporter, - dir.path(), - &test_python_bin(), - LevelFilter::Off, - tests, - &[], - None, - None, - DistMode::Test, - None, - None + RunTestsRequest { + root: dir.path(), + python: &test_python_bin(), + log_level: LevelFilter::Off, + tests, + hooks: &[], + maxfail: None, + workers: None, + dist: DistMode::Test, + discovery_duration: None, + changed_selection: None, + }, ) .await .is_ok() @@ -460,13 +492,15 @@ def test_failing(): ); let result = report_cycle( &mut reporter, - tests, - &[], - &pool, - None, - DistMode::Test, - None, - None, + ReportCycleRequest { + tests, + hooks: &[], + pool: &pool, + maxfail: None, + dist: DistMode::Test, + discovery_duration: None, + changed_selection: None, + }, ) .await; assert!( @@ -499,13 +533,15 @@ def test_failing(): ); let summary = report_cycle( &mut reporter, - tests, - &[], - &pool, - None, - DistMode::Test, - None, - None, + ReportCycleRequest { + tests, + hooks: &[], + pool: &pool, + maxfail: None, + dist: DistMode::Test, + discovery_duration: None, + changed_selection: None, + }, ) .await .expect("report_cycle should not error on test failures"); diff --git a/crates/tryke/src/main.rs b/crates/tryke/src/main.rs index 24112c8..ec32988 100644 --- a/crates/tryke/src/main.rs +++ b/crates/tryke/src/main.rs @@ -7,7 +7,7 @@ use tryke::cli::{Cli, Commands, ReporterFormat}; use tryke::discovery::{ discover_tests, discover_tests_changed_first, discover_tests_for_paths, resolved_excludes, }; -use tryke::execution::run_tests; +use tryke::execution::{RunTestsRequest, run_tests}; use tryke::fdlimit::{RaiseOutcome, raise as raise_fd_limit}; use tryke::graph::{run_fixture_graph, run_graph}; use tryke::watch::run_watch; @@ -207,16 +207,18 @@ fn main() -> Result<()> { let resolved_python = tryke_config::resolve_python(python.as_deref(), &config); let summary = rt.block_on(run_tests( &mut *rep, - root_path, - &resolved_python, - worker_log, - tests, - &discovered.hooks, - resolved_maxfail, - *workers, - (*dist).into(), - Some(discovery_duration), - changed_selection, + RunTestsRequest { + root: root_path, + python: &resolved_python, + log_level: worker_log, + tests, + hooks: &discovered.hooks, + maxfail: resolved_maxfail, + workers: *workers, + dist: (*dist).into(), + discovery_duration: Some(discovery_duration), + changed_selection, + }, ))?; if summary.failed > 0 || summary.errors > 0 { std::process::exit(1); diff --git a/crates/tryke/src/watch.rs b/crates/tryke/src/watch.rs index 7266870..f43866d 100644 --- a/crates/tryke/src/watch.rs +++ b/crates/tryke/src/watch.rs @@ -16,7 +16,7 @@ use tryke_reporter::{Reporter, reporter::WatchIdleInfo}; use tryke_runner::{DistMode, WorkerPool}; use tryke_types::{DiscoveryWarning, DiscoveryWarningKind, HookItem, filter::TestFilter}; -use crate::execution::{report_cycle, worker_pool_size}; +use crate::execution::{ReportCycleRequest, report_cycle, worker_pool_size}; /// How often the watch loop wakes up to check the quit flag while /// waiting for file events. Short enough that `q` feels responsive, @@ -98,13 +98,15 @@ async fn run_watch_cycle( ) { if let Err(e) = report_cycle( reporter, - tests, - hooks, - pool, - maxfail, - dist, - discovery_duration, - None, + ReportCycleRequest { + tests, + hooks, + pool, + maxfail, + dist, + discovery_duration, + changed_selection: None, + }, ) .await { diff --git a/docs/guides/reporters.md b/docs/guides/reporters.md index 794398d..066db19 100644 --- a/docs/guides/reporters.md +++ b/docs/guides/reporters.md @@ -20,9 +20,8 @@ Sample output (with `-v` to surface per-assertion lines): ```ansi -tryke test v0.0.28 +tryke test v0.0.27 -warning: scheduler: upgrading --dist test → file for 1 module(s) because of per="scope" fixtures (sample). Move the fixture into a describe() to keep finer-grained distribution. sample.py: users get @@ -63,9 +62,8 @@ Sample output: ```ansi -tryke test v0.0.28 +tryke test v0.0.27 -warning: scheduler: upgrading --dist test → file for 1 module(s) because of per="scope" fixtures (sample). Move the fixture into a describe() to keep finer-grained distribution. .. Test Files 1 passed (1) @@ -152,9 +150,8 @@ Sample output: ```ansi -tryke test v0.0.28 +tryke test v0.0.27 -warning: scheduler: upgrading --dist test → file for 1 module(s) because of per="scope" fixtures (sample). Move the fixture into a describe() to keep finer-grained distribution. PASS  [ 0.000s] sample > users > get :: returns a stored user PASS  [ 0.000s] sample > users > set :: stores a new user @@ -183,9 +180,8 @@ Sample output: ```ansi -tryke test v0.0.28 +tryke test v0.0.27 -warning: scheduler: upgrading --dist test → file for 1 module(s) because of per="scope" fixtures (sample). Move the fixture into a describe() to keep finer-grained distribution. sample.py ✓✓ 2 100% ████████████ Test Files 1 passed (1) diff --git a/docs/index.md b/docs/index.md index 0f88717..4cd8393 100644 --- a/docs/index.md +++ b/docs/index.md @@ -105,9 +105,8 @@ uvx tryke test ```ansi -tryke test v0.0.28 +tryke test v0.0.27 -warning: scheduler: upgrading --dist test → file for 1 module(s) because of per="scope" fixtures (sample). Move the fixture into a describe() to keep finer-grained distribution. sample.py: users get diff --git a/rustfmt.toml b/rustfmt.toml index d2a24e5..f3e454b 100644 --- a/rustfmt.toml +++ b/rustfmt.toml @@ -1,4 +1,2 @@ edition = "2024" style_edition = "2024" -merge_imports = true -imports_granularity = "Crate"