diff --git a/orch8-api/src/instances.rs b/orch8-api/src/instances.rs index 5deb4e0..1b492fd 100644 --- a/orch8-api/src/instances.rs +++ b/orch8-api/src/instances.rs @@ -29,12 +29,12 @@ pub(crate) use bulk::{ __path_bulk_reschedule, __path_bulk_update_state, __path_list_dlq, bulk_reschedule, bulk_update_state, list_dlq, }; +pub use checkpoints::{PruneCheckpointsRequest, SaveCheckpointRequest}; pub(crate) use checkpoints::{ __path_get_latest_checkpoint, __path_list_checkpoints, __path_prune_checkpoints, __path_save_checkpoint, get_latest_checkpoint, list_checkpoints, prune_checkpoints, save_checkpoint, }; -pub use checkpoints::{PruneCheckpointsRequest, SaveCheckpointRequest}; pub use inject::InjectBlocksRequest; pub(crate) use inject::{__path_inject_blocks, inject_blocks}; pub(crate) use lifecycle::{ diff --git a/orch8-api/src/instances.rs.orig b/orch8-api/src/instances.rs.orig new file mode 100644 index 0000000..1b492fd --- /dev/null +++ b/orch8-api/src/instances.rs.orig @@ -0,0 +1,82 @@ +//! Instance HTTP routes: lifecycle, signals, outputs, bulk ops, checkpoints, +//! audit, and dynamic step injection. + +use axum::routing::{get, patch, post}; +use axum::Router; + +use crate::AppState; + +mod artifacts; +mod audit; +mod bulk; +mod checkpoints; +mod inject; +mod lifecycle; +mod outputs; +mod signals; +mod types; + +// Re-exports for `crate::instances::*` compatibility (openapi.rs references +// handlers and public request types by this path). utoipa's derive macro +// also looks for `__path_` structs next to each handler, so those must +// be re-exported alongside the function. +pub(crate) use artifacts::{ + __path_get_artifact_bytes, __path_list_instance_artifacts, get_artifact_bytes, + list_instance_artifacts, +}; +pub(crate) use audit::{__path_list_audit_log, list_audit_log}; +pub(crate) use bulk::{ + __path_bulk_reschedule, __path_bulk_update_state, __path_list_dlq, bulk_reschedule, + bulk_update_state, list_dlq, +}; +pub use checkpoints::{PruneCheckpointsRequest, SaveCheckpointRequest}; +pub(crate) use checkpoints::{ + __path_get_latest_checkpoint, __path_list_checkpoints, __path_prune_checkpoints, + __path_save_checkpoint, get_latest_checkpoint, list_checkpoints, prune_checkpoints, + save_checkpoint, +}; +pub use inject::InjectBlocksRequest; +pub(crate) use inject::{__path_inject_blocks, inject_blocks}; +pub(crate) use lifecycle::{ + __path_create_instance, __path_create_instances_batch, __path_get_instance, + __path_list_instances, __path_retry_instance, __path_update_context, __path_update_state, + create_instance, create_instances_batch, get_instance, list_instances, retry_instance, + update_context, update_state, +}; +pub(crate) use outputs::{ + __path_get_execution_tree, __path_get_outputs, get_execution_tree, get_outputs, +}; +pub(crate) use signals::{__path_send_signal, send_signal}; + +pub fn routes() -> Router { + Router::new() + .route("/instances", post(create_instance).get(list_instances)) + .route("/instances/batch", post(create_instances_batch)) + .route("/instances/{id}", get(get_instance)) + .route("/instances/{id}/state", patch(update_state)) + .route("/instances/{id}/context", patch(update_context)) + .route("/instances/{id}/signals", post(send_signal)) + .route("/instances/{id}/outputs", get(get_outputs)) + .route("/instances/{id}/artifacts", get(list_instance_artifacts)) + .route("/artifacts/{*key}", get(get_artifact_bytes)) + .route("/instances/{id}/tree", get(get_execution_tree)) + .route("/instances/{id}/retry", post(retry_instance)) + .route( + "/instances/{id}/checkpoints", + get(list_checkpoints).post(save_checkpoint), + ) + .route( + "/instances/{id}/checkpoints/latest", + get(get_latest_checkpoint), + ) + .route("/instances/{id}/checkpoints/prune", post(prune_checkpoints)) + .route("/instances/{id}/audit", get(list_audit_log)) + .route("/instances/{id}/inject-blocks", post(inject_blocks)) + .route( + "/instances/{id}/stream", + get(crate::streaming::stream_instance), + ) + .route("/instances/bulk/state", patch(bulk_update_state)) + .route("/instances/bulk/reschedule", patch(bulk_reschedule)) + .route("/instances/dlq", get(list_dlq)) +} diff --git a/orch8-engine/src/scheduler.rs b/orch8-engine/src/scheduler.rs index e20976e..850a1f9 100644 --- a/orch8-engine/src/scheduler.rs +++ b/orch8-engine/src/scheduler.rs @@ -686,7 +686,8 @@ async fn process_waiting_deadlines( for block in &seq.blocks { if let orch8_types::sequence::BlockDefinition::Step(step_def) = block { if step_def.deadline.is_some() { - deadline_keys.push((instance.id, step_def.id.clone())); + // Pushing zero-allocation references avoids string clones in the fast path. + deadline_keys.push((instance.id, &step_def.id)); } } } @@ -1025,11 +1026,11 @@ async fn process_instance( // Fast path SLA deadline check for all steps BEFORE concurrency checks. // Batch-fetch any previous block outputs so the loop is N queries -> 1 query. - let deadline_keys: Vec<(InstanceId, BlockId)> = blocks + let deadline_keys: Vec<(InstanceId, &BlockId)> = blocks .iter() .filter_map(|b| match b { orch8_types::sequence::BlockDefinition::Step(s) if s.deadline.is_some() => { - Some((instance.id, s.id.clone())) + Some((instance.id, &s.id)) } _ => None, }) diff --git a/orch8-engine/tests/feature_gaps.rs b/orch8-engine/tests/feature_gaps.rs index 8316bd5..85f7304 100644 --- a/orch8-engine/tests/feature_gaps.rs +++ b/orch8-engine/tests/feature_gaps.rs @@ -498,7 +498,8 @@ async fn sqlite_get_batch_chunking_does_not_drop_keys() { keys.push((instance, block_id)); } - let batch = storage.get_block_outputs_batch(&keys).await.unwrap(); + let ref_keys: Vec<(InstanceId, &BlockId)> = keys.iter().map(|(i, b)| (*i, b)).collect(); + let batch = storage.get_block_outputs_batch(&ref_keys).await.unwrap(); assert_eq!(batch.len(), 450, "batch must return all 450 outputs"); // Spot-check a few keys. diff --git a/orch8-storage/src/encrypting.rs b/orch8-storage/src/encrypting.rs index 5c0ae20..e396fbc 100644 --- a/orch8-storage/src/encrypting.rs +++ b/orch8-storage/src/encrypting.rs @@ -673,7 +673,7 @@ impl crate::OutputStore for EncryptingStorage { } async fn get_block_outputs_batch( &self, - keys: &[(InstanceId, orch8_types::ids::BlockId)], + keys: &[(InstanceId, &orch8_types::ids::BlockId)], ) -> Result< std::collections::HashMap< (InstanceId, orch8_types::ids::BlockId), diff --git a/orch8-storage/src/lib.rs b/orch8-storage/src/lib.rs index 856d87a..2a52b56 100644 --- a/orch8-storage/src/lib.rs +++ b/orch8-storage/src/lib.rs @@ -664,9 +664,10 @@ pub trait OutputStore: Send + Sync + 'static { /// /// Returns a map from `(InstanceId, BlockId)` to the latest output /// (ordered by `created_at DESC`). Missing pairs are omitted. + /// Note: `&BlockId` is used over `BlockId` to prevent string allocations in hot paths like the scheduler. async fn get_block_outputs_batch( &self, - keys: &[(InstanceId, BlockId)], + keys: &[(InstanceId, &BlockId)], ) -> Result, StorageError>; async fn get_all_outputs( diff --git a/orch8-storage/src/postgres/mod.rs b/orch8-storage/src/postgres/mod.rs index c2a0334..941dbf9 100644 --- a/orch8-storage/src/postgres/mod.rs +++ b/orch8-storage/src/postgres/mod.rs @@ -545,7 +545,7 @@ impl crate::OutputStore for PostgresStorage { async fn get_block_outputs_batch( &self, - keys: &[(InstanceId, BlockId)], + keys: &[(InstanceId, &BlockId)], ) -> Result, StorageError> { outputs::get_batch(self, keys).await } diff --git a/orch8-storage/src/postgres/outputs.rs b/orch8-storage/src/postgres/outputs.rs index 115982b..ce0f33b 100644 --- a/orch8-storage/src/postgres/outputs.rs +++ b/orch8-storage/src/postgres/outputs.rs @@ -68,7 +68,7 @@ pub(super) async fn get( pub(super) async fn get_batch( store: &PostgresStorage, - keys: &[(InstanceId, BlockId)], + keys: &[(InstanceId, &BlockId)], ) -> Result, StorageError> { if keys.is_empty() { return Ok(std::collections::HashMap::new()); diff --git a/orch8-storage/src/sqlite/mod.rs b/orch8-storage/src/sqlite/mod.rs index d09fc9c..dbcff4a 100644 --- a/orch8-storage/src/sqlite/mod.rs +++ b/orch8-storage/src/sqlite/mod.rs @@ -641,7 +641,7 @@ impl crate::OutputStore for SqliteStorage { async fn get_block_outputs_batch( &self, - keys: &[(InstanceId, BlockId)], + keys: &[(InstanceId, &BlockId)], ) -> Result, StorageError> { outputs::get_batch(self, keys).await } diff --git a/orch8-storage/src/sqlite/outputs.rs b/orch8-storage/src/sqlite/outputs.rs index db936ca..3f5dd98 100644 --- a/orch8-storage/src/sqlite/outputs.rs +++ b/orch8-storage/src/sqlite/outputs.rs @@ -45,7 +45,7 @@ const GET_BATCH_CHUNK_SIZE: usize = 400; pub(super) async fn get_batch( storage: &SqliteStorage, - keys: &[(InstanceId, BlockId)], + keys: &[(InstanceId, &BlockId)], ) -> Result, StorageError> { if keys.is_empty() { return Ok(std::collections::HashMap::new());