Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 19 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
[workspace]
members = [
"orch8",
"orch8-types",
"orch8-storage",
"orch8-engine",
Expand Down
4 changes: 4 additions & 0 deletions deny.toml
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,10 @@ allow = [
]
confidence-threshold = 0.8

[[licenses.exceptions]]
allow = ["BUSL-1.1"]
crate = "orch8"

[[licenses.exceptions]]
allow = ["BUSL-1.1"]
crate = "orch8-mobile"
Expand Down
29 changes: 29 additions & 0 deletions orch8/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
[package]
name = "orch8"
version.workspace = true
license.workspace = true
edition.workspace = true
rust-version.workspace = true
description = "Durable workflow execution as an embeddable Rust library"
# Pre-1.0 embeddable facade — not published yet.
publish = false

[dependencies]
orch8-engine = { path = "../orch8-engine" }
orch8-storage = { path = "../orch8-storage" }
orch8-types = { path = "../orch8-types" }
tokio = { workspace = true }
tokio-util = { workspace = true }
serde_json = { workspace = true }
chrono = { workspace = true }
uuid = { workspace = true }
thiserror = { workspace = true }
tracing = { workspace = true }

[dev-dependencies]
axum = { workspace = true }
serde = { workspace = true }
tempfile = "3"

[lints]
workspace = true
103 changes: 103 additions & 0 deletions orch8/examples/embedded_axum.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
//! Embedding the engine in an axum web service: HTTP requests create durable
//! workflow instances on a shared [`orch8::Engine`].
//!
//! ```sh
//! cargo run -p orch8 --example embedded_axum
//! curl -X POST localhost:3000/orders # -> {"instance_id": "..."}
//! curl localhost:3000/orders/<instance_id> # -> {"state": "completed", ...}
//! ```

use axum::extract::{Path, State};
use axum::http::StatusCode;
use axum::routing::{get, post};
use axum::{Json, Router};

use orch8::{Engine, InstanceId, SequenceId, Storage};

fn sequence_json() -> serde_json::Value {
serde_json::json!({
"id": uuid::Uuid::now_v7(),
"tenant_id": "default",
"namespace": "default",
"name": "fulfil-order",
"version": 1,
"blocks": [
{ "type": "step", "id": "reserve", "handler": "reserve_stock", "params": {} },
{ "type": "step", "id": "notify", "handler": "log", "params": { "message": "order fulfilled" } }
],
"created_at": chrono::Utc::now().to_rfc3339()
})
}

/// Shared application state: the engine (cheap to clone) plus the sequence
/// new orders should run.
#[derive(Clone)]
struct AppState {
engine: Engine,
sequence_id: SequenceId,
}

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let engine = Engine::builder()
.storage(Storage::sqlite_in_memory())
.handler("reserve_stock", |_ctx: orch8::StepContext| async move {
Ok(serde_json::json!({ "reserved": true }))
})
.build()
.await?;

let sequence_id = engine
.upsert_sequence(serde_json::from_value(sequence_json())?)
.await?;

engine.start();

let app = Router::new()
.route("/orders", post(create_order))
.route("/orders/{id}", get(get_order))
.with_state(AppState {
engine: engine.clone(),
sequence_id,
});

let listener = tokio::net::TcpListener::bind("127.0.0.1:3000").await?;
println!("listening on http://127.0.0.1:3000 (POST /orders, GET /orders/{{id}})");

axum::serve(listener, app)
.with_graceful_shutdown(async {
let _ = tokio::signal::ctrl_c().await;
})
.await?;

engine.shutdown().await;
Ok(())
}

async fn create_order(
State(state): State<AppState>,
) -> Result<Json<serde_json::Value>, (StatusCode, String)> {
let id = state
.engine
.create_instance(state.sequence_id, orch8::CreateInstanceOptions::default())
.await
.map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))?;
Ok(Json(serde_json::json!({ "instance_id": id })))
}

async fn get_order(
State(state): State<AppState>,
Path(id): Path<uuid::Uuid>,
) -> Result<Json<serde_json::Value>, (StatusCode, String)> {
let instance = state
.engine
.get_instance(InstanceId::from_uuid(id))
.await
.map_err(|e| (StatusCode::NOT_FOUND, e.to_string()))?;
Ok(Json(serde_json::json!({
"instance_id": instance.id,
"state": instance.state,
"context": instance.context.data,
"updated_at": instance.updated_at,
})))
}
102 changes: 102 additions & 0 deletions orch8/examples/kill_resistant.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
//! Kill-resistance demo: workflows survive process exit because all state
//! lives in `SQLite`.
//!
//! Run this binary **twice**:
//!
//! ```sh
//! cargo run -p orch8 --example kill_resistant # creates a delayed instance, then exits
//! cargo run -p orch8 --example kill_resistant # the instance resumes and completes
//! ```
//!
//! The first run registers a sequence and creates an instance scheduled a few
//! seconds in the future, then exits before it fires — simulating a crash
//! mid-run. The second run reopens the same database, finds the live
//! instance, and the scheduler picks it up exactly where it left off.

use std::time::Duration;

use orch8::{CreateInstanceOptions, Engine, InstanceFilter, InstanceState, Storage};

