Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions .jules/bolt.md
Original file line number Diff line number Diff line change
Expand Up @@ -36,3 +36,9 @@
## 2024-05-14 - Remove hot-path BlockId cloning in scheduler tick
**Learning:** In the fast path of the scheduler (`process_instance`), `BlockId` was being cloned and looked up in a HashMap for every incomplete step on every tick, regardless of whether a deadline was configured. The fix avoids this allocation using zero-allocation references and an early return.
**Action:** Always verify if nested loops are lazily evaluating or bypassing unnecessary object clones/lookups on hot execution paths. Utilize Rust's HashMap with custom Borrow implementations or locally constructed reference maps to query without allocating strings.
## 2025-02-28 - Avoid Cloning Identifiers in Batch Iteration
**Learning:** Using on heap-allocated identifier types (like which wraps a String) when constructing parameter arrays for batch fetch operations creates significant overhead in hot loops. The trait implementations in can support borrowed data natively.
**Action:** When defining or calling batch interface methods, utilize Rust's lifetimes to borrow the inner values (e.g., `&[(InstanceId, &BlockId)]`) instead of taking ownership to prevent needless allocations.
## 2025-02-28 - Avoid Cloning Identifiers in Batch Iteration
**Learning:** Using `clone()` on heap-allocated identifier types (like `BlockId` which wraps a String) when constructing parameter arrays for batch fetch operations creates significant overhead in hot loops. The `Storage` trait implementations in `orch8-storage` can support borrowed data natively.
**Action:** When defining or calling batch interface methods, utilize Rust's lifetimes to borrow the inner values (e.g., `&[(InstanceId, &BlockId)]`) instead of taking ownership to prevent needless allocations.
2 changes: 1 addition & 1 deletion orch8-api/src/instances.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,12 +29,12 @@ pub(crate) use bulk::{
__path_bulk_reschedule, __path_bulk_update_state, __path_list_dlq, bulk_reschedule,
bulk_update_state, list_dlq,
};
pub use checkpoints::{PruneCheckpointsRequest, SaveCheckpointRequest};
pub(crate) use checkpoints::{
__path_get_latest_checkpoint, __path_list_checkpoints, __path_prune_checkpoints,
__path_save_checkpoint, get_latest_checkpoint, list_checkpoints, prune_checkpoints,
save_checkpoint,
};
pub use checkpoints::{PruneCheckpointsRequest, SaveCheckpointRequest};
pub use inject::InjectBlocksRequest;
pub(crate) use inject::{__path_inject_blocks, inject_blocks};
pub(crate) use lifecycle::{
Expand Down
6 changes: 3 additions & 3 deletions orch8-engine/src/scheduler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -686,7 +686,7 @@ async fn process_waiting_deadlines(
for block in &seq.blocks {
if let orch8_types::sequence::BlockDefinition::Step(step_def) = block {
if step_def.deadline.is_some() {
deadline_keys.push((instance.id, step_def.id.clone()));
deadline_keys.push((instance.id, &step_def.id));
}
}
}
Expand Down Expand Up @@ -1025,11 +1025,11 @@ async fn process_instance(

// Fast path SLA deadline check for all steps BEFORE concurrency checks.
// Batch-fetch any previous block outputs so the loop is N queries -> 1 query.
let deadline_keys: Vec<(InstanceId, BlockId)> = blocks
let deadline_keys: Vec<(InstanceId, &BlockId)> = blocks
.iter()
.filter_map(|b| match b {
orch8_types::sequence::BlockDefinition::Step(s) if s.deadline.is_some() => {
Some((instance.id, s.id.clone()))
Some((instance.id, &s.id))
}
_ => None,
})
Expand Down
7 changes: 6 additions & 1 deletion orch8-engine/tests/feature_gaps.rs
Original file line number Diff line number Diff line change
Expand Up @@ -498,7 +498,12 @@ async fn sqlite_get_batch_chunking_does_not_drop_keys() {
keys.push((instance, block_id));
}

let batch = storage.get_block_outputs_batch(&keys).await.unwrap();
let borrowed_keys: Vec<(InstanceId, &BlockId)> = keys.iter().map(|(i, b)| (*i, b)).collect();

let batch = storage
.get_block_outputs_batch(&borrowed_keys)
.await
.unwrap();
assert_eq!(batch.len(), 450, "batch must return all 450 outputs");

// Spot-check a few keys.
Expand Down
2 changes: 1 addition & 1 deletion orch8-storage/src/encrypting.rs
Original file line number Diff line number Diff line change
Expand Up @@ -673,7 +673,7 @@ impl crate::OutputStore for EncryptingStorage {
}
async fn get_block_outputs_batch(
&self,
keys: &[(InstanceId, orch8_types::ids::BlockId)],
keys: &[(InstanceId, &orch8_types::ids::BlockId)],
) -> Result<
std::collections::HashMap<
(InstanceId, orch8_types::ids::BlockId),
Expand Down
2 changes: 1 addition & 1 deletion orch8-storage/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -666,7 +666,7 @@ pub trait OutputStore: Send + Sync + 'static {
/// (ordered by `created_at DESC`). Missing pairs are omitted.
async fn get_block_outputs_batch(
&self,
keys: &[(InstanceId, BlockId)],
keys: &[(InstanceId, &BlockId)],
) -> Result<HashMap<(InstanceId, BlockId), BlockOutput>, StorageError>;

async fn get_all_outputs(
Expand Down
2 changes: 1 addition & 1 deletion orch8-storage/src/postgres/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -545,7 +545,7 @@ impl crate::OutputStore for PostgresStorage {

async fn get_block_outputs_batch(
&self,
keys: &[(InstanceId, BlockId)],
keys: &[(InstanceId, &BlockId)],
) -> Result<std::collections::HashMap<(InstanceId, BlockId), BlockOutput>, StorageError> {
outputs::get_batch(self, keys).await
}
Expand Down
2 changes: 1 addition & 1 deletion orch8-storage/src/postgres/outputs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ pub(super) async fn get(

pub(super) async fn get_batch(
store: &PostgresStorage,
keys: &[(InstanceId, BlockId)],
keys: &[(InstanceId, &BlockId)],
) -> Result<std::collections::HashMap<(InstanceId, BlockId), BlockOutput>, StorageError> {
if keys.is_empty() {
return Ok(std::collections::HashMap::new());
Expand Down
2 changes: 1 addition & 1 deletion orch8-storage/src/sqlite/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -641,7 +641,7 @@ impl crate::OutputStore for SqliteStorage {

async fn get_block_outputs_batch(
&self,
keys: &[(InstanceId, BlockId)],
keys: &[(InstanceId, &BlockId)],
) -> Result<std::collections::HashMap<(InstanceId, BlockId), BlockOutput>, StorageError> {
outputs::get_batch(self, keys).await
}
Expand Down
2 changes: 1 addition & 1 deletion orch8-storage/src/sqlite/outputs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ const GET_BATCH_CHUNK_SIZE: usize = 400;

pub(super) async fn get_batch(
storage: &SqliteStorage,
keys: &[(InstanceId, BlockId)],
keys: &[(InstanceId, &BlockId)],
) -> Result<std::collections::HashMap<(InstanceId, BlockId), BlockOutput>, StorageError> {
if keys.is_empty() {
return Ok(std::collections::HashMap::new());
Expand Down
Loading