Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion orch8-api/src/instances.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,12 +29,12 @@ pub(crate) use bulk::{
__path_bulk_reschedule, __path_bulk_update_state, __path_list_dlq, bulk_reschedule,
bulk_update_state, list_dlq,
};
pub use checkpoints::{PruneCheckpointsRequest, SaveCheckpointRequest};
pub(crate) use checkpoints::{
__path_get_latest_checkpoint, __path_list_checkpoints, __path_prune_checkpoints,
__path_save_checkpoint, get_latest_checkpoint, list_checkpoints, prune_checkpoints,
save_checkpoint,
};
pub use checkpoints::{PruneCheckpointsRequest, SaveCheckpointRequest};
pub use inject::InjectBlocksRequest;
pub(crate) use inject::{__path_inject_blocks, inject_blocks};
pub(crate) use lifecycle::{
Expand Down
82 changes: 82 additions & 0 deletions orch8-api/src/instances.rs.orig
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
//! Instance HTTP routes: lifecycle, signals, outputs, bulk ops, checkpoints,
//! audit, and dynamic step injection.

use axum::routing::{get, patch, post};
use axum::Router;

use crate::AppState;

mod artifacts;
mod audit;
mod bulk;
mod checkpoints;
mod inject;
mod lifecycle;
mod outputs;
mod signals;
mod types;

// Re-exports for `crate::instances::*` compatibility (openapi.rs references
// handlers and public request types by this path). utoipa's derive macro
// also looks for `__path_<fn>` structs next to each handler, so those must
// be re-exported alongside the function.
pub(crate) use artifacts::{
__path_get_artifact_bytes, __path_list_instance_artifacts, get_artifact_bytes,
list_instance_artifacts,
};
pub(crate) use audit::{__path_list_audit_log, list_audit_log};
pub(crate) use bulk::{
__path_bulk_reschedule, __path_bulk_update_state, __path_list_dlq, bulk_reschedule,
bulk_update_state, list_dlq,
};
pub use checkpoints::{PruneCheckpointsRequest, SaveCheckpointRequest};
pub(crate) use checkpoints::{
__path_get_latest_checkpoint, __path_list_checkpoints, __path_prune_checkpoints,
__path_save_checkpoint, get_latest_checkpoint, list_checkpoints, prune_checkpoints,
save_checkpoint,
};
pub use inject::InjectBlocksRequest;
pub(crate) use inject::{__path_inject_blocks, inject_blocks};
pub(crate) use lifecycle::{
__path_create_instance, __path_create_instances_batch, __path_get_instance,
__path_list_instances, __path_retry_instance, __path_update_context, __path_update_state,
create_instance, create_instances_batch, get_instance, list_instances, retry_instance,
update_context, update_state,
};
pub(crate) use outputs::{
__path_get_execution_tree, __path_get_outputs, get_execution_tree, get_outputs,
};
pub(crate) use signals::{__path_send_signal, send_signal};

pub fn routes() -> Router<AppState> {
Router::new()
.route("/instances", post(create_instance).get(list_instances))
.route("/instances/batch", post(create_instances_batch))
.route("/instances/{id}", get(get_instance))
.route("/instances/{id}/state", patch(update_state))
.route("/instances/{id}/context", patch(update_context))
.route("/instances/{id}/signals", post(send_signal))
.route("/instances/{id}/outputs", get(get_outputs))
.route("/instances/{id}/artifacts", get(list_instance_artifacts))
.route("/artifacts/{*key}", get(get_artifact_bytes))
.route("/instances/{id}/tree", get(get_execution_tree))
.route("/instances/{id}/retry", post(retry_instance))
.route(
"/instances/{id}/checkpoints",
get(list_checkpoints).post(save_checkpoint),
)
.route(
"/instances/{id}/checkpoints/latest",
get(get_latest_checkpoint),
)
.route("/instances/{id}/checkpoints/prune", post(prune_checkpoints))
.route("/instances/{id}/audit", get(list_audit_log))
.route("/instances/{id}/inject-blocks", post(inject_blocks))
.route(
"/instances/{id}/stream",
get(crate::streaming::stream_instance),
)
.route("/instances/bulk/state", patch(bulk_update_state))
.route("/instances/bulk/reschedule", patch(bulk_reschedule))
.route("/instances/dlq", get(list_dlq))
}
7 changes: 4 additions & 3 deletions orch8-engine/src/scheduler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -686,7 +686,8 @@ async fn process_waiting_deadlines(
for block in &seq.blocks {
if let orch8_types::sequence::BlockDefinition::Step(step_def) = block {
if step_def.deadline.is_some() {
deadline_keys.push((instance.id, step_def.id.clone()));
// Pushing zero-allocation references avoids string clones in the fast path.
deadline_keys.push((instance.id, &step_def.id));
}
}
}
Expand Down Expand Up @@ -1025,11 +1026,11 @@ async fn process_instance(

// Fast path SLA deadline check for all steps BEFORE concurrency checks.
// Batch-fetch any previous block outputs so the loop is N queries -> 1 query.
let deadline_keys: Vec<(InstanceId, BlockId)> = blocks
let deadline_keys: Vec<(InstanceId, &BlockId)> = blocks
.iter()
.filter_map(|b| match b {
orch8_types::sequence::BlockDefinition::Step(s) if s.deadline.is_some() => {
Some((instance.id, s.id.clone()))
Some((instance.id, &s.id))
}
_ => None,
})
Expand Down
3 changes: 2 additions & 1 deletion orch8-engine/tests/feature_gaps.rs
Original file line number Diff line number Diff line change
Expand Up @@ -498,7 +498,8 @@ async fn sqlite_get_batch_chunking_does_not_drop_keys() {
keys.push((instance, block_id));
}

let batch = storage.get_block_outputs_batch(&keys).await.unwrap();
let ref_keys: Vec<(InstanceId, &BlockId)> = keys.iter().map(|(i, b)| (*i, b)).collect();
let batch = storage.get_block_outputs_batch(&ref_keys).await.unwrap();
assert_eq!(batch.len(), 450, "batch must return all 450 outputs");

// Spot-check a few keys.
Expand Down
2 changes: 1 addition & 1 deletion orch8-storage/src/encrypting.rs
Original file line number Diff line number Diff line change
Expand Up @@ -673,7 +673,7 @@ impl crate::OutputStore for EncryptingStorage {
}
async fn get_block_outputs_batch(
&self,
keys: &[(InstanceId, orch8_types::ids::BlockId)],
keys: &[(InstanceId, &orch8_types::ids::BlockId)],
) -> Result<
std::collections::HashMap<
(InstanceId, orch8_types::ids::BlockId),
Expand Down
3 changes: 2 additions & 1 deletion orch8-storage/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -664,9 +664,10 @@ pub trait OutputStore: Send + Sync + 'static {
///
/// Returns a map from `(InstanceId, BlockId)` to the latest output
/// (ordered by `created_at DESC`). Missing pairs are omitted.
/// Note: `&BlockId` is used over `BlockId` to prevent string allocations in hot paths like the scheduler.
async fn get_block_outputs_batch(
&self,
keys: &[(InstanceId, BlockId)],
keys: &[(InstanceId, &BlockId)],
) -> Result<HashMap<(InstanceId, BlockId), BlockOutput>, StorageError>;

async fn get_all_outputs(
Expand Down
2 changes: 1 addition & 1 deletion orch8-storage/src/postgres/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -545,7 +545,7 @@ impl crate::OutputStore for PostgresStorage {

async fn get_block_outputs_batch(
&self,
keys: &[(InstanceId, BlockId)],
keys: &[(InstanceId, &BlockId)],
) -> Result<std::collections::HashMap<(InstanceId, BlockId), BlockOutput>, StorageError> {
outputs::get_batch(self, keys).await
}
Expand Down
2 changes: 1 addition & 1 deletion orch8-storage/src/postgres/outputs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ pub(super) async fn get(

pub(super) async fn get_batch(
store: &PostgresStorage,
keys: &[(InstanceId, BlockId)],
keys: &[(InstanceId, &BlockId)],
) -> Result<std::collections::HashMap<(InstanceId, BlockId), BlockOutput>, StorageError> {
if keys.is_empty() {
return Ok(std::collections::HashMap::new());
Expand Down
2 changes: 1 addition & 1 deletion orch8-storage/src/sqlite/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -641,7 +641,7 @@ impl crate::OutputStore for SqliteStorage {

async fn get_block_outputs_batch(
&self,
keys: &[(InstanceId, BlockId)],
keys: &[(InstanceId, &BlockId)],
) -> Result<std::collections::HashMap<(InstanceId, BlockId), BlockOutput>, StorageError> {
outputs::get_batch(self, keys).await
}
Expand Down
2 changes: 1 addition & 1 deletion orch8-storage/src/sqlite/outputs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ const GET_BATCH_CHUNK_SIZE: usize = 400;

pub(super) async fn get_batch(
storage: &SqliteStorage,
keys: &[(InstanceId, BlockId)],
keys: &[(InstanceId, &BlockId)],
) -> Result<std::collections::HashMap<(InstanceId, BlockId), BlockOutput>, StorageError> {
if keys.is_empty() {
return Ok(std::collections::HashMap::new());
Expand Down
Loading