Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -321,7 +321,7 @@ Chart repo: [orch8-io/helm-charts](https://github.com/orch8-io/helm-charts)
Pre-1.0. This is the public release of an engine that has been running my own production for several months, with 4,347 tests covering core paths. Honest about what it isn't yet:

- **Not battle-tested at Temporal-scale.** Largest internal load test: ~10K concurrent instances. If you're past that or have multiple engineers depending on uptime, run Temporal until 1.0.
- **No replay debugger or time-skipping test tooling.** Temporal's SDKs ship deterministic replay + timer-skip helpers; we don't.
- **No replay debugger.** Temporal's SDKs ship deterministic replay; we don't yet. Time-skipping tests *are* supported: inject a `ManualClock` via `SchedulerConfig::clock` and advance virtual time manually — a workflow with a 3-day delay completes in a millisecond-scale test.
- **Workflow versioning is younger.** Sequence definitions are versioned, but the migration ergonomics for in-flight instances aren't as polished as Temporal's `GetVersion` / patch system.
- **SDK depth varies.** TypeScript SDK has both authoring + worker support; Go and Python SDKs are worker-focused for now.
- **API is stable but evolving.** Pre-1.0 means breaking changes are possible; we'll mark them in releases and keep them minimal.
Expand Down
57 changes: 44 additions & 13 deletions orch8-engine/src/cron.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,27 +2,42 @@ use std::str::FromStr;
use std::sync::Arc;
use std::time::Duration;

use chrono::Utc;
use chrono::{DateTime, Utc};
use cron::Schedule;
use tokio::task::JoinSet;
use tokio::time::interval;
use tokio_util::sync::CancellationToken;
use tracing::{debug, error, info, warn};

use orch8_storage::StorageBackend;
use orch8_types::clock::SharedClock;
use orch8_types::context::ExecutionContext;
use orch8_types::cron::CronSchedule;
use orch8_types::ids::InstanceId;
use orch8_types::instance::{InstanceState, Priority, TaskInstance};

use crate::error::EngineError;

/// Run the cron tick loop. Checks every `tick_interval` for due cron schedules,
/// creates instances for each, and advances their `next_fire_at`.
/// Run the cron tick loop on the system clock. Checks every `tick_interval`
/// for due cron schedules, creates instances for each, and advances their
/// `next_fire_at`. See [`run_cron_loop_with_clock`] for virtual-time use.
pub async fn run_cron_loop(
storage: Arc<dyn StorageBackend>,
tick_interval: Duration,
cancel: CancellationToken,
) -> Result<(), EngineError> {
run_cron_loop_with_clock(storage, tick_interval, SharedClock::default(), cancel).await
}

/// [`run_cron_loop`] reading "now" from the supplied scheduler clock.
///
/// Note: the loop still *ticks* on real (tokio) time; the clock only governs
/// which schedules are considered due and how their next fire is computed.
pub async fn run_cron_loop_with_clock(
storage: Arc<dyn StorageBackend>,
tick_interval: Duration,
clock: SharedClock,
cancel: CancellationToken,
) -> Result<(), EngineError> {
let mut ticker = interval(tick_interval);
ticker.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
Expand All @@ -39,16 +54,19 @@ pub async fn run_cron_loop(
return Ok(());
}
_ = ticker.tick() => {
if let Err(e) = process_cron_tick(&storage).await {
if let Err(e) = process_cron_tick(&storage, &clock).await {
error!(error = %e, "cron tick failed");
}
}
}
}
}

