From 84929d786a958cb27a227c3bdd6bfea7b8a1d2e5 Mon Sep 17 00:00:00 2001 From: Pascal Berrang Date: Sun, 8 Mar 2026 15:48:26 +0000 Subject: [PATCH] Add idle timeout for WebWorkerPool with per-slot RefCell Workers with no pending tasks are automatically terminated after a configurable idle_timeout_ms duration and transparently recreated when new tasks arrive. Uses per-slot RefCell instead of a single RefCell around the whole Vec, allowing independent borrowing so the idle checker and task execution never conflict. --- Cargo.lock | 1 + src/pool/mod.rs | 190 ++++++++++++++++++++++++++++++++++++---- src/pool/scheduler.rs | 42 ++++----- src/webworker/worker.rs | 18 +++- test/Cargo.toml | 3 +- test/src/lib.rs | 3 + test/src/raw.rs | 61 ++++++++++++- 7 files changed, 278 insertions(+), 40 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 8faf616..0fd273f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -573,6 +573,7 @@ name = "wasmworker-test" version = "0.1.0" dependencies = [ "futures", + "js-sys", "serde", "wasm-bindgen", "wasm-bindgen-futures", diff --git a/src/pool/mod.rs b/src/pool/mod.rs index 7f6788b..bea81cd 100644 --- a/src/pool/mod.rs +++ b/src/pool/mod.rs @@ -1,10 +1,12 @@ -use std::borrow::Borrow; +use std::{borrow::Borrow, cell::RefCell, rc::Rc}; use futures::future::join_all; use js_sys::wasm_bindgen::{prelude::wasm_bindgen, UnwrapThrowExt}; use scheduler::Scheduler; pub use scheduler::Strategy; use serde::{Deserialize, Serialize}; +use wasm_bindgen::prelude::Closure; +use wasm_bindgen::JsCast; use wasm_bindgen_futures::JsFuture; use web_sys::window; @@ -46,6 +48,10 @@ pub struct WorkerPoolOptions { /// Whether to precompile and share the WASM module across workers for bandwidth optimization. /// This reduces the number of WASM fetches from N (one per worker) to 1 (shared across all workers). pub precompile_wasm: Option, + /// Idle timeout in milliseconds. Workers with no pending tasks will be terminated + /// after being idle for this duration. They are transparently recreated when new tasks arrive. + /// Default: `None` (no timeout, workers live for the pool's lifetime). + pub idle_timeout_ms: Option, /// Pre-compiled WASM module to share across workers. Internal use only. pub(crate) wasm_module: Option, } @@ -109,14 +115,45 @@ impl WorkerPoolOptions { /// # } /// # fn main() {} /// ``` +/// The state of a single worker slot in the pool. +enum WorkerSlot { + /// Worker is active and can accept tasks. + Active(WebWorker), + /// Worker is being created (prevents duplicate creation during async init). + Creating, + /// Worker was terminated by idle timeout and can be recreated. + Empty, +} + pub struct WebWorkerPool { - /// The workers that have been spawned. - workers: Vec, + /// The worker slots (per-slot RefCell for independent borrowing). + slots: Rc>>, + /// The total number of slots (pool capacity). + num_slots: usize, /// The internal scheduler that is used to distribute the tasks. scheduler: Scheduler, /// Pre-compiled WASM module shared across workers (kept alive to prevent dropping) #[allow(dead_code)] wasm_module: Option, + /// Config retained for worker re-creation. + pool_path: Option, + pool_path_bg: Option, + /// Idle checker setInterval closure (prevent GC). + _idle_checker_cb: Option>, + /// Idle checker interval ID (for clearInterval on Drop). + _idle_checker_id: Option, + /// Notify waiting tasks when a worker becomes available after creation. + worker_ready: tokio::sync::Notify, +} + +impl Drop for WebWorkerPool { + fn drop(&mut self) { + if let Some(id) = self._idle_checker_id { + if let Some(w) = web_sys::window() { + w.clear_interval_with_handle(id); + } + } + } } impl WebWorkerPool { @@ -172,7 +209,8 @@ impl WebWorkerPool { options.wasm_module.take() }; - let worker_inits = (0..options.num_workers()).map(|_| { + let num_slots = options.num_workers().max(1); + let worker_inits = (0..num_slots).map(|_| { // Do not impose a task limit. WebWorker::with_path_and_module( options.path(), @@ -184,10 +222,51 @@ impl WebWorkerPool { let workers = join_all(worker_inits).await; let workers = workers.into_iter().collect::, _>>()?; + let slots: Rc>> = Rc::new( + workers + .into_iter() + .map(|w| RefCell::new(WorkerSlot::Active(w))) + .collect(), + ); + + // Set up idle timeout checker if configured. + let (idle_checker_cb, idle_checker_id) = if let Some(timeout) = options.idle_timeout_ms { + let slots_clone = Rc::clone(&slots); + let cb = Closure::::new(move || { + let now = js_sys::Date::now(); + for i in 0..slots_clone.len() { + let should_terminate = { + let s = slots_clone[i].borrow(); + matches!(&*s, WorkerSlot::Active(ref w) + if w.current_load() == 0 && (now - w.last_active()) >= timeout as f64) + }; + if should_terminate { + *slots_clone[i].borrow_mut() = WorkerSlot::Empty; + } + } + }); + let id = window() + .expect_throw("Window missing") + .set_interval_with_callback_and_timeout_and_arguments_0( + cb.as_ref().unchecked_ref(), + (timeout / 2).max(1).min(i32::MAX as u32) as i32, + ) + .expect_throw("Could not set interval"); + (Some(cb), Some(id)) + } else { + (None, None) + }; + Ok(Self { - workers, + slots, + num_slots, scheduler: Scheduler::new(options.strategy()), wasm_module, + pool_path: options.path.clone(), + pool_path_bg: options.path_bg.clone(), + _idle_checker_cb: idle_checker_cb, + _idle_checker_id: idle_checker_id, + worker_ready: tokio::sync::Notify::new(), }) } @@ -256,22 +335,85 @@ impl WebWorkerPool { self.run_internal(func, arg).await } + /// Acquires an active worker slot, recreating a terminated worker if needed. + async fn acquire_worker(&self) -> usize { + loop { + let loads = self.compute_loads(); + if let Some(id) = self.scheduler.schedule(&loads) { + return id; + } + + // No active workers. Find first Empty slot and recreate. + let empty_slot = self + .slots + .iter() + .position(|slot| matches!(&*slot.borrow(), WorkerSlot::Empty)); + if let Some(i) = empty_slot { + *self.slots[i].borrow_mut() = WorkerSlot::Creating; + } + + if let Some(slot_id) = empty_slot { + let worker_result = WebWorker::with_path_and_module( + self.pool_path.as_deref(), + self.pool_path_bg.as_deref(), + None, + self.wasm_module.clone(), + ) + .await; + match worker_result { + Ok(worker) => { + *self.slots[slot_id].borrow_mut() = WorkerSlot::Active(worker); + self.worker_ready.notify_waiters(); + return slot_id; + } + Err(_) => { + *self.slots[slot_id].borrow_mut() = WorkerSlot::Empty; + self.worker_ready.notify_waiters(); + panic!("Couldn't recreate worker"); + } + } + } + + // All slots are Creating — wait for one to finish. + self.worker_ready.notified().await; + } + } + + /// Compute per-slot loads for the scheduler. + fn compute_loads(&self) -> Vec> { + self.slots + .iter() + .map(|slot| match &*slot.borrow() { + WorkerSlot::Active(w) => Some(w.current_load()), + _ => None, + }) + .collect() + } + /// Determines the worker to run a simple task on using the scheduler /// and runs the task. + // Per-slot RefCell: holding a borrow across await is safe because + // the idle checker only terminates slots with zero load (i.e., not borrowed). + #[allow(clippy::await_holding_refcell_ref)] pub(crate) async fn run_internal(&self, func: WebWorkerFn, arg: A) -> R where A: Borrow, T: Serialize + for<'de> Deserialize<'de>, R: Serialize + for<'de> Deserialize<'de>, { - let worker_id = self.scheduler.schedule(self); - self.workers[worker_id] - .run_internal(func, arg.borrow()) - .await + let worker_id = self.acquire_worker().await; + let slot = self.slots[worker_id].borrow(); + match &*slot { + WorkerSlot::Active(worker) => worker.run_internal(func, arg.borrow()).await, + _ => unreachable!("acquire_worker guarantees Active slot"), + } } /// Determines the worker to run a channel task on using the scheduler /// and runs the task. + // Per-slot RefCell: holding a borrow across await is safe because + // the idle checker only terminates slots with zero load (i.e., not borrowed). + #[allow(clippy::await_holding_refcell_ref)] pub(crate) async fn run_channel_internal( &self, func: WebWorkerChannelFn, @@ -281,20 +423,36 @@ impl WebWorkerPool { T: Serialize + for<'de> Deserialize<'de>, R: Serialize + for<'de> Deserialize<'de>, { - let worker_id = self.scheduler.schedule(self); - self.workers[worker_id] - .run_channel_internal(func, arg) - .await + let worker_id = self.acquire_worker().await; + let slot = self.slots[worker_id].borrow(); + match &*slot { + WorkerSlot::Active(worker) => worker.run_channel_internal(func, arg).await, + _ => unreachable!("acquire_worker guarantees Active slot"), + } } /// Return the number of tasks currently queued to this worker pool. pub fn current_load(&self) -> usize { - self.workers.iter().map(WebWorker::current_load).sum() + self.slots + .iter() + .map(|slot| match &*slot.borrow() { + WorkerSlot::Active(w) => w.current_load(), + _ => 0, + }) + .sum() } - /// Return the number of workers in the pool. + /// Return the total number of worker slots in the pool (pool capacity). pub fn num_workers(&self) -> usize { - self.workers.len() + self.num_slots + } + + /// Return the number of currently active (non-terminated) workers. + pub fn num_active_workers(&self) -> usize { + self.slots + .iter() + .filter(|s| matches!(&*RefCell::borrow(s), WorkerSlot::Active(_))) + .count() } /// Create a worker pool with a pre-compiled WASM module for optimal bandwidth usage. diff --git a/src/pool/scheduler.rs b/src/pool/scheduler.rs index ac4b694..ea9835b 100644 --- a/src/pool/scheduler.rs +++ b/src/pool/scheduler.rs @@ -1,8 +1,6 @@ use std::cell::Cell; -use wasm_bindgen::{prelude::wasm_bindgen, UnwrapThrowExt}; - -use super::WebWorkerPool; +use wasm_bindgen::prelude::wasm_bindgen; /// This enumeration contains the supported strategies for distributing /// tasks within the worker pool. @@ -46,26 +44,30 @@ impl Scheduler { } } - /// Given the pool, apply the strategy and determine which worker - /// should receive the next task. - pub(super) fn schedule(&self, pool: &WebWorkerPool) -> usize { + /// Given per-slot loads, apply the strategy and determine which worker + /// should receive the next task. Returns `None` if no active workers exist. + /// + /// Each entry in `loads` is `Some(current_load)` for active workers, + /// or `None` for terminated/creating slots. + pub(super) fn schedule(&self, loads: &[Option]) -> Option { match self.strategy { Strategy::RoundRobin => { - // Simply return the current worker and increment. - let worker_id = self.current_worker.get(); - self.current_worker - .set((worker_id + 1) % pool.num_workers()); - worker_id - } - Strategy::LoadBased => { - // Choose the worker with the minimum work load. - pool.workers - .iter() - .enumerate() - .min_by_key(|(_id, worker)| worker.current_load()) - .expect_throw("WorkerPool does not have workers") - .0 + let num = loads.len(); + for _ in 0..num { + let id = self.current_worker.get(); + self.current_worker.set((id + 1) % num); + if loads[id].is_some() { + return Some(id); + } + } + None } + Strategy::LoadBased => loads + .iter() + .enumerate() + .filter_map(|(i, load)| load.map(|l| (i, l))) + .min_by_key(|(_i, load)| *load) + .map(|(i, _)| i), } } } diff --git a/src/webworker/worker.rs b/src/webworker/worker.rs index 2dde5f0..2a1dee6 100644 --- a/src/webworker/worker.rs +++ b/src/webworker/worker.rs @@ -1,5 +1,5 @@ use std::{ - cell::RefCell, + cell::{Cell, RefCell}, collections::HashMap, rc::Rc, sync::atomic::{AtomicU32, Ordering}, @@ -60,6 +60,8 @@ pub struct WebWorker { open_tasks: Rc>>>, /// The callback handle for the worker. _callback: Closure, + /// Timestamp (ms since epoch) of the last completed task, used for idle timeout tracking. + last_active: Rc>, } impl WebWorker { @@ -182,8 +184,9 @@ impl WebWorker { } let tasks = Rc::new(RefCell::new(HashMap::new())); + let last_active = Rc::new(Cell::new(js_sys::Date::now())); - let callback_handle = Self::callback(Rc::clone(&tasks)); + let callback_handle = Self::callback(Rc::clone(&tasks), Rc::clone(&last_active)); worker.set_onmessage(Some(callback_handle.as_ref().unchecked_ref())); Ok(WebWorker { @@ -192,12 +195,14 @@ impl WebWorker { current_task: AtomicU32::new(0), open_tasks: tasks, _callback: callback_handle, + last_active, }) } /// Function to be called when a result is ready. fn callback( tasks: Rc>>>, + last_active: Rc>, ) -> Closure { Closure::new(move |event: MessageEvent| { let data = event.data(); @@ -210,6 +215,9 @@ impl WebWorker { // Ignore if receiver is already closed. let _ = channel.send(response); } + + // Update idle tracking timestamp. + last_active.set(js_sys::Date::now()); }) } @@ -516,6 +524,12 @@ impl WebWorker { pub fn current_load(&self) -> usize { self.open_tasks.borrow().len() } + + /// Return the timestamp (ms since epoch) of the last completed task. + /// Used for idle timeout tracking. + pub fn last_active(&self) -> f64 { + self.last_active.get() + } } impl Drop for WebWorker { diff --git a/test/Cargo.toml b/test/Cargo.toml index 65414ea..1c1f437 100644 --- a/test/Cargo.toml +++ b/test/Cargo.toml @@ -19,6 +19,7 @@ crate-type = ["cdylib"] futures = "0.3" serde = { version = "1.0", features = ["derive"] } wasm-bindgen = "0.2" +js-sys = "0.3" wasm-bindgen-futures = "0.4" -web-sys = { version = "0.3", features = ["MessagePort"] } +web-sys = { version = "0.3", features = ["MessagePort", "Window"] } wasmworker = { workspace = true } diff --git a/test/src/lib.rs b/test/src/lib.rs index 3a94d6e..554aa65 100644 --- a/test/src/lib.rs +++ b/test/src/lib.rs @@ -43,4 +43,7 @@ pub async fn run_tests() { // Pool configuration tests can_use_precompiled_wasm().await; can_use_custom_pool_options().await; + + // Idle timeout test + can_use_idle_timeout().await; } diff --git a/test/src/raw.rs b/test/src/raw.rs index db0ec9a..cdd93d3 100644 --- a/test/src/raw.rs +++ b/test/src/raw.rs @@ -1,4 +1,5 @@ -use wasm_bindgen::throw_str; +use wasm_bindgen::{throw_str, UnwrapThrowExt}; +use wasm_bindgen_futures::JsFuture; use wasmworker::webworker_fn; use wasmworker::{ error::InitError, webworker, worker_pool, WebWorker, WebWorkerPool, WorkerPoolOptions, @@ -6,6 +7,16 @@ use wasmworker::{ use crate::js_assert_eq; +async fn sleep_ms(ms: u32) { + let promise = js_sys::Promise::new(&mut |resolve, _| { + web_sys::window() + .unwrap_throw() + .set_timeout_with_callback_and_timeout_and_arguments_0(&resolve, ms as i32) + .unwrap_throw(); + }); + JsFuture::from(promise).await.unwrap_throw(); +} + #[webworker_fn] pub fn sort(mut v: Box<[u8]>) -> Box<[u8]> { v.sort(); @@ -114,3 +125,51 @@ pub(crate) async fn can_use_custom_pool_options() { let res = pool.run_bytes(webworker!(sort), &vec).await; js_assert_eq!(res, sorted_vec, "Custom options run failed"); } + +/// Test that idle timeout terminates workers and transparently recreates them. +pub(crate) async fn can_use_idle_timeout() { + let mut options = WorkerPoolOptions::new(); + options.num_workers = Some(2); + options.idle_timeout_ms = Some(300); + + let pool = WebWorkerPool::with_options(options) + .await + .expect("Couldn't create pool with idle timeout"); + + // All workers should be active after creation. + js_assert_eq!( + pool.num_active_workers(), + 2, + "Should start with 2 active workers" + ); + + // Run a task to verify workers work. + let vec: Box<[u8]> = vec![3, 1, 2].into(); + let sorted: Box<[u8]> = vec![1, 2, 3].into(); + let res = pool.run_bytes(webworker!(sort), &vec).await; + js_assert_eq!(res, sorted, "Task should succeed"); + + // Poll until all workers are idle-terminated (avoids flakiness from browser timer throttling). + let deadline = js_sys::Date::now() + 10_000.0; + while pool.num_active_workers() > 0 && js_sys::Date::now() < deadline { + sleep_ms(50).await; + } + + // Workers should be terminated. + js_assert_eq!( + pool.num_active_workers(), + 0, + "Workers should be terminated after idle" + ); + + // Run another task — should transparently recreate a worker. + let res = pool.run_bytes(webworker!(sort), &vec).await; + js_assert_eq!(res, sorted, "Task should succeed after recreation"); + + // At least one worker should be active now. + js_assert_eq!( + pool.num_active_workers() >= 1, + true, + "Should have at least one active worker after recreation" + ); +}