Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
128 changes: 89 additions & 39 deletions modules/schedulers.cppm
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,22 @@ struct scheduled_context
std::reference_wrapper<context> ctx;
time_point wake_time = time_point::max();

/**
* @brief Construct a new scheduled context object.
*
* Initializes a scheduler entry by registering the unblock listener on the
* context and computing the initial wake deadline based on the context's
* current blocking state. If the context is time-blocked, the wake deadline
* is set to the current time plus the context's sleep duration. Otherwise,
* the wake deadline is set to time_point::max() (never).
*
* @param p_ctx The execution context to manage in the scheduler.
* @param p_listener Unblock listener to register on the context for
* notification when the context becomes ready to resume. May be nullptr if
* event-driven wakeup is not needed.
* @param p_clock Clock instance used to compute wake deadlines when
* the context is time-blocked.
*/
scheduled_context(async::context& p_ctx,
async::unblock_listener* p_listener,
Clock const& p_clock)
Expand All @@ -113,20 +129,19 @@ struct scheduled_context
refresh(p_clock);
}

// Bind this entry to p_ctx, register p_listener (may be nullptr), and
// compute the initial wake_time. Must be called exactly once before any
// other method.
void assign(async::context& p_ctx,
async::unblock_listener* p_listener,
Clock const& p_clock)
{
ctx = p_ctx;
p_ctx.on_unblock(p_listener);
refresh(p_clock);
}

// Recompute wake_time from the context's current sleep_time. Called after
// every resume because the blocking state may have changed.
/**
* @brief Recompute wake_time from the context's current blocking state.
*
* Updates the wake deadline based on the context's current blocking state.
* If the context is time-blocked, computes an absolute deadline by adding
* the context's sleep duration to the current time. Otherwise, sets the
* deadline to time_point::max() (never wake).
*
* Called after every context resume because the blocking state may have
* changed.
*
* @param p_clock Clock instance used to compute absolute wake times.
*/
void refresh(Clock const& p_clock)
{
if (ctx.get().state() == blocked_by::time) {
Expand All @@ -137,42 +152,75 @@ struct scheduled_context
}

/**
* @brief Resume the context if its deadline has elapsed or it is otherwise
* ready, then refresh wake_time to reflect the new state.
* @brief Recompute wake_time and update the soonest wake deadline.
*
* @param p_clock - clock to get the current time from
* @return true - context was resumed
* @return false - context is done
* Refreshes the wake deadline from the context's current blocking state,
* then updates the global soonest wake deadline if this entry's deadline
* is earlier.
*
* Called after every context resume because the blocking state may have
* changed.
*
* @param p_clock Clock instance used to compute absolute wake times.
* @param p_soonest_time Reference to the global soonest wake deadline,
* updated if this entry's deadline is earlier.
*/
void resume(Clock const& p_clock)
void refresh(Clock const& p_clock, time_point& p_soonest_time)
{
if (ctx.get().state() == blocked_by::time && wake_time <= p_clock.now()) {
refresh(p_clock);

if (wake_time < p_soonest_time) {
p_soonest_time = wake_time;
}
}

/**
* @brief Resume the context if its deadline has elapsed or it is ready,
* then refresh wake_time to reflect the new state.
*
* If the context is time-blocked and its deadline has elapsed, unblocks and
* resumes the context. Otherwise, if the context is ready (not blocked by
* time or I/O), resumes it immediately. After resuming, refreshes the wake
* deadline and updates the global soonest wake deadline.
*
* @param p_clock Clock instance used to get the current time and compute
* wake deadlines.
* @param p_soonest_time Reference to the global soonest wake deadline,
* updated if this entry's deadline is earlier.
*/
void resume(Clock const& p_clock, time_point& p_soonest_time)
{
if (ctx.get().state() == blocked_by::time and wake_time <= p_clock.now()) {
// Deadline elapsed — unblock without triggering scheduler notification,
// since we are the scheduler, then resume.
// We skip calling the unblock listener because this scheduler
ctx.get().unblock_without_notification();
ctx.get().resume();
// Recompute after resume — state may have changed.
refresh(p_clock);
refresh(p_clock, p_soonest_time);
} else if (is_ready()) {
ctx.get().resume();
// Recompute after resume — state may have changed.
refresh(p_clock);
refresh(p_clock, p_soonest_time);
}
}

// Returns true if the context is not blocked by time or I/O and can be
// resumed immediately.
/**
* @brief Check if the context is ready to resume immediately.
*
* Returns true if the context is not blocked by time or I/O, indicating
* it can be resumed without waiting for a deadline to elapse or for an
* external I/O event to complete.
*
* @return true if the context is not blocked by time or I/O.
* @return false if the context is blocked by time or I/O.
*/
[[nodiscard]] bool is_ready() const noexcept
{
return ctx.get().state() != blocked_by::io &&
ctx.get().state() != blocked_by::time;
}

[[nodiscard]] bool operator<(time_point const& p_other) const noexcept
{
return wake_time < p_other;
}

// Clear the unblock listener so the context does not hold a pointer into
// this stack frame after run_until_done_impl returns.
~scheduled_context()
Expand Down Expand Up @@ -224,30 +272,32 @@ void run_until_done_impl(
bool can_sleep = true;

for (auto& task : tasks) {
// Skip performing any of the following steps if the context is done
if (task.ctx.get().done()) {
continue;
}

all_done = false;

task.resume(p_clock);

// After resuming, re-evaluate whether we can sleep and what the
// soonest deadline is
if (task.wake_time < soonest_wake_time) {
soonest_wake_time = task.wake_time;
}
// This resumes the context if it is elidible to be resumed. It is
// elidible if it's wake time has expired or its in a pollable blocked by
// state. This function also updates the `soonest_wake_time` variable if
// this task could be awaken sooner than the previous.
task.resume(p_clock, soonest_wake_time);

// Before we move on to the next task, check if the task was blocked by
// something.
if (task.is_ready()) {
can_sleep = false;
}
}

// No more work needs to be done, this is how the scheduler exits.
if (all_done) {
break;
return;
}

if (can_sleep && soonest_wake_time < max_wake) {
if (can_sleep and soonest_wake_time < max_wake) {
p_sleep_until(soonest_wake_time);
}
}
Expand Down
Loading