const DELAY: Duration = Duration::from_secs(5);

fn sequence_json() -> serde_json::Value {
serde_json::json!({
"id": uuid::Uuid::now_v7(),
"tenant_id": "default",
"namespace": "default",
"name": "kill-resistant",
"version": 1,
"blocks": [
{ "type": "step", "id": "survive", "handler": "survive", "params": {} },
{ "type": "step", "id": "finish", "handler": "log", "params": { "message": "all steps done" } }
],
"created_at": chrono::Utc::now().to_rfc3339()
})
}

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let db_path = std::env::temp_dir().join("orch8-kill-resistant.db");
println!("database: {}", db_path.display());

let engine = Engine::builder()
.storage(Storage::sqlite(&db_path))
.handler("survive", |_ctx: orch8::StepContext| async move {
println!("[survive] step executed — the workflow outlived the first process!");
Ok(serde_json::json!({ "survived": true }))
})
.build()
.await?;

// Any instance still in a non-terminal state from a previous run?
let live_filter = InstanceFilter {
states: Some(vec![
InstanceState::Scheduled,
InstanceState::Running,
InstanceState::Waiting,
InstanceState::Paused,
]),
..InstanceFilter::default()
};
let live = engine.list_instances(&live_filter).await?;

if let Some(pending) = live.first() {
// --- Second run: resume the crashed workflow. ---
println!("found surviving instance {} — resuming", pending.id);
engine.start();
loop {
let snapshot = engine.get_instance(pending.id).await?;
if matches!(
snapshot.state,
InstanceState::Completed | InstanceState::Failed | InstanceState::Cancelled
) {
println!("instance finished with state {:?}", snapshot.state);
break;
}
tokio::time::sleep(Duration::from_millis(100)).await;
}
engine.shutdown().await;
} else {
// --- First run: create a delayed instance and "crash". ---
let seq_id = engine
.upsert_sequence(serde_json::from_value(sequence_json())?)
.await?;
let inst = engine
.create_instance(
seq_id,
CreateInstanceOptions {
next_fire_at: Some(chrono::Utc::now() + DELAY),
..Default::default()
},
)
.await?;
engine.start();
println!("created instance {inst}, scheduled to fire in {DELAY:?}");
tokio::time::sleep(Duration::from_secs(1)).await;
println!("exiting before it runs (simulated crash) — run me again to watch it resume");
// Intentionally no graceful shutdown: the process just goes away.
// Everything the workflow needs is already on disk.
}

Ok(())
}
99 changes: 99 additions & 0 deletions orch8/examples/quickstart.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
//! Quickstart: file-backed `SQLite`, one custom handler, a two-step sequence
//! run to completion.
//!
//! ```sh
//! cargo run -p orch8 --example quickstart
//! ```

use std::time::Duration;

use orch8::{Engine, InstanceState, Storage};

/// A two-step workflow: a custom `charge_card` step, then a `send_receipt`
/// step whose params are templated from the first step's output.
fn sequence_json() -> serde_json::Value {
serde_json::json!({
"id": uuid::Uuid::now_v7(),
"tenant_id": "default",
"namespace": "default",
"name": "payment",
"version": 1,
"blocks": [
{
"type": "step",
"id": "charge",
"handler": "charge_card",
"params": { "amount_cents": 4_200 }
},
{
"type": "step",
"id": "receipt",
"handler": "send_receipt",
"params": { "summary": "charged {{outputs.charge.amount_cents}} cents (ref {{outputs.charge.reference}})" }
}
],
"created_at": chrono::Utc::now().to_rfc3339()
})
}

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let db_path = std::env::temp_dir().join("orch8-quickstart.db");

let engine = Engine::builder()
.storage(Storage::sqlite(&db_path))
.handler("charge_card", |ctx: orch8::StepContext| async move {
let amount = ctx.params.get("amount_cents").cloned().unwrap_or_default();
println!("[charge_card] charging {amount} cents");
Ok(serde_json::json!({
"charged": true,
"amount_cents": amount,
"reference": "ch_12345",
}))
})
.handler("send_receipt", |ctx: orch8::StepContext| async move {
let summary = ctx
.params
.get("summary")
.and_then(|v| v.as_str())
.unwrap_or("(no summary)")
.to_string();
println!("[send_receipt] {summary}");
Ok(serde_json::json!({ "receipt_sent": true, "summary": summary }))
})
.build()
.await?;

// Background tick loop on this process's tokio runtime.
engine.start();

let seq_id = engine
.upsert_sequence(serde_json::from_value(sequence_json())?)
.await?;
let inst = engine
.create_instance(seq_id, orch8::CreateInstanceOptions::default())
.await?;
println!("created instance {inst}");

// Poll until the instance reaches a terminal state.
loop {
let snapshot = engine.get_instance(inst).await?;
match snapshot.state {
InstanceState::Completed => {
println!(
"instance completed; final context.data = {}",
snapshot.context.data
);
break;
}
InstanceState::Failed | InstanceState::Cancelled => {
println!("instance ended in {:?}", snapshot.state);
break;
}
_ => tokio::time::sleep(Duration::from_millis(50)).await,
}
}

engine.shutdown().await;
Ok(())
}
Loading
Loading