Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .jules/bolt.md
Original file line number Diff line number Diff line change
Expand Up @@ -36,3 +36,6 @@
## 2024-05-14 - Remove hot-path BlockId cloning in scheduler tick
**Learning:** In the fast path of the scheduler (`process_instance`), `BlockId` was being cloned and looked up in a HashMap for every incomplete step on every tick, regardless of whether a deadline was configured. The fix avoids this allocation using zero-allocation references and an early return.
**Action:** Always verify if nested loops are lazily evaluating or bypassing unnecessary object clones/lookups on hot execution paths. Utilize Rust's HashMap with custom Borrow implementations or locally constructed reference maps to query without allocating strings.
## 2026-06-05 - [Zero-Allocation Composite Trait Keys]
**Learning:** When defining trait method signatures for batch database operations that accept composite keys (e.g., `get_block_outputs_batch` which accepted `&[(InstanceId, BlockId)]`), failing to use references for heap-allocated inner types forces the caller to clone strings just to build the lookup vector. In the scheduler's deadline check hot path, this created thousands of unnecessary `BlockId` string clones per minute.
**Action:** Always use references for heap-allocated inner types within tuples for batch lookup signatures (e.g., `&[(InstanceId, &BlockId)]`). This enables the caller to map `(&instance.id, &step_def.id)` directly from existing structs without any allocation penalty.
2 changes: 1 addition & 1 deletion orch8-api/src/instances.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,12 +29,12 @@ pub(crate) use bulk::{
__path_bulk_reschedule, __path_bulk_update_state, __path_list_dlq, bulk_reschedule,
bulk_update_state, list_dlq,
};
pub use checkpoints::{PruneCheckpointsRequest, SaveCheckpointRequest};
pub(crate) use checkpoints::{
__path_get_latest_checkpoint, __path_list_checkpoints, __path_prune_checkpoints,
__path_save_checkpoint, get_latest_checkpoint, list_checkpoints, prune_checkpoints,
save_checkpoint,
};
pub use checkpoints::{PruneCheckpointsRequest, SaveCheckpointRequest};
pub use inject::InjectBlocksRequest;
pub(crate) use inject::{__path_inject_blocks, inject_blocks};
pub(crate) use lifecycle::{
Expand Down
9 changes: 6 additions & 3 deletions orch8-engine/src/scheduler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -681,12 +681,13 @@ async fn process_waiting_deadlines(
}

// Collect all (instance_id, block_id) pairs that have a deadline.
// ⚡ Bolt: Avoid allocating Strings for BlockId by using references inside the deadline_keys Vector.
let mut deadline_keys = Vec::new();
for (instance, seq) in &instance_sequences {
for block in &seq.blocks {
if let orch8_types::sequence::BlockDefinition::Step(step_def) = block {
if step_def.deadline.is_some() {
deadline_keys.push((instance.id, step_def.id.clone()));
deadline_keys.push((instance.id, &step_def.id));
}
}
}
Expand Down Expand Up @@ -1025,11 +1026,13 @@ async fn process_instance(

// Fast path SLA deadline check for all steps BEFORE concurrency checks.
// Batch-fetch any previous block outputs so the loop is N queries -> 1 query.
let deadline_keys: Vec<(InstanceId, BlockId)> = blocks
// ⚡ Bolt: Avoid allocating Strings for BlockId by passing references into get_block_outputs_batch.
// In heavily concurrent workloads, skipping BlockId clones here saves thousands of small heap allocations per minute.
let deadline_keys: Vec<(InstanceId, &BlockId)> = blocks
.iter()
.filter_map(|b| match b {
orch8_types::sequence::BlockDefinition::Step(s) if s.deadline.is_some() => {
Some((instance.id, s.id.clone()))
Some((instance.id, &s.id))
}
_ => None,
})
Expand Down
21 changes: 16 additions & 5 deletions orch8-engine/tests/feature_gaps.rs
Original file line number Diff line number Diff line change
Expand Up @@ -481,7 +481,7 @@ async fn sqlite_get_batch_chunking_does_not_drop_keys() {
seed_instance(&storage, instance).await;

// Seed 450 outputs to exercise the 400-key chunk boundary.
let mut keys = Vec::with_capacity(450);
let mut key_data = Vec::with_capacity(450);
for i in 0..450 {
let block_id = BlockId::new(format!("block_{i:03}"));
let out = BlockOutput {
Expand All @@ -495,16 +495,27 @@ async fn sqlite_get_batch_chunking_does_not_drop_keys() {
created_at: Utc::now(),
};
storage.save_block_output(&out).await.unwrap();
keys.push((instance, block_id));
key_data.push((instance, block_id));
}

let keys: Vec<(InstanceId, &BlockId)> = key_data.iter().map(|(i, b)| (*i, b)).collect();

let batch = storage.get_block_outputs_batch(&keys).await.unwrap();
assert_eq!(batch.len(), 450, "batch must return all 450 outputs");

// Spot-check a few keys.
assert_eq!(batch[&keys[0]].output, json!({"idx": 0}));
assert_eq!(batch[&keys[399]].output, json!({"idx": 399}));
assert_eq!(batch[&keys[449]].output, json!({"idx": 449}));
assert_eq!(
batch[&(key_data[0].0, key_data[0].1.clone())].output,
json!({"idx": 0})
);
assert_eq!(
batch[&(key_data[399].0, key_data[399].1.clone())].output,
json!({"idx": 399})
);
assert_eq!(
batch[&(key_data[449].0, key_data[449].1.clone())].output,
json!({"idx": 449})
);
}

// --------------------------------------------------------------------------
Expand Down
2 changes: 1 addition & 1 deletion orch8-storage/src/encrypting.rs
Original file line number Diff line number Diff line change
Expand Up @@ -673,7 +673,7 @@ impl crate::OutputStore for EncryptingStorage {
}
async fn get_block_outputs_batch(
&self,
keys: &[(InstanceId, orch8_types::ids::BlockId)],
keys: &[(InstanceId, &orch8_types::ids::BlockId)],
) -> Result<
std::collections::HashMap<
(InstanceId, orch8_types::ids::BlockId),
Expand Down
2 changes: 1 addition & 1 deletion orch8-storage/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -666,7 +666,7 @@ pub trait OutputStore: Send + Sync + 'static {
/// (ordered by `created_at DESC`). Missing pairs are omitted.
async fn get_block_outputs_batch(
&self,
keys: &[(InstanceId, BlockId)],
keys: &[(InstanceId, &BlockId)],
) -> Result<HashMap<(InstanceId, BlockId), BlockOutput>, StorageError>;

async fn get_all_outputs(
Expand Down
2 changes: 1 addition & 1 deletion orch8-storage/src/postgres/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -545,7 +545,7 @@ impl crate::OutputStore for PostgresStorage {

async fn get_block_outputs_batch(
&self,
keys: &[(InstanceId, BlockId)],
keys: &[(InstanceId, &BlockId)],
) -> Result<std::collections::HashMap<(InstanceId, BlockId), BlockOutput>, StorageError> {
outputs::get_batch(self, keys).await
}
Expand Down
2 changes: 1 addition & 1 deletion orch8-storage/src/postgres/outputs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ pub(super) async fn get(

pub(super) async fn get_batch(
store: &PostgresStorage,
keys: &[(InstanceId, BlockId)],
keys: &[(InstanceId, &BlockId)],
) -> Result<std::collections::HashMap<(InstanceId, BlockId), BlockOutput>, StorageError> {
if keys.is_empty() {
return Ok(std::collections::HashMap::new());
Expand Down
2 changes: 1 addition & 1 deletion orch8-storage/src/sqlite/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -641,7 +641,7 @@ impl crate::OutputStore for SqliteStorage {

async fn get_block_outputs_batch(
&self,
keys: &[(InstanceId, BlockId)],
keys: &[(InstanceId, &BlockId)],
) -> Result<std::collections::HashMap<(InstanceId, BlockId), BlockOutput>, StorageError> {
outputs::get_batch(self, keys).await
}
Expand Down
2 changes: 1 addition & 1 deletion orch8-storage/src/sqlite/outputs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ const GET_BATCH_CHUNK_SIZE: usize = 400;

pub(super) async fn get_batch(
storage: &SqliteStorage,
keys: &[(InstanceId, BlockId)],
keys: &[(InstanceId, &BlockId)],
) -> Result<std::collections::HashMap<(InstanceId, BlockId), BlockOutput>, StorageError> {
if keys.is_empty() {
return Ok(std::collections::HashMap::new());
Expand Down
Loading