Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
87 changes: 80 additions & 7 deletions mill-io/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -101,13 +101,16 @@ pub mod object_pool;
pub mod poll;
pub mod reactor;
pub mod thread_pool;
pub mod timer_wheel;

pub use handler::EventHandler;
pub use mio::event::Event;
pub use object_pool::{ObjectPool, PooledObject};
pub use thread_pool::{ComputePoolMetrics, TaskPriority};
pub use timer_wheel::TimerId;

use crate::{error::Result, reactor::ReactorOptions};
use std::time::Duration;

/// A convenient prelude module that re-exports commonly used types and traits.
///
Expand All @@ -127,6 +130,7 @@ pub mod prelude {
pub use crate::object_pool::{ObjectPool, PooledObject};
pub use crate::reactor::{self, Reactor};
pub use crate::thread_pool::{self, ComputePoolMetrics, TaskPriority, ThreadPool};
pub use crate::timer_wheel::{TimerId, TimerWheel};
}

/// The main event loop structure for registering I/O sources and handling events.
Expand Down Expand Up @@ -448,19 +452,88 @@ impl EventLoop {
self.reactor.get_compute_metrics()
}

/// Schedule a one-shot timer that fires after `delay`.
///
/// The callback runs on the reactor thread during timer processing.
/// Keep callbacks fast to avoid blocking the event loop.
///
/// ## Example
///
/// ```rust,no_run
/// use mill_io::EventLoop;
/// use std::time::Duration;
///
/// let event_loop = EventLoop::default();
/// let timer_id = event_loop.schedule_once(Duration::from_secs(5), || {
/// println!("Timer fired!");
/// });
/// ```
pub fn schedule_once<F>(&self, delay: Duration, callback: F) -> TimerId
where
F: FnOnce() + Send + 'static,
{
let id = self.reactor.timer_wheel.schedule_once(delay, callback);
// Wake the reactor so it can recalculate its poll timeout.
let _ = self.reactor.poll_handle.wake();
id
}

/// Schedule a repeating timer that fires every `interval`.
/// The first firing happens after one `interval` elapses.
///
/// ## Example
///
/// ```rust,no_run
/// use mill_io::EventLoop;
/// use std::time::Duration;
///
/// let event_loop = EventLoop::default();
/// let timer_id = event_loop.schedule_repeating(Duration::from_secs(1), || {
/// println!("Tick!");
/// });
/// ```
pub fn schedule_repeating<F>(&self, interval: Duration, callback: F) -> TimerId
where
F: Fn() + Send + Sync + 'static,
{
let id = self
.reactor
.timer_wheel
.schedule_repeating(interval, callback);
let _ = self.reactor.poll_handle.wake();
id
}
Comment thread
coderabbitai[bot] marked this conversation as resolved.

/// Cancel a pending timer. Returns true if the timer was found and cancelled.
///
/// ## Example
///
/// ```rust,no_run
/// use mill_io::EventLoop;
/// use std::time::Duration;
///
/// let event_loop = EventLoop::default();
/// let timer_id = event_loop.schedule_once(Duration::from_secs(10), || {});
/// event_loop.cancel_timer(timer_id);
/// ```
pub fn cancel_timer(&self, timer_id: TimerId) -> bool {
self.reactor.timer_wheel.cancel(timer_id)
}

/// Returns the number of currently scheduled timers.
pub fn pending_timers(&self) -> usize {
self.reactor.timer_wheel.pending_count()
}

/// Signals the event loop to stop gracefully.
///
/// This method initiates a graceful shutdown of the event loop. It sends a shutdown
/// signal to the reactor, which will cause the main loop to exit after finishing
/// the current polling cycle.
/// Sends a shutdown signal to the reactor, which will cause the main loop
/// to exit after finishing the current polling cycle.
///
/// This method is non-blocking and returns immediately. The actual shutdown happens
/// asynchronously, and [`run()`](Self::run) will return once the shutdown is complete.
///
/// # Thread Safety
///
/// This method is thread-safe and can be called from any thread, making it suitable
/// for use in signal handlers or from other threads.
/// Thread-safe: can be called from any thread.
///
/// ## Example
///
Expand Down
24 changes: 20 additions & 4 deletions mill-io/src/reactor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ use crate::{
error::Result,
poll::PollHandle,
thread_pool::{ComputePoolMetrics, ComputeThreadPool, TaskPriority, ThreadPool},
timer_wheel::TimerWheel,
};
use mio::{event::Event, Events};
use std::{
Expand All @@ -27,6 +28,7 @@ pub struct Reactor {
running: AtomicBool,
poll_timeout_ms: u64,
options: ReactorOptions,
pub(crate) timer_wheel: Arc<TimerWheel>,
}

impl Default for Reactor {
Expand All @@ -45,6 +47,7 @@ impl Default for Reactor {
options: ReactorOptions {
direct_dispatch: false,
},
timer_wheel: Arc::new(TimerWheel::new()),
}
}
}
Expand All @@ -61,6 +64,7 @@ impl Reactor {
options: ReactorOptions {
direct_dispatch: false,
},
timer_wheel: Arc::new(TimerWheel::new()),
})
}

Expand All @@ -78,21 +82,33 @@ impl Reactor {
running: AtomicBool::new(false),
poll_timeout_ms,
options,
timer_wheel: Arc::new(TimerWheel::new()),
})
}

pub fn run(&self) -> Result<()> {
self.running.store(true, Ordering::SeqCst);

while self.running.load(Ordering::SeqCst) {
let _ = self.poll_handle.poll(
&mut self.events.write().unwrap(),
Some(Duration::from_millis(self.poll_timeout_ms)),
)?;
// Shorten the poll timeout if a timer is due soon.
let poll_timeout = match self.timer_wheel.time_until_next() {
Some(until_next) => {
let configured = Duration::from_millis(self.poll_timeout_ms);
until_next.min(configured)
}
None => Duration::from_millis(self.poll_timeout_ms),
};

let _ = self
.poll_handle
.poll(&mut self.events.write().unwrap(), Some(poll_timeout))?;

for event in self.events.read().unwrap().iter() {
self.dispatch_event(event.clone())?;
}

// Advance the timer wheel and fire expired timers.
self.timer_wheel.tick();
Comment thread
coderabbitai[bot] marked this conversation as resolved.
}
Ok(())
}
Expand Down
Loading
Loading