diff --git a/.jules/bolt.md b/.jules/bolt.md index 32f1ff6..5447981 100644 --- a/.jules/bolt.md +++ b/.jules/bolt.md @@ -36,3 +36,6 @@ ## 2024-05-14 - Remove hot-path BlockId cloning in scheduler tick **Learning:** In the fast path of the scheduler (`process_instance`), `BlockId` was being cloned and looked up in a HashMap for every incomplete step on every tick, regardless of whether a deadline was configured. The fix avoids this allocation using zero-allocation references and an early return. **Action:** Always verify if nested loops are lazily evaluating or bypassing unnecessary object clones/lookups on hot execution paths. Utilize Rust's HashMap with custom Borrow implementations or locally constructed reference maps to query without allocating strings. +## 2026-06-05 - [Zero-Allocation Composite Trait Keys] +**Learning:** When defining trait method signatures for batch database operations that accept composite keys (e.g., `get_block_outputs_batch` which accepted `&[(InstanceId, BlockId)]`), failing to use references for heap-allocated inner types forces the caller to clone strings just to build the lookup vector. In the scheduler's deadline check hot path, this created thousands of unnecessary `BlockId` string clones per minute. +**Action:** Always use references for heap-allocated inner types within tuples for batch lookup signatures (e.g., `&[(InstanceId, &BlockId)]`). This enables the caller to map `(&instance.id, &step_def.id)` directly from existing structs without any allocation penalty. diff --git a/orch8-api/src/instances.rs b/orch8-api/src/instances.rs index 5deb4e0..1b492fd 100644 --- a/orch8-api/src/instances.rs +++ b/orch8-api/src/instances.rs @@ -29,12 +29,12 @@ pub(crate) use bulk::{ __path_bulk_reschedule, __path_bulk_update_state, __path_list_dlq, bulk_reschedule, bulk_update_state, list_dlq, }; +pub use checkpoints::{PruneCheckpointsRequest, SaveCheckpointRequest}; pub(crate) use checkpoints::{ __path_get_latest_checkpoint, __path_list_checkpoints, __path_prune_checkpoints, __path_save_checkpoint, get_latest_checkpoint, list_checkpoints, prune_checkpoints, save_checkpoint, }; -pub use checkpoints::{PruneCheckpointsRequest, SaveCheckpointRequest}; pub use inject::InjectBlocksRequest; pub(crate) use inject::{__path_inject_blocks, inject_blocks}; pub(crate) use lifecycle::{ diff --git a/orch8-engine/src/scheduler.rs b/orch8-engine/src/scheduler.rs index e20976e..47231c8 100644 --- a/orch8-engine/src/scheduler.rs +++ b/orch8-engine/src/scheduler.rs @@ -681,12 +681,13 @@ async fn process_waiting_deadlines( } // Collect all (instance_id, block_id) pairs that have a deadline. + // ⚡ Bolt: Avoid allocating Strings for BlockId by using references inside the deadline_keys Vector. let mut deadline_keys = Vec::new(); for (instance, seq) in &instance_sequences { for block in &seq.blocks { if let orch8_types::sequence::BlockDefinition::Step(step_def) = block { if step_def.deadline.is_some() { - deadline_keys.push((instance.id, step_def.id.clone())); + deadline_keys.push((instance.id, &step_def.id)); } } } @@ -1025,11 +1026,13 @@ async fn process_instance( // Fast path SLA deadline check for all steps BEFORE concurrency checks. // Batch-fetch any previous block outputs so the loop is N queries -> 1 query. - let deadline_keys: Vec<(InstanceId, BlockId)> = blocks + // ⚡ Bolt: Avoid allocating Strings for BlockId by passing references into get_block_outputs_batch. + // In heavily concurrent workloads, skipping BlockId clones here saves thousands of small heap allocations per minute. + let deadline_keys: Vec<(InstanceId, &BlockId)> = blocks .iter() .filter_map(|b| match b { orch8_types::sequence::BlockDefinition::Step(s) if s.deadline.is_some() => { - Some((instance.id, s.id.clone())) + Some((instance.id, &s.id)) } _ => None, }) diff --git a/orch8-engine/tests/feature_gaps.rs b/orch8-engine/tests/feature_gaps.rs index 8316bd5..7c6407d 100644 --- a/orch8-engine/tests/feature_gaps.rs +++ b/orch8-engine/tests/feature_gaps.rs @@ -481,7 +481,7 @@ async fn sqlite_get_batch_chunking_does_not_drop_keys() { seed_instance(&storage, instance).await; // Seed 450 outputs to exercise the 400-key chunk boundary. - let mut keys = Vec::with_capacity(450); + let mut key_data = Vec::with_capacity(450); for i in 0..450 { let block_id = BlockId::new(format!("block_{i:03}")); let out = BlockOutput { @@ -495,16 +495,27 @@ async fn sqlite_get_batch_chunking_does_not_drop_keys() { created_at: Utc::now(), }; storage.save_block_output(&out).await.unwrap(); - keys.push((instance, block_id)); + key_data.push((instance, block_id)); } + let keys: Vec<(InstanceId, &BlockId)> = key_data.iter().map(|(i, b)| (*i, b)).collect(); + let batch = storage.get_block_outputs_batch(&keys).await.unwrap(); assert_eq!(batch.len(), 450, "batch must return all 450 outputs"); // Spot-check a few keys. - assert_eq!(batch[&keys[0]].output, json!({"idx": 0})); - assert_eq!(batch[&keys[399]].output, json!({"idx": 399})); - assert_eq!(batch[&keys[449]].output, json!({"idx": 449})); + assert_eq!( + batch[&(key_data[0].0, key_data[0].1.clone())].output, + json!({"idx": 0}) + ); + assert_eq!( + batch[&(key_data[399].0, key_data[399].1.clone())].output, + json!({"idx": 399}) + ); + assert_eq!( + batch[&(key_data[449].0, key_data[449].1.clone())].output, + json!({"idx": 449}) + ); } // -------------------------------------------------------------------------- diff --git a/orch8-storage/src/encrypting.rs b/orch8-storage/src/encrypting.rs index 5c0ae20..e396fbc 100644 --- a/orch8-storage/src/encrypting.rs +++ b/orch8-storage/src/encrypting.rs @@ -673,7 +673,7 @@ impl crate::OutputStore for EncryptingStorage { } async fn get_block_outputs_batch( &self, - keys: &[(InstanceId, orch8_types::ids::BlockId)], + keys: &[(InstanceId, &orch8_types::ids::BlockId)], ) -> Result< std::collections::HashMap< (InstanceId, orch8_types::ids::BlockId), diff --git a/orch8-storage/src/lib.rs b/orch8-storage/src/lib.rs index 856d87a..8023f42 100644 --- a/orch8-storage/src/lib.rs +++ b/orch8-storage/src/lib.rs @@ -666,7 +666,7 @@ pub trait OutputStore: Send + Sync + 'static { /// (ordered by `created_at DESC`). Missing pairs are omitted. async fn get_block_outputs_batch( &self, - keys: &[(InstanceId, BlockId)], + keys: &[(InstanceId, &BlockId)], ) -> Result, StorageError>; async fn get_all_outputs( diff --git a/orch8-storage/src/postgres/mod.rs b/orch8-storage/src/postgres/mod.rs index c2a0334..941dbf9 100644 --- a/orch8-storage/src/postgres/mod.rs +++ b/orch8-storage/src/postgres/mod.rs @@ -545,7 +545,7 @@ impl crate::OutputStore for PostgresStorage { async fn get_block_outputs_batch( &self, - keys: &[(InstanceId, BlockId)], + keys: &[(InstanceId, &BlockId)], ) -> Result, StorageError> { outputs::get_batch(self, keys).await } diff --git a/orch8-storage/src/postgres/outputs.rs b/orch8-storage/src/postgres/outputs.rs index 115982b..ce0f33b 100644 --- a/orch8-storage/src/postgres/outputs.rs +++ b/orch8-storage/src/postgres/outputs.rs @@ -68,7 +68,7 @@ pub(super) async fn get( pub(super) async fn get_batch( store: &PostgresStorage, - keys: &[(InstanceId, BlockId)], + keys: &[(InstanceId, &BlockId)], ) -> Result, StorageError> { if keys.is_empty() { return Ok(std::collections::HashMap::new()); diff --git a/orch8-storage/src/sqlite/mod.rs b/orch8-storage/src/sqlite/mod.rs index d09fc9c..dbcff4a 100644 --- a/orch8-storage/src/sqlite/mod.rs +++ b/orch8-storage/src/sqlite/mod.rs @@ -641,7 +641,7 @@ impl crate::OutputStore for SqliteStorage { async fn get_block_outputs_batch( &self, - keys: &[(InstanceId, BlockId)], + keys: &[(InstanceId, &BlockId)], ) -> Result, StorageError> { outputs::get_batch(self, keys).await } diff --git a/orch8-storage/src/sqlite/outputs.rs b/orch8-storage/src/sqlite/outputs.rs index db936ca..3f5dd98 100644 --- a/orch8-storage/src/sqlite/outputs.rs +++ b/orch8-storage/src/sqlite/outputs.rs @@ -45,7 +45,7 @@ const GET_BATCH_CHUNK_SIZE: usize = 400; pub(super) async fn get_batch( storage: &SqliteStorage, - keys: &[(InstanceId, BlockId)], + keys: &[(InstanceId, &BlockId)], ) -> Result, StorageError> { if keys.is_empty() { return Ok(std::collections::HashMap::new());