From 5acb0faac431101587fa433c3eda9a4781c7f987 Mon Sep 17 00:00:00 2001 From: Oleksii Date: Wed, 10 Jun 2026 01:07:23 -0300 Subject: [PATCH] =?UTF-8?q?feat(engine):=20Clock=20trait=20=E2=80=94=20vir?= =?UTF-8?q?tual=20time=20for=20scheduler=20and=20time-based=20checks?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Abstract "now" behind a Clock so tests (and a future orch8 dev --skip-timers mode) can advance time manually. - orch8-types/src/clock.rs: Clock trait, SystemClock (default), ManualClock (advance/set, lock-protected), SharedClock handle. - SchedulerConfig gains a serde-skipped `clock` field defaulting to SystemClock — no public signature changes anywhere; orch8-mobile compiles unchanged. - Scheduler reads the clock for all scheduling decisions: claiming due instances, semaphore/concurrency deferrals, delay / send-window / rate-limit pre-flight checks, retry backoff, SLA deadline and human-input timeout baselines, signalled-instance wakes, and cron evaluation (run_cron_loop_with_clock, calculate_next_fire_after). - Delay checks now record a "_delay_until:" metadata marker so a served delay executes instead of re-deferring forever — this makes duration delays actually complete (previously they re-applied on every claim) and enables the 3-day-delay virtual-time test. - Record timestamps (created_at/updated_at, audit rows, DB-side NOW()) stay on real time; boundary documented in orch8_types::clock. - Tests: ManualClock/SystemClock unit tests; e2e tests where a 3-day delay and a send-window boundary complete in milliseconds via ManualClock; full engine/storage/mobile suites pass (2650 tests). Co-Authored-By: Claude Fable 5 --- README.md | 2 +- orch8-engine/src/cron.rs | 57 +++++-- orch8-engine/src/lib.rs | 11 +- orch8-engine/src/scheduler.rs | 68 ++++++-- orch8-engine/src/scheduler/step_exec.rs | 90 ++++++++-- orch8-engine/src/scheduler/tests.rs | 75 ++++++--- orch8-engine/tests/clock_virtual_time.rs | 192 +++++++++++++++++++++ orch8-engine/tests/common/mod.rs | 1 + orch8-types/src/clock.rs | 202 +++++++++++++++++++++++ orch8-types/src/config.rs | 9 + orch8-types/src/lib.rs | 1 + 11 files changed, 642 insertions(+), 66 deletions(-) create mode 100644 orch8-engine/tests/clock_virtual_time.rs create mode 100644 orch8-types/src/clock.rs diff --git a/README.md b/README.md index 17853ded..9beda749 100644 --- a/README.md +++ b/README.md @@ -321,7 +321,7 @@ Chart repo: [orch8-io/helm-charts](https://github.com/orch8-io/helm-charts) Pre-1.0. This is the public release of an engine that has been running my own production for several months, with 4,347 tests covering core paths. Honest about what it isn't yet: - **Not battle-tested at Temporal-scale.** Largest internal load test: ~10K concurrent instances. If you're past that or have multiple engineers depending on uptime, run Temporal until 1.0. -- **No replay debugger or time-skipping test tooling.** Temporal's SDKs ship deterministic replay + timer-skip helpers; we don't. +- **No replay debugger.** Temporal's SDKs ship deterministic replay; we don't yet. Time-skipping tests *are* supported: inject a `ManualClock` via `SchedulerConfig::clock` and advance virtual time manually — a workflow with a 3-day delay completes in a millisecond-scale test. - **Workflow versioning is younger.** Sequence definitions are versioned, but the migration ergonomics for in-flight instances aren't as polished as Temporal's `GetVersion` / patch system. - **SDK depth varies.** TypeScript SDK has both authoring + worker support; Go and Python SDKs are worker-focused for now. - **API is stable but evolving.** Pre-1.0 means breaking changes are possible; we'll mark them in releases and keep them minimal. diff --git a/orch8-engine/src/cron.rs b/orch8-engine/src/cron.rs index 2a6240e2..23d16afd 100644 --- a/orch8-engine/src/cron.rs +++ b/orch8-engine/src/cron.rs @@ -2,7 +2,7 @@ use std::str::FromStr; use std::sync::Arc; use std::time::Duration; -use chrono::Utc; +use chrono::{DateTime, Utc}; use cron::Schedule; use tokio::task::JoinSet; use tokio::time::interval; @@ -10,6 +10,7 @@ use tokio_util::sync::CancellationToken; use tracing::{debug, error, info, warn}; use orch8_storage::StorageBackend; +use orch8_types::clock::SharedClock; use orch8_types::context::ExecutionContext; use orch8_types::cron::CronSchedule; use orch8_types::ids::InstanceId; @@ -17,12 +18,26 @@ use orch8_types::instance::{InstanceState, Priority, TaskInstance}; use crate::error::EngineError; -/// Run the cron tick loop. Checks every `tick_interval` for due cron schedules, -/// creates instances for each, and advances their `next_fire_at`. +/// Run the cron tick loop on the system clock. Checks every `tick_interval` +/// for due cron schedules, creates instances for each, and advances their +/// `next_fire_at`. See [`run_cron_loop_with_clock`] for virtual-time use. pub async fn run_cron_loop( storage: Arc, tick_interval: Duration, cancel: CancellationToken, +) -> Result<(), EngineError> { + run_cron_loop_with_clock(storage, tick_interval, SharedClock::default(), cancel).await +} + +/// [`run_cron_loop`] reading "now" from the supplied scheduler clock. +/// +/// Note: the loop still *ticks* on real (tokio) time; the clock only governs +/// which schedules are considered due and how their next fire is computed. +pub async fn run_cron_loop_with_clock( + storage: Arc, + tick_interval: Duration, + clock: SharedClock, + cancel: CancellationToken, ) -> Result<(), EngineError> { let mut ticker = interval(tick_interval); ticker.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip); @@ -39,7 +54,7 @@ pub async fn run_cron_loop( return Ok(()); } _ = ticker.tick() => { - if let Err(e) = process_cron_tick(&storage).await { + if let Err(e) = process_cron_tick(&storage, &clock).await { error!(error = %e, "cron tick failed"); } } @@ -47,8 +62,11 @@ pub async fn run_cron_loop( } } -async fn process_cron_tick(storage: &Arc) -> Result<(), EngineError> { - let now = Utc::now(); +async fn process_cron_tick( + storage: &Arc, + clock: &SharedClock, +) -> Result<(), EngineError> { + let now = clock.now(); let schedules = storage.claim_due_cron_schedules(now).await?; if schedules.is_empty() { @@ -63,9 +81,10 @@ async fn process_cron_tick(storage: &Arc) -> Result<(), Engi let mut tasks: JoinSet<()> = JoinSet::new(); for schedule in schedules { let storage = Arc::clone(storage); + let clock = clock.clone(); tasks.spawn(async move { let cron_id = schedule.id; - if let Err(e) = trigger_cron_schedule(storage.as_ref(), &schedule).await { + if let Err(e) = trigger_cron_schedule(storage.as_ref(), &schedule, &clock).await { error!( cron_id = %cron_id, error = %e, @@ -86,8 +105,9 @@ async fn process_cron_tick(storage: &Arc) -> Result<(), Engi async fn trigger_cron_schedule( storage: &dyn StorageBackend, schedule: &CronSchedule, + clock: &SharedClock, ) -> Result<(), EngineError> { - let now = Utc::now(); + let now = clock.now(); // Create a new instance for this schedule. let instance = TaskInstance { @@ -135,7 +155,7 @@ async fn trigger_cron_schedule( // Calculate next fire time — always advance even on failure so we don't // retry the same tick in a hot error loop. - let next = calculate_next_fire(schedule); + let next = calculate_next_fire_after(schedule, now); match next { Some(next_at) => { storage @@ -176,19 +196,30 @@ fn normalize_cron_expr(expr: &str) -> String { } } -/// Calculate the next fire time from a cron expression, respecting the -/// schedule's timezone. Falls back to UTC if the timezone is invalid. +/// Calculate the next fire time from a cron expression after the real current +/// time, respecting the schedule's timezone. Falls back to UTC if the +/// timezone is invalid. pub fn calculate_next_fire(schedule: &CronSchedule) -> Option> { + calculate_next_fire_after(schedule, Utc::now()) +} + +/// [`calculate_next_fire`] evaluated against an explicit "now" — the cron +/// loop passes its scheduler clock's instant so virtual time controls +/// schedule advancement in tests. +pub fn calculate_next_fire_after( + schedule: &CronSchedule, + now: DateTime, +) -> Option> { let normalized = normalize_cron_expr(&schedule.cron_expr); let cron_schedule = Schedule::from_str(&normalized).ok()?; if let Ok(tz) = schedule.timezone.parse::() { - let now_tz = Utc::now().with_timezone(&tz); + let now_tz = now.with_timezone(&tz); cron_schedule .after(&now_tz) .next() .map(|dt| dt.with_timezone(&Utc)) } else { - cron_schedule.upcoming(Utc).next() + cron_schedule.after(&now).next() } } diff --git a/orch8-engine/src/lib.rs b/orch8-engine/src/lib.rs index ee964447..131f89e4 100644 --- a/orch8-engine/src/lib.rs +++ b/orch8-engine/src/lib.rs @@ -1,6 +1,11 @@ pub mod circuit_breaker; pub mod credentials; pub mod cron; +/// Virtual time for scheduling decisions — re-exported from `orch8-types` so +/// engine users can write `orch8_engine::clock::ManualClock`. +pub mod clock { + pub use orch8_types::clock::{Clock, ManualClock, SharedClock, SystemClock}; +} pub mod error; pub mod evaluator; pub mod expression; @@ -270,8 +275,12 @@ impl Engine { let cron_storage = Arc::clone(&self.storage); let cron_cancel = self.cancel.clone(); let cron_tick = std::time::Duration::from_secs(self.config.cron_tick_secs); + let cron_clock = self.config.clock.clone(); set.spawn(async move { - if let Err(e) = cron::run_cron_loop(cron_storage, cron_tick, cron_cancel).await { + if let Err(e) = + cron::run_cron_loop_with_clock(cron_storage, cron_tick, cron_clock, cron_cancel) + .await + { tracing::error!(error = %e, "cron loop exited with error"); } }); diff --git a/orch8-engine/src/scheduler.rs b/orch8-engine/src/scheduler.rs index adfcc5f1..d02f51af 100644 --- a/orch8-engine/src/scheduler.rs +++ b/orch8-engine/src/scheduler.rs @@ -11,6 +11,7 @@ use tokio_util::sync::CancellationToken; use tracing::{debug, error, info, warn}; use orch8_storage::StorageBackend; +use orch8_types::clock::SharedClock; use orch8_types::config::{SchedulerConfig, WebhookConfig}; use orch8_types::filter::InstanceFilter; use orch8_types::ids::{BlockId, InstanceId}; @@ -26,7 +27,7 @@ mod step_exec; #[cfg(test)] mod tests; -pub use step_exec::check_human_input; +pub use step_exec::{check_human_input, check_human_input_at}; /// Result of a single tick execution, suitable for mobile/embedded callers /// that drive the engine manually rather than via the continuous tick loop. @@ -64,6 +65,10 @@ pub(crate) struct TickContext<'a> { pub max_per_tenant: u32, pub externalize_threshold: u32, pub counters: Option>, + /// Time source for all scheduling decisions made during this tick + /// (claiming, deferrals, deadline/timeout checks). Defaults to the + /// system clock via `SchedulerConfig::clock`. + pub clock: &'a SharedClock, } /// Execute a single scheduling pass: claim due instances, process them, handle @@ -96,6 +101,7 @@ pub async fn tick_once( max_per_tenant: config.max_instances_per_tenant, externalize_threshold: config.externalize_output_threshold, counters: Some(Arc::clone(&counters)), + clock: &config.clock, }; let join_handles = process_tick(&ctx).await?; @@ -108,7 +114,7 @@ pub async fn tick_once( let _ = handle.await; } - process_signalled_instances(storage, config.batch_size).await?; + process_signalled_instances(storage, config.batch_size, &config.clock).await?; process_waiting_deadlines( storage, @@ -117,6 +123,7 @@ pub async fn tick_once( &webhook_config, cancel, config.batch_size, + &config.clock, ) .await?; @@ -209,6 +216,7 @@ pub async fn run_tick_loop( externalize_threshold, // The tick loop does not need per-tick counters — pass None. counters: None, + clock: &config.clock, }; match process_tick(&ctx).await { Ok(_join_handles) => { @@ -236,7 +244,7 @@ pub async fn run_tick_loop( // picked up by claim_due_instances (which only claims // scheduled instances). Without this, resume/cancel signals // on paused instances would never be processed. - if let Err(e) = process_signalled_instances(&storage, batch_size).await { + if let Err(e) = process_signalled_instances(&storage, batch_size, &config.clock).await { error!(error = %e, "signalled instance processing failed"); } // Check SLA deadlines for waiting instances (external worker @@ -249,6 +257,7 @@ pub async fn run_tick_loop( &webhook_config, &cancel, batch_size, + &config.clock, ).await { error!(error = %e, "waiting deadline processing failed"); } @@ -289,7 +298,7 @@ async fn process_tick(ctx: &TickContext<'_>) -> Result>, Engi } let fetch_limit = std::cmp::min(ctx.batch_size, u32::try_from(available).unwrap_or(u32::MAX)); - let now = Utc::now(); + let now = ctx.clock.now(); let instances = ctx .storage .claim_due_instances(now, fetch_limit, ctx.max_per_tenant) @@ -304,7 +313,7 @@ async fn process_tick(ctx: &TickContext<'_>) -> Result>, Engi // instances share a concurrency_key, more than `max_concurrency` may be // Running simultaneously. We put the excess back to Scheduled here, // synchronously, so the observable Running count never exceeds the limit. - let mut instances = enforce_concurrency_limits(ctx.storage, instances).await?; + let mut instances = enforce_concurrency_limits(ctx.storage, instances, ctx.clock).await?; let count = instances.len(); tracing::Span::current().record("claimed", count); @@ -353,7 +362,7 @@ async fn process_tick(ctx: &TickContext<'_>) -> Result>, Engi ); if let Err(e) = ctx .storage - .update_instance_state(instance.id, InstanceState::Scheduled, Some(Utc::now())) + .update_instance_state(instance.id, InstanceState::Scheduled, Some(ctx.clock.now())) .await { // If this write fails the row stays `Running` and is never @@ -376,6 +385,7 @@ async fn process_tick(ctx: &TickContext<'_>) -> Result>, Engi let counters = ctx.counters.clone(); let externalize_threshold = ctx.externalize_threshold; let max_steps_per_instance = ctx.max_steps_per_instance; + let clock = ctx.clock.clone(); crate::metrics::inc(crate::metrics::INSTANCES_CLAIMED); @@ -415,6 +425,7 @@ async fn process_tick(ctx: &TickContext<'_>) -> Result>, Engi externalize_threshold, max_steps_per_instance, &cancel, + &clock, ) .await }) @@ -499,6 +510,7 @@ async fn process_tick(ctx: &TickContext<'_>) -> Result>, Engi async fn enforce_concurrency_limits( storage: &Arc, instances: Vec, + clock: &SharedClock, ) -> Result, EngineError> { // Collect concurrency keys present in the batch. let mut key_instances: HashMap<&str, Vec> = HashMap::with_capacity(instances.len() / 2); @@ -546,7 +558,7 @@ async fn enforce_concurrency_limits( return Ok(instances); } - let defer_at = Utc::now() + chrono::Duration::seconds(1); + let defer_at = clock.now() + chrono::Duration::seconds(1); let mut deferred_ids: Vec = Vec::with_capacity(deferred_indices.len()); for &idx in &deferred_indices { let inst = &instances[idx]; @@ -579,6 +591,7 @@ async fn enforce_concurrency_limits( async fn process_signalled_instances( storage: &Arc, batch_size: u32, + clock: &SharedClock, ) -> Result<(), EngineError> { let signalled = storage.get_signalled_instance_ids(batch_size).await?; if signalled.is_empty() { @@ -610,7 +623,7 @@ async fn process_signalled_instances( "waking scheduled instance with pending signal" ); if let Err(e) = storage - .update_instance_state(instance_id, InstanceState::Scheduled, Some(Utc::now())) + .update_instance_state(instance_id, InstanceState::Scheduled, Some(clock.now())) .await { warn!( @@ -653,6 +666,7 @@ async fn process_waiting_deadlines( webhook_config: &WebhookConfig, cancel: &CancellationToken, batch_size: u32, + clock: &SharedClock, ) -> Result<(), EngineError> { use orch8_types::filter::{InstanceFilter, Pagination}; @@ -718,6 +732,7 @@ async fn process_waiting_deadlines( webhook_config, cancel, prev, + clock, ) .await? { @@ -738,7 +753,7 @@ async fn process_waiting_deadlines( .current_step_started_at .or(instance.context.runtime.started_at); if let Some(started) = baseline { - let elapsed = Utc::now() - started; + let elapsed = clock.now() - started; if elapsed > chrono::Duration::from_std(timeout) .unwrap_or(chrono::Duration::days(365)) @@ -754,7 +769,7 @@ async fn process_waiting_deadlines( Some(&instance.tenant_id), InstanceState::Waiting, InstanceState::Scheduled, - Some(Utc::now()), + Some(clock.now()), ) .await?; handled = true; @@ -772,6 +787,7 @@ async fn process_waiting_deadlines( /// SLA deadline check variant for instances in `Waiting` state. /// Delegates to the shared `handle_deadline_breach` with `from_state = Waiting`. +#[allow(clippy::too_many_arguments)] async fn check_step_deadline_waiting( storage: &Arc, handlers: &HandlerRegistry, @@ -780,6 +796,7 @@ async fn check_step_deadline_waiting( _webhook_config: &WebhookConfig, _cancel: &CancellationToken, prev_output: Option<&orch8_types::output::BlockOutput>, + clock: &SharedClock, ) -> Result { handle_deadline_breach( storage, @@ -788,6 +805,7 @@ async fn check_step_deadline_waiting( step_def, prev_output, InstanceState::Waiting, + clock, ) .await } @@ -807,6 +825,7 @@ pub(crate) async fn handle_deadline_breach( step_def: &orch8_types::sequence::StepDef, prev_output: Option<&orch8_types::output::BlockOutput>, from_state: InstanceState, + clock: &SharedClock, ) -> Result { let Some(deadline) = step_def.deadline else { return Ok(false); @@ -817,7 +836,7 @@ pub(crate) async fn handle_deadline_breach( let Some(baseline) = baseline else { return Ok(false); }; - let elapsed = Utc::now() - baseline; + let elapsed = clock.now() - baseline; if elapsed < chrono::Duration::from_std(deadline).unwrap_or(chrono::TimeDelta::MAX) { return Ok(false); } @@ -977,6 +996,7 @@ async fn process_instance( externalize_threshold: u32, max_steps_per_instance: u32, cancel: &CancellationToken, + clock: &SharedClock, ) -> Result<(), EngineError> { let instance_id = instance.id; @@ -987,9 +1007,11 @@ async fn process_instance( // claim_due_instances already set state to Running. // Stamp runtime.started_at on the first run so timeout / escalation - // handlers (e.g. human_review) can compute elapsed time. + // handlers (e.g. human_review) can compute elapsed time. Uses the + // scheduler clock (not wall time) because this value is the baseline for + // deadline / human-input timeout *scheduling decisions*. let instance = if instance.context.runtime.started_at.is_none() { - let started_at = Utc::now(); + let started_at = clock.now(); storage .update_instance_started_at(instance_id, started_at) .await?; @@ -1061,8 +1083,10 @@ async fn process_instance( let prev = deadline_outputs_ref .get(&(&instance.id, &step_def.id)) .copied(); - if step_exec::check_step_deadline(storage, handlers, &instance, step_def, prev) - .await? + if step_exec::check_step_deadline( + storage, handlers, &instance, step_def, prev, clock, + ) + .await? { return Ok(()); } @@ -1074,7 +1098,7 @@ async fn process_instance( if let (Some(ref key), Some(max)) = (&instance.concurrency_key, instance.max_concurrency) { let position = storage.concurrency_position(instance_id, key).await?; if position > i64::from(max) { - let defer_at = Utc::now() + chrono::Duration::seconds(2); + let defer_at = clock.now() + chrono::Duration::seconds(2); storage .update_instance_state(instance_id, InstanceState::Scheduled, Some(defer_at)) .await?; @@ -1116,6 +1140,7 @@ async fn process_instance( &sequence, max_steps_per_instance, cancel, + clock, ) .await; } @@ -1152,6 +1177,7 @@ async fn process_instance( &instance, step_def, cancel, + clock, ) .await?; @@ -1262,6 +1288,12 @@ async fn process_instance( /// /// Uses a CAS (conditional update) so a concurrent cancel/fail cannot be /// silently overwritten. +/// +/// Clock note: this intentionally uses real `Utc::now()` rather than the +/// scheduler clock — the semantics are "wake immediately". Under a +/// `ManualClock` that starts at or after real time (the documented +/// discipline), real now <= virtual now, so the parent is due on the very +/// next virtual tick. async fn wake_parent_if_child( storage: &dyn StorageBackend, instance: &orch8_types::instance::TaskInstance, @@ -1310,6 +1342,7 @@ async fn wake_parent_if_child( /// The evaluator returns an `EvalOutcome` carrying the tree state so we can /// transition the instance without re-reading the tree from the database. #[allow(clippy::too_many_lines)] +#[allow(clippy::too_many_arguments)] async fn process_instance_tree( storage: &Arc, handlers: &HandlerRegistry, @@ -1318,6 +1351,7 @@ async fn process_instance_tree( sequence: &SequenceDefinition, max_steps_per_instance: u32, cancel: &CancellationToken, + clock: &SharedClock, ) -> Result<(), EngineError> { use crate::evaluator::EvalOutcome; let instance_id = instance.id; @@ -1391,7 +1425,7 @@ async fn process_instance_tree( Some(&instance.tenant_id), current, InstanceState::Scheduled, - Some(Utc::now()), + Some(clock.now()), ) .await { diff --git a/orch8-engine/src/scheduler/step_exec.rs b/orch8-engine/src/scheduler/step_exec.rs index 56362ada..58c1875b 100644 --- a/orch8-engine/src/scheduler/step_exec.rs +++ b/orch8-engine/src/scheduler/step_exec.rs @@ -6,11 +6,12 @@ use std::borrow::Cow; use std::sync::Arc; -use chrono::Utc; +use chrono::{DateTime, Utc}; use tokio_util::sync::CancellationToken; use tracing::{debug, info, warn}; use orch8_storage::StorageBackend; +use orch8_types::clock::SharedClock; use orch8_types::config::WebhookConfig; use orch8_types::instance::InstanceState; @@ -28,6 +29,7 @@ pub(super) async fn check_step_deadline( instance: &orch8_types::instance::TaskInstance, step_def: &orch8_types::sequence::StepDef, prev_output: Option<&orch8_types::output::BlockOutput>, + clock: &SharedClock, ) -> Result { super::handle_deadline_breach( storage, @@ -36,6 +38,7 @@ pub(super) async fn check_step_deadline( step_def, prev_output, InstanceState::Running, + clock, ) .await } @@ -112,23 +115,56 @@ pub(super) async fn check_budget( Ok(true) // Paused — skip step execution this tick. } +/// Metadata key prefix recording the `fire_at` a delayed block was deferred +/// to. Once `clock.now() >= fire_at`, the delay has been served and the block +/// executes instead of being re-deferred. +const DELAY_UNTIL_PREFIX: &str = "_delay_until:"; + /// Check if the step has a delay and defer if so. Returns `true` if deferred. +/// +/// The deferral target is recorded in instance metadata +/// (`"_delay_until:": `). On a later claim, if the +/// scheduler clock has reached that instant the delay is considered served +/// and the step proceeds to execution; otherwise the deferral is recomputed +/// from the current clock (preserving the historical "delay re-applies while +/// pending" behavior). Without the marker a duration-based delay would +/// re-defer on every claim and the block could never run. pub(super) async fn check_step_delay( storage: &dyn StorageBackend, instance: &orch8_types::instance::TaskInstance, step_def: &orch8_types::sequence::StepDef, + clock: &SharedClock, ) -> Result { let Some(delay) = &step_def.delay else { return Ok(false); }; + let now = clock.now(); + let marker_key = format!("{DELAY_UNTIL_PREFIX}{}", step_def.id.as_str()); + let served = instance + .metadata + .get(&marker_key) + .and_then(|v| v.as_str()) + .and_then(|s| DateTime::parse_from_rfc3339(s).ok()) + .is_some_and(|until| now >= until.with_timezone(&Utc)); + if served { + return Ok(false); + } + let fire_at = crate::scheduling::delay::calculate_next_fire_at( - Utc::now(), + now, delay, &instance.timezone, Some(&instance.context.config), ); + storage + .merge_instance_metadata( + instance.id, + &serde_json::json!({ marker_key: fire_at.to_rfc3339() }), + ) + .await?; + crate::lifecycle::transition_instance( storage, instance.id, @@ -153,13 +189,14 @@ pub(super) async fn check_send_window( storage: &dyn StorageBackend, instance: &orch8_types::instance::TaskInstance, step_def: &orch8_types::sequence::StepDef, + clock: &SharedClock, ) -> Result { let Some(window) = &step_def.send_window else { return Ok(false); }; let Some(next_open) = - crate::scheduling::send_window::check_window(Utc::now(), window, &instance.timezone) + crate::scheduling::send_window::check_window(clock.now(), window, &instance.timezone) else { return Ok(false); // Inside window }; @@ -188,6 +225,7 @@ pub(super) async fn check_step_rate_limit( storage: &dyn StorageBackend, instance: &orch8_types::instance::TaskInstance, step_def: &orch8_types::sequence::StepDef, + clock: &SharedClock, ) -> Result { let Some(key) = &step_def.rate_limit_key else { return Ok(false); @@ -195,7 +233,7 @@ pub(super) async fn check_step_rate_limit( let resource_key = orch8_types::ids::ResourceKey::new(key.clone()); let check = storage - .check_rate_limit(&instance.tenant_id, &resource_key, Utc::now()) + .check_rate_limit(&instance.tenant_id, &resource_key, clock.now()) .await?; if let orch8_types::rate_limit::RateLimitCheck::Exceeded { retry_after } = check { @@ -267,12 +305,26 @@ async fn accept_human_input( Ok(()) } -#[allow(clippy::too_many_lines)] +/// [`check_human_input_at`] evaluated at the real current time. Kept as the +/// stable public entry point for integration tests and external callers. pub async fn check_human_input( storage: &dyn StorageBackend, instance: &orch8_types::instance::TaskInstance, step_def: &orch8_types::sequence::StepDef, human_def: &orch8_types::sequence::HumanInputDef, +) -> Result { + check_human_input_at(storage, instance, step_def, human_def, Utc::now()).await +} + +/// [`check_human_input`] with an explicit "now" — the scheduler passes its +/// clock's instant here so input timeouts respect virtual time in tests. +#[allow(clippy::too_many_lines)] +pub async fn check_human_input_at( + storage: &dyn StorageBackend, + instance: &orch8_types::instance::TaskInstance, + step_def: &orch8_types::sequence::StepDef, + human_def: &orch8_types::sequence::HumanInputDef, + now: DateTime, ) -> Result { // Dry-run auto-approve: resolve the gate with the default (first) choice so // the simulation flows past human gates instead of pausing. Behaviorally @@ -351,7 +403,7 @@ pub async fn check_human_input( .current_step_started_at .or(instance.context.runtime.started_at); if let Some(started) = baseline { - let elapsed = chrono::Utc::now() - started; + let elapsed = now - started; if elapsed > chrono::Duration::from_std(timeout).unwrap_or(chrono::Duration::days(365)) { // Timeout expired — fail or escalate. @@ -386,7 +438,7 @@ pub async fn check_human_input( Some(&instance.tenant_id), InstanceState::Running, InstanceState::Scheduled, - Some(chrono::Utc::now()), + Some(now), ) .await?; return Ok(true); // Deferred — do NOT fall through to the step handler @@ -415,6 +467,7 @@ pub async fn check_human_input( /// Returns `StepOutcome` wrapped in `Result` — the outcome tells the caller /// whether to continue executing more blocks or stop. #[allow(clippy::too_many_lines)] +#[allow(clippy::too_many_arguments)] pub(super) async fn execute_step_block( storage: &Arc, handlers: &HandlerRegistry, @@ -423,6 +476,7 @@ pub(super) async fn execute_step_block( instance: &orch8_types::instance::TaskInstance, step_def: &orch8_types::sequence::StepDef, cancel: &CancellationToken, + clock: &SharedClock, ) -> Result { let instance_id = instance.id; @@ -449,23 +503,24 @@ pub(super) async fn execute_step_block( } } - if check_step_delay(storage.as_ref(), instance, step_def).await? { + if check_step_delay(storage.as_ref(), instance, step_def, clock).await? { return Ok(StepOutcome::Deferred); } - if check_send_window(storage.as_ref(), instance, step_def).await? { + if check_send_window(storage.as_ref(), instance, step_def, clock).await? { return Ok(StepOutcome::Deferred); } - if check_step_rate_limit(storage.as_ref(), instance, step_def).await? { + if check_step_rate_limit(storage.as_ref(), instance, step_def, clock).await? { return Ok(StepOutcome::Deferred); } // Stamp per-step start time and current step ID so wait_for_input // timeouts are measured correctly and the mobile notifier can identify - // which step the instance is waiting on. + // which step the instance is waiting on. Uses the scheduler clock — this + // timestamp is the baseline for timeout scheduling decisions. { - let step_started = chrono::Utc::now(); + let step_started = clock.now(); if let Some(mut inst) = storage.get_instance(instance_id).await? { let expected_updated_at = inst.updated_at; inst.context.runtime.current_step = Some(step_def.id.clone()); @@ -481,7 +536,9 @@ pub(super) async fn execute_step_block( // API can discover the instance and process_signalled_instances will wake it // when the signal arrives. if let Some(human_def) = &step_def.wait_for_input { - if check_human_input(storage.as_ref(), instance, step_def, human_def).await? { + if check_human_input_at(storage.as_ref(), instance, step_def, human_def, clock.now()) + .await? + { crate::lifecycle::transition_instance( storage.as_ref(), instance_id, @@ -674,7 +731,7 @@ pub(super) async fn execute_step_block( Some(&instance.tenant_id), InstanceState::Running, InstanceState::Scheduled, - Some(chrono::Utc::now()), + Some(clock.now()), ) .await?; Ok(StepOutcome::Deferred) @@ -769,6 +826,7 @@ pub(super) async fn execute_step_block( webhook_config, message, cancel, + clock, ) .await?; Ok(StepOutcome::Failed) @@ -823,6 +881,7 @@ pub(super) async fn execute_step_block( } } +#[allow(clippy::too_many_arguments)] pub(super) async fn handle_retryable_failure( storage: &dyn StorageBackend, instance: &orch8_types::instance::TaskInstance, @@ -831,6 +890,7 @@ pub(super) async fn handle_retryable_failure( webhook_config: &WebhookConfig, message: &str, cancel: &CancellationToken, + clock: &SharedClock, ) -> Result<(), EngineError> { let instance_id = instance.id; if let Some(retry) = &step_def.retry { @@ -882,7 +942,7 @@ pub(super) async fn handle_retryable_failure( retry.max_backoff, retry.backoff_multiplier, ); - let fire_at = Utc::now() + let fire_at = clock.now() + chrono::Duration::from_std(backoff).unwrap_or_else(|_| chrono::Duration::zero()); warn!( diff --git a/orch8-engine/src/scheduler/tests.rs b/orch8-engine/src/scheduler/tests.rs index 04f6f794..71a65136 100644 --- a/orch8-engine/src/scheduler/tests.rs +++ b/orch8-engine/src/scheduler/tests.rs @@ -190,6 +190,7 @@ async fn scheduler_execute_step_block_resolves_templates_for_inprocess_handler() &instance, &step_def, &cancel, + &SharedClock::default(), ) .await .expect("execute_step_block errored"); @@ -241,6 +242,7 @@ async fn scheduler_execute_step_block_template_failure_fails_instance() { &instance, &step_def, &cancel, + &SharedClock::default(), ) .await .expect("execute_step_block errored"); @@ -321,6 +323,7 @@ async fn scheduler_fast_path_retryable_failure_honours_max_attempts() { &inst, &step_def, &cancel, + &SharedClock::default(), ) .await .expect("execute_step_block errored"); @@ -383,6 +386,7 @@ async fn scheduler_execute_step_block_resolves_params_for_external_worker() { &instance, &step_def, &cancel, + &SharedClock::default(), ) .await .expect("execute_step_block errored"); @@ -578,9 +582,14 @@ async fn check_step_delay_defers_instance_with_correct_fire_at() { timezone: None, }); - let deferred = check_step_delay(storage.as_ref(), &instance, &step_def) - .await - .expect("check_step_delay errored"); + let deferred = check_step_delay( + storage.as_ref(), + &instance, + &step_def, + &SharedClock::default(), + ) + .await + .expect("check_step_delay errored"); assert!(deferred, "delay present → must defer"); let refreshed = storage.get_instance(instance_id).await.unwrap().unwrap(); @@ -602,9 +611,14 @@ async fn check_step_delay_is_noop_when_no_delay() { let instance = storage.get_instance(instance_id).await.unwrap().unwrap(); let step_def = mk_step_def("s", "noop", serde_json::json!({})); - let deferred = check_step_delay(storage.as_ref(), &instance, &step_def) - .await - .unwrap(); + let deferred = check_step_delay( + storage.as_ref(), + &instance, + &step_def, + &SharedClock::default(), + ) + .await + .unwrap(); assert!(!deferred); // State must stay Running since there's nothing to defer. let refreshed = storage.get_instance(instance_id).await.unwrap().unwrap(); @@ -653,9 +667,14 @@ async fn check_step_rate_limit_returns_false_when_no_rate_limit_key() { let step_def = mk_step_def("s", "noop", serde_json::json!({})); // rate_limit_key defaults to None in mk_step_def. - let deferred = check_step_rate_limit(storage.as_ref(), &instance, &step_def) - .await - .unwrap(); + let deferred = check_step_rate_limit( + storage.as_ref(), + &instance, + &step_def, + &SharedClock::default(), + ) + .await + .unwrap(); assert!(!deferred, "no rate_limit_key => must not defer"); // State must remain Running — nothing to defer. @@ -674,9 +693,14 @@ async fn check_step_rate_limit_allows_under_threshold() { let mut step_def = mk_step_def("s", "noop", serde_json::json!({})); step_def.rate_limit_key = Some("rk-under".into()); // No upsert → no bucket row → check_rate_limit returns Allowed. - let deferred = check_step_rate_limit(storage.as_ref(), &instance, &step_def) - .await - .unwrap(); + let deferred = check_step_rate_limit( + storage.as_ref(), + &instance, + &step_def, + &SharedClock::default(), + ) + .await + .unwrap(); assert!(!deferred, "empty bucket must not defer"); let refreshed = storage.get_instance(instance_id).await.unwrap().unwrap(); @@ -697,9 +721,14 @@ async fn check_step_rate_limit_defers_at_threshold() { let mut step_def = mk_step_def("s", "noop", serde_json::json!({})); step_def.rate_limit_key = Some("rk-full".into()); - let deferred = check_step_rate_limit(storage.as_ref(), &instance, &step_def) - .await - .unwrap(); + let deferred = check_step_rate_limit( + storage.as_ref(), + &instance, + &step_def, + &SharedClock::default(), + ) + .await + .unwrap(); assert!(deferred, "full bucket must defer"); let refreshed = storage.get_instance(instance_id).await.unwrap().unwrap(); @@ -759,7 +788,9 @@ async fn enforce_concurrency_limits_passthrough_when_no_keys() { storage.get_instance(id2).await.unwrap().unwrap(), ]; - let kept = enforce_concurrency_limits(&storage, insts).await.unwrap(); + let kept = enforce_concurrency_limits(&storage, insts, &SharedClock::default()) + .await + .unwrap(); assert_eq!(kept.len(), 2, "no keys => passthrough"); } @@ -792,7 +823,9 @@ async fn enforce_concurrency_limits_max_one_serialises() { storage.get_instance(id2).await.unwrap().unwrap(), ]; - let kept = enforce_concurrency_limits(&storage, insts).await.unwrap(); + let kept = enforce_concurrency_limits(&storage, insts, &SharedClock::default()) + .await + .unwrap(); assert_eq!(kept.len(), 1, "max=1 must keep exactly one"); // The deferred instance must be back in Scheduled with a fire_at set. @@ -831,7 +864,9 @@ async fn enforce_concurrency_limits_max_zero_blocks_all() { storage.get_instance(id2).await.unwrap().unwrap(), ]; - let kept = enforce_concurrency_limits(&storage, insts).await.unwrap(); + let kept = enforce_concurrency_limits(&storage, insts, &SharedClock::default()) + .await + .unwrap(); assert_eq!(kept.len(), 0, "max=0 must defer everyone"); for id in [id1, id2] { @@ -865,7 +900,9 @@ async fn enforce_concurrency_limits_allows_up_to_cap() { storage.get_instance(id3).await.unwrap().unwrap(), ]; - let kept = enforce_concurrency_limits(&storage, insts).await.unwrap(); + let kept = enforce_concurrency_limits(&storage, insts, &SharedClock::default()) + .await + .unwrap(); assert_eq!(kept.len(), 2, "max=2 must keep exactly two of three"); // Exactly one of the three must be back in Scheduled. diff --git a/orch8-engine/tests/clock_virtual_time.rs b/orch8-engine/tests/clock_virtual_time.rs new file mode 100644 index 00000000..ed1679b0 --- /dev/null +++ b/orch8-engine/tests/clock_virtual_time.rs @@ -0,0 +1,192 @@ +//! Virtual-time e2e tests: drive `tick_once` with a `ManualClock` injected +//! via `SchedulerConfig::clock` and advance time manually, so workflows with +//! multi-day delays and send windows complete in a millisecond-scale test run. + +use std::sync::Arc; +use std::time::Duration; + +use chrono::{TimeZone, Utc}; +use tokio_util::sync::CancellationToken; + +use orch8_engine::clock::{Clock, ManualClock, SharedClock}; +use orch8_engine::handlers::HandlerRegistry; +use orch8_engine::scheduler::tick_once; +use orch8_storage::StorageBackend; +use orch8_types::config::SchedulerConfig; +use orch8_types::ids::BlockId; +use orch8_types::instance::InstanceState; +use orch8_types::sequence::{DelaySpec, SendWindow}; + +mod common; +use common::*; + +/// Build a scheduler config whose scheduling decisions read the given clock. +fn config_with_clock(manual: &Arc) -> SchedulerConfig { + SchedulerConfig { + clock: SharedClock::from_arc(Arc::clone(manual) as Arc), + ..default_config() + } +} + +/// Run one scheduling pass under the supplied config. +async fn tick( + storage: &Arc, + handlers: &Arc, + config: &SchedulerConfig, +) -> orch8_engine::scheduler::TickOnceResult { + let sem = semaphore(128); + let seq_cache = cache(); + let cancel = CancellationToken::new(); + tick_once(storage, handlers, &sem, config, &seq_cache, &cancel) + .await + .unwrap() +} + +/// The payoff test: a sequence with a 3-day delay completes in a fast test +/// run by advancing a `ManualClock` instead of sleeping. +#[tokio::test] +async fn three_day_delay_completes_with_manual_clock() { + let s = storage().await; + let delay = DelaySpec { + duration: Duration::from_secs(3 * 24 * 60 * 60), // 3 days + business_days_only: false, + jitter: None, + holidays: vec![], + fire_at_local: None, + timezone: None, + }; + let seq = mk_sequence(vec![ + mk_step_with_delay_spec("wait_3d", "noop", delay), + mk_step("after_delay", "noop"), + ]); + s.create_sequence(&seq).await.unwrap(); + let inst = mk_instance_scheduled(seq.id, serde_json::json!({})); + s.create_instance(&inst).await.unwrap(); + + // ManualClock discipline: start at (or after) real time, move forward only. + let start = Utc::now(); + let manual = Arc::new(ManualClock::new(start)); + let config = config_with_clock(&manual); + let h = Arc::new(registry()); + + // Tick 1: the delayed step defers the instance ~3 virtual days out. + tick(&s, &h, &config).await; + let refreshed = s.get_instance(inst.id).await.unwrap().unwrap(); + assert_eq!(refreshed.state, InstanceState::Scheduled); + let fire_at = refreshed.next_fire_at.expect("deferred fire_at"); + let diff_secs = (fire_at - start).num_seconds(); + assert!( + (3 * 24 * 3600 - 5..=3 * 24 * 3600 + 5).contains(&diff_secs), + "expected ~3 days deferral, got {diff_secs}s" + ); + + // Tick 2 without advancing time: nothing is due, nothing runs. + let result = tick(&s, &h, &config).await; + assert_eq!(result.steps_executed, 0, "instance must not be claimed yet"); + let refreshed = s.get_instance(inst.id).await.unwrap().unwrap(); + assert_eq!(refreshed.state, InstanceState::Scheduled); + + // Advance 3 days (+1 minute of slack) — the delay has now been served. + manual.advance(chrono::Duration::days(3) + chrono::Duration::minutes(1)); + tick(&s, &h, &config).await; + + let refreshed = s.get_instance(inst.id).await.unwrap().unwrap(); + assert_eq!( + refreshed.state, + InstanceState::Completed, + "after advancing the clock past the delay the instance must complete" + ); + // Both blocks actually executed. + for block in ["wait_3d", "after_delay"] { + let output = s + .get_block_output(inst.id, &BlockId::new(block)) + .await + .unwrap(); + assert!(output.is_some(), "block {block} must have an output"); + } +} + +/// A step outside its send window is deferred to the window's opening, and +/// advancing the clock across the boundary lets it execute. +#[tokio::test] +async fn send_window_boundary_crossed_with_manual_clock() { + let s = storage().await; + let window = SendWindow { + start_hour: 9, + end_hour: 17, + days: vec![], + }; + let seq = mk_sequence(vec![mk_step_with_send_window("notify", "noop", window)]); + s.create_sequence(&seq).await.unwrap(); + let inst = mk_instance_scheduled(seq.id, serde_json::json!({})); + s.create_instance(&inst).await.unwrap(); + + // A fixed future instant outside the window: 2030-01-07 (a Monday) 03:00 + // UTC. Being in the future keeps the forward-only ManualClock discipline. + let night = Utc.with_ymd_and_hms(2030, 1, 7, 3, 0, 0).unwrap(); + let manual = Arc::new(ManualClock::new(night)); + let config = config_with_clock(&manual); + let h = Arc::new(registry()); + + // Tick 1: outside the window — deferred to 09:00 the same (virtual) day. + tick(&s, &h, &config).await; + let refreshed = s.get_instance(inst.id).await.unwrap().unwrap(); + assert_eq!(refreshed.state, InstanceState::Scheduled); + let next_open = refreshed.next_fire_at.expect("deferred fire_at"); + assert_eq!( + next_open, + Utc.with_ymd_and_hms(2030, 1, 7, 9, 0, 0).unwrap(), + "must defer to the window opening" + ); + + // Tick 2 while still at night: not due, no execution. + let result = tick(&s, &h, &config).await; + assert_eq!(result.steps_executed, 0); + + // Jump into the window and tick: the step now runs to completion. + manual.set(Utc.with_ymd_and_hms(2030, 1, 7, 9, 30, 0).unwrap()); + tick(&s, &h, &config).await; + + let refreshed = s.get_instance(inst.id).await.unwrap().unwrap(); + assert_eq!(refreshed.state, InstanceState::Completed); + let output = s + .get_block_output(inst.id, &BlockId::new("notify")) + .await + .unwrap(); + assert!( + output.is_some(), + "step must have executed inside the window" + ); +} + +/// With the default `SystemClock` nothing changes: a delayed instance stays +/// deferred within a fast test run (sanity check that virtual time is opt-in). +#[tokio::test] +async fn system_clock_default_keeps_delay_pending() { + let s = storage().await; + let delay = DelaySpec { + duration: Duration::from_secs(3600), + business_days_only: false, + jitter: None, + holidays: vec![], + fire_at_local: None, + timezone: None, + }; + let seq = mk_sequence(vec![mk_step_with_delay_spec("wait_1h", "noop", delay)]); + s.create_sequence(&seq).await.unwrap(); + let inst = mk_instance_scheduled(seq.id, serde_json::json!({})); + s.create_instance(&inst).await.unwrap(); + + let config = default_config(); // SystemClock + let h = Arc::new(registry()); + + tick(&s, &h, &config).await; + let refreshed = s.get_instance(inst.id).await.unwrap().unwrap(); + assert_eq!(refreshed.state, InstanceState::Scheduled); + + // Re-tick immediately: still pending — real time has not advanced 1h. + let result = tick(&s, &h, &config).await; + assert_eq!(result.steps_executed, 0); + let refreshed = s.get_instance(inst.id).await.unwrap().unwrap(); + assert_eq!(refreshed.state, InstanceState::Scheduled); +} diff --git a/orch8-engine/tests/common/mod.rs b/orch8-engine/tests/common/mod.rs index f3639e18..59e2130d 100644 --- a/orch8-engine/tests/common/mod.rs +++ b/orch8-engine/tests/common/mod.rs @@ -682,6 +682,7 @@ pub fn default_config() -> SchedulerConfig { node_reaper_stale_secs: 120, cron_tick_secs: 10, max_steps_per_instance: 0, + clock: orch8_types::clock::SharedClock::default(), artifact_retention_secs: 0, } } diff --git a/orch8-types/src/clock.rs b/orch8-types/src/clock.rs new file mode 100644 index 00000000..7d3b4973 --- /dev/null +++ b/orch8-types/src/clock.rs @@ -0,0 +1,202 @@ +//! Virtual time for the scheduler. +//! +//! All *scheduling decisions* in the engine (claiming due instances, delay / +//! send-window / rate-limit deferrals, retry backoff, SLA-deadline and +//! human-input timeout checks, cron evaluation) read "now" through a +//! [`Clock`] instead of calling [`Utc::now()`] directly. Production code uses +//! [`SystemClock`] (the default everywhere), which is byte-for-byte the old +//! behavior. Tests inject a [`ManualClock`] and advance it explicitly, so a +//! workflow with a 3-day delay completes in a millisecond-scale test run. +//! +//! # What the clock does NOT govern +//! +//! Record timestamps stay on real time by design: +//! +//! - `created_at` / `updated_at` stamping of rows (instances, outputs, +//! signals, worker tasks) — these are audit data, not scheduling inputs. +//! - Database-side `NOW()` in `UPDATE ... SET updated_at = NOW()` clauses. +//! The claim queries themselves bind an app-side timestamp parameter +//! (`next_fire_at <= $now`), so the virtual clock fully controls claiming; +//! clock skew between app and DB only affects audit columns. +//! - Background sweeps outside the tick loop (stale-instance recovery, GC, +//! trigger sync) and logging. +//! +//! # `ManualClock` discipline +//! +//! Start a [`ManualClock`] **at or after the current real time** and only move +//! it forward. A few "wake immediately" paths (e.g. parent wake-up after a +//! child completes) stamp `next_fire_at` with real time; as long as virtual +//! time >= real time those instances remain due under the virtual clock. + +use std::fmt; +use std::sync::{Arc, RwLock}; + +use chrono::{DateTime, Duration, Utc}; + +/// Source of "now" for scheduling decisions. +/// +/// Implementations must be cheap to call and safe to share across tasks. +pub trait Clock: Send + Sync + 'static { + /// The current instant according to this clock. + fn now(&self) -> DateTime; +} + +/// Production clock: returns the real [`Utc::now()`]. +#[derive(Debug, Clone, Copy, Default)] +pub struct SystemClock; + +impl Clock for SystemClock { + fn now(&self) -> DateTime { + Utc::now() + } +} + +/// Test clock: starts at a fixed instant and only moves when told to. +/// +/// Share it via `Arc` — keep one handle for the scheduler (as a +/// [`SharedClock`]) and one for the test body to call [`ManualClock::advance`] +/// or [`ManualClock::set`]. +#[derive(Debug)] +pub struct ManualClock { + now: RwLock>, +} + +impl ManualClock { + /// Create a clock frozen at `start`. + #[must_use] + pub fn new(start: DateTime) -> Self { + Self { + now: RwLock::new(start), + } + } + + /// Move the clock forward (or backward, for negative durations) by `delta`. + pub fn advance(&self, delta: Duration) { + let mut now = self.now.write().expect("ManualClock lock poisoned"); + *now += delta; + } + + /// Jump the clock to an absolute instant. + pub fn set(&self, to: DateTime) { + let mut now = self.now.write().expect("ManualClock lock poisoned"); + *now = to; + } +} + +impl Clock for ManualClock { + fn now(&self) -> DateTime { + *self.now.read().expect("ManualClock lock poisoned") + } +} + +/// Cheaply cloneable, type-erased clock handle. +/// +/// Lives inside [`crate::config::SchedulerConfig`] so the scheduler, cron +/// loop, and step pre-flight checks all read the same time source without any +/// public signature changes. Defaults to [`SystemClock`]; the field is skipped +/// during (de)serialization, so config files are unaffected. +#[derive(Clone)] +pub struct SharedClock(Arc); + +impl SharedClock { + /// Wrap a concrete clock. + pub fn new(clock: C) -> Self { + Self(Arc::new(clock)) + } + + /// Wrap an already-shared clock (e.g. an `Arc` the test also + /// keeps a handle to for advancing time). + #[must_use] + pub fn from_arc(clock: Arc) -> Self { + Self(clock) + } + + /// The current instant according to the wrapped clock. + #[must_use] + pub fn now(&self) -> DateTime { + self.0.now() + } +} + +impl Clock for SharedClock { + fn now(&self) -> DateTime { + self.0.now() + } +} + +impl Default for SharedClock { + fn default() -> Self { + Self(Arc::new(SystemClock)) + } +} + +impl fmt::Debug for SharedClock { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.write_str("SharedClock(..)") + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn system_clock_tracks_real_time() { + let clock = SystemClock; + let before = Utc::now(); + let observed = clock.now(); + let after = Utc::now(); + assert!(before <= observed && observed <= after); + } + + #[test] + fn manual_clock_is_frozen_until_advanced() { + let start = Utc::now(); + let clock = ManualClock::new(start); + assert_eq!(clock.now(), start); + assert_eq!(clock.now(), start, "repeated reads must not drift"); + } + + #[test] + fn manual_clock_advance_moves_forward() { + let start = Utc::now(); + let clock = ManualClock::new(start); + clock.advance(Duration::days(3)); + assert_eq!(clock.now(), start + Duration::days(3)); + clock.advance(Duration::seconds(1)); + assert_eq!( + clock.now(), + start + Duration::days(3) + Duration::seconds(1) + ); + } + + #[test] + fn manual_clock_set_jumps_to_absolute_instant() { + let start = Utc::now(); + let clock = ManualClock::new(start); + let target = start + Duration::days(30); + clock.set(target); + assert_eq!(clock.now(), target); + // set() can also move backwards. + clock.set(start); + assert_eq!(clock.now(), start); + } + + #[test] + fn manual_clock_is_shareable_across_handles() { + let start = Utc::now(); + let manual = Arc::new(ManualClock::new(start)); + let shared = SharedClock::from_arc(Arc::clone(&manual) as Arc); + manual.advance(Duration::hours(2)); + assert_eq!(shared.now(), start + Duration::hours(2)); + } + + #[test] + fn shared_clock_defaults_to_system_time() { + let shared = SharedClock::default(); + let before = Utc::now(); + let observed = shared.now(); + let after = Utc::now(); + assert!(before <= observed && observed <= after); + } +} diff --git a/orch8-types/src/config.rs b/orch8-types/src/config.rs index 73b29a18..3bce5db0 100644 --- a/orch8-types/src/config.rs +++ b/orch8-types/src/config.rs @@ -362,6 +362,14 @@ pub struct SchedulerConfig { /// Primarily used by the mobile SDK to prevent runaway workflows on-device. #[serde(default)] pub max_steps_per_instance: u32, + /// Time source for scheduling decisions (claiming due instances, delay / + /// send-window / rate-limit deferrals, retry backoff, deadline and + /// human-input timeout checks, cron evaluation). Defaults to + /// [`crate::clock::SystemClock`] — production behavior is unchanged. + /// Tests inject a [`crate::clock::ManualClock`] here to advance time + /// manually. Not configurable via files/env; skipped by serde. + #[serde(skip)] + pub clock: crate::clock::SharedClock, /// Artifact retention, in seconds. When `> 0`, the background GC sweeper /// deletes the durable artifacts of instances that have been in a terminal /// state for longer than this window. `0` (default) disables the sweep — @@ -392,6 +400,7 @@ impl Default for SchedulerConfig { node_reaper_stale_secs: default_node_reaper_stale_secs(), cron_tick_secs: default_cron_tick_secs(), max_steps_per_instance: 0, + clock: crate::clock::SharedClock::default(), artifact_retention_secs: 0, } } diff --git a/orch8-types/src/lib.rs b/orch8-types/src/lib.rs index 402c6d72..a455e9c1 100644 --- a/orch8-types/src/lib.rs +++ b/orch8-types/src/lib.rs @@ -4,6 +4,7 @@ pub mod audit; pub mod auth; pub mod checkpoint; pub mod circuit_breaker; +pub mod clock; pub mod cluster; pub mod config; pub mod context;