From 8aa8018d47a59eab43d9df9d804e501d19de7f9a Mon Sep 17 00:00:00 2001 From: hulxv Date: Wed, 18 Mar 2026 22:32:42 +0200 Subject: [PATCH 1/4] feat(mill-io): implement timer-wheel and scheduled events --- mill-io/src/lib.rs | 84 +++- mill-io/src/reactor.rs | 19 +- mill-io/src/timer_wheel.rs | 779 +++++++++++++++++++++++++++++++++++++ 3 files changed, 874 insertions(+), 8 deletions(-) create mode 100644 mill-io/src/timer_wheel.rs diff --git a/mill-io/src/lib.rs b/mill-io/src/lib.rs index 0313110..7e2002c 100644 --- a/mill-io/src/lib.rs +++ b/mill-io/src/lib.rs @@ -101,13 +101,16 @@ pub mod object_pool; pub mod poll; pub mod reactor; pub mod thread_pool; +pub mod timer_wheel; pub use handler::EventHandler; pub use mio::event::Event; pub use object_pool::{ObjectPool, PooledObject}; pub use thread_pool::{ComputePoolMetrics, TaskPriority}; +pub use timer_wheel::TimerId; use crate::{error::Result, reactor::ReactorOptions}; +use std::time::Duration; /// A convenient prelude module that re-exports commonly used types and traits. /// @@ -127,6 +130,7 @@ pub mod prelude { pub use crate::object_pool::{ObjectPool, PooledObject}; pub use crate::reactor::{self, Reactor}; pub use crate::thread_pool::{self, ComputePoolMetrics, TaskPriority, ThreadPool}; + pub use crate::timer_wheel::{TimerId, TimerWheel}; } /// The main event loop structure for registering I/O sources and handling events. @@ -448,19 +452,85 @@ impl EventLoop { self.reactor.get_compute_metrics() } + /// Schedule a one-shot timer that fires after `delay`. + /// + /// The callback runs on the reactor thread during timer processing. + /// Keep callbacks fast to avoid blocking the event loop. + /// + /// ## Example + /// + /// ```rust,no_run + /// use mill_io::EventLoop; + /// use std::time::Duration; + /// + /// let event_loop = EventLoop::default(); + /// let timer_id = event_loop.schedule_once(Duration::from_secs(5), || { + /// println!("Timer fired!"); + /// }); + /// ``` + pub fn schedule_once(&self, delay: Duration, callback: F) -> TimerId + where + F: FnOnce() + Send + 'static, + { + let id = self.reactor.timer_wheel.schedule_once(delay, callback); + // Wake the reactor so it can recalculate its poll timeout. + let _ = self.reactor.poll_handle.wake(); + id + } + + /// Schedule a repeating timer that fires every `interval`. + /// The first firing happens after one `interval` elapses. + /// + /// ## Example + /// + /// ```rust,no_run + /// use mill_io::EventLoop; + /// use std::time::Duration; + /// + /// let event_loop = EventLoop::default(); + /// let timer_id = event_loop.schedule_repeating(Duration::from_secs(1), || { + /// println!("Tick!"); + /// }); + /// ``` + pub fn schedule_repeating(&self, interval: Duration, callback: F) -> TimerId + where + F: Fn() + Send + Sync + 'static, + { + let id = self.reactor.timer_wheel.schedule_repeating(interval, callback); + let _ = self.reactor.poll_handle.wake(); + id + } + + /// Cancel a pending timer. Returns true if the timer was found and cancelled. + /// + /// ## Example + /// + /// ```rust,no_run + /// use mill_io::EventLoop; + /// use std::time::Duration; + /// + /// let event_loop = EventLoop::default(); + /// let timer_id = event_loop.schedule_once(Duration::from_secs(10), || {}); + /// event_loop.cancel_timer(timer_id); + /// ``` + pub fn cancel_timer(&self, timer_id: TimerId) -> bool { + self.reactor.timer_wheel.cancel(timer_id) + } + + /// Returns the number of currently scheduled timers. + pub fn pending_timers(&self) -> usize { + self.reactor.timer_wheel.pending_count() + } + /// Signals the event loop to stop gracefully. /// - /// This method initiates a graceful shutdown of the event loop. It sends a shutdown - /// signal to the reactor, which will cause the main loop to exit after finishing - /// the current polling cycle. + /// Sends a shutdown signal to the reactor, which will cause the main loop + /// to exit after finishing the current polling cycle. /// /// This method is non-blocking and returns immediately. The actual shutdown happens /// asynchronously, and [`run()`](Self::run) will return once the shutdown is complete. /// - /// # Thread Safety - /// - /// This method is thread-safe and can be called from any thread, making it suitable - /// for use in signal handlers or from other threads. + /// Thread-safe: can be called from any thread. /// /// ## Example /// diff --git a/mill-io/src/reactor.rs b/mill-io/src/reactor.rs index ca5fc4c..b7e92d2 100644 --- a/mill-io/src/reactor.rs +++ b/mill-io/src/reactor.rs @@ -2,6 +2,7 @@ use crate::{ error::Result, poll::PollHandle, thread_pool::{ComputePoolMetrics, ComputeThreadPool, TaskPriority, ThreadPool}, + timer_wheel::TimerWheel, }; use mio::{event::Event, Events}; use std::{ @@ -27,6 +28,7 @@ pub struct Reactor { running: AtomicBool, poll_timeout_ms: u64, options: ReactorOptions, + pub(crate) timer_wheel: Arc, } impl Default for Reactor { @@ -45,6 +47,7 @@ impl Default for Reactor { options: ReactorOptions { direct_dispatch: false, }, + timer_wheel: Arc::new(TimerWheel::new()), } } } @@ -61,6 +64,7 @@ impl Reactor { options: ReactorOptions { direct_dispatch: false, }, + timer_wheel: Arc::new(TimerWheel::new()), }) } @@ -78,6 +82,7 @@ impl Reactor { running: AtomicBool::new(false), poll_timeout_ms, options, + timer_wheel: Arc::new(TimerWheel::new()), }) } @@ -85,14 +90,26 @@ impl Reactor { self.running.store(true, Ordering::SeqCst); while self.running.load(Ordering::SeqCst) { + // Shorten the poll timeout if a timer is due soon. + let poll_timeout = match self.timer_wheel.time_until_next() { + Some(until_next) => { + let configured = Duration::from_millis(self.poll_timeout_ms); + until_next.min(configured) + } + None => Duration::from_millis(self.poll_timeout_ms), + }; + let _ = self.poll_handle.poll( &mut self.events.write().unwrap(), - Some(Duration::from_millis(self.poll_timeout_ms)), + Some(poll_timeout), )?; for event in self.events.read().unwrap().iter() { self.dispatch_event(event.clone())?; } + + // Advance the timer wheel and fire expired timers. + self.timer_wheel.tick(); } Ok(()) } diff --git a/mill-io/src/timer_wheel.rs b/mill-io/src/timer_wheel.rs new file mode 100644 index 0000000..7fcf204 --- /dev/null +++ b/mill-io/src/timer_wheel.rs @@ -0,0 +1,779 @@ +use parking_lot::Mutex; +use std::collections::HashMap; +use std::sync::atomic::{AtomicU64, Ordering}; +use std::time::{Duration, Instant}; + +/// Unique identifier for a scheduled timer. +#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] +pub struct TimerId(pub(crate) u64); + +// 4-level hierarchical timer wheel. +// +// Each level has a fixed number of slots. A timer is placed in the lowest +// level that can represent its delay. When a lower level wraps, timers +// cascade down from the next level, getting re-sorted into finer slots. +// +// Level 0: 256 slots, 1 ms/slot (256 ms range) +// Level 1: 64 slots, 256 ms/slot (~16 s range) +// Level 2: 64 slots, ~16 s/slot (~17 min range) +// Level 3: 64 slots, ~17 min/slot (~18 hr range) +// +// Timers beyond ~18 hours are stored in an overflow list. + +const NUM_LEVELS: usize = 4; +const LEVEL_BITS: [u32; NUM_LEVELS] = [8, 6, 6, 6]; +const LEVEL_SIZE: [usize; NUM_LEVELS] = [256, 64, 64, 64]; + +// Bit shifts and masks for extracting the slot index at each level +// from an absolute tick value. +const SHIFTS: [u32; NUM_LEVELS] = [0, 8, 14, 20]; +const MASKS: [u64; NUM_LEVELS] = [0xFF, 0x3F, 0x3F, 0x3F]; + +// Total addressable ticks: 2^26 = 67,108,864 (~18.6 hours at 1ms/tick) +const MAX_RANGE_TICKS: u64 = 1 << 26; + +enum TimerKind { + Once(Option>), + Repeating(Box, Duration), +} + +struct TimerEntry { + id: TimerId, + deadline_tick: u64, + kind: TimerKind, +} + +struct WheelInner { + levels: [Vec>; NUM_LEVELS], + current_tick: u64, + start_time: Instant, + /// All live timers keyed by raw ID. Cancelled timers are removed from here; + /// stale IDs left in wheel slots are skipped on fire. + timers: HashMap, + /// Timers whose deadline exceeds the wheel's addressable range. + overflow: Vec, + timer_count: usize, +} + +impl WheelInner { + fn new() -> Self { + let levels = [ + vec![Vec::new(); LEVEL_SIZE[0]], + vec![Vec::new(); LEVEL_SIZE[1]], + vec![Vec::new(); LEVEL_SIZE[2]], + vec![Vec::new(); LEVEL_SIZE[3]], + ]; + + WheelInner { + levels, + current_tick: 0, + start_time: Instant::now(), + timers: HashMap::new(), + overflow: Vec::new(), + timer_count: 0, + } + } + + fn instant_to_tick(&self, instant: Instant) -> u64 { + instant + .saturating_duration_since(self.start_time) + .as_millis() as u64 + } + + /// Place a timer ID into the correct wheel slot based on its deadline. + fn insert_into_wheel(&mut self, id: u64, deadline_tick: u64) { + let delta = deadline_tick.saturating_sub(self.current_tick); + + if delta >= MAX_RANGE_TICKS { + self.overflow.push(id); + return; + } + + let level = self.level_for_delta(delta); + let slot = (deadline_tick >> SHIFTS[level]) & MASKS[level]; + self.levels[level][slot as usize].push(id); + } + + /// Find the lowest wheel level whose span covers `delta` ticks. + fn level_for_delta(&self, delta: u64) -> usize { + let mut cumulative_bits = 0u32; + for level in 0..NUM_LEVELS { + cumulative_bits += LEVEL_BITS[level]; + if delta < (1u64 << cumulative_bits) { + return level; + } + } + NUM_LEVELS - 1 + } + + /// Move timers from a higher level slot down into lower levels. + /// Called when a lower level's cursor wraps around. + fn cascade(&mut self, level: usize) { + let slot = (self.current_tick >> SHIFTS[level]) & MASKS[level]; + let ids: Vec = self.levels[level][slot as usize].drain(..).collect(); + for id in ids { + if self.timers.contains_key(&id) { + let deadline = self.timers[&id].deadline_tick; + self.insert_into_wheel(id, deadline); + } + } + } + + /// Move overflow timers into the wheel if they are now within range. + fn reinsert_overflow(&mut self) { + let overflow = std::mem::take(&mut self.overflow); + for id in overflow { + if let Some(entry) = self.timers.get(&id) { + let deadline = entry.deadline_tick; + self.insert_into_wheel(id, deadline); + } + } + } + + /// Advance the wheel to `now` and return all expired timer entries. + fn advance_to(&mut self, now: Instant) -> Vec { + let target_tick = self.instant_to_tick(now); + let mut expired = Vec::new(); + + // If the time jump is larger than the wheel can represent (e.g. after + // a long suspend), drain everything instead of iterating tick-by-tick. + if target_tick > self.current_tick + MAX_RANGE_TICKS { + self.drain_all_into(&mut expired); + self.current_tick = target_tick; + self.reinsert_overflow(); + return expired; + } + + while self.current_tick < target_tick { + let slot = (self.current_tick & MASKS[0]) as usize; + + // Fire all timers in the current level-0 slot. + let ids: Vec = self.levels[0][slot].drain(..).collect(); + for id in ids { + if let Some(entry) = self.timers.remove(&id) { + self.timer_count -= 1; + expired.push(entry); + } + } + + self.current_tick += 1; + + // Cascade from higher levels when lower levels wrap. + let mut shift_acc = LEVEL_BITS[0]; + for level in 1..NUM_LEVELS { + if (self.current_tick & ((1u64 << shift_acc) - 1)) == 0 { + self.cascade(level); + } else { + break; + } + shift_acc += LEVEL_BITS[level]; + } + } + + if !self.overflow.is_empty() { + self.reinsert_overflow(); + } + + expired + } + + /// Drain every timer from every slot and overflow (used for large time jumps). + fn drain_all_into(&mut self, expired: &mut Vec) { + for level in 0..NUM_LEVELS { + for slot in self.levels[level].iter_mut() { + for id in slot.drain(..) { + if let Some(entry) = self.timers.remove(&id) { + self.timer_count -= 1; + expired.push(entry); + } + } + } + } + let overflow = std::mem::take(&mut self.overflow); + for id in overflow { + if let Some(entry) = self.timers.remove(&id) { + self.timer_count -= 1; + expired.push(entry); + } + } + } + + /// Duration until the nearest pending timer, or None if empty. + /// Uses wall-clock time so the result is accurate even if tick() + /// hasn't been called recently. + fn time_until_next(&self) -> Option { + if self.timer_count == 0 { + return None; + } + + let now_tick = self.instant_to_tick(Instant::now()); + + let earliest = self + .timers + .values() + .map(|e| e.deadline_tick) + .min() + .unwrap_or(u64::MAX); + + if earliest <= now_tick { + Some(Duration::ZERO) + } else { + Some(Duration::from_millis(earliest - now_tick)) + } + } +} + +/// Hierarchical timer wheel for O(1) timer scheduling and cancellation. +/// +/// Timers are placed into one of four wheel levels depending on how far +/// in the future they fire. As time advances, higher-level timers cascade +/// down into finer-grained levels until they land in level 0 and fire. +/// +/// Thread-safe: all methods can be called from any thread. +pub struct TimerWheel { + inner: Mutex, + next_id: AtomicU64, +} + +impl TimerWheel { + pub fn new() -> Self { + TimerWheel { + inner: Mutex::new(WheelInner::new()), + next_id: AtomicU64::new(1), + } + } + + /// Schedule a one-shot timer that fires once after `delay`. + pub fn schedule_once(&self, delay: Duration, callback: F) -> TimerId + where + F: FnOnce() + Send + 'static, + { + let id = self.next_id.fetch_add(1, Ordering::Relaxed); + let timer_id = TimerId(id); + + let mut inner = self.inner.lock(); + let deadline_tick = inner.instant_to_tick(Instant::now()) + delay.as_millis() as u64; + + let entry = TimerEntry { + id: timer_id, + deadline_tick, + kind: TimerKind::Once(Some(Box::new(callback))), + }; + + inner.timers.insert(id, entry); + inner.timer_count += 1; + inner.insert_into_wheel(id, deadline_tick); + + timer_id + } + + /// Schedule a repeating timer that fires every `interval`. + /// The first firing happens after one `interval` elapses. + pub fn schedule_repeating(&self, interval: Duration, callback: F) -> TimerId + where + F: Fn() + Send + Sync + 'static, + { + let id = self.next_id.fetch_add(1, Ordering::Relaxed); + let timer_id = TimerId(id); + + let mut inner = self.inner.lock(); + let deadline_tick = inner.instant_to_tick(Instant::now()) + interval.as_millis() as u64; + + let entry = TimerEntry { + id: timer_id, + deadline_tick, + kind: TimerKind::Repeating(Box::new(callback), interval), + }; + + inner.timers.insert(id, entry); + inner.timer_count += 1; + inner.insert_into_wheel(id, deadline_tick); + + timer_id + } + + /// Cancel a pending timer. Returns true if the timer existed and was removed. + pub fn cancel(&self, timer_id: TimerId) -> bool { + let mut inner = self.inner.lock(); + if inner.timers.remove(&timer_id.0).is_some() { + inner.timer_count -= 1; + true + } else { + false + } + } + + /// Advance the wheel to the current time, firing all expired timers. + /// + /// Repeating timers are automatically re-scheduled after firing. + /// Callbacks run outside the wheel lock so they can safely schedule + /// or cancel other timers. + pub fn tick(&self) { + let now = Instant::now(); + + let expired = { + let mut inner = self.inner.lock(); + inner.advance_to(now) + }; + + // Fire callbacks without holding the lock. + let mut to_reschedule = Vec::new(); + + for mut entry in expired { + match &mut entry.kind { + TimerKind::Once(cb) => { + if let Some(callback) = cb.take() { + callback(); + } + } + TimerKind::Repeating(cb, _) => { + cb(); + to_reschedule.push(entry); + } + } + } + + if !to_reschedule.is_empty() { + let mut inner = self.inner.lock(); + let now_tick = inner.instant_to_tick(Instant::now()); + + for entry in to_reschedule { + if let TimerKind::Repeating(_, interval) = &entry.kind { + let next_deadline = now_tick + interval.as_millis() as u64; + let id = entry.id.0; + + let new_entry = TimerEntry { + id: entry.id, + deadline_tick: next_deadline, + kind: entry.kind, + }; + + inner.timers.insert(id, new_entry); + inner.timer_count += 1; + inner.insert_into_wheel(id, next_deadline); + } + } + } + } + + /// Duration until the nearest pending timer, or None if no timers exist. + /// Used by the reactor to shorten the poll timeout when a timer is due soon. + pub fn time_until_next(&self) -> Option { + self.inner.lock().time_until_next() + } + + /// Number of currently scheduled timers. + pub fn pending_count(&self) -> usize { + self.inner.lock().timer_count + } +} + +impl Default for TimerWheel { + fn default() -> Self { + Self::new() + } +} + +#[cfg(test)] +mod tests { + use super::*; + use std::sync::atomic::{AtomicUsize, Ordering}; + use std::sync::Arc; + + #[test] + fn test_schedule_once_fires() { + let wheel = TimerWheel::new(); + let fired = Arc::new(AtomicUsize::new(0)); + let fired_clone = fired.clone(); + + wheel.schedule_once(Duration::from_millis(5), move || { + fired_clone.fetch_add(1, Ordering::SeqCst); + }); + + assert_eq!(wheel.pending_count(), 1); + + std::thread::sleep(Duration::from_millis(10)); + wheel.tick(); + + assert_eq!(fired.load(Ordering::SeqCst), 1); + assert_eq!(wheel.pending_count(), 0); + } + + #[test] + fn test_cancel_timer() { + let wheel = TimerWheel::new(); + let fired = Arc::new(AtomicUsize::new(0)); + let fired_clone = fired.clone(); + + let id = wheel.schedule_once(Duration::from_millis(50), move || { + fired_clone.fetch_add(1, Ordering::SeqCst); + }); + + assert!(wheel.cancel(id)); + assert_eq!(wheel.pending_count(), 0); + + std::thread::sleep(Duration::from_millis(60)); + wheel.tick(); + + assert_eq!(fired.load(Ordering::SeqCst), 0); + } + + #[test] + fn test_cancel_returns_false_for_unknown() { + let wheel = TimerWheel::new(); + assert!(!wheel.cancel(TimerId(999))); + } + + #[test] + fn test_repeating_timer() { + let wheel = TimerWheel::new(); + let count = Arc::new(AtomicUsize::new(0)); + let count_clone = count.clone(); + + wheel.schedule_repeating(Duration::from_millis(10), move || { + count_clone.fetch_add(1, Ordering::SeqCst); + }); + + for _ in 0..5 { + std::thread::sleep(Duration::from_millis(15)); + wheel.tick(); + } + + let final_count = count.load(Ordering::SeqCst); + assert!( + final_count >= 3, + "Expected at least 3 firings, got {final_count}" + ); + assert_eq!( + wheel.pending_count(), + 1, + "Repeating timer should remain scheduled" + ); + } + + #[test] + fn test_cancel_repeating_timer() { + let wheel = TimerWheel::new(); + let count = Arc::new(AtomicUsize::new(0)); + let count_clone = count.clone(); + + let id = wheel.schedule_repeating(Duration::from_millis(10), move || { + count_clone.fetch_add(1, Ordering::SeqCst); + }); + + std::thread::sleep(Duration::from_millis(15)); + wheel.tick(); + let after_first = count.load(Ordering::SeqCst); + assert!(after_first >= 1); + + wheel.cancel(id); + + std::thread::sleep(Duration::from_millis(30)); + wheel.tick(); + assert_eq!(count.load(Ordering::SeqCst), after_first); + } + + #[test] + fn test_timer_does_not_fire_early() { + let wheel = TimerWheel::new(); + let fired = Arc::new(AtomicUsize::new(0)); + let fired_clone = fired.clone(); + + wheel.schedule_once(Duration::from_millis(100), move || { + fired_clone.fetch_add(1, Ordering::SeqCst); + }); + + wheel.tick(); + assert_eq!(fired.load(Ordering::SeqCst), 0); + + std::thread::sleep(Duration::from_millis(50)); + wheel.tick(); + assert_eq!(fired.load(Ordering::SeqCst), 0); + } + + #[test] + fn test_many_timers() { + let wheel = TimerWheel::new(); + let count = Arc::new(AtomicUsize::new(0)); + + for i in 0..1000 { + let count_clone = count.clone(); + wheel.schedule_once(Duration::from_millis(5 + (i % 20)), move || { + count_clone.fetch_add(1, Ordering::SeqCst); + }); + } + + assert_eq!(wheel.pending_count(), 1000); + + std::thread::sleep(Duration::from_millis(30)); + wheel.tick(); + + assert_eq!(count.load(Ordering::SeqCst), 1000); + assert_eq!(wheel.pending_count(), 0); + } + + #[test] + fn test_level_assignment() { + let wheel = TimerWheel::new(); + let inner = wheel.inner.lock(); + + assert_eq!(inner.level_for_delta(0), 0); + assert_eq!(inner.level_for_delta(255), 0); + assert_eq!(inner.level_for_delta(256), 1); + assert_eq!(inner.level_for_delta(16383), 1); + assert_eq!(inner.level_for_delta(16384), 2); + assert_eq!(inner.level_for_delta(1048575), 2); + assert_eq!(inner.level_for_delta(1048576), 3); + } + + #[test] + fn test_time_until_next() { + let wheel = TimerWheel::new(); + assert!(wheel.time_until_next().is_none()); + + wheel.schedule_once(Duration::from_millis(100), || {}); + let until = wheel.time_until_next().unwrap(); + assert!(until.as_millis() <= 100); + assert!(until.as_millis() >= 90); + } + + #[test] + fn test_higher_level_timers() { + let wheel = TimerWheel::new(); + let fired = Arc::new(AtomicUsize::new(0)); + let fired_clone = fired.clone(); + + // 300ms falls into level 1 (>= 256ms) + wheel.schedule_once(Duration::from_millis(300), move || { + fired_clone.fetch_add(1, Ordering::SeqCst); + }); + + std::thread::sleep(Duration::from_millis(200)); + wheel.tick(); + assert_eq!(fired.load(Ordering::SeqCst), 0); + + std::thread::sleep(Duration::from_millis(150)); + wheel.tick(); + assert_eq!(fired.load(Ordering::SeqCst), 1); + } + + #[test] + fn test_concurrent_schedule_and_tick() { + let wheel = Arc::new(TimerWheel::new()); + let count = Arc::new(AtomicUsize::new(0)); + + let mut handles = Vec::new(); + for _ in 0..4 { + let wheel = wheel.clone(); + let count = count.clone(); + handles.push(std::thread::spawn(move || { + for _ in 0..250 { + let count = count.clone(); + wheel.schedule_once(Duration::from_millis(5), move || { + count.fetch_add(1, Ordering::SeqCst); + }); + } + })); + } + + for h in handles { + h.join().unwrap(); + } + + std::thread::sleep(Duration::from_millis(20)); + wheel.tick(); + + assert_eq!(count.load(Ordering::SeqCst), 1000); + } + + #[test] + fn test_zero_duration_fires_on_next_tick() { + let wheel = TimerWheel::new(); + let fired = Arc::new(AtomicUsize::new(0)); + let f = fired.clone(); + + wheel.schedule_once(Duration::ZERO, move || { + f.fetch_add(1, Ordering::SeqCst); + }); + + // Even Duration::ZERO needs a tick() call to fire. + assert_eq!(fired.load(Ordering::SeqCst), 0); + + std::thread::sleep(Duration::from_millis(1)); + wheel.tick(); + assert_eq!(fired.load(Ordering::SeqCst), 1); + } + + #[test] + fn test_tick_on_empty_wheel() { + let wheel = TimerWheel::new(); + // Must not panic. + wheel.tick(); + wheel.tick(); + assert_eq!(wheel.pending_count(), 0); + } + + #[test] + fn test_one_shot_fires_exactly_once() { + let wheel = TimerWheel::new(); + let count = Arc::new(AtomicUsize::new(0)); + let c = count.clone(); + + wheel.schedule_once(Duration::from_millis(5), move || { + c.fetch_add(1, Ordering::SeqCst); + }); + + std::thread::sleep(Duration::from_millis(10)); + wheel.tick(); + wheel.tick(); + wheel.tick(); + + assert_eq!(count.load(Ordering::SeqCst), 1); + assert_eq!(wheel.pending_count(), 0); + } + + #[test] + fn test_cancel_after_fire_returns_false() { + let wheel = TimerWheel::new(); + let id = wheel.schedule_once(Duration::from_millis(1), || {}); + + std::thread::sleep(Duration::from_millis(5)); + wheel.tick(); + + assert!(!wheel.cancel(id)); + } + + #[test] + fn test_callback_schedules_new_timer() { + // Proves the lock is not held during callback execution. + let wheel = Arc::new(TimerWheel::new()); + let inner_fired = Arc::new(AtomicUsize::new(0)); + + let w = wheel.clone(); + let f = inner_fired.clone(); + wheel.schedule_once(Duration::from_millis(1), move || { + w.schedule_once(Duration::from_millis(1), move || { + f.fetch_add(1, Ordering::SeqCst); + }); + }); + + std::thread::sleep(Duration::from_millis(5)); + wheel.tick(); + // The inner timer was just scheduled; it hasn't fired yet. + assert_eq!(inner_fired.load(Ordering::SeqCst), 0); + assert_eq!(wheel.pending_count(), 1); + + std::thread::sleep(Duration::from_millis(5)); + wheel.tick(); + assert_eq!(inner_fired.load(Ordering::SeqCst), 1); + assert_eq!(wheel.pending_count(), 0); + } + + #[test] + fn test_callback_cancels_another_timer() { + let wheel = Arc::new(TimerWheel::new()); + let victim_fired = Arc::new(AtomicUsize::new(0)); + let vf = victim_fired.clone(); + + let victim_id = wheel.schedule_once(Duration::from_millis(50), move || { + vf.fetch_add(1, Ordering::SeqCst); + }); + + let w = wheel.clone(); + wheel.schedule_once(Duration::from_millis(1), move || { + w.cancel(victim_id); + }); + + std::thread::sleep(Duration::from_millis(5)); + wheel.tick(); + + // Victim was cancelled by the first callback. + std::thread::sleep(Duration::from_millis(60)); + wheel.tick(); + assert_eq!(victim_fired.load(Ordering::SeqCst), 0); + } + + #[test] + fn test_time_until_next_returns_zero_for_overdue() { + let wheel = TimerWheel::new(); + wheel.schedule_once(Duration::from_millis(1), || {}); + + std::thread::sleep(Duration::from_millis(5)); + + let until = wheel.time_until_next().unwrap(); + assert_eq!(until, Duration::ZERO); + } + + #[test] + fn test_mixed_oneshot_and_repeating() { + let wheel = TimerWheel::new(); + let once_count = Arc::new(AtomicUsize::new(0)); + let repeat_count = Arc::new(AtomicUsize::new(0)); + + let oc = once_count.clone(); + wheel.schedule_once(Duration::from_millis(5), move || { + oc.fetch_add(1, Ordering::SeqCst); + }); + + let rc = repeat_count.clone(); + wheel.schedule_repeating(Duration::from_millis(10), move || { + rc.fetch_add(1, Ordering::SeqCst); + }); + + assert_eq!(wheel.pending_count(), 2); + + // After enough time, the one-shot fires once and the repeater fires multiple times. + for _ in 0..5 { + std::thread::sleep(Duration::from_millis(15)); + wheel.tick(); + } + + assert_eq!(once_count.load(Ordering::SeqCst), 1); + assert!(repeat_count.load(Ordering::SeqCst) >= 3); + // One-shot is gone, repeater remains. + assert_eq!(wheel.pending_count(), 1); + } + + #[test] + fn test_overflow_timer() { + // A timer beyond the wheel's ~18hr range goes into overflow. + let wheel = TimerWheel::new(); + let fired = Arc::new(AtomicUsize::new(0)); + let f = fired.clone(); + + // MAX_RANGE_TICKS = 2^26 = 67,108,864 ms. Schedule beyond that. + wheel.schedule_once(Duration::from_millis(MAX_RANGE_TICKS + 1000), move || { + f.fetch_add(1, Ordering::SeqCst); + }); + + assert_eq!(wheel.pending_count(), 1); + + // Ticking now should not fire it. + wheel.tick(); + assert_eq!(fired.load(Ordering::SeqCst), 0); + assert_eq!(wheel.pending_count(), 1); + } + + #[test] + fn test_pending_count_tracks_lifecycle() { + let wheel = TimerWheel::new(); + assert_eq!(wheel.pending_count(), 0); + + let id1 = wheel.schedule_once(Duration::from_millis(5), || {}); + assert_eq!(wheel.pending_count(), 1); + + let id2 = wheel.schedule_once(Duration::from_millis(5), || {}); + assert_eq!(wheel.pending_count(), 2); + + wheel.cancel(id1); + assert_eq!(wheel.pending_count(), 1); + + std::thread::sleep(Duration::from_millis(10)); + wheel.tick(); + assert_eq!(wheel.pending_count(), 0); + + // Cancelling already-fired timer doesn't affect count. + wheel.cancel(id2); + assert_eq!(wheel.pending_count(), 0); + } +} From faef16126ca0b6153af343540813c33700d216a4 Mon Sep 17 00:00:00 2001 From: hulxv Date: Wed, 18 Mar 2026 22:51:14 +0200 Subject: [PATCH 2/4] fix: clippy --- mill-io/src/lib.rs | 5 ++++- mill-io/src/reactor.rs | 7 +++---- mill-io/src/timer_wheel.rs | 12 ++++++------ 3 files changed, 13 insertions(+), 11 deletions(-) diff --git a/mill-io/src/lib.rs b/mill-io/src/lib.rs index 7e2002c..7298696 100644 --- a/mill-io/src/lib.rs +++ b/mill-io/src/lib.rs @@ -496,7 +496,10 @@ impl EventLoop { where F: Fn() + Send + Sync + 'static, { - let id = self.reactor.timer_wheel.schedule_repeating(interval, callback); + let id = self + .reactor + .timer_wheel + .schedule_repeating(interval, callback); let _ = self.reactor.poll_handle.wake(); id } diff --git a/mill-io/src/reactor.rs b/mill-io/src/reactor.rs index b7e92d2..67f8225 100644 --- a/mill-io/src/reactor.rs +++ b/mill-io/src/reactor.rs @@ -99,10 +99,9 @@ impl Reactor { None => Duration::from_millis(self.poll_timeout_ms), }; - let _ = self.poll_handle.poll( - &mut self.events.write().unwrap(), - Some(poll_timeout), - )?; + let _ = self + .poll_handle + .poll(&mut self.events.write().unwrap(), Some(poll_timeout))?; for event in self.events.read().unwrap().iter() { self.dispatch_event(event.clone())?; diff --git a/mill-io/src/timer_wheel.rs b/mill-io/src/timer_wheel.rs index 7fcf204..691525c 100644 --- a/mill-io/src/timer_wheel.rs +++ b/mill-io/src/timer_wheel.rs @@ -97,8 +97,8 @@ impl WheelInner { /// Find the lowest wheel level whose span covers `delta` ticks. fn level_for_delta(&self, delta: u64) -> usize { let mut cumulative_bits = 0u32; - for level in 0..NUM_LEVELS { - cumulative_bits += LEVEL_BITS[level]; + for (level, &bits) in LEVEL_BITS.iter().enumerate() { + cumulative_bits += bits; if delta < (1u64 << cumulative_bits) { return level; } @@ -160,13 +160,13 @@ impl WheelInner { // Cascade from higher levels when lower levels wrap. let mut shift_acc = LEVEL_BITS[0]; - for level in 1..NUM_LEVELS { + for (level, &bits) in LEVEL_BITS.iter().enumerate().skip(1) { if (self.current_tick & ((1u64 << shift_acc) - 1)) == 0 { self.cascade(level); } else { break; } - shift_acc += LEVEL_BITS[level]; + shift_acc += bits; } } @@ -179,8 +179,8 @@ impl WheelInner { /// Drain every timer from every slot and overflow (used for large time jumps). fn drain_all_into(&mut self, expired: &mut Vec) { - for level in 0..NUM_LEVELS { - for slot in self.levels[level].iter_mut() { + for level in &mut self.levels { + for slot in level.iter_mut() { for id in slot.drain(..) { if let Some(entry) = self.timers.remove(&id) { self.timer_count -= 1; From 472260bebe9861daa85df3063023f4ded6d5a085 Mon Sep 17 00:00:00 2001 From: hulxv Date: Wed, 18 Mar 2026 22:54:02 +0200 Subject: [PATCH 3/4] fix: macos --- mill-io/src/timer_wheel.rs | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/mill-io/src/timer_wheel.rs b/mill-io/src/timer_wheel.rs index 691525c..903039b 100644 --- a/mill-io/src/timer_wheel.rs +++ b/mill-io/src/timer_wheel.rs @@ -479,7 +479,8 @@ mod tests { let fired = Arc::new(AtomicUsize::new(0)); let fired_clone = fired.clone(); - wheel.schedule_once(Duration::from_millis(100), move || { + // Use a large deadline so no amount of CI load can cause an overshoot. + wheel.schedule_once(Duration::from_secs(10), move || { fired_clone.fetch_add(1, Ordering::SeqCst); }); @@ -489,6 +490,7 @@ mod tests { std::thread::sleep(Duration::from_millis(50)); wheel.tick(); assert_eq!(fired.load(Ordering::SeqCst), 0); + assert_eq!(wheel.pending_count(), 1); } #[test] From 2774064c94121aa3d722ade6e7b498f9482d3f4b Mon Sep 17 00:00:00 2001 From: hulxv Date: Thu, 19 Mar 2026 00:18:46 +0200 Subject: [PATCH 4/4] comments --- mill-io/src/timer_wheel.rs | 82 ++++++++++++++++++++++++++++++-------- 1 file changed, 66 insertions(+), 16 deletions(-) diff --git a/mill-io/src/timer_wheel.rs b/mill-io/src/timer_wheel.rs index 903039b..62cff0e 100644 --- a/mill-io/src/timer_wheel.rs +++ b/mill-io/src/timer_wheel.rs @@ -53,6 +53,10 @@ struct WheelInner { /// Timers whose deadline exceeds the wheel's addressable range. overflow: Vec, timer_count: usize, + /// Cached earliest deadline tick. None means dirty and needs a full + /// recompute on the next time_until_next call. Kept up to date on + /// inserts (O(1) min-update) and lazily invalidated on cancels/expirations. + earliest_deadline: Option, } impl WheelInner { @@ -71,6 +75,7 @@ impl WheelInner { timers: HashMap::new(), overflow: Vec::new(), timer_count: 0, + earliest_deadline: None, } } @@ -174,6 +179,12 @@ impl WheelInner { self.reinsert_overflow(); } + // Timers were removed; invalidate the cached earliest so the next + // time_until_next call recomputes from the remaining set. + if !expired.is_empty() { + self.earliest_deadline = None; + } + expired } @@ -196,24 +207,46 @@ impl WheelInner { expired.push(entry); } } + self.earliest_deadline = None; + } + + /// O(1) update when a timer is inserted. If the cache is valid, + /// take the min; if dirty (None), leave it dirty. + fn note_inserted(&mut self, deadline_tick: u64) { + self.earliest_deadline = Some(match self.earliest_deadline { + Some(current) => current.min(deadline_tick), + None => deadline_tick, + }); + } + + /// Invalidate the cache when the earliest timer might have been removed. + fn note_removed(&mut self, removed_deadline: u64) { + if self.earliest_deadline == Some(removed_deadline) { + self.earliest_deadline = None; + } + } + + /// Full O(n) recompute of the earliest deadline and cache it. + fn recompute_earliest(&mut self) -> Option { + let earliest = self.timers.values().map(|e| e.deadline_tick).min(); + self.earliest_deadline = earliest; + earliest } /// Duration until the nearest pending timer, or None if empty. - /// Uses wall-clock time so the result is accurate even if tick() - /// hasn't been called recently. - fn time_until_next(&self) -> Option { + /// O(1) when the cached earliest is valid; O(n) recompute only + /// when the cache was invalidated by a cancel or expiration. + fn time_until_next(&mut self) -> Option { if self.timer_count == 0 { return None; } - let now_tick = self.instant_to_tick(Instant::now()); + let earliest = match self.earliest_deadline { + Some(e) => e, + None => self.recompute_earliest()?, + }; - let earliest = self - .timers - .values() - .map(|e| e.deadline_tick) - .min() - .unwrap_or(u64::MAX); + let now_tick = self.instant_to_tick(Instant::now()); if earliest <= now_tick { Some(Duration::ZERO) @@ -263,6 +296,7 @@ impl TimerWheel { inner.timers.insert(id, entry); inner.timer_count += 1; inner.insert_into_wheel(id, deadline_tick); + inner.note_inserted(deadline_tick); timer_id } @@ -288,6 +322,7 @@ impl TimerWheel { inner.timers.insert(id, entry); inner.timer_count += 1; inner.insert_into_wheel(id, deadline_tick); + inner.note_inserted(deadline_tick); timer_id } @@ -295,8 +330,10 @@ impl TimerWheel { /// Cancel a pending timer. Returns true if the timer existed and was removed. pub fn cancel(&self, timer_id: TimerId) -> bool { let mut inner = self.inner.lock(); - if inner.timers.remove(&timer_id.0).is_some() { + if let Some(entry) = inner.timers.remove(&timer_id.0) { + let deadline = entry.deadline_tick; inner.timer_count -= 1; + inner.note_removed(deadline); true } else { false @@ -351,6 +388,7 @@ impl TimerWheel { inner.timers.insert(id, new_entry); inner.timer_count += 1; inner.insert_into_wheel(id, next_deadline); + inner.note_inserted(next_deadline); } } } @@ -540,21 +578,33 @@ mod tests { } #[test] - fn test_higher_level_timers() { + fn test_higher_level_timer_does_not_fire_early() { let wheel = TimerWheel::new(); let fired = Arc::new(AtomicUsize::new(0)); let fired_clone = fired.clone(); - // 300ms falls into level 1 (>= 256ms) - wheel.schedule_once(Duration::from_millis(300), move || { + // 10s is well into level 1 (>= 256ms) and impossible to overshoot. + wheel.schedule_once(Duration::from_secs(10), move || { fired_clone.fetch_add(1, Ordering::SeqCst); }); - std::thread::sleep(Duration::from_millis(200)); wheel.tick(); assert_eq!(fired.load(Ordering::SeqCst), 0); + assert_eq!(wheel.pending_count(), 1); + } + + #[test] + fn test_higher_level_timer_fires_after_deadline() { + let wheel = TimerWheel::new(); + let fired = Arc::new(AtomicUsize::new(0)); + let fired_clone = fired.clone(); + + // 300ms falls into level 1 (>= 256ms). Sleep well past the deadline. + wheel.schedule_once(Duration::from_millis(300), move || { + fired_clone.fetch_add(1, Ordering::SeqCst); + }); - std::thread::sleep(Duration::from_millis(150)); + std::thread::sleep(Duration::from_millis(400)); wheel.tick(); assert_eq!(fired.load(Ordering::SeqCst), 1); }