Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

190 changes: 174 additions & 16 deletions src/pool/mod.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
use std::borrow::Borrow;
use std::{borrow::Borrow, cell::RefCell, rc::Rc};

use futures::future::join_all;
use js_sys::wasm_bindgen::{prelude::wasm_bindgen, UnwrapThrowExt};
use scheduler::Scheduler;
pub use scheduler::Strategy;
use serde::{Deserialize, Serialize};
use wasm_bindgen::prelude::Closure;
use wasm_bindgen::JsCast;

use wasm_bindgen_futures::JsFuture;
use web_sys::window;
Expand Down Expand Up @@ -46,6 +48,10 @@ pub struct WorkerPoolOptions {
/// Whether to precompile and share the WASM module across workers for bandwidth optimization.
/// This reduces the number of WASM fetches from N (one per worker) to 1 (shared across all workers).
pub precompile_wasm: Option<bool>,
/// Idle timeout in milliseconds. Workers with no pending tasks will be terminated
/// after being idle for this duration. They are transparently recreated when new tasks arrive.
/// Default: `None` (no timeout, workers live for the pool's lifetime).
pub idle_timeout_ms: Option<u32>,
/// Pre-compiled WASM module to share across workers. Internal use only.
pub(crate) wasm_module: Option<js_sys::WebAssembly::Module>,
}
Expand Down Expand Up @@ -109,14 +115,45 @@ impl WorkerPoolOptions {
/// # }
/// # fn main() {}
/// ```
/// The state of a single worker slot in the pool.
enum WorkerSlot {
/// Worker is active and can accept tasks.
Active(WebWorker),
/// Worker is being created (prevents duplicate creation during async init).
Creating,
/// Worker was terminated by idle timeout and can be recreated.
Empty,
}

pub struct WebWorkerPool {
/// The workers that have been spawned.
workers: Vec<WebWorker>,
/// The worker slots (per-slot RefCell for independent borrowing).
slots: Rc<Vec<RefCell<WorkerSlot>>>,
/// The total number of slots (pool capacity).
num_slots: usize,
/// The internal scheduler that is used to distribute the tasks.
scheduler: Scheduler,
/// Pre-compiled WASM module shared across workers (kept alive to prevent dropping)
#[allow(dead_code)]
wasm_module: Option<js_sys::WebAssembly::Module>,
/// Config retained for worker re-creation.
pool_path: Option<String>,
pool_path_bg: Option<String>,
/// Idle checker setInterval closure (prevent GC).
_idle_checker_cb: Option<Closure<dyn FnMut()>>,
/// Idle checker interval ID (for clearInterval on Drop).
_idle_checker_id: Option<i32>,
/// Notify waiting tasks when a worker becomes available after creation.
worker_ready: tokio::sync::Notify,
}

impl Drop for WebWorkerPool {
fn drop(&mut self) {
if let Some(id) = self._idle_checker_id {
if let Some(w) = web_sys::window() {
w.clear_interval_with_handle(id);
}
}
}
}

impl WebWorkerPool {
Expand Down Expand Up @@ -172,7 +209,8 @@ impl WebWorkerPool {
options.wasm_module.take()
};

let worker_inits = (0..options.num_workers()).map(|_| {
let num_slots = options.num_workers().max(1);
let worker_inits = (0..num_slots).map(|_| {
// Do not impose a task limit.
WebWorker::with_path_and_module(
options.path(),
Expand All @@ -184,10 +222,51 @@ impl WebWorkerPool {
let workers = join_all(worker_inits).await;
let workers = workers.into_iter().collect::<Result<Vec<_>, _>>()?;

let slots: Rc<Vec<RefCell<WorkerSlot>>> = Rc::new(
workers
.into_iter()
.map(|w| RefCell::new(WorkerSlot::Active(w)))
.collect(),
);

// Set up idle timeout checker if configured.
let (idle_checker_cb, idle_checker_id) = if let Some(timeout) = options.idle_timeout_ms {
let slots_clone = Rc::clone(&slots);
let cb = Closure::<dyn FnMut()>::new(move || {
let now = js_sys::Date::now();
for i in 0..slots_clone.len() {
let should_terminate = {
let s = slots_clone[i].borrow();
matches!(&*s, WorkerSlot::Active(ref w)
if w.current_load() == 0 && (now - w.last_active()) >= timeout as f64)
};
if should_terminate {
*slots_clone[i].borrow_mut() = WorkerSlot::Empty;
}
}
});
let id = window()
.expect_throw("Window missing")
.set_interval_with_callback_and_timeout_and_arguments_0(
cb.as_ref().unchecked_ref(),
(timeout / 2).max(1).min(i32::MAX as u32) as i32,
)
.expect_throw("Could not set interval");
(Some(cb), Some(id))
} else {
(None, None)
};

Ok(Self {
workers,
slots,
num_slots,
scheduler: Scheduler::new(options.strategy()),
wasm_module,
pool_path: options.path.clone(),
pool_path_bg: options.path_bg.clone(),
_idle_checker_cb: idle_checker_cb,
_idle_checker_id: idle_checker_id,
worker_ready: tokio::sync::Notify::new(),
})
}

Expand Down Expand Up @@ -256,22 +335,85 @@ impl WebWorkerPool {
self.run_internal(func, arg).await
}

/// Acquires an active worker slot, recreating a terminated worker if needed.
async fn acquire_worker(&self) -> usize {
loop {
let loads = self.compute_loads();
if let Some(id) = self.scheduler.schedule(&loads) {
return id;
}

// No active workers. Find first Empty slot and recreate.
let empty_slot = self
.slots
.iter()
.position(|slot| matches!(&*slot.borrow(), WorkerSlot::Empty));
if let Some(i) = empty_slot {
*self.slots[i].borrow_mut() = WorkerSlot::Creating;
}

if let Some(slot_id) = empty_slot {
let worker_result = WebWorker::with_path_and_module(
self.pool_path.as_deref(),
self.pool_path_bg.as_deref(),
None,
self.wasm_module.clone(),
)
.await;
match worker_result {
Ok(worker) => {
*self.slots[slot_id].borrow_mut() = WorkerSlot::Active(worker);
self.worker_ready.notify_waiters();
return slot_id;
}
Err(_) => {
*self.slots[slot_id].borrow_mut() = WorkerSlot::Empty;
self.worker_ready.notify_waiters();
panic!("Couldn't recreate worker");
}
}
}

// All slots are Creating — wait for one to finish.
self.worker_ready.notified().await;
}
}

/// Compute per-slot loads for the scheduler.
fn compute_loads(&self) -> Vec<Option<usize>> {
self.slots
.iter()
.map(|slot| match &*slot.borrow() {
WorkerSlot::Active(w) => Some(w.current_load()),
_ => None,
})
.collect()
}

/// Determines the worker to run a simple task on using the scheduler
/// and runs the task.
// Per-slot RefCell: holding a borrow across await is safe because
// the idle checker only terminates slots with zero load (i.e., not borrowed).
#[allow(clippy::await_holding_refcell_ref)]
pub(crate) async fn run_internal<T, R, A>(&self, func: WebWorkerFn<T, R>, arg: A) -> R
where
A: Borrow<T>,
T: Serialize + for<'de> Deserialize<'de>,
R: Serialize + for<'de> Deserialize<'de>,
{
let worker_id = self.scheduler.schedule(self);
self.workers[worker_id]
.run_internal(func, arg.borrow())
.await
let worker_id = self.acquire_worker().await;
let slot = self.slots[worker_id].borrow();
match &*slot {
WorkerSlot::Active(worker) => worker.run_internal(func, arg.borrow()).await,
_ => unreachable!("acquire_worker guarantees Active slot"),
}
}

/// Determines the worker to run a channel task on using the scheduler
/// and runs the task.
// Per-slot RefCell: holding a borrow across await is safe because
// the idle checker only terminates slots with zero load (i.e., not borrowed).
#[allow(clippy::await_holding_refcell_ref)]
pub(crate) async fn run_channel_internal<T, R>(
&self,
func: WebWorkerChannelFn<T, R>,
Expand All @@ -281,20 +423,36 @@ impl WebWorkerPool {
T: Serialize + for<'de> Deserialize<'de>,
R: Serialize + for<'de> Deserialize<'de>,
{
let worker_id = self.scheduler.schedule(self);
self.workers[worker_id]
.run_channel_internal(func, arg)
.await
let worker_id = self.acquire_worker().await;
let slot = self.slots[worker_id].borrow();
match &*slot {
WorkerSlot::Active(worker) => worker.run_channel_internal(func, arg).await,
_ => unreachable!("acquire_worker guarantees Active slot"),
}
}

/// Return the number of tasks currently queued to this worker pool.
pub fn current_load(&self) -> usize {
self.workers.iter().map(WebWorker::current_load).sum()
self.slots
.iter()
.map(|slot| match &*slot.borrow() {
WorkerSlot::Active(w) => w.current_load(),
_ => 0,
})
.sum()
}

/// Return the number of workers in the pool.
/// Return the total number of worker slots in the pool (pool capacity).
pub fn num_workers(&self) -> usize {
self.workers.len()
self.num_slots
}

/// Return the number of currently active (non-terminated) workers.
pub fn num_active_workers(&self) -> usize {
self.slots
.iter()
.filter(|s| matches!(&*RefCell::borrow(s), WorkerSlot::Active(_)))
.count()
}

/// Create a worker pool with a pre-compiled WASM module for optimal bandwidth usage.
Expand Down
42 changes: 22 additions & 20 deletions src/pool/scheduler.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
use std::cell::Cell;

use wasm_bindgen::{prelude::wasm_bindgen, UnwrapThrowExt};

use super::WebWorkerPool;
use wasm_bindgen::prelude::wasm_bindgen;

/// This enumeration contains the supported strategies for distributing
/// tasks within the worker pool.
Expand Down Expand Up @@ -46,26 +44,30 @@ impl Scheduler {
}
}

/// Given the pool, apply the strategy and determine which worker
/// should receive the next task.
pub(super) fn schedule(&self, pool: &WebWorkerPool) -> usize {
/// Given per-slot loads, apply the strategy and determine which worker
/// should receive the next task. Returns `None` if no active workers exist.
///
/// Each entry in `loads` is `Some(current_load)` for active workers,
/// or `None` for terminated/creating slots.
pub(super) fn schedule(&self, loads: &[Option<usize>]) -> Option<usize> {
match self.strategy {
Strategy::RoundRobin => {
// Simply return the current worker and increment.
let worker_id = self.current_worker.get();
self.current_worker
.set((worker_id + 1) % pool.num_workers());
worker_id
}
Strategy::LoadBased => {
// Choose the worker with the minimum work load.
pool.workers
.iter()
.enumerate()
.min_by_key(|(_id, worker)| worker.current_load())
.expect_throw("WorkerPool does not have workers")
.0
let num = loads.len();
for _ in 0..num {
let id = self.current_worker.get();
self.current_worker.set((id + 1) % num);
if loads[id].is_some() {
return Some(id);
}
}
None
}
Strategy::LoadBased => loads
.iter()
.enumerate()
.filter_map(|(i, load)| load.map(|l| (i, l)))
.min_by_key(|(_i, load)| *load)
.map(|(i, _)| i),
}
}
}
Loading
Loading