From d43b4b3b4fcde188269931557d5b13d00265fe0f Mon Sep 17 00:00:00 2001 From: Oleksii Date: Wed, 10 Jun 2026 01:11:26 -0300 Subject: [PATCH] =?UTF-8?q?feat(api):=20execution=20timeline=20+=20fork-fr?= =?UTF-8?q?om=20=E2=80=94=20time-travel=20operations=20on=20snapshot=20sta?= =?UTF-8?q?te?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit GET /instances/{id}/timeline: flat chronological complement of the execution tree — one entry per block_outputs row in execution order with sentinels flagged (not hidden), plus instance-level summary (current state/context) and recorded state transitions from the audit log. Paginated (default 200, max 1000); ?include_outputs=false returns metadata only. POST /instances/{id}/fork: clone an instance into a sandbox that resumes from an arbitrary top-level block. Copies inline pre-fork block outputs onto a new Scheduled instance (optional context patch, dry-run by default, provenance in metadata); artifact-backed/externalized outputs are instance-scoped, so their blocks go to the re-run set instead of being copied. Source instance is never touched, so forking is allowed from any source state. Storage: OutputStore::get_outputs_page + OutputStore::copy_block_outputs on both backends and the encrypting wrapper (pass-through — outputs are not field-encrypted). Co-Authored-By: Claude Fable 5 --- orch8-api/src/instances.rs | 9 + orch8-api/src/instances/fork.rs | 266 ++++++++++++ orch8-api/src/instances/lifecycle.rs | 7 +- orch8-api/src/instances/timeline.rs | 211 ++++++++++ orch8-api/src/instances/types.rs | 18 + orch8-api/src/openapi.rs | 9 + orch8-api/tests/timeline_fork.rs | 576 ++++++++++++++++++++++++++ orch8-engine/tests/fork_e2e.rs | 183 ++++++++ orch8-storage/src/encrypting.rs | 21 + orch8-storage/src/lib.rs | 36 ++ orch8-storage/src/postgres/mod.rs | 18 + orch8-storage/src/postgres/outputs.rs | 52 +++ orch8-storage/src/sqlite/mod.rs | 202 +++++++++ orch8-storage/src/sqlite/outputs.rs | 63 +++ 14 files changed, 1668 insertions(+), 3 deletions(-) create mode 100644 orch8-api/src/instances/fork.rs create mode 100644 orch8-api/src/instances/timeline.rs create mode 100644 orch8-api/tests/timeline_fork.rs create mode 100644 orch8-engine/tests/fork_e2e.rs diff --git a/orch8-api/src/instances.rs b/orch8-api/src/instances.rs index 8f7207f..511cf08 100644 --- a/orch8-api/src/instances.rs +++ b/orch8-api/src/instances.rs @@ -10,10 +10,12 @@ mod artifacts; mod audit; mod bulk; mod checkpoints; +mod fork; mod inject; mod lifecycle; mod outputs; mod signals; +mod timeline; mod types; // Re-exports for `crate::instances::*` compatibility (openapi.rs references @@ -35,6 +37,8 @@ pub(crate) use checkpoints::{ save_checkpoint, }; pub use checkpoints::{PruneCheckpointsRequest, SaveCheckpointRequest}; +pub use fork::ForkResponse; +pub(crate) use fork::{__path_fork_instance, fork_instance}; pub use inject::InjectBlocksRequest; pub(crate) use inject::{__path_inject_blocks, inject_blocks}; pub(crate) use lifecycle::{ @@ -47,6 +51,9 @@ pub(crate) use outputs::{ __path_get_execution_tree, __path_get_outputs, get_execution_tree, get_outputs, }; pub(crate) use signals::{__path_send_signal, send_signal}; +pub(crate) use timeline::{__path_get_timeline, get_timeline}; +pub use timeline::{TimelineEntry, TimelineInstance, TimelineResponse, TimelineStateTransition}; +pub use types::ForkRequest; // Request/query types the MCP server reuses so its tools/call dispatch goes // through the exact same wire shapes as the REST endpoints. pub(crate) use types::{CreateInstanceRequest, ListQuery, SendSignalRequest}; @@ -63,11 +70,13 @@ pub fn routes() -> Router { .route("/instances/{id}/artifacts", get(list_instance_artifacts)) .route("/artifacts/{*key}", get(get_artifact_bytes)) .route("/instances/{id}/tree", get(get_execution_tree)) + .route("/instances/{id}/timeline", get(get_timeline)) .route("/instances/{id}/retry", post(retry_instance)) .route( "/instances/{id}/resume-from/{block_id}", post(resume_from_block), ) + .route("/instances/{id}/fork", post(fork_instance)) .route( "/instances/{id}/checkpoints", get(list_checkpoints).post(save_checkpoint), diff --git a/orch8-api/src/instances/fork.rs b/orch8-api/src/instances/fork.rs new file mode 100644 index 0000000..89db1b9 --- /dev/null +++ b/orch8-api/src/instances/fork.rs @@ -0,0 +1,266 @@ +//! `POST /instances/{id}/fork` — clone an instance into a sandbox that +//! resumes from an arbitrary block. +//! +//! The snapshot model makes time travel a read + clone: a fork is a brand-new +//! instance of the same sequence whose `block_outputs` are seeded with copies +//! of the source's outputs for every top-level block *before* the fork point, +//! so the engine's completed-blocks memoization skips them and execution +//! resumes exactly at `from_block_id`. The source instance is never touched, +//! so forking is allowed from ANY source state (unlike resume-from, which +//! mutates the instance in place and requires quiescence). +//! +//! ## Artifact-backed (externalized) outputs +//! +//! Externalized output payloads are keyed by the *source* instance ID and +//! ownership-checked on read, so their references cannot be shared across +//! instances safely. We therefore copy **inline outputs only**: any pre-fork +//! top-level block whose snapshot is not fully inline (externalized payload, +//! or a sentinel as the latest row) is placed in the re-run set instead — it +//! executes again on the fork. Granularity is the top-level block: if any +//! block nested inside a composite is non-copyable the whole composite +//! re-runs, since a partially-seeded composite (some iterations copied, some +//! missing) would resume in an inconsistent state. +//! +//! Forks default to dry-run so re-running those blocks (and the post-fork +//! tail) does not re-fire production side effects. + +use axum::extract::{Path, State}; +use axum::http::StatusCode; +use axum::response::IntoResponse; +use axum::Json; +use chrono::Utc; +use serde::Serialize; +use utoipa::ToSchema; +use uuid::Uuid; + +use orch8_types::context::RuntimeContext; +use orch8_types::ids::{BlockId, InstanceId}; +use orch8_types::instance::{InstanceState, TaskInstance}; + +use super::lifecycle::{collect_block_ids, top_level_block_id}; +use super::types::ForkRequest; +use crate::error::ApiError; +use crate::AppState; + +#[derive(Serialize, ToSchema)] +pub struct ForkResponse { + /// ID of the newly created fork. + pub id: InstanceId, + /// ID of the source instance. + pub forked_from: InstanceId, + /// Always `scheduled` — the fork fires immediately. + pub state: String, + /// Number of pre-fork top-level blocks whose outputs were copied (the + /// fork will NOT re-execute these). + pub copied_blocks: usize, + /// Pre-fork top-level blocks whose outputs could not be copied (never + /// executed, artifact-backed, or mid-flight sentinel) — these WILL + /// (re-)execute on the fork, in addition to everything from + /// `from_block_id` onward. + pub rerun_blocks: Vec, + /// Whether the fork runs in dry-run mode. + pub dry_run: bool, +} + +#[utoipa::path(post, path = "/instances/{id}/fork", tag = "instances", + params(("id" = Uuid, Path, description = "Source instance ID")), + request_body = ForkRequest, + responses( + (status = 201, description = "Fork created and scheduled", body = ForkResponse), + (status = 400, description = "Unknown block or invalid context patch"), + (status = 404, description = "Instance or sequence not found"), + ) +)] +#[allow(clippy::too_many_lines)] // sequential clone steps — splitting hurts readability +pub async fn fork_instance( + State(state): State, + tenant_ctx: crate::auth::OptionalTenant, + Path(id): Path, + Json(req): Json, +) -> Result { + let source_id = InstanceId::from_uuid(id); + + let source = state + .storage + .get_instance(source_id) + .await + .map_err(|e| ApiError::from_storage(e, "instance"))? + .ok_or_else(|| ApiError::NotFound(format!("instance {id}")))?; + + crate::auth::enforce_tenant_access(&tenant_ctx, &source.tenant_id, &format!("instance {id}"))?; + + // Validate the context patch up-front (same rule as resume-from). + let patch = match req.context { + None => None, + Some(serde_json::Value::Object(map)) => Some(map), + Some(_) => { + return Err(ApiError::InvalidArgument( + "context patch must be a JSON object".into(), + )); + } + }; + + let sequence = state + .storage + .get_sequence(source.sequence_id) + .await + .map_err(|e| ApiError::from_storage(e, "sequence"))? + .ok_or_else(|| { + ApiError::NotFound(format!("sequence {}", source.sequence_id.into_uuid())) + })?; + + let target_idx = sequence + .blocks + .iter() + .position(|b| top_level_block_id(b).as_str() == req.from_block_id) + .ok_or_else(|| { + ApiError::InvalidArgument(format!( + "block '{}' is not a top-level block of sequence {}", + req.from_block_id, + source.sequence_id.into_uuid() + )) + })?; + + // Partition the pre-fork top-level blocks into the copy set and the + // re-run set. A block group (top-level block + everything nested in it) + // is copyable iff it executed and every member's LATEST row is inline: + // an externalized payload or a trailing sentinel makes the snapshot + // unusable on another instance, so the whole group re-runs instead. + // Older `__retry__` markers behind a real inline output do not block the + // copy — the storage copy skips non-inline rows, which merely resets the + // block's attempt counter on the fork. + let source_outputs = state + .storage + .get_all_outputs(source_id) + .await + .map_err(|e| ApiError::from_storage(e, "outputs"))?; + + let mut copy_ids: Vec = Vec::new(); + let mut copied_blocks = 0usize; + let mut rerun_blocks: Vec = Vec::new(); + for block in &sequence.blocks[..target_idx] { + let mut group_ids: Vec = Vec::new(); + collect_block_ids(block, &mut group_ids); + + let mut executed = false; + let mut copyable = true; + for block_id in &group_ids { + // Rows are in created_at ASC order, so the last match is the + // latest snapshot for this block. + let latest = source_outputs.iter().rfind(|o| &o.block_id == block_id); + if let Some(latest) = latest { + executed = true; + if latest.output_ref.is_some() { + // Trailing sentinel (mid-flight / error) or externalized + // payload — either way the snapshot cannot be cloned. + copyable = false; + break; + } + } + } + + if executed && copyable { + copy_ids.extend(group_ids); + copied_blocks += 1; + } else { + rerun_blocks.push(top_level_block_id(block).as_str().to_owned()); + } + } + + // Assemble the fork: same sequence/tenant/namespace, fresh identity and + // runtime state, source context with the optional patch applied. + let mut context = source.context.clone(); + if let Some(patch) = patch { + if !context.data.is_object() { + context.data = serde_json::json!({}); + } + if let Some(data) = context.data.as_object_mut() { + for (key, value) in patch { + data.insert(key, value); + } + } + } + // The fork starts from a clean engine slate (no current step, attempt 0); + // only the requested execution mode carries over. + context.runtime = RuntimeContext { + dry_run: req.dry_run, + ..RuntimeContext::default() + }; + context.check_size(state.max_context_bytes)?; + + // Stamp the provenance onto the fork's metadata, preserving the source's + // own metadata keys. + let mut metadata = match source.metadata.clone() { + serde_json::Value::Object(map) => map, + _ => serde_json::Map::new(), + }; + metadata.insert( + "forked_from".into(), + serde_json::json!(source_id.into_uuid()), + ); + metadata.insert( + "forked_at_block".into(), + serde_json::json!(req.from_block_id), + ); + + let now = Utc::now(); + let fork = TaskInstance { + id: InstanceId::new(), + sequence_id: source.sequence_id, + tenant_id: source.tenant_id.clone(), + namespace: source.namespace.clone(), + state: InstanceState::Scheduled, + next_fire_at: Some(now), + priority: source.priority, + timezone: source.timezone.clone(), + metadata: serde_json::Value::Object(metadata), + context, + // A sandbox must not contend for (or dedupe against) the source's + // production concurrency / idempotency slots. + concurrency_key: None, + max_concurrency: None, + idempotency_key: None, + session_id: None, + parent_instance_id: None, + budget: source.budget.clone(), + created_at: now, + updated_at: now, + }; + + state + .storage + .create_instance(&fork) + .await + .map_err(|e| ApiError::from_storage(e, "instance"))?; + + // Seed the fork with the source's pre-fork snapshots. Inline rows only — + // see the module docs for why externalized references are not shared. + let copied_rows = state + .storage + .copy_block_outputs(source_id, fork.id, ©_ids) + .await + .map_err(|e| ApiError::from_storage(e, "block_outputs"))?; + + tracing::info!( + source_id = %source_id, + fork_id = %fork.id, + forked_at_block = %req.from_block_id, + copied_blocks = copied_blocks, + copied_rows = copied_rows, + rerun_blocks = rerun_blocks.len(), + dry_run = req.dry_run, + "instance forked" + ); + + Ok(( + StatusCode::CREATED, + Json(ForkResponse { + id: fork.id, + forked_from: source_id, + state: "scheduled".into(), + copied_blocks, + rerun_blocks, + dry_run: req.dry_run, + }), + )) +} diff --git a/orch8-api/src/instances/lifecycle.rs b/orch8-api/src/instances/lifecycle.rs index 5503b9f..c5600ed 100644 --- a/orch8-api/src/instances/lifecycle.rs +++ b/orch8-api/src/instances/lifecycle.rs @@ -531,8 +531,9 @@ pub async fn retry_instance( /// Used by resume-from-block to wipe descendant outputs together with the /// composite that owns them — leaving a nested loop's iteration-counter /// marker or a nested step's output in place would make the re-run skip or -/// short-circuit those blocks. -fn collect_block_ids(block: &BlockDefinition, out: &mut Vec) { +/// short-circuit those blocks. Fork-from reuses it to gather the copy set +/// for blocks *before* the fork point. +pub(super) fn collect_block_ids(block: &BlockDefinition, out: &mut Vec) { fn collect_list(blocks: &[BlockDefinition], out: &mut Vec) { for b in blocks { collect_block_ids(b, out); @@ -592,7 +593,7 @@ fn collect_block_ids(block: &BlockDefinition, out: &mut Vec) { } /// The ID of a top-level block, regardless of variant. -fn top_level_block_id(block: &BlockDefinition) -> &BlockId { +pub(super) fn top_level_block_id(block: &BlockDefinition) -> &BlockId { match block { BlockDefinition::Step(s) => &s.id, BlockDefinition::Parallel(p) => &p.id, diff --git a/orch8-api/src/instances/timeline.rs b/orch8-api/src/instances/timeline.rs new file mode 100644 index 0000000..6309535 --- /dev/null +++ b/orch8-api/src/instances/timeline.rs @@ -0,0 +1,211 @@ +//! `GET /instances/{id}/timeline` — state-at-every-step view. +//! +//! The engine is snapshot-based: the state after every executed block lives +//! in `block_outputs` as data, not as an event log. The timeline endpoint is +//! the flat chronological complement of `GET /instances/{id}/tree`: one entry +//! per `block_outputs` row in execution order (`created_at ASC`), plus +//! instance-level context (creation time, recorded state transitions from the +//! audit log, current state / context). +//! +//! Internal bookkeeping rows (`__in_progress__` / `__retry__` / `__error__`) +//! are *flagged* via `is_sentinel`, not hidden — time travel needs to see +//! retries and crash markers, unlike `GET /instances/{id}/outputs` which +//! strips them. + +use axum::extract::{Path, Query, State}; +use axum::response::IntoResponse; +use axum::Json; +use chrono::{DateTime, Utc}; +use serde::{Deserialize, Serialize}; +use utoipa::ToSchema; +use uuid::Uuid; + +use orch8_types::context::ExecutionContext; +use orch8_types::ids::{InstanceId, SequenceId}; +use orch8_types::instance::InstanceState; + +use crate::error::ApiError; +use crate::AppState; + +/// Default page size for timeline entries. Outputs can be large, so the +/// endpoint never returns the unbounded history in one response. +const DEFAULT_TIMELINE_LIMIT: u32 = 200; +/// Hard ceiling on the page size (same cap as `Pagination::capped`). +const MAX_TIMELINE_LIMIT: u32 = 1000; + +#[derive(Deserialize)] +pub struct TimelineQuery { + #[serde(default)] + pub(crate) offset: u64, + #[serde(default = "default_timeline_limit")] + pub(crate) limit: u32, + /// When `false`, entries carry metadata only (no `output` payloads) and + /// the instance summary omits the current context. Default `true`. + #[serde(default = "super::types::default_true")] + pub(crate) include_outputs: bool, +} + +const fn default_timeline_limit() -> u32 { + DEFAULT_TIMELINE_LIMIT +} + +/// One executed-block entry: the persisted snapshot of a single block +/// execution (first attempt, retry marker, loop iteration, ...). +#[derive(Serialize, ToSchema)] +pub struct TimelineEntry { + pub block_id: String, + pub attempt: u16, + /// When this row was persisted — i.e. when the block's execution (or the + /// sentinel write) completed. + pub completed_at: DateTime, + /// Inline output payload. Omitted when `include_outputs=false`. For + /// artifact-backed (externalized) rows the stored reference marker is + /// returned as-is — resolve via `output_ref`. + #[serde(skip_serializing_if = "Option::is_none")] + pub output: Option, + /// Externalized-payload reference, or a sentinel tag + /// (`__in_progress__` / `__retry__` / `__error__`). + #[serde(skip_serializing_if = "Option::is_none")] + pub output_ref: Option, + /// `true` for internal bookkeeping rows (crash-recovery sentinels, retry + /// markers, error markers) — flagged rather than hidden. + pub is_sentinel: bool, +} + +/// Instance-level summary heading the timeline. +#[derive(Serialize, ToSchema)] +pub struct TimelineInstance { + pub id: InstanceId, + pub sequence_id: SequenceId, + pub state: InstanceState, + pub created_at: DateTime, + pub updated_at: DateTime, + /// Current execution context. Omitted when `include_outputs=false` + /// (contexts can be as large as outputs). + #[serde(skip_serializing_if = "Option::is_none")] + pub context: Option, +} + +/// A recorded instance state transition (from the audit log). +#[derive(Serialize, ToSchema)] +pub struct TimelineStateTransition { + #[serde(skip_serializing_if = "Option::is_none")] + pub from_state: Option, + #[serde(skip_serializing_if = "Option::is_none")] + pub to_state: Option, + pub at: DateTime, +} + +#[derive(Serialize, ToSchema)] +pub struct TimelineResponse { + pub instance: TimelineInstance, + /// State transitions recorded in the audit log (chronological, capped at + /// the most recent 200). Empty when audit logging is not enabled. + pub state_transitions: Vec, + /// Executed-block entries in execution order (`created_at ASC`), + /// paginated by `offset` / `limit`. + pub entries: Vec, + pub offset: u64, + pub limit: u32, + pub has_more: bool, +} + +/// `true` for internal bookkeeping markers that are not real step outputs. +pub(super) fn is_sentinel_ref(output_ref: Option<&str>) -> bool { + matches!( + output_ref, + Some("__in_progress__" | "__retry__" | "__error__") + ) +} + +#[utoipa::path(get, path = "/instances/{id}/timeline", tag = "instances", + params( + ("id" = Uuid, Path, description = "Instance ID"), + ("offset" = u64, Query, description = "Pagination offset over executed-block entries"), + ("limit" = u32, Query, description = "Page size (default 200, max 1000)"), + ("include_outputs" = bool, Query, description = "Include output payloads and current context (default true)"), + ), + responses( + (status = 200, description = "Execution timeline", body = TimelineResponse), + (status = 404, description = "Instance not found"), + ) +)] +pub async fn get_timeline( + State(state): State, + tenant_ctx: crate::auth::OptionalTenant, + Path(id): Path, + Query(q): Query, +) -> Result { + let instance_id = InstanceId::from_uuid(id); + + let instance = state + .storage + .get_instance(instance_id) + .await + .map_err(|e| ApiError::from_storage(e, "instance"))? + .ok_or_else(|| ApiError::NotFound(format!("instance {id}")))?; + + crate::auth::enforce_tenant_access( + &tenant_ctx, + &instance.tenant_id, + &format!("instance {id}"), + )?; + + let limit = q.limit.clamp(1, MAX_TIMELINE_LIMIT); + + let outputs = state + .storage + .get_outputs_page(instance_id, limit, q.offset) + .await + .map_err(|e| ApiError::from_storage(e, "outputs"))?; + + // Same has-more heuristic as `PaginatedResponse::from_vec`: a full page + // means there may be more rows. + let has_more = u32::try_from(outputs.len()).unwrap_or(u32::MAX) >= limit; + + let entries: Vec = outputs + .into_iter() + .map(|o| TimelineEntry { + is_sentinel: is_sentinel_ref(o.output_ref.as_deref()), + block_id: o.block_id.as_str().to_owned(), + attempt: o.attempt, + completed_at: o.created_at, + output: q.include_outputs.then_some(o.output), + output_ref: o.output_ref, + }) + .collect(); + + // State transitions, if the deployment records them (audit log is + // best-effort). The storage returns newest-first; the timeline is + // chronological, so reverse. + let mut state_transitions: Vec = state + .storage + .list_audit_log(instance_id, 200) + .await + .map_err(|e| ApiError::from_storage(e, "audit_log"))? + .into_iter() + .filter(|e| e.event_type == "state_transition") + .map(|e| TimelineStateTransition { + from_state: e.from_state, + to_state: e.to_state, + at: e.created_at, + }) + .collect(); + state_transitions.reverse(); + + Ok(Json(TimelineResponse { + instance: TimelineInstance { + id: instance.id, + sequence_id: instance.sequence_id, + state: instance.state, + created_at: instance.created_at, + updated_at: instance.updated_at, + context: q.include_outputs.then_some(instance.context), + }, + state_transitions, + entries, + offset: q.offset, + limit, + has_more, + })) +} diff --git a/orch8-api/src/instances/types.rs b/orch8-api/src/instances/types.rs index ad205b9..54370b9 100644 --- a/orch8-api/src/instances/types.rs +++ b/orch8-api/src/instances/types.rs @@ -77,6 +77,24 @@ pub struct ResumeFromRequest { pub(crate) context: Option, } +/// Body for `POST /instances/{id}/fork`. `from_block_id` must be a top-level +/// block of the source's sequence; `context` (optional) is shallow-merged +/// into the fork's `context.data` with the same per-key semantics as +/// resume-from; `dry_run` defaults to **true** so a forked production +/// workflow does not re-fire side effects unless explicitly asked to. +#[derive(Deserialize, ToSchema)] +pub struct ForkRequest { + pub(crate) from_block_id: String, + #[serde(default)] + pub(crate) context: Option, + #[serde(default = "default_true")] + pub(crate) dry_run: bool, +} + +pub const fn default_true() -> bool { + true +} + #[derive(Deserialize, ToSchema)] pub struct SendSignalRequest { pub(crate) signal_type: SignalType, diff --git a/orch8-api/src/openapi.rs b/orch8-api/src/openapi.rs index a8a1b90..d2848a4 100644 --- a/orch8-api/src/openapi.rs +++ b/orch8-api/src/openapi.rs @@ -39,8 +39,10 @@ use utoipa::OpenApi; crate::instances::list_instance_artifacts, crate::instances::get_artifact_bytes, crate::instances::get_execution_tree, + crate::instances::get_timeline, crate::instances::retry_instance, crate::instances::resume_from_block, + crate::instances::fork_instance, crate::instances::bulk_update_state, crate::instances::bulk_reschedule, crate::instances::list_dlq, @@ -173,6 +175,13 @@ use utoipa::OpenApi; orch8_types::cluster::ClusterNode, orch8_types::cluster::NodeStatus, orch8_types::checkpoint::Checkpoint, + // Timeline / fork (time-travel operations) + crate::instances::TimelineResponse, + crate::instances::TimelineInstance, + crate::instances::TimelineEntry, + crate::instances::TimelineStateTransition, + crate::instances::ForkRequest, + crate::instances::ForkResponse, crate::instances::SaveCheckpointRequest, crate::instances::PruneCheckpointsRequest, crate::instances::InjectBlocksRequest, diff --git a/orch8-api/tests/timeline_fork.rs b/orch8-api/tests/timeline_fork.rs new file mode 100644 index 0000000..c9903c0 --- /dev/null +++ b/orch8-api/tests/timeline_fork.rs @@ -0,0 +1,576 @@ +//! E2E tests for the time-travel operations: +//! `GET /instances/{id}/timeline` and `POST /instances/{id}/fork`. +//! +//! The API test harness has no engine loop, so executed steps are simulated +//! by writing `block_outputs` rows and driving instance state directly +//! through the storage handle exposed by `spawn_test_server()` (same pattern +//! as `resume_from.rs`). + +use chrono::{Duration, Utc}; +use orch8_api::test_harness::spawn_test_server; +use orch8_storage::{InstanceStore, OutputStore}; +use orch8_types::ids::{BlockId, InstanceId}; +use orch8_types::instance::InstanceState; +use orch8_types::output::BlockOutput; +use reqwest::StatusCode; +use serde_json::json; +use uuid::Uuid; + +fn step(id: &str) -> serde_json::Value { + json!({ + "type": "step", + "id": id, + "handler": "noop", + "params": {}, + "cancellable": true + }) +} + +fn mk_sequence_body(id: Uuid) -> serde_json::Value { + json!({ + "id": id, + "tenant_id": "t1", + "namespace": "ns1", + "name": "timeline-fork-seq", + "version": 1, + "deprecated": false, + "blocks": [step("s1"), step("s2"), step("s3")], + "interceptors": null, + "created_at": Utc::now().to_rfc3339() + }) +} + +async fn create_sequence(client: &reqwest::Client, base_url: &str) -> Uuid { + let seq_id = Uuid::now_v7(); + let resp = client + .post(format!("{base_url}/sequences")) + .header("X-Tenant-Id", "t1") + .json(&mk_sequence_body(seq_id)) + .send() + .await + .unwrap(); + assert_eq!(resp.status(), StatusCode::CREATED); + seq_id +} + +async fn create_instance(client: &reqwest::Client, base_url: &str, seq_id: Uuid) -> Uuid { + let body = json!({ + "sequence_id": seq_id, + "tenant_id": "t1", + "namespace": "ns1", + "metadata": { "owner": "alice" }, + "context": { "data": { "seed": 1 }, "config": {}, "audit": [] } + }); + let resp = client + .post(format!("{base_url}/instances")) + .header("X-Tenant-Id", "t1") + .json(&body) + .send() + .await + .unwrap(); + assert_eq!(resp.status(), StatusCode::CREATED); + let created: serde_json::Value = resp.json().await.unwrap(); + created["id"].as_str().unwrap().parse().unwrap() +} + +/// An output row with a deterministic timestamp offset so ordering +/// assertions are stable. +fn mk_output_at(inst: Uuid, block: &str, offset_secs: i64) -> BlockOutput { + BlockOutput { + id: Uuid::now_v7(), + instance_id: InstanceId::from_uuid(inst), + block_id: BlockId::new(block), + output: json!({"result": format!("{block}-ok")}), + output_ref: None, + output_size: 0, + attempt: 0, + created_at: Utc::now() - Duration::seconds(100) + Duration::seconds(offset_secs), + } +} + +// ==================================================================== +// Timeline +// ==================================================================== + +#[tokio::test] +async fn timeline_returns_entries_in_execution_order_with_sentinels_flagged() { + let srv = spawn_test_server().await; + let client = reqwest::Client::new(); + let seq_id = create_sequence(&client, &srv.base_url).await; + let inst = create_instance(&client, &srv.base_url, seq_id).await; + + // s1 ok, then a retry marker for s2, then s2's real output, then s3. + srv.storage + .save_block_output(&mk_output_at(inst, "s1", 0)) + .await + .unwrap(); + let mut retry_marker = mk_output_at(inst, "s2", 1); + retry_marker.output_ref = Some("__retry__".into()); + retry_marker.attempt = 1; + srv.storage.save_block_output(&retry_marker).await.unwrap(); + srv.storage + .save_block_output(&mk_output_at(inst, "s2", 2)) + .await + .unwrap(); + srv.storage + .save_block_output(&mk_output_at(inst, "s3", 3)) + .await + .unwrap(); + + let resp = client + .get(format!("{}/instances/{inst}/timeline", srv.base_url)) + .header("X-Tenant-Id", "t1") + .send() + .await + .unwrap(); + assert_eq!(resp.status(), StatusCode::OK); + let body: serde_json::Value = resp.json().await.unwrap(); + + // Instance-level summary: id, current state, context. + assert_eq!(body["instance"]["id"], inst.to_string()); + assert_eq!(body["instance"]["state"], "scheduled"); + assert_eq!(body["instance"]["context"]["data"]["seed"], 1); + assert!(body["instance"]["created_at"].is_string()); + + // Entries in chronological execution order; sentinel flagged, not hidden. + let entries = body["entries"].as_array().unwrap(); + let ids: Vec<&str> = entries + .iter() + .map(|e| e["block_id"].as_str().unwrap()) + .collect(); + assert_eq!(ids, ["s1", "s2", "s2", "s3"]); + assert_eq!(entries[0]["is_sentinel"], false); + assert_eq!(entries[1]["is_sentinel"], true); + assert_eq!(entries[1]["output_ref"], "__retry__"); + assert_eq!(entries[1]["attempt"], 1); + assert_eq!(entries[2]["is_sentinel"], false); + assert_eq!(entries[2]["output"]["result"], "s2-ok"); + assert!(entries[3]["completed_at"].is_string()); + assert_eq!(body["has_more"], false); +} + +#[tokio::test] +async fn timeline_pagination_pages_through_entries() { + let srv = spawn_test_server().await; + let client = reqwest::Client::new(); + let seq_id = create_sequence(&client, &srv.base_url).await; + let inst = create_instance(&client, &srv.base_url, seq_id).await; + + for (i, block) in ["s1", "s2", "s3"].iter().enumerate() { + srv.storage + .save_block_output(&mk_output_at(inst, block, i64::try_from(i).unwrap())) + .await + .unwrap(); + } + + let resp = client + .get(format!( + "{}/instances/{inst}/timeline?limit=2&offset=0", + srv.base_url + )) + .header("X-Tenant-Id", "t1") + .send() + .await + .unwrap(); + let page1: serde_json::Value = resp.json().await.unwrap(); + let entries = page1["entries"].as_array().unwrap(); + assert_eq!(entries.len(), 2); + assert_eq!(entries[0]["block_id"], "s1"); + assert_eq!(entries[1]["block_id"], "s2"); + assert_eq!(page1["has_more"], true); + assert_eq!(page1["limit"], 2); + assert_eq!(page1["offset"], 0); + + let resp = client + .get(format!( + "{}/instances/{inst}/timeline?limit=2&offset=2", + srv.base_url + )) + .header("X-Tenant-Id", "t1") + .send() + .await + .unwrap(); + let page2: serde_json::Value = resp.json().await.unwrap(); + let entries = page2["entries"].as_array().unwrap(); + assert_eq!(entries.len(), 1); + assert_eq!(entries[0]["block_id"], "s3"); + assert_eq!(page2["has_more"], false); +} + +#[tokio::test] +async fn timeline_include_outputs_false_returns_metadata_only() { + let srv = spawn_test_server().await; + let client = reqwest::Client::new(); + let seq_id = create_sequence(&client, &srv.base_url).await; + let inst = create_instance(&client, &srv.base_url, seq_id).await; + + srv.storage + .save_block_output(&mk_output_at(inst, "s1", 0)) + .await + .unwrap(); + + let resp = client + .get(format!( + "{}/instances/{inst}/timeline?include_outputs=false", + srv.base_url + )) + .header("X-Tenant-Id", "t1") + .send() + .await + .unwrap(); + assert_eq!(resp.status(), StatusCode::OK); + let body: serde_json::Value = resp.json().await.unwrap(); + + let entry = &body["entries"][0]; + assert_eq!(entry["block_id"], "s1"); + assert!(entry.get("output").is_none(), "output payload omitted"); + assert!(entry["completed_at"].is_string()); + assert!( + body["instance"].get("context").is_none(), + "context omitted in metadata-only mode" + ); +} + +#[tokio::test] +async fn timeline_cross_tenant_returns_404() { + let srv = spawn_test_server().await; + let client = reqwest::Client::new(); + let seq_id = create_sequence(&client, &srv.base_url).await; + let inst = create_instance(&client, &srv.base_url, seq_id).await; + + let resp = client + .get(format!("{}/instances/{inst}/timeline", srv.base_url)) + .header("X-Tenant-Id", "t2") + .send() + .await + .unwrap(); + assert_eq!(resp.status(), StatusCode::NOT_FOUND); +} + +// ==================================================================== +// Fork +// ==================================================================== + +#[tokio::test] +async fn fork_from_middle_block_copies_pre_fork_outputs_and_leaves_source_untouched() { + let srv = spawn_test_server().await; + let client = reqwest::Client::new(); + let seq_id = create_sequence(&client, &srv.base_url).await; + let inst = create_instance(&client, &srv.base_url, seq_id).await; + + for (i, block) in ["s1", "s2", "s3"].iter().enumerate() { + srv.storage + .save_block_output(&mk_output_at(inst, block, i64::try_from(i).unwrap())) + .await + .unwrap(); + } + srv.storage + .update_instance_state(InstanceId::from_uuid(inst), InstanceState::Completed, None) + .await + .unwrap(); + + let resp = client + .post(format!("{}/instances/{inst}/fork", srv.base_url)) + .header("X-Tenant-Id", "t1") + .json(&json!({ "from_block_id": "s3" })) + .send() + .await + .unwrap(); + assert_eq!(resp.status(), StatusCode::CREATED); + let body: serde_json::Value = resp.json().await.unwrap(); + assert_eq!(body["forked_from"], inst.to_string()); + assert_eq!(body["state"], "scheduled"); + assert_eq!(body["copied_blocks"], 2); + assert_eq!(body["rerun_blocks"].as_array().unwrap().len(), 0); + assert_eq!(body["dry_run"], true, "dry-run is the default"); + + let fork_id: Uuid = body["id"].as_str().unwrap().parse().unwrap(); + assert_ne!(fork_id, inst, "fork is a new instance"); + + // The fork carries copies of s1 and s2's outputs — s3 will (re-)run. + let fork_outputs = srv + .storage + .get_all_outputs(InstanceId::from_uuid(fork_id)) + .await + .unwrap(); + let mut blocks: Vec<&str> = fork_outputs.iter().map(|o| o.block_id.as_str()).collect(); + blocks.sort_unstable(); + assert_eq!(blocks, ["s1", "s2"]); + assert!( + fork_outputs + .iter() + .all(|o| o.instance_id.into_uuid() == fork_id), + "copied rows belong to the fork" + ); + + // Fork instance: scheduled now, dry-run stamped, provenance metadata, + // source metadata preserved. + let fork_inst = srv + .storage + .get_instance(InstanceId::from_uuid(fork_id)) + .await + .unwrap() + .unwrap(); + assert_eq!(fork_inst.state, InstanceState::Scheduled); + assert!(fork_inst.next_fire_at.is_some()); + assert!(fork_inst.context.runtime.dry_run); + assert_eq!(fork_inst.metadata["forked_from"], inst.to_string()); + assert_eq!(fork_inst.metadata["forked_at_block"], "s3"); + assert_eq!(fork_inst.metadata["owner"], "alice"); + assert!(fork_inst.idempotency_key.is_none()); + + // Source untouched: still completed, all three outputs intact. + let source = srv + .storage + .get_instance(InstanceId::from_uuid(inst)) + .await + .unwrap() + .unwrap(); + assert_eq!(source.state, InstanceState::Completed); + assert!(!source.context.runtime.dry_run); + let source_outputs = srv + .storage + .get_all_outputs(InstanceId::from_uuid(inst)) + .await + .unwrap(); + assert_eq!(source_outputs.len(), 3); +} + +#[tokio::test] +async fn fork_applies_context_patch_without_touching_source_context() { + let srv = spawn_test_server().await; + let client = reqwest::Client::new(); + let seq_id = create_sequence(&client, &srv.base_url).await; + let inst = create_instance(&client, &srv.base_url, seq_id).await; + + srv.storage + .save_block_output(&mk_output_at(inst, "s1", 0)) + .await + .unwrap(); + + let resp = client + .post(format!("{}/instances/{inst}/fork", srv.base_url)) + .header("X-Tenant-Id", "t1") + .json(&json!({ + "from_block_id": "s2", + "context": { "api_key": "sandbox", "retries": 3 }, + "dry_run": false + })) + .send() + .await + .unwrap(); + assert_eq!(resp.status(), StatusCode::CREATED); + let body: serde_json::Value = resp.json().await.unwrap(); + assert_eq!(body["dry_run"], false); + let fork_id: Uuid = body["id"].as_str().unwrap().parse().unwrap(); + + let fork_inst = srv + .storage + .get_instance(InstanceId::from_uuid(fork_id)) + .await + .unwrap() + .unwrap(); + // Patched keys merged in; pre-existing keys preserved; dry-run off. + assert_eq!(fork_inst.context.data["api_key"], "sandbox"); + assert_eq!(fork_inst.context.data["retries"], 3); + assert_eq!(fork_inst.context.data["seed"], 1); + assert!(!fork_inst.context.runtime.dry_run); + + // Source context unchanged. + let source = srv + .storage + .get_instance(InstanceId::from_uuid(inst)) + .await + .unwrap() + .unwrap(); + assert!(source.context.data.get("api_key").is_none()); +} + +#[tokio::test] +async fn fork_from_first_block_copies_nothing() { + let srv = spawn_test_server().await; + let client = reqwest::Client::new(); + let seq_id = create_sequence(&client, &srv.base_url).await; + let inst = create_instance(&client, &srv.base_url, seq_id).await; + + for (i, block) in ["s1", "s2", "s3"].iter().enumerate() { + srv.storage + .save_block_output(&mk_output_at(inst, block, i64::try_from(i).unwrap())) + .await + .unwrap(); + } + + let resp = client + .post(format!("{}/instances/{inst}/fork", srv.base_url)) + .header("X-Tenant-Id", "t1") + .json(&json!({ "from_block_id": "s1" })) + .send() + .await + .unwrap(); + assert_eq!(resp.status(), StatusCode::CREATED); + let body: serde_json::Value = resp.json().await.unwrap(); + assert_eq!(body["copied_blocks"], 0); + assert_eq!(body["rerun_blocks"].as_array().unwrap().len(), 0); + + let fork_id: Uuid = body["id"].as_str().unwrap().parse().unwrap(); + let fork_outputs = srv + .storage + .get_all_outputs(InstanceId::from_uuid(fork_id)) + .await + .unwrap(); + assert!(fork_outputs.is_empty(), "full re-run: nothing copied"); +} + +#[tokio::test] +async fn fork_artifact_backed_pre_fork_block_goes_to_rerun_set() { + let srv = spawn_test_server().await; + let client = reqwest::Client::new(); + let seq_id = create_sequence(&client, &srv.base_url).await; + let inst = create_instance(&client, &srv.base_url, seq_id).await; + + // s1's output was externalized (artifact-backed): its payload reference + // is keyed by the SOURCE instance and cannot be shared with the fork. + let mut externalized = mk_output_at(inst, "s1", 0); + externalized.output = json!({"_externalized": true, "_ref": format!("{inst}:s1")}); + externalized.output_ref = Some(format!("{inst}:s1")); + srv.storage.save_block_output(&externalized).await.unwrap(); + // s2 completed inline. + srv.storage + .save_block_output(&mk_output_at(inst, "s2", 1)) + .await + .unwrap(); + + let resp = client + .post(format!("{}/instances/{inst}/fork", srv.base_url)) + .header("X-Tenant-Id", "t1") + .json(&json!({ "from_block_id": "s3" })) + .send() + .await + .unwrap(); + assert_eq!(resp.status(), StatusCode::CREATED); + let body: serde_json::Value = resp.json().await.unwrap(); + // s1 re-runs (artifact-backed), s2 was copied. + assert_eq!(body["copied_blocks"], 1); + assert_eq!(body["rerun_blocks"], json!(["s1"])); + + let fork_id: Uuid = body["id"].as_str().unwrap().parse().unwrap(); + let fork_outputs = srv + .storage + .get_all_outputs(InstanceId::from_uuid(fork_id)) + .await + .unwrap(); + assert_eq!(fork_outputs.len(), 1); + assert_eq!(fork_outputs[0].block_id.as_str(), "s2"); + assert!( + fork_outputs[0].output_ref.is_none(), + "only inline rows are ever copied" + ); +} + +#[tokio::test] +async fn fork_is_allowed_from_a_running_source() { + let srv = spawn_test_server().await; + let client = reqwest::Client::new(); + let seq_id = create_sequence(&client, &srv.base_url).await; + let inst = create_instance(&client, &srv.base_url, seq_id).await; + + srv.storage + .save_block_output(&mk_output_at(inst, "s1", 0)) + .await + .unwrap(); + srv.storage + .update_instance_state(InstanceId::from_uuid(inst), InstanceState::Running, None) + .await + .unwrap(); + + // Fork is a read + clone: unlike resume-from, the source does not need + // to be quiescent. + let resp = client + .post(format!("{}/instances/{inst}/fork", srv.base_url)) + .header("X-Tenant-Id", "t1") + .json(&json!({ "from_block_id": "s2" })) + .send() + .await + .unwrap(); + assert_eq!(resp.status(), StatusCode::CREATED); + + let source = srv + .storage + .get_instance(InstanceId::from_uuid(inst)) + .await + .unwrap() + .unwrap(); + assert_eq!(source.state, InstanceState::Running, "source untouched"); +} + +#[tokio::test] +async fn fork_unknown_block_returns_400() { + let srv = spawn_test_server().await; + let client = reqwest::Client::new(); + let seq_id = create_sequence(&client, &srv.base_url).await; + let inst = create_instance(&client, &srv.base_url, seq_id).await; + + let resp = client + .post(format!("{}/instances/{inst}/fork", srv.base_url)) + .header("X-Tenant-Id", "t1") + .json(&json!({ "from_block_id": "no-such-block" })) + .send() + .await + .unwrap(); + assert_eq!(resp.status(), StatusCode::BAD_REQUEST); + let body: serde_json::Value = resp.json().await.unwrap(); + assert!(body["error"].as_str().unwrap().contains("no-such-block")); +} + +#[tokio::test] +async fn fork_rejects_non_object_context_patch() { + let srv = spawn_test_server().await; + let client = reqwest::Client::new(); + let seq_id = create_sequence(&client, &srv.base_url).await; + let inst = create_instance(&client, &srv.base_url, seq_id).await; + + let resp = client + .post(format!("{}/instances/{inst}/fork", srv.base_url)) + .header("X-Tenant-Id", "t1") + .json(&json!({ "from_block_id": "s2", "context": [1, 2, 3] })) + .send() + .await + .unwrap(); + assert_eq!(resp.status(), StatusCode::BAD_REQUEST); +} + +#[tokio::test] +async fn fork_cross_tenant_returns_404() { + let srv = spawn_test_server().await; + let client = reqwest::Client::new(); + let seq_id = create_sequence(&client, &srv.base_url).await; + let inst = create_instance(&client, &srv.base_url, seq_id).await; + + let resp = client + .post(format!("{}/instances/{inst}/fork", srv.base_url)) + .header("X-Tenant-Id", "t2") + .json(&json!({ "from_block_id": "s2" })) + .send() + .await + .unwrap(); + assert_eq!(resp.status(), StatusCode::NOT_FOUND); +} + +#[tokio::test] +async fn fork_unknown_instance_returns_404_via_v1_prefix() { + let srv = spawn_test_server().await; + let client = reqwest::Client::new(); + + let resp = client + .post(format!( + "{}/instances/{}/fork", + srv.v1_url(), + Uuid::now_v7() + )) + .header("X-Tenant-Id", "t1") + .json(&json!({ "from_block_id": "s1" })) + .send() + .await + .unwrap(); + assert_eq!(resp.status(), StatusCode::NOT_FOUND); +} diff --git a/orch8-engine/tests/fork_e2e.rs b/orch8-engine/tests/fork_e2e.rs new file mode 100644 index 0000000..2bfb26a --- /dev/null +++ b/orch8-engine/tests/fork_e2e.rs @@ -0,0 +1,183 @@ +//! Engine-level e2e for fork-from (time travel). +//! +//! Runs a flat 3-step sequence to completion through the real scheduler +//! (`tick_once`), then performs the same storage clone the +//! `POST /instances/{id}/fork` handler performs (new instance with patched +//! context + `copy_block_outputs` for the pre-fork blocks) and ticks the +//! fork to completion. Verifies the fork resumes at the fork point: the +//! pre-fork block is NOT re-executed (its copied output short-circuits the +//! completed-blocks check) while the fork-point block and the tail run again +//! with the patched context. + +use std::sync::atomic::{AtomicU32, Ordering}; +use std::sync::Arc; + +use serde_json::json; +use tokio_util::sync::CancellationToken; + +use orch8_engine::handlers::HandlerRegistry; +use orch8_engine::scheduler::tick_once; +use orch8_storage::StorageBackend; +use orch8_types::ids::{BlockId, InstanceId}; +use orch8_types::instance::InstanceState; + +mod common; +use common::*; + +/// Tick the scheduler until the instance reaches `target` (or panic). +async fn tick_until( + storage: &Arc, + handlers: &Arc, + instance_id: InstanceId, + target: InstanceState, +) { + let sem = semaphore(128); + let config = default_config(); + let seq_cache = cache(); + let cancel = CancellationToken::new(); + for _ in 0..50 { + tick_once(storage, handlers, &sem, &config, &seq_cache, &cancel) + .await + .unwrap(); + let inst = storage.get_instance(instance_id).await.unwrap().unwrap(); + if inst.state == target { + return; + } + } + let inst = storage.get_instance(instance_id).await.unwrap().unwrap(); + panic!("instance never reached {target}, stuck in {}", inst.state); +} + +#[tokio::test] +#[allow(clippy::too_many_lines)] // one linear scenario: run, fork, re-run, assert +async fn fork_from_middle_block_reruns_only_the_tail_with_patched_context() { + // Per-step execution counters so we can prove what (re-)ran on the fork. + let s1_runs = Arc::new(AtomicU32::new(0)); + let s2_runs = Arc::new(AtomicU32::new(0)); + let s3_runs = Arc::new(AtomicU32::new(0)); + + let mut reg = registry(); + { + let s1_runs = Arc::clone(&s1_runs); + reg.register("count_s1", move |_ctx| { + let s1_runs = Arc::clone(&s1_runs); + Box::pin(async move { + s1_runs.fetch_add(1, Ordering::SeqCst); + Ok(json!({"step": "s1"})) + }) + }); + } + { + // s2 echoes the context flavour so we can assert the fork ran with + // the patched context, not the source's. + let s2_runs = Arc::clone(&s2_runs); + reg.register("count_s2", move |ctx| { + let s2_runs = Arc::clone(&s2_runs); + Box::pin(async move { + s2_runs.fetch_add(1, Ordering::SeqCst); + let flavour = ctx + .context + .data + .get("flavour") + .and_then(serde_json::Value::as_str) + .unwrap_or("none") + .to_owned(); + Ok(json!({"step": "s2", "flavour": flavour})) + }) + }); + } + { + let s3_runs = Arc::clone(&s3_runs); + reg.register("count_s3", move |_ctx| { + let s3_runs = Arc::clone(&s3_runs); + Box::pin(async move { + s3_runs.fetch_add(1, Ordering::SeqCst); + Ok(json!({"step": "s3"})) + }) + }); + } + let handlers = Arc::new(reg); + + let storage: Arc = Arc::new( + orch8_storage::sqlite::SqliteStorage::in_memory() + .await + .unwrap(), + ); + let seq = mk_sequence(vec![ + mk_step("s1", "count_s1"), + mk_step("s2", "count_s2"), + mk_step("s3", "count_s3"), + ]); + storage.create_sequence(&seq).await.unwrap(); + let source = mk_instance_scheduled(seq.id, json!({"flavour": "production"})); + storage.create_instance(&source).await.unwrap(); + + // Source runs to completion: every step executed exactly once. + tick_until(&storage, &handlers, source.id, InstanceState::Completed).await; + assert_eq!(s1_runs.load(Ordering::SeqCst), 1); + assert_eq!(s2_runs.load(Ordering::SeqCst), 1); + assert_eq!(s3_runs.load(Ordering::SeqCst), 1); + let source_s2 = storage + .get_block_output(source.id, &BlockId::new("s2")) + .await + .unwrap() + .unwrap(); + assert_eq!(source_s2.output["flavour"], "production"); + + // --- Fork surgery (mirrors the API handler) --- + // New instance of the same sequence with a patched context, seeded with + // a copy of s1's output (the only block before the fork point s2). + let mut fork = mk_instance_scheduled(seq.id, json!({"flavour": "sandbox"})); + fork.metadata = json!({ + "forked_from": source.id.into_uuid(), + "forked_at_block": "s2", + }); + storage.create_instance(&fork).await.unwrap(); + let copied = storage + .copy_block_outputs(source.id, fork.id, &[BlockId::new("s1")]) + .await + .unwrap(); + assert_eq!(copied, 1, "s1's output copied onto the fork"); + + // Tick the fork to completion. + tick_until(&storage, &handlers, fork.id, InstanceState::Completed).await; + + // Pre-fork block did NOT re-execute: the copied output short-circuits + // the completed-blocks check. + assert_eq!( + s1_runs.load(Ordering::SeqCst), + 1, + "s1 must not re-execute on the fork" + ); + // Fork-point block and the tail DID execute (once on the source, once + // on the fork). + assert_eq!(s2_runs.load(Ordering::SeqCst), 2, "s2 re-ran on the fork"); + assert_eq!(s3_runs.load(Ordering::SeqCst), 2, "s3 re-ran on the fork"); + + // The fork's s2 ran with the patched context. + let fork_s2 = storage + .get_block_output(fork.id, &BlockId::new("s2")) + .await + .unwrap() + .unwrap(); + assert_eq!(fork_s2.output["flavour"], "sandbox"); + + // Source is untouched: still completed, its outputs unchanged. + let source_after = storage.get_instance(source.id).await.unwrap().unwrap(); + assert_eq!(source_after.state, InstanceState::Completed); + let source_s2_after = storage + .get_block_output(source.id, &BlockId::new("s2")) + .await + .unwrap() + .unwrap(); + assert_eq!(source_s2_after.output["flavour"], "production"); + + // And the fork's copied s1 row really belongs to the fork. + let fork_s1 = storage + .get_block_output(fork.id, &BlockId::new("s1")) + .await + .unwrap() + .unwrap(); + assert_eq!(fork_s1.instance_id, fork.id); + assert_eq!(fork_s1.output["step"], "s1"); +} diff --git a/orch8-storage/src/encrypting.rs b/orch8-storage/src/encrypting.rs index bd69fd8..aa3b15f 100644 --- a/orch8-storage/src/encrypting.rs +++ b/orch8-storage/src/encrypting.rs @@ -817,6 +817,27 @@ impl crate::OutputStore for EncryptingStorage { async fn delete_block_output_by_id(&self, id: Uuid) -> Result<(), StorageError> { self.inner.delete_block_output_by_id(id).await } + async fn get_outputs_page( + &self, + instance_id: InstanceId, + limit: u32, + offset: u64, + ) -> Result, StorageError> { + self.inner + .get_outputs_page(instance_id, limit, offset) + .await + } + // Pass-through like the other output methods: block outputs are not + // field-encrypted, so a row copied verbatim stays consistent with what + // `save_block_output` would have written. + async fn copy_block_outputs( + &self, + src: InstanceId, + dst: InstanceId, + block_ids: &[orch8_types::ids::BlockId], + ) -> Result { + self.inner.copy_block_outputs(src, dst, block_ids).await + } } // ============================================================================ diff --git a/orch8-storage/src/lib.rs b/orch8-storage/src/lib.rs index 33218ed..cad5375 100644 --- a/orch8-storage/src/lib.rs +++ b/orch8-storage/src/lib.rs @@ -843,6 +843,42 @@ pub trait OutputStore: Send + Sync + 'static { /// the real output is persisted the sentinel must be removed so output /// counts stay correct. async fn delete_block_output_by_id(&self, id: Uuid) -> Result<(), StorageError>; + + /// Return one page of `block_outputs` rows for an instance in execution + /// order (`created_at ASC`, `id ASC` tiebreak). + /// + /// Backs `GET /instances/{id}/timeline`: unlike [`Self::get_all_outputs`] + /// the result is bounded, so a long-running instance with thousands of + /// loop-iteration rows cannot blow up a single response. + async fn get_outputs_page( + &self, + instance_id: InstanceId, + limit: u32, + offset: u64, + ) -> Result, StorageError>; + + /// Copy `block_outputs` rows for the given block IDs from `src` to `dst`, + /// inserting new rows (fresh primary keys, `instance_id = dst`) that + /// preserve `block_id`, `output`, `output_size`, `attempt` and + /// `created_at`. Returns the number of rows copied. + /// + /// Only **inline** rows (`output_ref IS NULL`) are copied. Rows with a + /// non-null `output_ref` — externalized payload references (which are + /// keyed by the *source* instance ID and ownership-checked on read) and + /// internal sentinels (`__in_progress__` / `__retry__` / `__error__`) — + /// are deliberately skipped: a copied reference would dangle or leak + /// across instances. Callers (fork-from) must put blocks whose outputs + /// were not copied into the re-run set instead. + /// + /// Every backend MUST implement this — no default impl so a missing + /// implementation fails at compile time rather than silently no-oping + /// (same convention as [`Self::delete_block_outputs`]). + async fn copy_block_outputs( + &self, + src: InstanceId, + dst: InstanceId, + block_ids: &[BlockId], + ) -> Result; } // ============================================================================ diff --git a/orch8-storage/src/postgres/mod.rs b/orch8-storage/src/postgres/mod.rs index 139348e..6f22c7b 100644 --- a/orch8-storage/src/postgres/mod.rs +++ b/orch8-storage/src/postgres/mod.rs @@ -687,6 +687,24 @@ impl crate::OutputStore for PostgresStorage { async fn delete_block_output_by_id(&self, id: Uuid) -> Result<(), StorageError> { outputs::delete_by_id(self, id).await } + + async fn get_outputs_page( + &self, + instance_id: InstanceId, + limit: u32, + offset: u64, + ) -> Result, StorageError> { + outputs::get_page(self, instance_id, limit, offset).await + } + + async fn copy_block_outputs( + &self, + src: InstanceId, + dst: InstanceId, + block_ids: &[BlockId], + ) -> Result { + outputs::copy_for_blocks(self, src, dst, block_ids).await + } } // ============================================================================ diff --git a/orch8-storage/src/postgres/outputs.rs b/orch8-storage/src/postgres/outputs.rs index 115982b..917c687 100644 --- a/orch8-storage/src/postgres/outputs.rs +++ b/orch8-storage/src/postgres/outputs.rs @@ -144,6 +144,28 @@ pub(super) async fn get_after_created_at( Ok(rows.into_iter().map(BlockOutputRow::into_output).collect()) } +/// One page of an instance's outputs in execution order. Backs the timeline +/// endpoint — see `OutputStore::get_outputs_page`. +pub(super) async fn get_page( + store: &PostgresStorage, + instance_id: InstanceId, + limit: u32, + offset: u64, +) -> Result, StorageError> { + let rows = sqlx::query_as::<_, BlockOutputRow>( + r"SELECT id, instance_id, block_id, output, output_ref, output_size, attempt, created_at + FROM block_outputs WHERE instance_id = $1 + ORDER BY created_at, id + LIMIT $2 OFFSET $3", + ) + .bind(instance_id.into_uuid()) + .bind(i64::from(limit)) + .bind(i64::try_from(offset).unwrap_or(i64::MAX)) + .fetch_all(&store.pool) + .await?; + Ok(rows.into_iter().map(BlockOutputRow::into_output).collect()) +} + /// Distinct `block_id`s that have produced at least one output for this /// instance. `DISTINCT` is required because under the write-append model a /// single block can have multiple rows (loop iterations, retries). @@ -231,6 +253,36 @@ pub(super) async fn delete_for_blocks( Ok(result.rows_affected()) } +/// Copy inline (`output_ref IS NULL`) output rows for `block_ids` from `src` +/// to `dst` with fresh primary keys (single INSERT ... SELECT round-trip). +/// See `OutputStore::copy_block_outputs` for why externalized / sentinel +/// rows are skipped. +pub(super) async fn copy_for_blocks( + store: &PostgresStorage, + src: InstanceId, + dst: InstanceId, + block_ids: &[BlockId], +) -> Result { + if block_ids.is_empty() { + return Ok(0); + } + let ids: Vec<&str> = block_ids.iter().map(orch8_types::BlockId::as_str).collect(); + let result = sqlx::query( + r" + INSERT INTO block_outputs (id, instance_id, block_id, output, output_ref, output_size, attempt, created_at) + SELECT gen_random_uuid(), $2, block_id, output, output_ref, output_size, attempt, created_at + FROM block_outputs + WHERE instance_id = $1 AND block_id = ANY($3) AND output_ref IS NULL + ", + ) + .bind(src.into_uuid()) + .bind(dst.into_uuid()) + .bind(&ids) + .execute(&store.pool) + .await?; + Ok(result.rows_affected()) +} + /// Delete ALL `block_outputs` rows for an instance (DLQ retry clean slate). pub(super) async fn delete_all_for_instance( store: &PostgresStorage, diff --git a/orch8-storage/src/sqlite/mod.rs b/orch8-storage/src/sqlite/mod.rs index 0e7be70..c4b69ba 100644 --- a/orch8-storage/src/sqlite/mod.rs +++ b/orch8-storage/src/sqlite/mod.rs @@ -783,6 +783,24 @@ impl crate::OutputStore for SqliteStorage { async fn delete_block_output_by_id(&self, id: Uuid) -> Result<(), StorageError> { outputs::delete_by_id(self, id).await } + + async fn get_outputs_page( + &self, + instance_id: InstanceId, + limit: u32, + offset: u64, + ) -> Result, StorageError> { + outputs::get_page(self, instance_id, limit, offset).await + } + + async fn copy_block_outputs( + &self, + src: InstanceId, + dst: InstanceId, + block_ids: &[BlockId], + ) -> Result { + outputs::copy_for_blocks(self, src, dst, block_ids).await + } } // ============================================================================ @@ -4171,4 +4189,188 @@ mod tests { storage.acquire_manifest_lock("tenant-1").await.unwrap(); storage.release_manifest_lock("tenant-1").await.unwrap(); } + + /// Seed a sequence plus `n` instances of it (block_outputs has an FK to + /// task_instances, so copy targets must exist as rows). + async fn seed_instances(storage: &SqliteStorage, n: usize) -> Vec { + let now = Utc::now(); + let seq = orch8_types::sequence::SequenceDefinition { + id: SequenceId::new(), + tenant_id: TenantId::unchecked("t"), + namespace: Namespace::new("ns"), + name: "copy-test".into(), + version: 1, + deprecated: false, + status: SequenceStatus::default(), + blocks: vec![], + interceptors: None, + created_at: now, + }; + storage.create_sequence(&seq).await.unwrap(); + + let mut ids = Vec::with_capacity(n); + for _ in 0..n { + let inst = TaskInstance { + id: InstanceId::new(), + sequence_id: seq.id, + tenant_id: TenantId::unchecked("t"), + namespace: Namespace::new("ns"), + state: InstanceState::Running, + next_fire_at: None, + priority: Priority::Normal, + timezone: "UTC".into(), + metadata: serde_json::json!({}), + context: ExecutionContext::default(), + concurrency_key: None, + max_concurrency: None, + idempotency_key: None, + session_id: None, + parent_instance_id: None, + budget: None, + created_at: now, + updated_at: now, + }; + storage.create_instance(&inst).await.unwrap(); + ids.push(inst.id); + } + ids + } + + fn mk_block_output( + instance_id: InstanceId, + block: &str, + output_ref: Option<&str>, + offset_secs: i64, + ) -> orch8_types::output::BlockOutput { + orch8_types::output::BlockOutput { + id: Uuid::now_v7(), + instance_id, + block_id: BlockId::new(block), + output: serde_json::json!({"v": block}), + output_ref: output_ref.map(str::to_owned), + output_size: 7, + attempt: 2, + created_at: Utc::now() - chrono::Duration::seconds(100 - offset_secs), + } + } + + #[tokio::test] + async fn copy_block_outputs_copies_inline_rows_with_fresh_identity() { + let storage = SqliteStorage::in_memory().await.unwrap(); + let ids = seed_instances(&storage, 2).await; + let (src, dst) = (ids[0], ids[1]); + + let s1 = mk_block_output(src, "s1", None, 0); + let s2 = mk_block_output(src, "s2", None, 1); + let s3 = mk_block_output(src, "s3", None, 2); + for o in [&s1, &s2, &s3] { + storage.save_block_output(o).await.unwrap(); + } + + let copied = storage + .copy_block_outputs(src, dst, &[BlockId::new("s1"), BlockId::new("s2")]) + .await + .unwrap(); + assert_eq!(copied, 2); + + let dst_outputs = storage.get_all_outputs(dst).await.unwrap(); + assert_eq!(dst_outputs.len(), 2, "s3 was not in the copy set"); + let copy_s1 = dst_outputs + .iter() + .find(|o| o.block_id.as_str() == "s1") + .unwrap(); + // Fresh primary key + dst instance_id, everything else preserved. + assert_ne!(copy_s1.id, s1.id); + assert_eq!(copy_s1.instance_id, dst); + assert_eq!(copy_s1.output, s1.output); + assert_eq!(copy_s1.attempt, s1.attempt); + assert_eq!(copy_s1.output_size, s1.output_size); + assert_eq!( + copy_s1.created_at.timestamp(), + s1.created_at.timestamp(), + "created_at preserved so execution order survives the copy" + ); + + // Source rows untouched. + assert_eq!(storage.get_all_outputs(src).await.unwrap().len(), 3); + } + + #[tokio::test] + async fn copy_block_outputs_skips_externalized_and_sentinel_rows() { + let storage = SqliteStorage::in_memory().await.unwrap(); + let ids = seed_instances(&storage, 2).await; + let (src, dst) = (ids[0], ids[1]); + + // s1: retry marker then a real inline output; s2: externalized only. + storage + .save_block_output(&mk_block_output(src, "s1", Some("__retry__"), 0)) + .await + .unwrap(); + storage + .save_block_output(&mk_block_output(src, "s1", None, 1)) + .await + .unwrap(); + storage + .save_block_output(&mk_block_output(src, "s2", Some("ext:ref"), 2)) + .await + .unwrap(); + + let copied = storage + .copy_block_outputs(src, dst, &[BlockId::new("s1"), BlockId::new("s2")]) + .await + .unwrap(); + assert_eq!(copied, 1, "only s1's inline row is copied"); + + let dst_outputs = storage.get_all_outputs(dst).await.unwrap(); + assert_eq!(dst_outputs.len(), 1); + assert_eq!(dst_outputs[0].block_id.as_str(), "s1"); + assert!(dst_outputs[0].output_ref.is_none()); + } + + #[tokio::test] + async fn copy_block_outputs_empty_block_set_is_noop() { + let storage = SqliteStorage::in_memory().await.unwrap(); + let ids = seed_instances(&storage, 2).await; + let (src, dst) = (ids[0], ids[1]); + + storage + .save_block_output(&mk_block_output(src, "s1", None, 0)) + .await + .unwrap(); + + let copied = storage.copy_block_outputs(src, dst, &[]).await.unwrap(); + assert_eq!(copied, 0); + assert!(storage.get_all_outputs(dst).await.unwrap().is_empty()); + } + + #[tokio::test] + async fn get_outputs_page_orders_and_paginates() { + let storage = SqliteStorage::in_memory().await.unwrap(); + let ids = seed_instances(&storage, 1).await; + let src = ids[0]; + + for (i, block) in ["s1", "s2", "s3"].iter().enumerate() { + storage + .save_block_output(&mk_block_output( + src, + block, + None, + i64::try_from(i).unwrap(), + )) + .await + .unwrap(); + } + + let page1 = storage.get_outputs_page(src, 2, 0).await.unwrap(); + assert_eq!(page1.len(), 2); + assert_eq!(page1[0].block_id.as_str(), "s1"); + assert_eq!(page1[1].block_id.as_str(), "s2"); + + let page2 = storage.get_outputs_page(src, 2, 2).await.unwrap(); + assert_eq!(page2.len(), 1); + assert_eq!(page2[0].block_id.as_str(), "s3"); + + let beyond = storage.get_outputs_page(src, 2, 5).await.unwrap(); + assert!(beyond.is_empty()); + } } diff --git a/orch8-storage/src/sqlite/outputs.rs b/orch8-storage/src/sqlite/outputs.rs index db936ca..ce542d1 100644 --- a/orch8-storage/src/sqlite/outputs.rs +++ b/orch8-storage/src/sqlite/outputs.rs @@ -115,6 +115,25 @@ pub(super) async fn get_after_created_at( rows.iter().map(row_to_output).collect() } +/// One page of an instance's outputs in execution order. Backs the timeline +/// endpoint — see `OutputStore::get_outputs_page`. +pub(super) async fn get_page( + storage: &SqliteStorage, + instance_id: InstanceId, + limit: u32, + offset: u64, +) -> Result, StorageError> { + let rows = sqlx::query( + "SELECT * FROM block_outputs WHERE instance_id=?1 ORDER BY created_at, id LIMIT ?2 OFFSET ?3", + ) + .bind(instance_id.into_uuid().to_string()) + .bind(i64::from(limit)) + .bind(i64::try_from(offset).unwrap_or(i64::MAX)) + .fetch_all(&storage.pool) + .await?; + rows.iter().map(row_to_output).collect() +} + pub(super) async fn get_completed_ids( storage: &SqliteStorage, instance_id: InstanceId, @@ -208,6 +227,50 @@ pub(super) async fn delete_for_blocks( Ok(result.rows_affected()) } +/// Copy inline (`output_ref IS NULL`) output rows for `block_ids` from `src` +/// to `dst` with fresh primary keys. Mirror of the Postgres impl — see +/// `OutputStore::copy_block_outputs` for why externalized / sentinel rows +/// are skipped. +pub(super) async fn copy_for_blocks( + storage: &SqliteStorage, + src: InstanceId, + dst: InstanceId, + block_ids: &[BlockId], +) -> Result { + if block_ids.is_empty() { + return Ok(0); + } + let mut qb = sqlx::QueryBuilder::new("SELECT * FROM block_outputs WHERE instance_id="); + qb.push_bind(src.into_uuid().to_string()); + qb.push(" AND output_ref IS NULL AND block_id IN ("); + let mut sep = qb.separated(", "); + for bid in block_ids { + sep.push_bind(bid.as_str()); + } + sep.push_unseparated(") ORDER BY created_at, id"); + let rows = qb.build().fetch_all(&storage.pool).await?; + let outputs: Vec = rows.iter().map(row_to_output).collect::>()?; + + let mut tx = storage.pool.begin().await?; + for out in &outputs { + sqlx::query( + "INSERT INTO block_outputs (id,instance_id,block_id,output,output_ref,output_size,attempt,created_at) VALUES (?1,?2,?3,?4,?5,?6,?7,?8)", + ) + .bind(uuid::Uuid::now_v7().to_string()) + .bind(dst.into_uuid().to_string()) + .bind(out.block_id.as_str()) + .bind(serde_json::to_string(&out.output)?) + .bind(&out.output_ref) + .bind(out.output_size as i64) + .bind(out.attempt as i64) + .bind(ts(out.created_at)) + .execute(&mut *tx) + .await?; + } + tx.commit().await?; + Ok(outputs.len() as u64) +} + pub(super) async fn delete_all_for_instance( storage: &SqliteStorage, instance_id: InstanceId,