async fn process_cron_tick(storage: &Arc<dyn StorageBackend>) -> Result<(), EngineError> {
let now = Utc::now();
async fn process_cron_tick(
storage: &Arc<dyn StorageBackend>,
clock: &SharedClock,
) -> Result<(), EngineError> {
let now = clock.now();
let schedules = storage.claim_due_cron_schedules(now).await?;

if schedules.is_empty() {
Expand All @@ -63,9 +81,10 @@ async fn process_cron_tick(storage: &Arc<dyn StorageBackend>) -> Result<(), Engi
let mut tasks: JoinSet<()> = JoinSet::new();
for schedule in schedules {
let storage = Arc::clone(storage);
let clock = clock.clone();
tasks.spawn(async move {
let cron_id = schedule.id;
if let Err(e) = trigger_cron_schedule(storage.as_ref(), &schedule).await {
if let Err(e) = trigger_cron_schedule(storage.as_ref(), &schedule, &clock).await {
error!(
cron_id = %cron_id,
error = %e,
Expand All @@ -86,8 +105,9 @@ async fn process_cron_tick(storage: &Arc<dyn StorageBackend>) -> Result<(), Engi
async fn trigger_cron_schedule(
storage: &dyn StorageBackend,
schedule: &CronSchedule,
clock: &SharedClock,
) -> Result<(), EngineError> {
let now = Utc::now();
let now = clock.now();

// Create a new instance for this schedule.
let instance = TaskInstance {
Expand Down Expand Up @@ -135,7 +155,7 @@ async fn trigger_cron_schedule(

// Calculate next fire time — always advance even on failure so we don't
// retry the same tick in a hot error loop.
let next = calculate_next_fire(schedule);
let next = calculate_next_fire_after(schedule, now);
match next {
Some(next_at) => {
storage
Expand Down Expand Up @@ -176,19 +196,30 @@ fn normalize_cron_expr(expr: &str) -> String {
}
}

/// Calculate the next fire time from a cron expression, respecting the
/// schedule's timezone. Falls back to UTC if the timezone is invalid.
/// Calculate the next fire time from a cron expression after the real current
/// time, respecting the schedule's timezone. Falls back to UTC if the
/// timezone is invalid.
pub fn calculate_next_fire(schedule: &CronSchedule) -> Option<chrono::DateTime<Utc>> {
calculate_next_fire_after(schedule, Utc::now())
}

/// [`calculate_next_fire`] evaluated against an explicit "now" — the cron
/// loop passes its scheduler clock's instant so virtual time controls
/// schedule advancement in tests.
pub fn calculate_next_fire_after(
schedule: &CronSchedule,
now: DateTime<Utc>,
) -> Option<chrono::DateTime<Utc>> {
let normalized = normalize_cron_expr(&schedule.cron_expr);
let cron_schedule = Schedule::from_str(&normalized).ok()?;
if let Ok(tz) = schedule.timezone.parse::<chrono_tz::Tz>() {
let now_tz = Utc::now().with_timezone(&tz);
let now_tz = now.with_timezone(&tz);
cron_schedule
.after(&now_tz)
.next()
.map(|dt| dt.with_timezone(&Utc))
} else {
cron_schedule.upcoming(Utc).next()
cron_schedule.after(&now).next()
}
}

Expand Down
11 changes: 10 additions & 1 deletion orch8-engine/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,11 @@
pub mod circuit_breaker;
pub mod credentials;
pub mod cron;
/// Virtual time for scheduling decisions — re-exported from `orch8-types` so
/// engine users can write `orch8_engine::clock::ManualClock`.
pub mod clock {
pub use orch8_types::clock::{Clock, ManualClock, SharedClock, SystemClock};
}
pub mod error;
pub mod evaluator;
pub mod expression;
Expand Down Expand Up @@ -270,8 +275,12 @@ impl Engine {
let cron_storage = Arc::clone(&self.storage);
let cron_cancel = self.cancel.clone();
let cron_tick = std::time::Duration::from_secs(self.config.cron_tick_secs);
let cron_clock = self.config.clock.clone();
set.spawn(async move {
if let Err(e) = cron::run_cron_loop(cron_storage, cron_tick, cron_cancel).await {
if let Err(e) =
cron::run_cron_loop_with_clock(cron_storage, cron_tick, cron_clock, cron_cancel)
.await
{
tracing::error!(error = %e, "cron loop exited with error");
}
});
Expand Down
Loading
Loading