From 7aad79505f3d610fa6dcd5964e4616ce85c25e05 Mon Sep 17 00:00:00 2001 From: Oleksii Date: Wed, 10 Jun 2026 00:41:00 -0300 Subject: [PATCH 1/2] =?UTF-8?q?feat(orch8):=20embeddable=20crate=20?= =?UTF-8?q?=E2=80=94=20durable=20execution=20as=20a=20Rust=20library?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit New workspace member `orch8`: a curated pre-1.0 facade over orch8-engine/orch8-storage/orch8-types that embeds the durable workflow engine in any tokio application. - Engine::builder(): .storage(sqlite file / sqlite in-memory / postgres), .handler(name, async closure) on top of the server's built-in registry, .tick_interval(Duration), .tenant(..); build() opens storage, applies schema/migrations and runs crash recovery (mirrors server startup). - engine.start()/shutdown(): background tick loop on the HOST's runtime with CancellationToken + graceful drain; engine.tick_once() for manual-tick embedding. - Instance/sequence ops without HTTP: upsert_sequence (idempotent per name+version), create_instance (options struct with Default, including idempotency-key dedupe), get_instance, send_signal (terminal-state rejection + wake), list_instances. - Examples: quickstart (sqlite file, custom handler, templated 2-step run), kill_resistant (run twice — instance survives process exit), embedded_axum (shared engine behind HTTP endpoints). - Integration tests: background-loop completion, manual ticking, human-input signal wake, bounded graceful shutdown, facade CRUD semantics. Rustdoc with inline quickstart; doc warnings denied. Co-Authored-By: Claude Fable 5 --- Cargo.lock | 19 ++ Cargo.toml | 1 + orch8/Cargo.toml | 29 +++ orch8/examples/embedded_axum.rs | 103 ++++++++++ orch8/examples/kill_resistant.rs | 102 ++++++++++ orch8/examples/quickstart.rs | 99 ++++++++++ orch8/src/builder.rs | 105 ++++++++++ orch8/src/engine.rs | 328 +++++++++++++++++++++++++++++++ orch8/src/error.rs | 45 +++++ orch8/src/lib.rs | 119 +++++++++++ orch8/src/storage.rs | 65 ++++++ orch8/tests/engine.rs | 279 ++++++++++++++++++++++++++ 12 files changed, 1294 insertions(+) create mode 100644 orch8/Cargo.toml create mode 100644 orch8/examples/embedded_axum.rs create mode 100644 orch8/examples/kill_resistant.rs create mode 100644 orch8/examples/quickstart.rs create mode 100644 orch8/src/builder.rs create mode 100644 orch8/src/engine.rs create mode 100644 orch8/src/error.rs create mode 100644 orch8/src/lib.rs create mode 100644 orch8/src/storage.rs create mode 100644 orch8/tests/engine.rs diff --git a/Cargo.lock b/Cargo.lock index 69034d6..2886499 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2765,6 +2765,25 @@ version = "0.2.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7c87def4c32ab89d880effc9e097653c8da5d6ef28e6b539d313baaacfbafcbe" +[[package]] +name = "orch8" +version = "0.5.0" +dependencies = [ + "axum 0.8.9", + "chrono", + "orch8-engine", + "orch8-storage", + "orch8-types", + "serde", + "serde_json", + "tempfile", + "thiserror 2.0.18", + "tokio", + "tokio-util", + "tracing", + "uuid", +] + [[package]] name = "orch8-api" version = "0.5.0" diff --git a/Cargo.toml b/Cargo.toml index 80db737..e0c07b9 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,5 +1,6 @@ [workspace] members = [ + "orch8", "orch8-types", "orch8-storage", "orch8-engine", diff --git a/orch8/Cargo.toml b/orch8/Cargo.toml new file mode 100644 index 0000000..9a2c1f3 --- /dev/null +++ b/orch8/Cargo.toml @@ -0,0 +1,29 @@ +[package] +name = "orch8" +version.workspace = true +license.workspace = true +edition.workspace = true +rust-version.workspace = true +description = "Durable workflow execution as an embeddable Rust library" +# Pre-1.0 embeddable facade — not published yet. +publish = false + +[dependencies] +orch8-engine = { path = "../orch8-engine" } +orch8-storage = { path = "../orch8-storage" } +orch8-types = { path = "../orch8-types" } +tokio = { workspace = true } +tokio-util = { workspace = true } +serde_json = { workspace = true } +chrono = { workspace = true } +uuid = { workspace = true } +thiserror = { workspace = true } +tracing = { workspace = true } + +[dev-dependencies] +axum = { workspace = true } +serde = { workspace = true } +tempfile = "3" + +[lints] +workspace = true diff --git a/orch8/examples/embedded_axum.rs b/orch8/examples/embedded_axum.rs new file mode 100644 index 0000000..b4e6c95 --- /dev/null +++ b/orch8/examples/embedded_axum.rs @@ -0,0 +1,103 @@ +//! Embedding the engine in an axum web service: HTTP requests create durable +//! workflow instances on a shared [`orch8::Engine`]. +//! +//! ```sh +//! cargo run -p orch8 --example embedded_axum +//! curl -X POST localhost:3000/orders # -> {"instance_id": "..."} +//! curl localhost:3000/orders/ # -> {"state": "completed", ...} +//! ``` + +use axum::extract::{Path, State}; +use axum::http::StatusCode; +use axum::routing::{get, post}; +use axum::{Json, Router}; + +use orch8::{Engine, InstanceId, SequenceId, Storage}; + +fn sequence_json() -> serde_json::Value { + serde_json::json!({ + "id": uuid::Uuid::now_v7(), + "tenant_id": "default", + "namespace": "default", + "name": "fulfil-order", + "version": 1, + "blocks": [ + { "type": "step", "id": "reserve", "handler": "reserve_stock", "params": {} }, + { "type": "step", "id": "notify", "handler": "log", "params": { "message": "order fulfilled" } } + ], + "created_at": chrono::Utc::now().to_rfc3339() + }) +} + +/// Shared application state: the engine (cheap to clone) plus the sequence +/// new orders should run. +#[derive(Clone)] +struct AppState { + engine: Engine, + sequence_id: SequenceId, +} + +#[tokio::main] +async fn main() -> Result<(), Box> { + let engine = Engine::builder() + .storage(Storage::sqlite_in_memory()) + .handler("reserve_stock", |_ctx: orch8::StepContext| async move { + Ok(serde_json::json!({ "reserved": true })) + }) + .build() + .await?; + + let sequence_id = engine + .upsert_sequence(serde_json::from_value(sequence_json())?) + .await?; + + engine.start(); + + let app = Router::new() + .route("/orders", post(create_order)) + .route("/orders/{id}", get(get_order)) + .with_state(AppState { + engine: engine.clone(), + sequence_id, + }); + + let listener = tokio::net::TcpListener::bind("127.0.0.1:3000").await?; + println!("listening on http://127.0.0.1:3000 (POST /orders, GET /orders/{{id}})"); + + axum::serve(listener, app) + .with_graceful_shutdown(async { + let _ = tokio::signal::ctrl_c().await; + }) + .await?; + + engine.shutdown().await; + Ok(()) +} + +async fn create_order( + State(state): State, +) -> Result, (StatusCode, String)> { + let id = state + .engine + .create_instance(state.sequence_id, orch8::CreateInstanceOptions::default()) + .await + .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))?; + Ok(Json(serde_json::json!({ "instance_id": id }))) +} + +async fn get_order( + State(state): State, + Path(id): Path, +) -> Result, (StatusCode, String)> { + let instance = state + .engine + .get_instance(InstanceId::from_uuid(id)) + .await + .map_err(|e| (StatusCode::NOT_FOUND, e.to_string()))?; + Ok(Json(serde_json::json!({ + "instance_id": instance.id, + "state": instance.state, + "context": instance.context.data, + "updated_at": instance.updated_at, + }))) +} diff --git a/orch8/examples/kill_resistant.rs b/orch8/examples/kill_resistant.rs new file mode 100644 index 0000000..bb23054 --- /dev/null +++ b/orch8/examples/kill_resistant.rs @@ -0,0 +1,102 @@ +//! Kill-resistance demo: workflows survive process exit because all state +//! lives in `SQLite`. +//! +//! Run this binary **twice**: +//! +//! ```sh +//! cargo run -p orch8 --example kill_resistant # creates a delayed instance, then exits +//! cargo run -p orch8 --example kill_resistant # the instance resumes and completes +//! ``` +//! +//! The first run registers a sequence and creates an instance scheduled a few +//! seconds in the future, then exits before it fires — simulating a crash +//! mid-run. The second run reopens the same database, finds the live +//! instance, and the scheduler picks it up exactly where it left off. + +use std::time::Duration; + +use orch8::{CreateInstanceOptions, Engine, InstanceFilter, InstanceState, Storage}; + +const DELAY: Duration = Duration::from_secs(5); + +fn sequence_json() -> serde_json::Value { + serde_json::json!({ + "id": uuid::Uuid::now_v7(), + "tenant_id": "default", + "namespace": "default", + "name": "kill-resistant", + "version": 1, + "blocks": [ + { "type": "step", "id": "survive", "handler": "survive", "params": {} }, + { "type": "step", "id": "finish", "handler": "log", "params": { "message": "all steps done" } } + ], + "created_at": chrono::Utc::now().to_rfc3339() + }) +} + +#[tokio::main] +async fn main() -> Result<(), Box> { + let db_path = std::env::temp_dir().join("orch8-kill-resistant.db"); + println!("database: {}", db_path.display()); + + let engine = Engine::builder() + .storage(Storage::sqlite(&db_path)) + .handler("survive", |_ctx: orch8::StepContext| async move { + println!("[survive] step executed — the workflow outlived the first process!"); + Ok(serde_json::json!({ "survived": true })) + }) + .build() + .await?; + + // Any instance still in a non-terminal state from a previous run? + let live_filter = InstanceFilter { + states: Some(vec![ + InstanceState::Scheduled, + InstanceState::Running, + InstanceState::Waiting, + InstanceState::Paused, + ]), + ..InstanceFilter::default() + }; + let live = engine.list_instances(&live_filter).await?; + + if let Some(pending) = live.first() { + // --- Second run: resume the crashed workflow. --- + println!("found surviving instance {} — resuming", pending.id); + engine.start(); + loop { + let snapshot = engine.get_instance(pending.id).await?; + if matches!( + snapshot.state, + InstanceState::Completed | InstanceState::Failed | InstanceState::Cancelled + ) { + println!("instance finished with state {:?}", snapshot.state); + break; + } + tokio::time::sleep(Duration::from_millis(100)).await; + } + engine.shutdown().await; + } else { + // --- First run: create a delayed instance and "crash". --- + let seq_id = engine + .upsert_sequence(serde_json::from_value(sequence_json())?) + .await?; + let inst = engine + .create_instance( + seq_id, + CreateInstanceOptions { + next_fire_at: Some(chrono::Utc::now() + DELAY), + ..Default::default() + }, + ) + .await?; + engine.start(); + println!("created instance {inst}, scheduled to fire in {DELAY:?}"); + tokio::time::sleep(Duration::from_secs(1)).await; + println!("exiting before it runs (simulated crash) — run me again to watch it resume"); + // Intentionally no graceful shutdown: the process just goes away. + // Everything the workflow needs is already on disk. + } + + Ok(()) +} diff --git a/orch8/examples/quickstart.rs b/orch8/examples/quickstart.rs new file mode 100644 index 0000000..9f1006f --- /dev/null +++ b/orch8/examples/quickstart.rs @@ -0,0 +1,99 @@ +//! Quickstart: file-backed `SQLite`, one custom handler, a two-step sequence +//! run to completion. +//! +//! ```sh +//! cargo run -p orch8 --example quickstart +//! ``` + +use std::time::Duration; + +use orch8::{Engine, InstanceState, Storage}; + +/// A two-step workflow: a custom `charge_card` step, then a `send_receipt` +/// step whose params are templated from the first step's output. +fn sequence_json() -> serde_json::Value { + serde_json::json!({ + "id": uuid::Uuid::now_v7(), + "tenant_id": "default", + "namespace": "default", + "name": "payment", + "version": 1, + "blocks": [ + { + "type": "step", + "id": "charge", + "handler": "charge_card", + "params": { "amount_cents": 4_200 } + }, + { + "type": "step", + "id": "receipt", + "handler": "send_receipt", + "params": { "summary": "charged {{outputs.charge.amount_cents}} cents (ref {{outputs.charge.reference}})" } + } + ], + "created_at": chrono::Utc::now().to_rfc3339() + }) +} + +#[tokio::main] +async fn main() -> Result<(), Box> { + let db_path = std::env::temp_dir().join("orch8-quickstart.db"); + + let engine = Engine::builder() + .storage(Storage::sqlite(&db_path)) + .handler("charge_card", |ctx: orch8::StepContext| async move { + let amount = ctx.params.get("amount_cents").cloned().unwrap_or_default(); + println!("[charge_card] charging {amount} cents"); + Ok(serde_json::json!({ + "charged": true, + "amount_cents": amount, + "reference": "ch_12345", + })) + }) + .handler("send_receipt", |ctx: orch8::StepContext| async move { + let summary = ctx + .params + .get("summary") + .and_then(|v| v.as_str()) + .unwrap_or("(no summary)") + .to_string(); + println!("[send_receipt] {summary}"); + Ok(serde_json::json!({ "receipt_sent": true, "summary": summary })) + }) + .build() + .await?; + + // Background tick loop on this process's tokio runtime. + engine.start(); + + let seq_id = engine + .upsert_sequence(serde_json::from_value(sequence_json())?) + .await?; + let inst = engine + .create_instance(seq_id, orch8::CreateInstanceOptions::default()) + .await?; + println!("created instance {inst}"); + + // Poll until the instance reaches a terminal state. + loop { + let snapshot = engine.get_instance(inst).await?; + match snapshot.state { + InstanceState::Completed => { + println!( + "instance completed; final context.data = {}", + snapshot.context.data + ); + break; + } + InstanceState::Failed | InstanceState::Cancelled => { + println!("instance ended in {:?}", snapshot.state); + break; + } + _ => tokio::time::sleep(Duration::from_millis(50)).await, + } + } + + engine.shutdown().await; + Ok(()) +} diff --git a/orch8/src/builder.rs b/orch8/src/builder.rs new file mode 100644 index 0000000..d1c5591 --- /dev/null +++ b/orch8/src/builder.rs @@ -0,0 +1,105 @@ +use std::future::Future; +use std::time::Duration; + +use orch8_engine::handlers::{HandlerRegistry, StepContext}; +use orch8_engine::recovery; +use orch8_types::config::SchedulerConfig; +use orch8_types::error::StepError; +use orch8_types::ids::TenantId; + +use crate::engine::Engine; +use crate::error::Error; +use crate::storage::Storage; + +/// Builder for an embedded [`Engine`]. Obtain via [`Engine::builder`]. +/// +/// At minimum a [`Storage`] must be configured; everything else has +/// sensible defaults (100 ms tick interval, tenant `"default"`, the full +/// built-in handler set). +#[must_use = "call .build().await to construct the engine"] +pub struct EngineBuilder { + storage: Option, + handlers: HandlerRegistry, + tick_interval: Duration, + tenant: String, +} + +impl EngineBuilder { + pub(crate) fn new() -> Self { + let mut handlers = HandlerRegistry::new(); + // Same default registry the server wires up at startup: all built-in + // handlers (noop, log, sleep, http_request, transform, ...). + orch8_engine::handlers::builtin::register_builtins(&mut handlers); + Self { + storage: None, + handlers, + tick_interval: Duration::from_millis(SchedulerConfig::default().tick_interval_ms), + tenant: "default".to_string(), + } + } + + /// Select the storage backend (required). See [`Storage`]. + pub fn storage(mut self, storage: Storage) -> Self { + self.storage = Some(storage); + self + } + + /// Register a custom step handler under `name`. + /// + /// Handlers are plain async functions taking a [`StepContext`] and + /// returning JSON output. Registering a name that collides with a + /// built-in handler replaces the built-in. + pub fn handler(mut self, name: &str, handler: F) -> Self + where + F: Fn(StepContext) -> Fut + Send + Sync + 'static, + Fut: Future> + Send + 'static, + { + self.handlers.register(name, handler); + self + } + + /// Scheduler tick interval for the background loop started by + /// [`Engine::start`]. Default: 100 ms. + pub fn tick_interval(mut self, interval: Duration) -> Self { + self.tick_interval = interval; + self + } + + /// Default tenant used by [`Engine::create_instance`] for instance + /// scoping. Default: `"default"`. + pub fn tenant(mut self, tenant: impl Into) -> Self { + self.tenant = tenant.into(); + self + } + + /// Open the storage backend (applying schema/migrations), recover any + /// instances left `Running` by a previous crash, and return the engine. + /// + /// Must be called from within a tokio runtime. + pub async fn build(self) -> Result { + let storage_cfg = self.storage.ok_or_else(|| { + Error::Config( + "no storage configured — call .storage(Storage::sqlite(..)) on the builder" + .to_string(), + ) + })?; + + let tenant = TenantId::new(self.tenant).map_err(Error::Config)?; + + let storage = storage_cfg.connect().await?; + + let config = SchedulerConfig { + tick_interval_ms: u64::try_from(self.tick_interval.as_millis()) + .unwrap_or(u64::MAX) + .max(1), + ..SchedulerConfig::default() + }; + + // Crash recovery, mirroring server startup: instances stuck in + // `Running` longer than the stale threshold go back to `Scheduled`. + recovery::recover_stale_instances(storage.as_ref(), config.stale_instance_threshold_secs) + .await?; + + Ok(Engine::from_parts(storage, self.handlers, config, tenant)) + } +} diff --git a/orch8/src/engine.rs b/orch8/src/engine.rs new file mode 100644 index 0000000..fa56889 --- /dev/null +++ b/orch8/src/engine.rs @@ -0,0 +1,328 @@ +use std::sync::{Arc, Mutex}; +use std::time::Duration; + +use chrono::{DateTime, Utc}; +use tokio::sync::Semaphore; +use tokio::task::JoinHandle; +use tokio_util::sync::CancellationToken; + +use orch8_engine::handlers::HandlerRegistry; +use orch8_engine::scheduler::{run_tick_loop, tick_once, TickOnceResult}; +use orch8_engine::sequence_cache::SequenceCache; +use orch8_storage::StorageBackend; +use orch8_types::config::SchedulerConfig; +use orch8_types::context::ExecutionContext; +use orch8_types::error::StorageError; +use orch8_types::filter::{InstanceFilter, Pagination}; +use orch8_types::ids::{InstanceId, Namespace, SequenceId, TenantId}; +use orch8_types::instance::{Budget, InstanceState, Priority, TaskInstance}; +use orch8_types::sequence::SequenceDefinition; +use orch8_types::signal::{Signal, SignalType}; + +use crate::builder::EngineBuilder; +use crate::error::Error; + +/// Options for [`Engine::create_instance`]. All fields have defaults, so +/// `Default::default()` starts an instance immediately with an empty context +/// in the engine's default namespace. +#[derive(Debug, Clone)] +pub struct CreateInstanceOptions { + /// Namespace the instance is scoped to. Default: `"default"`. + pub namespace: Namespace, + /// Initial execution context (`context.data` is what step handlers + /// read/write). Default: empty. + pub context: ExecutionContext, + /// Free-form metadata stored on the instance. Default: `{}`. + pub metadata: serde_json::Value, + /// Scheduling priority. Default: [`Priority::Normal`]. + pub priority: Priority, + /// Optional resource budget (token/step caps) enforced by the scheduler. + pub budget: Option, + /// Idempotency key: re-creating with the same key returns the existing + /// instance's id instead of starting a duplicate. + pub idempotency_key: Option, + /// When the instance should first fire. Default: immediately. + pub next_fire_at: Option>, + /// IANA timezone for time-window evaluation. Default: `"UTC"`. + pub timezone: String, +} + +impl Default for CreateInstanceOptions { + fn default() -> Self { + Self { + namespace: Namespace::new("default"), + context: ExecutionContext::default(), + metadata: serde_json::json!({}), + priority: Priority::Normal, + budget: None, + idempotency_key: None, + next_fire_at: None, + timezone: "UTC".to_string(), + } + } +} + +struct Inner { + storage: Arc, + handlers: Arc, + config: SchedulerConfig, + tenant: TenantId, + cancel: CancellationToken, + /// Bounds concurrent step execution for the manual [`Engine::tick_once`] + /// path. The background loop manages its own semaphore internally. + semaphore: Arc, + sequence_cache: Arc, + tick_task: Mutex>>, +} + +/// An embedded Orch8 engine. Cheap to clone (all state is behind an `Arc`), +/// so it can be shared across tasks — e.g. stored in an axum router's state. +/// +/// Construct via [`Engine::builder`]; drive it with [`Engine::start`] / +/// [`Engine::shutdown`] or manually with [`Engine::tick_once`]. +#[derive(Clone)] +pub struct Engine { + inner: Arc, +} + +impl Engine { + /// Start configuring an embedded engine. + pub fn builder() -> EngineBuilder { + EngineBuilder::new() + } + + pub(crate) fn from_parts( + storage: Arc, + handlers: HandlerRegistry, + config: SchedulerConfig, + tenant: TenantId, + ) -> Self { + let semaphore = Arc::new(Semaphore::new(config.max_concurrent_steps as usize)); + Self { + inner: Arc::new(Inner { + storage, + handlers: Arc::new(handlers), + semaphore, + sequence_cache: Arc::new(SequenceCache::new(1_000, Duration::from_secs(300))), + tenant, + cancel: CancellationToken::new(), + config, + tick_task: Mutex::new(None), + }), + } + } + + /// The default tenant instances are created under. + #[must_use] + pub fn tenant(&self) -> &TenantId { + &self.inner.tenant + } + + /// Spawn the scheduler tick loop as a background task on the **host's** + /// tokio runtime. Idempotent — calling it while the loop is already + /// running is a no-op. Must be called from within a tokio runtime. + pub fn start(&self) { + let mut guard = self + .inner + .tick_task + .lock() + .unwrap_or_else(std::sync::PoisonError::into_inner); + if guard.is_some() || self.inner.cancel.is_cancelled() { + return; + } + let storage = Arc::clone(&self.inner.storage); + let handlers = Arc::clone(&self.inner.handlers); + let config = self.inner.config.clone(); + let cancel = self.inner.cancel.clone(); + *guard = Some(tokio::spawn(async move { + if let Err(e) = run_tick_loop(storage, handlers, &config, cancel).await { + tracing::error!(error = %e, "orch8 tick loop exited with error"); + } + })); + } + + /// Gracefully shut down: cancel the tick loop and wait for in-flight + /// steps to drain (bounded by the scheduler's grace period). Terminal — + /// the engine cannot be restarted afterwards. + pub async fn shutdown(&self) { + self.inner.cancel.cancel(); + let task = self + .inner + .tick_task + .lock() + .unwrap_or_else(std::sync::PoisonError::into_inner) + .take(); + if let Some(task) = task { + let _ = task.await; + } + } + + /// Execute a single scheduling pass and return what it did. For hosts + /// that drive the engine manually (test harnesses, cooperative + /// schedulers) instead of running the background loop. + pub async fn tick_once(&self) -> Result { + let result = tick_once( + &self.inner.storage, + &self.inner.handlers, + &self.inner.semaphore, + &self.inner.config, + &self.inner.sequence_cache, + &self.inner.cancel, + ) + .await?; + Ok(result) + } + + /// Register a sequence definition (idempotently). + /// + /// The definition is validated, then stored. If a sequence with the same + /// `(tenant_id, namespace, name, version)` already exists, the existing + /// definition is left untouched and its id is returned — published + /// versions are immutable so running instances stay pinned to the + /// definition they started with. Publish changes by bumping `version`. + pub async fn upsert_sequence(&self, seq: SequenceDefinition) -> Result { + seq.validate() + .map_err(|e| Error::InvalidSequence(e.to_string()))?; + if let Some(existing) = self + .inner + .storage + .get_sequence_by_name(&seq.tenant_id, &seq.namespace, &seq.name, Some(seq.version)) + .await? + { + return Ok(existing.id); + } + self.inner.storage.create_sequence(&seq).await?; + Ok(seq.id) + } + + /// Create (start) a workflow instance of `sequence_id`. + /// + /// The instance is persisted in `Scheduled` state and picked up by the + /// next tick. If `opts.idempotency_key` matches an existing instance of + /// this tenant, that instance's id is returned instead of creating a + /// duplicate. + pub async fn create_instance( + &self, + sequence_id: SequenceId, + opts: CreateInstanceOptions, + ) -> Result { + let now = Utc::now(); + + // Resolve the sequence up-front so a missing id surfaces as NotFound + // rather than a foreign-key violation from the storage layer. + self.inner + .storage + .get_sequence(sequence_id) + .await? + .ok_or_else(|| Error::NotFound(format!("sequence {}", sequence_id.into_uuid())))?; + + if let Some(ref key) = opts.idempotency_key { + if !key.is_empty() { + if let Some(existing) = self + .inner + .storage + .find_by_idempotency_key(&self.inner.tenant, key) + .await? + { + return Ok(existing.id); + } + } + } + + let instance = TaskInstance { + id: InstanceId::new(), + sequence_id, + tenant_id: self.inner.tenant.clone(), + namespace: opts.namespace, + state: InstanceState::Scheduled, + next_fire_at: Some(opts.next_fire_at.unwrap_or(now)), + priority: opts.priority, + timezone: opts.timezone, + metadata: opts.metadata, + context: opts.context, + concurrency_key: None, + max_concurrency: None, + idempotency_key: opts.idempotency_key, + session_id: None, + parent_instance_id: None, + budget: opts.budget, + created_at: now, + updated_at: now, + }; + + self.inner.storage.create_instance(&instance).await?; + Ok(instance.id) + } + + /// Fetch the current snapshot of an instance (state, context, timestamps). + pub async fn get_instance(&self, id: InstanceId) -> Result { + self.inner + .storage + .get_instance(id) + .await? + .ok_or_else(|| Error::NotFound(format!("instance {id}"))) + } + + /// List instances matching `filter` (most recently updated first, + /// at most 100 rows). Use [`InstanceFilter::default`] to list everything. + pub async fn list_instances( + &self, + filter: &InstanceFilter, + ) -> Result, Error> { + let instances = self + .inner + .storage + .list_instances(filter, &Pagination::default().capped()) + .await?; + Ok(instances) + } + + /// Send a signal to a live instance: `Pause`, `Resume`, `Cancel`, + /// `UpdateContext`, or `Custom` (e.g. to resolve a `wait_for_input` + /// step). Wakes the instance so the signal is processed on the next tick. + /// + /// Fails with [`Error::TerminalInstance`] if the instance has already + /// completed, failed or been cancelled. + pub async fn send_signal( + &self, + id: InstanceId, + signal_type: SignalType, + payload: serde_json::Value, + ) -> Result<(), Error> { + let signal = Signal { + id: uuid::Uuid::now_v7(), + instance_id: id, + signal_type, + payload, + delivered: false, + created_at: Utc::now(), + delivered_at: None, + }; + + // Atomic enqueue: rejects if the instance is terminal (or missing), + // mirroring the HTTP API's behavior. + self.inner + .storage + .enqueue_signal_if_active(&signal) + .await + .map_err(|e| match e { + StorageError::TerminalTarget { .. } => Error::TerminalInstance(id.to_string()), + StorageError::NotFound { .. } => Error::NotFound(format!("instance {id}")), + other => Error::Storage(other), + })?; + + // Wake a Scheduled instance whose next_fire_at is in the future so + // the signal is handled promptly (critical for human-input waits). + if let Ok(Some(fresh)) = self.inner.storage.get_instance(id).await { + if fresh.state == InstanceState::Scheduled { + let _ = self + .inner + .storage + .update_instance_state(id, InstanceState::Scheduled, Some(Utc::now())) + .await; + } + } + + Ok(()) + } +} diff --git a/orch8/src/error.rs b/orch8/src/error.rs new file mode 100644 index 0000000..e2a9eb3 --- /dev/null +++ b/orch8/src/error.rs @@ -0,0 +1,45 @@ +/// Unified error type for the embeddable engine. +/// +/// Wraps the internal storage / engine error types so embedders only deal +/// with a single error enum. Marked `#[non_exhaustive]` — new variants may +/// be added before 1.0. +#[derive(Debug, thiserror::Error)] +#[non_exhaustive] +pub enum Error { + /// Builder/configuration problem (missing storage, invalid tenant, ...). + #[error("configuration error: {0}")] + Config(String), + + /// A sequence definition failed structural validation. + #[error("invalid sequence: {0}")] + InvalidSequence(String), + + /// The requested entity does not exist. + #[error("not found: {0}")] + NotFound(String), + + /// The target instance is in a terminal state (completed / failed / + /// cancelled) and cannot accept the operation (e.g. a signal). + #[error("instance {0} is in a terminal state")] + TerminalInstance(String), + + /// Error from the storage backend. + #[error(transparent)] + Storage(orch8_types::error::StorageError), + + /// Error from the scheduling engine. + #[error(transparent)] + Engine(Box), +} + +impl From for Error { + fn from(e: orch8_types::error::StorageError) -> Self { + Self::Storage(e) + } +} + +impl From for Error { + fn from(e: orch8_engine::error::EngineError) -> Self { + Self::Engine(Box::new(e)) + } +} diff --git a/orch8/src/lib.rs b/orch8/src/lib.rs new file mode 100644 index 0000000..afe517c --- /dev/null +++ b/orch8/src/lib.rs @@ -0,0 +1,119 @@ +//! # orch8 — durable execution as a Rust library +//! +//! Embed the Orch8 workflow engine directly in your application: workflows +//! survive process crashes and restarts because every state transition is +//! persisted to `SQLite` or `PostgreSQL` before it takes effect. No separate +//! server, no sidecar — just a library on your own tokio runtime. +//! +//! ## Quickstart +//! +//! ```no_run +//! use std::time::Duration; +//! +//! const SEQ_JSON: &str = r#"{ +//! "id": "0195fdc0-0000-7000-8000-000000000001", +//! "tenant_id": "default", +//! "namespace": "default", +//! "name": "payment", +//! "version": 1, +//! "blocks": [ +//! { "type": "step", "id": "charge", "handler": "charge_card", "params": {} } +//! ], +//! "created_at": "2026-01-01T00:00:00Z" +//! }"#; +//! +//! # async fn run() -> Result<(), Box> { +//! let engine = orch8::Engine::builder() +//! .storage(orch8::Storage::sqlite("app.db")) // or ::sqlite_in_memory(), ::postgres(url) +//! .handler("charge_card", |_ctx: orch8::StepContext| async move { +//! Ok(serde_json::json!({ "charged": true })) +//! }) +//! .build() +//! .await?; +//! +//! engine.start(); // background tick loop on the host's tokio runtime +//! +//! let seq_id = engine.upsert_sequence(serde_json::from_str(SEQ_JSON)?).await?; +//! let inst = engine.create_instance(seq_id, Default::default()).await?; +//! +//! // ... later: poll progress, send signals, list instances ... +//! let instance = engine.get_instance(inst).await?; +//! println!("state: {:?}", instance.state); +//! +//! engine.shutdown().await; // graceful: stop ticking, drain in-flight steps +//! # Ok(()) +//! # } +//! ``` +//! +//! ## Host runtime requirement +//! +//! The engine runs on **your** tokio runtime. [`EngineBuilder::build`] is an +//! `async fn` and [`Engine::start`] spawns a task with [`tokio::spawn`], so +//! both must be called from within a tokio runtime context (e.g. inside +//! `#[tokio::main]`). The engine never creates its own runtime — if you need +//! that (e.g. for FFI hosts), see the `orch8-mobile` crate instead. +//! +//! ## Storage choices +//! +//! | Constructor | Backing | Use case | +//! |---|---|---| +//! | [`Storage::sqlite`] | file-backed `SQLite` (WAL) | single-process apps, durable across restarts | +//! | [`Storage::sqlite_in_memory`] | in-memory `SQLite` | tests, ephemeral workloads | +//! | [`Storage::postgres`] | `PostgreSQL` | multi-node deployments, larger scale | +//! +//! `SQLite` schema setup is bundled and applied automatically on build; +//! `PostgreSQL` runs the same migrations the `orch8-server` binary applies at +//! startup. +//! +//! ## Driving the engine +//! +//! Two modes: +//! +//! - **Background loop**: [`Engine::start`] spawns the scheduler tick loop; +//! [`Engine::shutdown`] cancels it and drains in-flight steps. +//! - **Manual ticking**: call [`Engine::tick_once`] yourself — useful for +//! test harnesses and hosts that control their own cadence. Don't mix the +//! two concurrently. +//! +//! ## Stability +//! +//! This crate is a **pre-1.0 facade**: it re-exports a curated, intentionally +//! small subset of the internal crates so internals can evolve behind it. +//! Expect breaking changes between minor versions until 1.0. + +mod builder; +mod engine; +mod error; +mod storage; + +pub use builder::EngineBuilder; +pub use engine::{CreateInstanceOptions, Engine}; +pub use error::Error; +pub use storage::Storage; + +// --------------------------------------------------------------------------- +// Curated re-exports. These are the only internal types that are part of the +// public embedding surface; everything else in orch8-engine / orch8-storage / +// orch8-types is free to churn without affecting embedders. +// --------------------------------------------------------------------------- + +/// Context passed to step handlers during execution (params, instance +/// context, attempt counter, storage handle). +pub use orch8_engine::handlers::StepContext; +/// Result of a single manual tick (see [`Engine::tick_once`]). +pub use orch8_engine::scheduler::TickOnceResult as TickResult; +/// Per-instance execution context (`data`, `config`, audit trail). +pub use orch8_types::context::ExecutionContext; +/// Error type returned by step handlers — `Retryable` errors are retried per +/// the step's retry policy, `Permanent` errors fail the step immediately. +pub use orch8_types::error::StepError; +/// Filter for [`Engine::list_instances`]. +pub use orch8_types::filter::InstanceFilter; +pub use orch8_types::ids::{InstanceId, Namespace, SequenceId, TenantId}; +pub use orch8_types::instance::{Budget, InstanceState, Priority, TaskInstance}; +/// Workflow definition types: a sequence is a list of blocks (steps, +/// parallel groups, loops, routers, ...). Usually deserialized from JSON. +pub use orch8_types::sequence::{BlockDefinition, SequenceDefinition}; +/// Signal types for [`Engine::send_signal`] (`Pause`, `Resume`, `Cancel`, +/// `UpdateContext`, or `Custom`). +pub use orch8_types::signal::SignalType; diff --git a/orch8/src/storage.rs b/orch8/src/storage.rs new file mode 100644 index 0000000..52e196e --- /dev/null +++ b/orch8/src/storage.rs @@ -0,0 +1,65 @@ +use std::path::{Path, PathBuf}; +use std::sync::Arc; + +use orch8_storage::postgres::PostgresStorage; +use orch8_storage::sqlite::SqliteStorage; +use orch8_storage::StorageBackend; + +use crate::error::Error; + +/// Storage backend selection for [`crate::EngineBuilder::storage`]. +/// +/// Construct with [`Storage::sqlite`], [`Storage::sqlite_in_memory`] or +/// [`Storage::postgres`]. The connection is opened — and the schema applied — +/// when [`crate::EngineBuilder::build`] runs. +#[derive(Debug, Clone)] +pub struct Storage(pub(crate) StorageKind); + +#[derive(Debug, Clone)] +pub(crate) enum StorageKind { + SqliteFile(PathBuf), + SqliteInMemory, + Postgres(String), +} + +impl Storage { + /// File-backed `SQLite` at `path` (WAL mode). The file and the bundled + /// schema are created on first use — durable across process restarts. + pub fn sqlite(path: impl AsRef) -> Self { + Self(StorageKind::SqliteFile(path.as_ref().to_path_buf())) + } + + /// In-memory `SQLite`. All state is lost when the engine is dropped — + /// intended for tests and ephemeral workloads. + #[must_use] + pub fn sqlite_in_memory() -> Self { + Self(StorageKind::SqliteInMemory) + } + + /// `PostgreSQL` at `url` (e.g. `postgres://user:pass@host/db`). Migrations + /// are applied on build, mirroring `orch8-server` startup. + pub fn postgres(url: impl Into) -> Self { + Self(StorageKind::Postgres(url.into())) + } + + /// Open the backend and make sure its schema is in place. + pub(crate) async fn connect(self) -> Result, Error> { + match self.0 { + StorageKind::SqliteFile(path) => { + let path = path.to_string_lossy().into_owned(); + let storage = SqliteStorage::file(&path).await?; + Ok(Arc::new(storage)) + } + StorageKind::SqliteInMemory => { + let storage = SqliteStorage::in_memory().await?; + Ok(Arc::new(storage)) + } + StorageKind::Postgres(url) => { + // Same pool sizing default as the server's DatabaseConfig. + let storage = PostgresStorage::new(&url, 64, None).await?; + storage.run_migrations().await?; + Ok(Arc::new(storage)) + } + } + } +} diff --git a/orch8/tests/engine.rs b/orch8/tests/engine.rs new file mode 100644 index 0000000..d9dcb68 --- /dev/null +++ b/orch8/tests/engine.rs @@ -0,0 +1,279 @@ +//! Integration tests for the embeddable `orch8` facade: builder, background +//! tick loop, manual ticking, signals, and graceful shutdown — all through +//! the public API only. + +use std::time::Duration; + +use orch8::{ + CreateInstanceOptions, Engine, InstanceId, InstanceState, SignalType, StepContext, Storage, +}; + +/// Bounded-wait helper: poll `get_instance` until the predicate holds. +async fn wait_for_state( + engine: &Engine, + id: InstanceId, + timeout: Duration, + pred: impl Fn(InstanceState) -> bool, +) -> InstanceState { + let deadline = tokio::time::Instant::now() + timeout; + loop { + let state = engine.get_instance(id).await.expect("get_instance").state; + if pred(state) { + return state; + } + assert!( + tokio::time::Instant::now() < deadline, + "timed out waiting for instance {id} (last state: {state:?})" + ); + tokio::time::sleep(Duration::from_millis(25)).await; + } +} + +fn two_step_sequence(name: &str, handler: &str) -> orch8::SequenceDefinition { + serde_json::from_value(serde_json::json!({ + "id": uuid::Uuid::now_v7(), + "tenant_id": "default", + "namespace": "default", + "name": name, + "version": 1, + "blocks": [ + { "type": "step", "id": "s1", "handler": handler, "params": {} }, + { "type": "step", "id": "s2", "handler": "noop", "params": {} } + ], + "created_at": chrono::Utc::now().to_rfc3339() + })) + .expect("valid sequence definition") +} + +async fn build_engine() -> Engine { + Engine::builder() + .storage(Storage::sqlite_in_memory()) + .tick_interval(Duration::from_millis(20)) + .handler("custom_step", |_ctx: StepContext| async move { + Ok(serde_json::json!({ "ok": true })) + }) + .build() + .await + .expect("engine builds") +} + +/// Builder + in-memory sqlite + custom handler: the background loop runs a +/// two-step sequence to completion, observed by polling `get_instance`. +#[tokio::test] +async fn start_runs_sequence_to_completion() { + let engine = build_engine().await; + engine.start(); + + let seq_id = engine + .upsert_sequence(two_step_sequence("bg-seq", "custom_step")) + .await + .expect("upsert"); + let inst = engine + .create_instance(seq_id, CreateInstanceOptions::default()) + .await + .expect("create"); + + let state = wait_for_state(&engine, inst, Duration::from_secs(10), |s| { + matches!( + s, + InstanceState::Completed | InstanceState::Failed | InstanceState::Cancelled + ) + }) + .await; + assert_eq!(state, InstanceState::Completed); + + engine.shutdown().await; +} + +/// Manual-tick mode: without `start()`, repeated `tick_once` calls advance +/// the instance to completion. +#[tokio::test] +async fn manual_tick_once_completes_instance() { + let engine = build_engine().await; + + let seq_id = engine + .upsert_sequence(two_step_sequence("manual-seq", "custom_step")) + .await + .expect("upsert"); + let inst = engine + .create_instance(seq_id, CreateInstanceOptions::default()) + .await + .expect("create"); + + let mut completed = false; + for _ in 0..100 { + let result = engine.tick_once().await.expect("tick"); + let state = engine.get_instance(inst).await.expect("get").state; + if state == InstanceState::Completed { + assert!( + !result.has_pending_work || result.instances_advanced > 0 || completed, + "tick result should be coherent with the observed state" + ); + completed = true; + break; + } + tokio::time::sleep(Duration::from_millis(10)).await; + } + assert!(completed, "instance did not complete via manual ticking"); +} + +/// `send_signal` resolves a `wait_for_input` gate: the instance parks +/// waiting for human input, a custom `human_input:` signal wakes it, +/// and the sequence then runs to completion (mirrors the engine's HITL +/// signal tests through the public facade). +#[tokio::test] +async fn send_signal_wakes_waiting_instance() { + let engine = build_engine().await; + engine.start(); + + let seq: orch8::SequenceDefinition = serde_json::from_value(serde_json::json!({ + "id": uuid::Uuid::now_v7(), + "tenant_id": "default", + "namespace": "default", + "name": "gated-seq", + "version": 1, + "blocks": [ + { + "type": "step", + "id": "gate", + "handler": "noop", + "params": {}, + "wait_for_input": { "prompt": "approve?" } + }, + { "type": "step", "id": "after", "handler": "custom_step", "params": {} } + ], + "created_at": chrono::Utc::now().to_rfc3339() + })) + .expect("valid sequence"); + + let seq_id = engine.upsert_sequence(seq).await.expect("upsert"); + let inst = engine + .create_instance(seq_id, CreateInstanceOptions::default()) + .await + .expect("create"); + + // Give the scheduler time to reach the gate; the instance must NOT + // complete while the human-input gate is unresolved. + tokio::time::sleep(Duration::from_millis(300)).await; + let parked = engine.get_instance(inst).await.expect("get").state; + assert!( + !matches!( + parked, + InstanceState::Completed | InstanceState::Failed | InstanceState::Cancelled + ), + "instance should be parked at the gate, was {parked:?}" + ); + + // Resolve the gate. The signal name is `human_input:` and the + // payload must carry one of the effective choices (default yes/no). + engine + .send_signal( + inst, + SignalType::Custom("human_input:gate".to_string()), + serde_json::json!({ "value": "yes" }), + ) + .await + .expect("send_signal"); + + let state = wait_for_state(&engine, inst, Duration::from_secs(10), |s| { + matches!( + s, + InstanceState::Completed | InstanceState::Failed | InstanceState::Cancelled + ) + }) + .await; + assert_eq!(state, InstanceState::Completed); + + // The chosen value lands in context.data under the block id. + let snapshot = engine.get_instance(inst).await.expect("get"); + assert_eq!(snapshot.context.data["gate"], "yes"); + + engine.shutdown().await; +} + +/// Graceful shutdown: start, create work, shutdown completes without +/// hanging (bounded by an outer timeout) and signals to terminal instances +/// are rejected. +#[tokio::test] +async fn shutdown_is_graceful_and_bounded() { + let engine = build_engine().await; + engine.start(); + + let seq_id = engine + .upsert_sequence(two_step_sequence("shutdown-seq", "custom_step")) + .await + .expect("upsert"); + let inst = engine + .create_instance(seq_id, CreateInstanceOptions::default()) + .await + .expect("create"); + + // Let it finish, then shut down; must not hang. + wait_for_state(&engine, inst, Duration::from_secs(10), |s| { + s == InstanceState::Completed + }) + .await; + + tokio::time::timeout(Duration::from_secs(30), engine.shutdown()) + .await + .expect("shutdown must complete within the grace period"); + + // Signalling a terminal instance is rejected with TerminalInstance. + let err = engine + .send_signal(inst, SignalType::Cancel, serde_json::json!({})) + .await + .expect_err("signal to terminal instance must fail"); + assert!(matches!(err, orch8::Error::TerminalInstance(_)), "{err:?}"); + + // Shutdown is idempotent. + tokio::time::timeout(Duration::from_secs(5), engine.shutdown()) + .await + .expect("second shutdown returns promptly"); +} + +/// Facade conveniences: upsert is idempotent per (name, version), missing +/// instances surface `NotFound`, idempotency keys dedupe instance creation, +/// and `list_instances` sees created work. +#[tokio::test] +async fn facade_crud_semantics() { + let engine = build_engine().await; + + let seq = two_step_sequence("crud-seq", "custom_step"); + let first_id = seq.id; + let seq_id = engine.upsert_sequence(seq).await.expect("upsert"); + assert_eq!(seq_id, first_id); + + // Re-registering the same (name, version) returns the existing id. + let again = engine + .upsert_sequence(two_step_sequence("crud-seq", "custom_step")) + .await + .expect("second upsert"); + assert_eq!(again, first_id); + + // Unknown instance -> NotFound. + let missing = engine.get_instance(InstanceId::new()).await; + assert!(matches!(missing, Err(orch8::Error::NotFound(_)))); + + // Idempotency key dedupes. + let opts = CreateInstanceOptions { + idempotency_key: Some("order-42".to_string()), + ..Default::default() + }; + let a = engine + .create_instance(seq_id, opts.clone()) + .await + .expect("create a"); + let b = engine + .create_instance(seq_id, opts) + .await + .expect("create b"); + assert_eq!(a, b, "same idempotency key must return the same instance"); + + let all = engine + .list_instances(&orch8::InstanceFilter::default()) + .await + .expect("list"); + assert_eq!(all.len(), 1); + assert_eq!(all[0].id, a); + assert_eq!(all[0].tenant_id, *engine.tenant()); +} From f71c838c833ac0001eb063d22a3e6ffab9f71ac1 Mon Sep 17 00:00:00 2001 From: Oleksii Date: Wed, 10 Jun 2026 00:48:52 -0300 Subject: [PATCH 2/2] chore(deny): BUSL-1.1 exception for the new orch8 facade crate Co-Authored-By: Claude Fable 5 --- deny.toml | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/deny.toml b/deny.toml index c27ff14..fb252ef 100644 --- a/deny.toml +++ b/deny.toml @@ -26,6 +26,10 @@ allow = [ ] confidence-threshold = 0.8 +[[licenses.exceptions]] +allow = ["BUSL-1.1"] +crate = "orch8" + [[licenses.exceptions]] allow = ["BUSL-1.1"] crate = "orch8-mobile"