From 68c412ec82ca99353bd52a36bdb30fef86e704f0 Mon Sep 17 00:00:00 2001 From: Harry Brundage Date: Mon, 23 Mar 2026 15:27:41 -0400 Subject: [PATCH 1/6] Add task scanner ordering tests for behind-cursor task creation Tests that all code paths creating task keys (enqueue, expedite, restart, concurrency grants, retries, priority ordering) result in tasks being picked up by the TaskBroker scanner even when those tasks sort before entries already in the buffer. This coverage is important before optimizing the scanner to use cursor-based scanning. Co-Authored-By: Claude Opus 4.6 (1M context) --- tests/task_scanner_ordering_tests.rs | 797 +++++++++++++++++++++++++++ 1 file changed, 797 insertions(+) create mode 100644 tests/task_scanner_ordering_tests.rs diff --git a/tests/task_scanner_ordering_tests.rs b/tests/task_scanner_ordering_tests.rs new file mode 100644 index 0000000..66352f2 --- /dev/null +++ b/tests/task_scanner_ordering_tests.rs @@ -0,0 +1,797 @@ +//! Tests for task scanner pickup of tasks created "behind the cursor". +//! +//! These tests verify that all code paths that create task keys result in +//! those tasks eventually being picked up by the TaskBroker scanner and +//! delivered via dequeue. This is important because: +//! +//! - The scanner always starts from the beginning of the task group key range. +//! - If we ever optimise the scanner to use a cursor (resume scanning from +//! where it left off), we must ensure tasks inserted behind the cursor are +//! still picked up. +//! +//! Each test exercises a different code path that can create a task key that +//! sorts BEFORE tasks already in the broker buffer. + +mod test_helpers; + +use silo::job::ConcurrencyLimit; +use silo::job::Limit; +use silo::job_attempt::AttemptOutcome; + +use test_helpers::*; + +const TIMEOUT_MS: u64 = 30000; + +// --------------------------------------------------------------------------- +// 1. Enqueue with start_at_ms=0 sorts before all other tasks +// --------------------------------------------------------------------------- + +/// Tasks enqueued with start_at_ms=0 use time=0 in their key, placing them +/// before any task with a real timestamp. After the buffer is already populated +/// with tasks at now_ms, a new start_at_ms=0 task must still be picked up +/// eventually (the scanner rescans from the beginning of the key range). +#[silo::test] +async fn enqueue_start_at_zero_picked_up_after_buffer_populated() { + with_timeout!(TIMEOUT_MS, { + let (_tmp, shard) = open_temp_shard().await; + let payload = msgpack_payload(&serde_json::json!({"test": true})); + let now = now_ms(); + + // Enqueue several tasks with start_at_ms=now to populate the buffer + for i in 0..5 { + shard + .enqueue( + "t1", + Some(format!("job-now-{}", i)), + 50, + now, + None, + payload.clone(), + vec![], + None, + "default", + ) + .await + .expect("enqueue now"); + } + + // Dequeue one to prove the buffer is working + let first = shard + .dequeue("w", "default", 1) + .await + .expect("first dequeue"); + assert_eq!(first.tasks.len(), 1, "should dequeue from initial batch"); + + // Now enqueue a task with start_at_ms=0 — this sorts BEFORE the buffered tasks + shard + .enqueue( + "t1", + Some("job-zero".to_string()), + 50, + 0, + None, + payload.clone(), + vec![], + None, + "default", + ) + .await + .expect("enqueue zero"); + + // Drain all remaining tasks — the zero-time task must appear eventually + let mut all_ids: Vec = Vec::new(); + loop { + let batch = shard + .dequeue("w", "default", 10) + .await + .expect("drain dequeue"); + if batch.tasks.is_empty() { + break; + } + for task in &batch.tasks { + all_ids.push(task.job().id().to_string()); + } + } + + assert!( + all_ids.contains(&"job-zero".to_string()), + "start_at_ms=0 task must be picked up; got jobs: {:?}", + all_ids + ); + // Should have gotten all 5 remaining tasks (5 enqueued - 1 dequeued + 1 zero = 5) + assert_eq!(all_ids.len(), 5, "all tasks must be drained; got: {:?}", all_ids); + }); +} + +// --------------------------------------------------------------------------- +// 2. Expedite moves a future task to now +// --------------------------------------------------------------------------- + +/// Expediting a far-future scheduled job creates a new task key at now_ms, +/// which sorts before the old future-scheduled key. The scanner must pick up +/// the new key even if it already has buffered entries with later timestamps. +#[silo::test] +async fn expedited_task_picked_up_by_scanner() { + with_timeout!(TIMEOUT_MS, { + let (_tmp, shard) = open_temp_shard().await; + let payload = msgpack_payload(&serde_json::json!({"test": true})); + let now = now_ms(); + let far_future = now + 86_400_000; // 1 day ahead + + // Enqueue a job scheduled for far future + shard + .enqueue( + "t1", + Some("future-job".to_string()), + 50, + far_future, + None, + payload.clone(), + vec![], + None, + "default", + ) + .await + .expect("enqueue future"); + + // Enqueue some immediate tasks to populate the buffer + for i in 0..3 { + shard + .enqueue( + "t1", + Some(format!("immediate-{}", i)), + 50, + now, + None, + payload.clone(), + vec![], + None, + "default", + ) + .await + .expect("enqueue immediate"); + } + + // Dequeue immediate tasks — future job should NOT appear (it's in the future) + let batch1 = shard + .dequeue("w", "default", 10) + .await + .expect("dequeue batch 1"); + let batch1_ids: Vec<&str> = batch1.tasks.iter().map(|t| t.job().id()).collect(); + assert!( + !batch1_ids.contains(&"future-job"), + "future job should not be dequeued before expedite" + ); + + // Complete those tasks + for task in &batch1.tasks { + shard + .report_attempt_outcome( + task.attempt().task_id(), + AttemptOutcome::Success { result: vec![] }, + ) + .await + .expect("report outcome"); + } + + // Expedite the future job — moves it to now + shard + .expedite_job("t1", "future-job") + .await + .expect("expedite"); + + // The expedited task should now be dequeueable + let result = shard + .dequeue("w", "default", 1) + .await + .expect("dequeue after expedite"); + assert_eq!(result.tasks.len(), 1, "expedited task should be dequeued"); + assert_eq!(result.tasks[0].job().id(), "future-job"); + }); +} + +// --------------------------------------------------------------------------- +// 3. Restart creates a task at now_ms +// --------------------------------------------------------------------------- + +/// Restarting a failed job creates a new task. The scanner must pick it up +/// even if the buffer already contains tasks from an earlier scan. +#[silo::test] +async fn restarted_job_task_picked_up_by_scanner() { + with_timeout!(TIMEOUT_MS, { + let (_tmp, shard) = open_temp_shard().await; + let payload = msgpack_payload(&serde_json::json!({"test": true})); + let now = now_ms(); + + // Enqueue and process a job to Failed state + shard + .enqueue( + "t1", + Some("restart-me".to_string()), + 50, + now, + None, + payload.clone(), + vec![], + None, + "default", + ) + .await + .expect("enqueue"); + + let tasks = shard + .dequeue("w", "default", 1) + .await + .expect("dequeue") + .tasks; + assert_eq!(tasks.len(), 1); + + shard + .report_attempt_outcome( + tasks[0].attempt().task_id(), + AttemptOutcome::Error { + error_code: "ERR".to_string(), + error: vec![], + }, + ) + .await + .expect("report error"); + + // Enqueue more tasks so the buffer has entries + for i in 0..3 { + shard + .enqueue( + "t1", + Some(format!("filler-{}", i)), + 50, + now, + None, + payload.clone(), + vec![], + None, + "default", + ) + .await + .expect("enqueue filler"); + } + + // Restart the failed job — creates a new task + shard + .restart_job("t1", "restart-me") + .await + .expect("restart"); + + // Dequeue all — the restarted job should appear + let result = shard + .dequeue("w", "default", 10) + .await + .expect("dequeue after restart"); + + let job_ids: Vec<&str> = result.tasks.iter().map(|t| t.job().id()).collect(); + assert!( + job_ids.contains(&"restart-me"), + "restarted job must be picked up; got jobs: {:?}", + job_ids + ); + }); +} + +// --------------------------------------------------------------------------- +// 4. Concurrency grant creates a task with the original start_time_ms +// --------------------------------------------------------------------------- + +/// When a concurrency slot opens and a queued request is granted, a RunAttempt +/// task is created with the original start_time_ms from when the job was first +/// enqueued. This time could be well in the past. The scanner must pick it up. +#[silo::test] +async fn concurrency_grant_task_picked_up_by_scanner() { + with_timeout!(TIMEOUT_MS, { + let (_tmp, shard) = open_temp_shard_with_reconcile_interval_ms(50).await; + let payload = msgpack_payload(&serde_json::json!({"test": true})); + let now = now_ms(); + + let limit = Limit::Concurrency(ConcurrencyLimit { + key: "test-queue".to_string(), + max_concurrency: 1, + }); + + // Enqueue job A with concurrency limit — it gets the slot + shard + .enqueue( + "t1", + Some("holder".to_string()), + 50, + now, + None, + payload.clone(), + vec![limit.clone()], + None, + "default", + ) + .await + .expect("enqueue holder"); + + // Dequeue job A — it becomes the holder + let holder_tasks = shard + .dequeue("w", "default", 1) + .await + .expect("dequeue holder"); + assert_eq!(holder_tasks.tasks.len(), 1); + let holder_task_id = holder_tasks.tasks[0].attempt().task_id().to_string(); + + // Enqueue job B with same concurrency limit — it gets queued (RequestTicket) + shard + .enqueue( + "t1", + Some("waiter".to_string()), + 50, + now, + None, + payload.clone(), + vec![limit.clone()], + None, + "default", + ) + .await + .expect("enqueue waiter"); + + // Enqueue some tasks in a different group to populate other buffers + for i in 0..3 { + shard + .enqueue( + "t1", + Some(format!("other-{}", i)), + 50, + now, + None, + payload.clone(), + vec![], + None, + "other-group", + ) + .await + .expect("enqueue other"); + } + + // Complete holder — this frees the concurrency slot + shard + .report_attempt_outcome( + &holder_task_id, + AttemptOutcome::Success { result: vec![] }, + ) + .await + .expect("complete holder"); + + // Wait for the concurrency grant scanner to process the pending request + // and create a RunAttempt task for the waiter + let result = poll_until( + || async { + shard + .dequeue("w", "default", 1) + .await + .expect("dequeue waiter") + }, + |r| !r.tasks.is_empty(), + 5000, + ) + .await; + + assert_eq!( + result.tasks.len(), + 1, + "waiter should be dequeued after concurrency grant" + ); + assert_eq!(result.tasks[0].job().id(), "waiter"); + }); +} + +// --------------------------------------------------------------------------- +// 5. Interleaved enqueue and dequeue across task groups +// --------------------------------------------------------------------------- + +/// Tasks enqueued into a task group that already has buffered entries should be +/// picked up even when they sort before existing buffer entries. +#[silo::test] +async fn interleaved_enqueue_dequeue_picks_up_all_tasks() { + with_timeout!(TIMEOUT_MS, { + let (_tmp, shard) = open_temp_shard().await; + let payload = msgpack_payload(&serde_json::json!({"test": true})); + let now = now_ms(); + + // Phase 1: enqueue batch at now + for i in 0..5 { + shard + .enqueue( + "t1", + Some(format!("batch1-{}", i)), + 50, + now, + None, + payload.clone(), + vec![], + None, + "default", + ) + .await + .expect("enqueue batch 1"); + } + + // Dequeue 2 to partially drain the buffer + let first = shard + .dequeue("w", "default", 2) + .await + .expect("dequeue first"); + assert_eq!(first.tasks.len(), 2); + + // Phase 2: enqueue more at now+1 (sorts after remaining batch1 entries) + for i in 0..3 { + shard + .enqueue( + "t1", + Some(format!("batch2-{}", i)), + 50, + now + 1, + None, + payload.clone(), + vec![], + None, + "default", + ) + .await + .expect("enqueue batch 2"); + } + + // Phase 3: enqueue at start_at_ms=0 — sorts BEFORE everything + shard + .enqueue( + "t1", + Some("zero-time".to_string()), + 50, + 0, + None, + payload.clone(), + vec![], + None, + "default", + ) + .await + .expect("enqueue zero"); + + // Dequeue all remaining — should get all 7 remaining tasks + // (5 original - 2 dequeued + 3 batch2 + 1 zero = 7) + let mut all_ids: Vec = Vec::new(); + for task in &first.tasks { + all_ids.push(task.job().id().to_string()); + } + + // Drain everything + loop { + let batch = shard + .dequeue("w", "default", 10) + .await + .expect("drain dequeue"); + if batch.tasks.is_empty() { + break; + } + for task in &batch.tasks { + all_ids.push(task.job().id().to_string()); + } + } + + // We should have all 9 jobs + assert_eq!( + all_ids.len(), + 9, + "expected 9 total tasks, got {}: {:?}", + all_ids.len(), + all_ids + ); + assert!( + all_ids.contains(&"zero-time".to_string()), + "zero-time task must be dequeued; got: {:?}", + all_ids + ); + }); +} + +// --------------------------------------------------------------------------- +// 6. Higher priority task enqueued after lower priority tasks +// --------------------------------------------------------------------------- + +/// A task with higher priority (lower number) enqueued after the buffer already +/// contains lower-priority tasks. Within the same start_time_ms, the +/// higher-priority task key sorts first and must be picked up eventually. +#[silo::test] +async fn higher_priority_enqueued_later_picked_up() { + with_timeout!(TIMEOUT_MS, { + let (_tmp, shard) = open_temp_shard().await; + let payload = msgpack_payload(&serde_json::json!({"test": true})); + let now = now_ms(); + + // Enqueue low-priority tasks (priority=100) first + for i in 0..5 { + shard + .enqueue( + "t1", + Some(format!("low-pri-{}", i)), + 100, + now, + None, + payload.clone(), + vec![], + None, + "default", + ) + .await + .expect("enqueue low priority"); + } + + // Dequeue one to prove scanner is active + let first = shard + .dequeue("w", "default", 1) + .await + .expect("first dequeue"); + assert_eq!(first.tasks.len(), 1); + + // Now enqueue a HIGH priority task (priority=1) — sorts before the low-pri tasks + shard + .enqueue( + "t1", + Some("high-pri".to_string()), + 1, + now, + None, + payload.clone(), + vec![], + None, + "default", + ) + .await + .expect("enqueue high priority"); + + // Drain all remaining — the high-pri task must appear + let mut all_ids: Vec = Vec::new(); + loop { + let batch = shard + .dequeue("w", "default", 10) + .await + .expect("drain dequeue"); + if batch.tasks.is_empty() { + break; + } + for task in &batch.tasks { + all_ids.push(task.job().id().to_string()); + } + } + + assert!( + all_ids.contains(&"high-pri".to_string()), + "high priority task must be picked up; got: {:?}", + all_ids + ); + // 5 original - 1 dequeued + 1 high-pri = 5 + assert_eq!(all_ids.len(), 5, "all tasks must be drained; got: {:?}", all_ids); + }); +} + +// --------------------------------------------------------------------------- +// 7. Retry after failure creates a task at a future time +// --------------------------------------------------------------------------- + +/// When a job fails and has a retry policy, a new task is created at the retry +/// backoff time. That task should be picked up once its time arrives. +#[silo::test] +async fn retry_task_picked_up_after_backoff() { + with_timeout!(TIMEOUT_MS, { + let (_tmp, shard) = open_temp_shard().await; + let payload = msgpack_payload(&serde_json::json!({"test": true})); + let now = now_ms(); + + // Enqueue with a retry policy (1 retry, very short initial backoff) + let retry = silo::retry::RetryPolicy { + retry_count: 1, + initial_interval_ms: 1, // 1ms backoff so the retry is immediately ready + max_interval_ms: 10, + randomize_interval: false, + backoff_factor: 1.0, + }; + + shard + .enqueue( + "t1", + Some("retry-job".to_string()), + 50, + now, + Some(retry), + payload.clone(), + vec![], + None, + "default", + ) + .await + .expect("enqueue retry job"); + + // Dequeue and fail the first attempt + let tasks = shard + .dequeue("w", "default", 1) + .await + .expect("dequeue attempt 1"); + assert_eq!(tasks.tasks.len(), 1); + assert_eq!(tasks.tasks[0].attempt().attempt_number(), 1); + + shard + .report_attempt_outcome( + tasks.tasks[0].attempt().task_id(), + AttemptOutcome::Error { + error_code: "FAIL".to_string(), + error: vec![], + }, + ) + .await + .expect("fail attempt 1"); + + // Small sleep to let the retry backoff elapse (1ms) + tokio::time::sleep(std::time::Duration::from_millis(50)).await; + + // The retry task should now be dequeueable + let retry_result = shard + .dequeue("w", "default", 1) + .await + .expect("dequeue retry"); + assert_eq!( + retry_result.tasks.len(), + 1, + "retry task should be dequeued" + ); + assert_eq!(retry_result.tasks[0].job().id(), "retry-job"); + assert_eq!( + retry_result.tasks[0].attempt().attempt_number(), + 2, + "should be attempt 2" + ); + }); +} + +// --------------------------------------------------------------------------- +// 8. Multiple task groups: task in group A does not block group B +// --------------------------------------------------------------------------- + +/// Each task group has its own scanner. Creating a task in group A should not +/// affect task delivery in group B, and vice versa. Tasks created in either +/// group after the other group's buffer is populated must still be delivered. +#[silo::test] +async fn tasks_across_groups_are_independently_scanned() { + with_timeout!(TIMEOUT_MS, { + let (_tmp, shard) = open_temp_shard().await; + let payload = msgpack_payload(&serde_json::json!({"test": true})); + let now = now_ms(); + + // Populate group A + for i in 0..3 { + shard + .enqueue( + "t1", + Some(format!("group-a-{}", i)), + 50, + now, + None, + payload.clone(), + vec![], + None, + "group-a", + ) + .await + .expect("enqueue group-a"); + } + + // Populate group B + for i in 0..3 { + shard + .enqueue( + "t1", + Some(format!("group-b-{}", i)), + 50, + now, + None, + payload.clone(), + vec![], + None, + "group-b", + ) + .await + .expect("enqueue group-b"); + } + + // Drain group A first + let a_result = shard + .dequeue("w", "group-a", 10) + .await + .expect("dequeue group-a"); + assert_eq!(a_result.tasks.len(), 3); + + // Group B should still have all its tasks + let b_result = shard + .dequeue("w", "group-b", 10) + .await + .expect("dequeue group-b"); + assert_eq!(b_result.tasks.len(), 3); + + // Now add new tasks to group A — they should be picked up + shard + .enqueue( + "t1", + Some("group-a-late".to_string()), + 50, + now, + None, + payload.clone(), + vec![], + None, + "group-a", + ) + .await + .expect("enqueue group-a late"); + + let a_late = shard + .dequeue("w", "group-a", 1) + .await + .expect("dequeue group-a late"); + assert_eq!(a_late.tasks.len(), 1); + assert_eq!(a_late.tasks[0].job().id(), "group-a-late"); + }); +} + +// --------------------------------------------------------------------------- +// 9. Bulk enqueue then drain verifies no tasks are lost +// --------------------------------------------------------------------------- + +/// Enqueue many tasks in a burst, then drain them all. No tasks should be +/// lost even if the scanner needs multiple passes to buffer them all. +#[silo::test] +async fn bulk_enqueue_drain_no_tasks_lost() { + with_timeout!(TIMEOUT_MS, { + let (_tmp, shard) = open_temp_shard().await; + let payload = msgpack_payload(&serde_json::json!({"test": true})); + let now = now_ms(); + let total = 50; + + for i in 0..total { + shard + .enqueue( + "t1", + Some(format!("bulk-{:04}", i)), + 50, + now + (i as i64), // stagger slightly for unique keys + None, + payload.clone(), + vec![], + None, + "default", + ) + .await + .expect("enqueue bulk"); + } + + let mut dequeued_ids: Vec = Vec::new(); + loop { + let batch = shard + .dequeue("w", "default", 20) + .await + .expect("drain dequeue"); + if batch.tasks.is_empty() { + break; + } + for task in &batch.tasks { + dequeued_ids.push(task.job().id().to_string()); + } + } + + assert_eq!( + dequeued_ids.len(), + total, + "expected {} tasks, got {}: missing tasks were lost by the scanner", + total, + dequeued_ids.len() + ); + }); +} From 01d090f11d2a4b0d1095c426a3e0dabe46acbba0 Mon Sep 17 00:00:00 2001 From: Harry Brundage Date: Mon, 23 Mar 2026 15:35:16 -0400 Subject: [PATCH 2/6] Implement cursor-based scanning to skip already-buffered entries MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The TaskBroker scanner previously always started from the beginning of the task group key range on every scan pass. When the buffer already contained N entries, each scan had to read through all N before finding new entries to insert — O(buffer_size) wasted reads per scan. Now the scanner tracks a cursor (the last key read) and resumes from there on subsequent passes within a fill cycle. This eliminates the redundant re-reads of already-buffered entries. The cursor is invalidated whenever wakeup() fires, which happens on every code path that creates new task keys (enqueue, expedite, restart, concurrency grants, retries). This ensures tasks created behind the cursor are always picked up on the next scan. Co-Authored-By: Claude Opus 4.6 (1M context) --- src/task_broker.rs | 95 +++++++++++++++++++++++++++++++++++++++++++--- 1 file changed, 90 insertions(+), 5 deletions(-) diff --git a/src/task_broker.rs b/src/task_broker.rs index 1cdf7aa..bf2e7d6 100644 --- a/src/task_broker.rs +++ b/src/task_broker.rs @@ -64,6 +64,16 @@ pub struct TaskBroker { metrics: Option, /// The shard's tenant range for filtering defunct tasks. range: ShardRange, + /// Scan cursor: the last key read by the scanner. When set, the next scan + /// resumes from just after this key instead of re-reading from the + /// beginning of the task group range. Invalidated by `wakeup()` so that + /// newly-created tasks behind the cursor are not missed. + scan_cursor: Mutex>>, + /// Set to true when a wakeup fires, forcing the next scan to start from + /// the beginning of the key range. This ensures tasks created behind the + /// cursor (by enqueue, expedite, restart, concurrency grants, etc.) are + /// picked up on the next scan pass. + cursor_invalidated: AtomicBool, } impl TaskBroker { @@ -99,6 +109,8 @@ impl TaskBroker { shard_name, metrics, range, + scan_cursor: Mutex::new(None), + cursor_invalidated: AtomicBool::new(false), }) } @@ -124,11 +136,50 @@ impl TaskBroker { }); } + /// Determine the scan start key. If we have a valid cursor (not + /// invalidated), resume scanning from just after the cursor. Otherwise + /// start from the beginning of the task group range. + fn take_scan_start(&self) -> (Vec, bool) { + let group_prefix = task_group_prefix(&self.task_group); + + // If cursor was invalidated by a wakeup, start from the beginning + if self.cursor_invalidated.swap(false, Ordering::SeqCst) { + *self.scan_cursor.lock().unwrap() = None; + return (group_prefix, true); + } + + let cursor = self.scan_cursor.lock().unwrap(); + match cursor.as_ref() { + Some(last_key) => { + // Resume just after the last key we read + let resume_from = end_bound(last_key); + (resume_from, false) + } + None => (group_prefix, true), + } + } + + /// Update the scan cursor to the last key read in the scan. + fn set_scan_cursor(&self, last_key: Option>) { + *self.scan_cursor.lock().unwrap() = last_key; + } + + /// Reset the cursor so the next scan starts from the beginning. + fn reset_scan_cursor(&self) { + *self.scan_cursor.lock().unwrap() = None; + } + /// Scan tasks from DB and insert into buffer, skipping future tasks and inflight ones. async fn scan_tasks(&self, now_ms: i64, generation: u64) -> usize { // [SILO-SCAN-1] Scan only this task group's key range - let start = task_group_prefix(&self.task_group); - let end = end_bound(&start); + let (start, is_full_scan) = self.take_scan_start(); + let end = end_bound(&task_group_prefix(&self.task_group)); + + // If the cursor is past the end of the range, wrap around + if start >= end { + self.reset_scan_cursor(); + return 0; + } let Ok(mut iter) = self.db.scan::, _>(start..end).await else { return 0; @@ -138,18 +189,23 @@ impl TaskBroker { let mut defunct_keys: Vec> = Vec::new(); let mut inserted = 0; + let mut last_key_read: Option> = None; + let mut reached_end = true; while inserted < self.scan_batch && self.buffer.len() < self.target_buffer { let Ok(Some(kv)) = iter.next().await else { break; }; + reached_end = false; // Parse the task key to extract timestamp let Some(parsed_key) = parse_task_key(&kv.key) else { + last_key_read = Some(kv.key.to_vec()); continue; }; // Filter out future tasks if parsed_key.start_time_ms > now_ms as u64 { + last_key_read = Some(kv.key.to_vec()); continue; } @@ -157,6 +213,7 @@ impl TaskBroker { // [SILO-SCAN-3] Skip inflight tasks if self.inflight.lock().unwrap().contains(&key_bytes) { + last_key_read = Some(key_bytes); continue; } @@ -172,12 +229,16 @@ impl TaskBroker { } }; if suppress_due_to_tombstone { + last_key_read = Some(key_bytes); continue; } let decoded = match decode_task_validated(kv.value.clone()) { Ok(t) => t, - Err(_) => continue, // Skip malformed tasks + Err(_) => { + last_key_read = Some(key_bytes); + continue; // Skip malformed tasks + } }; // Check if task's tenant is within shard range @@ -193,6 +254,7 @@ impl TaskBroker { range = %self.range, "skipping defunct task (tenant outside shard range)" ); + last_key_read = Some(key_bytes); continue; } @@ -203,7 +265,7 @@ impl TaskBroker { // [SILO-SCAN-2] Insert into buffer if not already present if self.buffer.get(&key_bytes).is_none() { - self.buffer.insert(key_bytes, entry); + self.buffer.insert(key_bytes.clone(), entry); inserted += 1; // Yield periodically to avoid starving other tasks @@ -211,6 +273,26 @@ impl TaskBroker { tokio::task::yield_now().await; } } + + last_key_read = Some(key_bytes); + } + + // Update cursor position + if let Some(last_key) = last_key_read { + self.set_scan_cursor(Some(last_key)); + } else if reached_end { + // Iterator returned nothing — we've exhausted the range. + // Reset cursor so next scan starts from the beginning. + self.reset_scan_cursor(); + } + // If we stopped because inserted >= scan_batch or buffer is full, + // and we got a cursor update above, we'll resume from there next time. + + // On a full scan that inserted nothing, the cursor has now walked + // through the entire range without finding new work. Reset it so + // the next scan starts fresh rather than repeatedly scanning the tail. + if is_full_scan && inserted == 0 { + self.reset_scan_cursor(); } // Delete defunct tasks from the database @@ -418,8 +500,11 @@ impl TaskBroker { } } - /// Wake the scanner to refill promptly. + /// Wake the scanner to refill promptly. Invalidates the scan cursor so + /// that the next scan starts from the beginning of the key range, ensuring + /// newly-created tasks that sort before the cursor are not missed. fn wakeup(&self) { + self.cursor_invalidated.store(true, Ordering::SeqCst); self.scan_requested.store(true, Ordering::SeqCst); self.notify.notify_one(); } From 517d2696d096fb8ad3bfd345777a9144525b8d30 Mon Sep 17 00:00:00 2001 From: Harry Brundage Date: Mon, 23 Mar 2026 15:46:50 -0400 Subject: [PATCH 3/6] Add task scanner profile benchmark using real shard dequeue Replaces the synthetic scan simulation with a benchmark that exercises the full TaskBroker pipeline end-to-end: enqueue 50K tasks via import, then drain them via sustained dequeue calls measuring throughput and per-batch latency. Co-Authored-By: Claude Opus 4.6 (1M context) --- benches/task_scanner_profile.rs | 700 ++++---------------------------- 1 file changed, 77 insertions(+), 623 deletions(-) diff --git a/benches/task_scanner_profile.rs b/benches/task_scanner_profile.rs index b44efe1..8a15888 100644 --- a/benches/task_scanner_profile.rs +++ b/benches/task_scanner_profile.rs @@ -1,8 +1,7 @@ -//! Profile the task scanner hot path. +//! Profile the task scanner hot path using real shard dequeue operations. //! -//! Creates a shard with a large number of pending tasks and repeatedly scans -//! the task key range, measuring per-scan cost. This reproduces the production -//! hot path where `TaskBroker::scan_tasks` iterates through SlateDB SSTs. +//! Creates a shard with a large number of pending tasks and measures sustained +//! dequeue throughput, which exercises the full TaskBroker scanner pipeline. //! //! Run with: //! cargo bench --bench task_scanner_profile @@ -14,17 +13,15 @@ mod bench_helpers; use bench_helpers::*; -use silo::codec::decode_task_validated; -use silo::keys::{end_bound, parse_task_key, task_group_prefix, tasks_prefix}; use std::sync::Arc; use std::time::{Duration, Instant}; -/// Number of additional tasks to enqueue beyond what the golden shard has. -const EXTRA_TASKS: usize = 100_000; -/// How many scan iterations to run for timing. -const SCAN_ITERATIONS: usize = 200; -/// Batch size per scan (matches TaskBroker::scan_batch). -const SCAN_BATCH: usize = 1024; +/// Number of tasks to enqueue for sustained dequeue benchmarks. +const TASK_COUNT: usize = 50_000; +/// Task group used for benchmark tasks. +const TASK_GROUP: &str = "scanner-bench"; +/// Tenant used for benchmark tasks. +const TENANT: &str = "scanner-bench-tenant"; /// Enqueue a large number of ready tasks into a shard for benchmarking. async fn enqueue_many_tasks(shard: &Arc, count: usize) { @@ -47,13 +44,13 @@ async fn enqueue_many_tasks(shard: &Arc, c payload: rmp_serde::to_vec(&serde_json::json!({"bench": "scanner"})).unwrap(), limits: vec![], metadata: None, - task_group: "scanner-bench".to_string(), + task_group: TASK_GROUP.to_string(), attempts: vec![], // no attempts = Waiting status, creates a task entry }); } shard - .import_jobs("scanner-bench-tenant", batch) + .import_jobs(TENANT, batch) .await .expect("import scan bench jobs"); @@ -83,253 +80,52 @@ async fn enqueue_many_tasks(shard: &Arc, c ); } -/// Count all task keys in the DB under the tasks prefix. -async fn count_all_tasks(db: &slatedb::Db) -> usize { - let start = tasks_prefix(); - let end = end_bound(&start); - let Ok(mut iter) = db.scan::, _>(start..end).await else { - return 0; - }; - let mut count = 0; - while let Ok(Some(_)) = iter.next().await { - count += 1; - } - count -} - -/// Simulate one scan_tasks pass: open iterator, read up to SCAN_BATCH entries, -/// parse keys and decode values. Returns (entries_read, duration). -async fn single_scan_pass(db: &slatedb::Db, task_group: &str, now: i64) -> (usize, Duration) { - let start_key = task_group_prefix(task_group); - let end_key = end_bound(&start_key); - - let scan_start = Instant::now(); - - let Ok(mut iter) = db.scan::, _>(start_key..end_key).await else { - return (0, scan_start.elapsed()); - }; - - let mut read = 0; - while read < SCAN_BATCH { - let Ok(Some(kv)) = iter.next().await else { - break; - }; - - // Parse key (same as TaskBroker::scan_tasks) - let Some(parsed_key) = parse_task_key(&kv.key) else { - continue; - }; - - // Time filter check - if parsed_key.start_time_ms > now as u64 { - continue; - } - - // Decode task value (same as TaskBroker::scan_tasks) - let _decoded = decode_task_validated(kv.value.clone()); - - read += 1; - } - - (read, scan_start.elapsed()) -} - -/// Detailed breakdown of scan costs: iterator creation, reads, parsing, decoding. -struct ScanBreakdown { - iter_creation: Duration, - total_read: Duration, - total_key_parse: Duration, - total_value_decode: Duration, - total_wall: Duration, - entries: usize, -} - -async fn single_scan_pass_breakdown(db: &slatedb::Db, task_group: &str, now: i64) -> ScanBreakdown { - let start_key = task_group_prefix(task_group); - let end_key = end_bound(&start_key); - - let wall_start = Instant::now(); - - let t = Instant::now(); - let Ok(mut iter) = db.scan::, _>(start_key..end_key).await else { - return ScanBreakdown { - iter_creation: t.elapsed(), - total_read: Duration::ZERO, - total_key_parse: Duration::ZERO, - total_value_decode: Duration::ZERO, - total_wall: wall_start.elapsed(), - entries: 0, - }; - }; - let iter_creation = t.elapsed(); - - let mut total_read = Duration::ZERO; - let mut total_key_parse = Duration::ZERO; - let mut total_value_decode = Duration::ZERO; - let mut read = 0; - - while read < SCAN_BATCH { - let t = Instant::now(); - let Ok(Some(kv)) = iter.next().await else { - total_read += t.elapsed(); - break; - }; - total_read += t.elapsed(); - - let t = Instant::now(); - let Some(parsed_key) = parse_task_key(&kv.key) else { - total_key_parse += t.elapsed(); - continue; - }; - total_key_parse += t.elapsed(); - - if parsed_key.start_time_ms > now as u64 { - continue; - } - - let t = Instant::now(); - let _decoded = decode_task_validated(kv.value.clone()); - total_value_decode += t.elapsed(); - - read += 1; - } - - ScanBreakdown { - iter_creation, - total_read, - total_key_parse, - total_value_decode, - total_wall: wall_start.elapsed(), - entries: read, - } -} - -/// Benchmark just the iterator creation cost (no reads). -async fn bench_iterator_creation( - db: &slatedb::Db, - task_group: &str, - iterations: usize, -) -> Vec { - let start_key = task_group_prefix(task_group); - let end_key = end_bound(&start_key); +/// Sustained dequeue: drain tasks as fast as possible, measuring throughput. +/// Returns (total_dequeued, elapsed, per_batch_durations). +async fn sustained_dequeue( + shard: &Arc, + batch_size: usize, + max_tasks: usize, +) -> (usize, Duration, Vec) { + let start = Instant::now(); + let mut total = 0; + let mut durations = Vec::new(); - let mut durations = Vec::with_capacity(iterations); - for _ in 0..iterations { + while total < max_tasks { let t = Instant::now(); - let _iter = db - .scan::, _>(start_key.clone()..end_key.clone()) - .await; - durations.push(t.elapsed()); - } - durations.sort(); - durations -} - -/// Benchmark full scan passes (iterator creation + reading SCAN_BATCH entries). -async fn bench_full_scan( - db: &slatedb::Db, - task_group: &str, - iterations: usize, -) -> (Vec, usize) { - let now = now_ms(); - let mut durations = Vec::with_capacity(iterations); - let mut total_entries = 0; - - for _ in 0..iterations { - let (entries, elapsed) = single_scan_pass(db, task_group, now).await; - durations.push(elapsed); - total_entries += entries; - } - durations.sort(); - (durations, total_entries) -} - -/// Benchmark scanning the ENTIRE task group (all entries, not just SCAN_BATCH). -async fn bench_full_group_scan( - db: &slatedb::Db, - task_group: &str, - iterations: usize, -) -> (Vec, usize) { - let now = now_ms(); - let start_key = task_group_prefix(task_group); - let end_key = end_bound(&start_key); - - let mut durations = Vec::with_capacity(iterations); - let mut last_count = 0; - - for _ in 0..iterations { - let scan_start = Instant::now(); - - let Ok(mut iter) = db - .scan::, _>(start_key.clone()..end_key.clone()) + let result = shard + .dequeue("bench-worker", TASK_GROUP, batch_size) .await - else { - durations.push(scan_start.elapsed()); - continue; - }; - - let mut read = 0; - loop { - let Ok(Some(kv)) = iter.next().await else { + .expect("dequeue"); + let elapsed = t.elapsed(); + + if result.tasks.is_empty() { + // Give scanner a moment to refill, then try once more + tokio::time::sleep(Duration::from_millis(100)).await; + let result = shard + .dequeue("bench-worker", TASK_GROUP, batch_size) + .await + .expect("dequeue retry"); + if result.tasks.is_empty() { break; - }; - let Some(parsed_key) = parse_task_key(&kv.key) else { - continue; - }; - if parsed_key.start_time_ms > now as u64 { - continue; } - let _decoded = decode_task_validated(kv.value.clone()); - read += 1; + total += result.tasks.len(); + durations.push(elapsed + Duration::from_millis(100)); + } else { + total += result.tasks.len(); + durations.push(elapsed); } - - durations.push(scan_start.elapsed()); - last_count = read; } - durations.sort(); - (durations, last_count) + (total, start.elapsed(), durations) } -/// Benchmark just iterator reads with NO key parsing or value decoding. -async fn bench_raw_reads( - db: &slatedb::Db, - task_group: &str, - iterations: usize, -) -> (Vec, usize) { - let start_key = task_group_prefix(task_group); - let end_key = end_bound(&start_key); - - let mut durations = Vec::with_capacity(iterations); - let mut total = 0; - - for _ in 0..iterations { - let Ok(mut iter) = db - .scan::, _>(start_key.clone()..end_key.clone()) - .await - else { - continue; - }; - let t = Instant::now(); - let mut read = 0; - while read < SCAN_BATCH { - let Ok(Some(_kv)) = iter.next().await else { - break; - }; - read += 1; - } - durations.push(t.elapsed()); - total += read; - } - durations.sort(); - (durations, total) -} - -fn print_stats(label: &str, durations: &[Duration]) { +fn print_stats(label: &str, durations: &mut [Duration]) { if durations.is_empty() { println!(" {:<40} (no data)", label); return; } + durations.sort(); let p50 = durations[durations.len() / 2]; let p99 = durations[(durations.len() as f64 * 0.99).ceil() as usize - 1]; let mean: Duration = durations.iter().sum::() / durations.len() as u32; @@ -355,395 +151,53 @@ async fn main() { let metadata = ensure_golden_shard().await; println!( - "Cloning golden shard and adding {} extra tasks...", - EXTRA_TASKS + "Cloning golden shard and adding {} tasks...", + TASK_COUNT ); let (_guard, shard) = clone_golden_shard("scanner-profile", &metadata).await; + enqueue_many_tasks(&shard, TASK_COUNT).await; - // Count existing tasks - let existing_tasks = count_all_tasks(shard.db()).await; - println!(" Existing task keys in golden shard: {}", existing_tasks); - - // Add more tasks to the "scanner-bench" task group - enqueue_many_tasks(&shard, EXTRA_TASKS).await; - - let total_tasks = count_all_tasks(shard.db()).await; - println!(" Total task keys after additions: {}", total_tasks); - - // Count tasks in each group - { - let start_key = task_group_prefix("scanner-bench"); - let end_key = end_bound(&start_key); - let Ok(mut iter) = shard.db().scan::, _>(start_key..end_key).await else { - panic!("failed to scan scanner-bench group"); - }; - let mut count = 0; - while let Ok(Some(_)) = iter.next().await { - count += 1; - } - println!(" Tasks in 'scanner-bench' group: {}", count); - - let start_key = task_group_prefix(""); - let end_key = end_bound(&start_key); - let Ok(mut iter) = shard.db().scan::, _>(start_key..end_key).await else { - panic!("failed to scan default group"); - }; - let mut count = 0; - while let Ok(Some(_)) = iter.next().await { - count += 1; - } - println!(" Tasks in default '' group: {}", count); - } - - println!( - "\n--- Iterator creation cost ({} iterations) ---", - SCAN_ITERATIONS - ); - - let durations = bench_iterator_creation(shard.db(), "scanner-bench", SCAN_ITERATIONS).await; - print_stats("iter_create(scanner-bench, 100K)", &durations); - - let durations = bench_iterator_creation(shard.db(), "", SCAN_ITERATIONS).await; - print_stats("iter_create(default group)", &durations); + // Let the scanner populate the buffer initially + println!("\n Waiting for scanner to populate buffer..."); + tokio::time::sleep(Duration::from_millis(500)).await; + // --- Sustained dequeue throughput --- + println!("\n--- Sustained dequeue throughput (batch_size=32) ---"); { - let start = tasks_prefix(); - let end = end_bound(&start); - let mut durations = Vec::with_capacity(SCAN_ITERATIONS); - for _ in 0..SCAN_ITERATIONS { - let t = Instant::now(); - let _iter = shard - .db() - .scan::, _>(start.clone()..end.clone()) - .await; - durations.push(t.elapsed()); - } - durations.sort(); - print_stats("iter_create(all tasks)", &durations); - } - - println!( - "\n--- Raw reads (no parse/decode) {} entries ({} iters) ---", - SCAN_BATCH, SCAN_ITERATIONS - ); - - let (durations, _) = bench_raw_reads(shard.db(), "scanner-bench", SCAN_ITERATIONS).await; - print_stats( - &format!("raw_read(scanner-bench, {})", SCAN_BATCH), - &durations, - ); - - let (durations, _) = bench_raw_reads(shard.db(), "", SCAN_ITERATIONS).await; - print_stats(&format!("raw_read(default, {})", SCAN_BATCH), &durations); - - println!( - "\n--- Full scan {} entries per pass ({} iterations) ---", - SCAN_BATCH, SCAN_ITERATIONS - ); - - let (durations, total) = bench_full_scan(shard.db(), "scanner-bench", SCAN_ITERATIONS).await; - print_stats( - &format!("scan_batch(scanner-bench, {})", SCAN_BATCH), - &durations, - ); - println!(" total entries read across all iterations: {}", total); - - let (durations, total) = bench_full_scan(shard.db(), "", SCAN_ITERATIONS).await; - print_stats(&format!("scan_batch(default, {})", SCAN_BATCH), &durations); - println!(" total entries read across all iterations: {}", total); - - println!("\n--- CPU time breakdown (100 iterations, scanner-bench group) ---"); - { - let now = now_ms(); - let iters: u32 = 100; - let mut iter_creation = Duration::ZERO; - let mut total_read = Duration::ZERO; - let mut total_key_parse = Duration::ZERO; - let mut total_value_decode = Duration::ZERO; - let mut total_wall = Duration::ZERO; - let mut total_entries: usize = 0; - - for _ in 0..iters { - let bd = single_scan_pass_breakdown(shard.db(), "scanner-bench", now).await; - iter_creation += bd.iter_creation; - total_read += bd.total_read; - total_key_parse += bd.total_key_parse; - total_value_decode += bd.total_value_decode; - total_wall += bd.total_wall; - total_entries += bd.entries; - } - - let avg_wall = total_wall / iters; - let avg_iter = iter_creation / iters; - let avg_read = total_read / iters; - let avg_parse = total_key_parse / iters; - let avg_decode = total_value_decode / iters; - + let (total, elapsed, mut durations) = + sustained_dequeue(&shard, 32, TASK_COUNT).await; + let rate = total as f64 / elapsed.as_secs_f64(); println!( - " Average per-scan breakdown ({} entries/scan):", - total_entries / iters as usize - ); - println!(" wall time: {}", format_duration(avg_wall)); - println!( - " iter creation: {} ({:.1}%)", - format_duration(avg_iter), - avg_iter.as_secs_f64() / avg_wall.as_secs_f64() * 100.0 - ); - println!( - " iter.next() reads: {} ({:.1}%)", - format_duration(avg_read), - avg_read.as_secs_f64() / avg_wall.as_secs_f64() * 100.0 - ); - println!( - " key parsing: {} ({:.1}%)", - format_duration(avg_parse), - avg_parse.as_secs_f64() / avg_wall.as_secs_f64() * 100.0 - ); - println!( - " value decoding: {} ({:.1}%)", - format_duration(avg_decode), - avg_decode.as_secs_f64() / avg_wall.as_secs_f64() * 100.0 - ); - let accounted = avg_iter + avg_read + avg_parse + avg_decode; - let overhead = avg_wall.saturating_sub(accounted); - println!( - " other overhead: {} ({:.1}%)", - format_duration(overhead), - overhead.as_secs_f64() / avg_wall.as_secs_f64() * 100.0 + " Dequeued {} tasks in {:.1}s = {:.0} tasks/sec", + total, + elapsed.as_secs_f64(), + rate, ); + print_stats("dequeue_batch(32)", &mut durations); } - // Simulate production pattern: scan with a pre-populated buffer. - // The scanner must read through all buffered entries before finding new ones. - println!("\n--- Production pattern: scan with pre-populated buffer ---"); - { - use std::collections::HashSet; - let now = now_ms(); - - // Pre-populate a "buffer" by scanning first N entries - for buffer_size in [0usize, 2048, 4096, 8192, 16384, 32768] { - let start_key = task_group_prefix("scanner-bench"); - let end_key = end_bound(&start_key); - - // Collect the first buffer_size keys to simulate what's already in the buffer - let mut buffered_keys: HashSet> = HashSet::new(); - if buffer_size > 0 { - let Ok(mut iter) = shard - .db() - .scan::, _>(start_key.clone()..end_key.clone()) - .await - else { - continue; - }; - let mut collected = 0; - while collected < buffer_size { - let Ok(Some(kv)) = iter.next().await else { - break; - }; - buffered_keys.insert(kv.key.to_vec()); - collected += 1; - } - } - - // Now simulate scan_tasks: scan from beginning, skip buffered, insert 1024 new - let iters = 50; - let mut durations = Vec::with_capacity(iters); - let mut total_iter_next_calls = 0usize; - - for _ in 0..iters { - let Ok(mut iter) = shard - .db() - .scan::, _>(start_key.clone()..end_key.clone()) - .await - else { - continue; - }; - - let scan_start = Instant::now(); - let mut inserted = 0; - let mut reads = 0; - while inserted < SCAN_BATCH { - let Ok(Some(kv)) = iter.next().await else { - break; - }; - reads += 1; - - let Some(parsed_key) = parse_task_key(&kv.key) else { - continue; - }; - if parsed_key.start_time_ms > now as u64 { - continue; - } - let _decoded = decode_task_validated(kv.value.clone()); - - // Skip if already in "buffer" - if buffered_keys.contains(&kv.key.to_vec()) { - continue; - } - inserted += 1; - } - durations.push(scan_start.elapsed()); - total_iter_next_calls += reads; - } + shard.close().await.expect("close"); - durations.sort(); - let avg_reads = total_iter_next_calls / iters; - print_stats( - &format!("buffer={}, +{} new", buffer_size, SCAN_BATCH), - &durations, - ); - println!( - " avg iter.next() calls: {} ({}x scan_batch)", - avg_reads, - avg_reads as f64 / SCAN_BATCH as f64 - ); - } - } + // Run again with a fresh clone for a second batch size + let (_guard2, shard2) = clone_golden_shard("scanner-profile-2", &metadata).await; + enqueue_many_tasks(&shard2, TASK_COUNT).await; + tokio::time::sleep(Duration::from_millis(500)).await; - // Simulate a full refill cycle: buffer drains from target_buffer to low_watermark, - // then the scanner refills it. Measure total CPU cost of the refill with different - // scan_batch sizes. - println!("\n--- Refill cycle cost: buffer 4096->8192 with varying scan_batch ---"); + println!("\n--- Sustained dequeue throughput (batch_size=1) ---"); { - use std::collections::HashSet; - let now = now_ms(); - let target_buffer = 8192usize; - let low_watermark = target_buffer / 2; // 4096 - - // Pre-populate buffer with 4096 entries (simulates post-drain state) - let start_key = task_group_prefix("scanner-bench"); - let end_key = end_bound(&start_key); - let mut buffered_keys: HashSet> = HashSet::new(); - { - let Ok(mut iter) = shard - .db() - .scan::, _>(start_key.clone()..end_key.clone()) - .await - else { - panic!("scan failed"); - }; - let mut collected = 0; - while collected < low_watermark { - let Ok(Some(kv)) = iter.next().await else { - break; - }; - buffered_keys.insert(kv.key.to_vec()); - collected += 1; - } - } - - for scan_batch in [1024usize, 2048, 4096, 8192] { - let iters = 20; - let mut durations = Vec::with_capacity(iters); - let mut total_scans = 0usize; - let mut total_reads = 0usize; - - for _ in 0..iters { - let cycle_start = Instant::now(); - let mut buf_len = low_watermark; - let mut current_buffered = buffered_keys.clone(); - let mut cycle_scans = 0; - let mut cycle_reads = 0; - - // Simulate multiple scan passes to fill buffer from low_watermark to target_buffer - while buf_len < target_buffer { - let Ok(mut iter) = shard - .db() - .scan::, _>(start_key.clone()..end_key.clone()) - .await - else { - break; - }; - - let mut inserted = 0; - while inserted < scan_batch && buf_len < target_buffer { - let Ok(Some(kv)) = iter.next().await else { - break; - }; - cycle_reads += 1; - - let Some(parsed_key) = parse_task_key(&kv.key) else { - continue; - }; - if parsed_key.start_time_ms > now as u64 { - continue; - } - let _decoded = decode_task_validated(kv.value.clone()); - - let key_bytes = kv.key.to_vec(); - if !current_buffered.contains(&key_bytes) { - current_buffered.insert(key_bytes); - buf_len += 1; - inserted += 1; - } - } - cycle_scans += 1; - } - - durations.push(cycle_start.elapsed()); - total_scans += cycle_scans; - total_reads += cycle_reads; - } - - durations.sort(); - let avg_scans = total_scans as f64 / iters as f64; - let avg_reads = total_reads as f64 / iters as f64; - print_stats(&format!("scan_batch={}", scan_batch), &durations); - println!( - " avg scans/refill: {:.1}, avg reads/refill: {:.0}, refills_at_50scan/s: {:.1}/s, est CPU: {:.2} vCPU", - avg_scans, - avg_reads, - 50.0 / avg_scans, - durations[durations.len() / 2].as_secs_f64() * 50.0 / avg_scans, - ); - } - } - - println!("\n--- Full group scan ({} iterations) ---", 10); - - let (durations, count) = bench_full_group_scan(shard.db(), "scanner-bench", 10).await; - print_stats( - &format!("full_scan(scanner-bench, {} tasks)", count), - &durations, - ); - - let (durations, count) = bench_full_group_scan(shard.db(), "", 10).await; - print_stats(&format!("full_scan(default, {} tasks)", count), &durations); - - // Throughput estimate - println!("\n--- Throughput estimate ---"); - let now = now_ms(); - let sustained_iters = 500; - let sustained_start = Instant::now(); - let mut total_entries = 0; - for _ in 0..sustained_iters { - let (entries, _) = single_scan_pass(shard.db(), "scanner-bench", now).await; - total_entries += entries; + // Only drain 2000 with batch_size=1 to keep it reasonable + let (total, elapsed, mut durations) = + sustained_dequeue(&shard2, 1, 2000).await; + let rate = total as f64 / elapsed.as_secs_f64(); + println!( + " Dequeued {} tasks in {:.1}s = {:.0} tasks/sec", + total, + elapsed.as_secs_f64(), + rate, + ); + print_stats("dequeue_batch(1)", &mut durations); } - let sustained_elapsed = sustained_start.elapsed(); - let scans_per_sec = sustained_iters as f64 / sustained_elapsed.as_secs_f64(); - let entries_per_sec = total_entries as f64 / sustained_elapsed.as_secs_f64(); - println!( - " {} scans in {:.1}s = {:.1} scans/sec ({:.0} entries/sec)", - sustained_iters, - sustained_elapsed.as_secs_f64(), - scans_per_sec, - entries_per_sec, - ); - println!( - " Average {:.1}ms per scan ({} entries/scan)", - sustained_elapsed.as_secs_f64() * 1000.0 / sustained_iters as f64, - total_entries / sustained_iters - ); - // Extrapolate to production: if scanning at 50 scans/sec - let ms_per_scan = sustained_elapsed.as_secs_f64() * 1000.0 / sustained_iters as f64; - let cpu_at_50_scans = ms_per_scan * 50.0 / 1000.0; - println!( - "\n At 50 scans/sec: estimated {:.2} vCPU ({:.1}ms/scan * 50)", - cpu_at_50_scans, ms_per_scan - ); - - shard.close().await.expect("close"); + shard2.close().await.expect("close"); println!("\nDone."); } From a450980fcd2048efa549abc5ba89b7d3781d0e32 Mon Sep 17 00:00:00 2001 From: Harry Brundage Date: Mon, 23 Mar 2026 15:48:32 -0400 Subject: [PATCH 4/6] Format benchmark and test files Co-Authored-By: Claude Opus 4.6 (1M context) --- benches/task_scanner_profile.rs | 11 +++-------- tests/task_scanner_ordering_tests.rs | 25 ++++++++++++++----------- 2 files changed, 17 insertions(+), 19 deletions(-) diff --git a/benches/task_scanner_profile.rs b/benches/task_scanner_profile.rs index 8a15888..6b9a5a7 100644 --- a/benches/task_scanner_profile.rs +++ b/benches/task_scanner_profile.rs @@ -150,10 +150,7 @@ async fn main() { let metadata = ensure_golden_shard().await; - println!( - "Cloning golden shard and adding {} tasks...", - TASK_COUNT - ); + println!("Cloning golden shard and adding {} tasks...", TASK_COUNT); let (_guard, shard) = clone_golden_shard("scanner-profile", &metadata).await; enqueue_many_tasks(&shard, TASK_COUNT).await; @@ -164,8 +161,7 @@ async fn main() { // --- Sustained dequeue throughput --- println!("\n--- Sustained dequeue throughput (batch_size=32) ---"); { - let (total, elapsed, mut durations) = - sustained_dequeue(&shard, 32, TASK_COUNT).await; + let (total, elapsed, mut durations) = sustained_dequeue(&shard, 32, TASK_COUNT).await; let rate = total as f64 / elapsed.as_secs_f64(); println!( " Dequeued {} tasks in {:.1}s = {:.0} tasks/sec", @@ -186,8 +182,7 @@ async fn main() { println!("\n--- Sustained dequeue throughput (batch_size=1) ---"); { // Only drain 2000 with batch_size=1 to keep it reasonable - let (total, elapsed, mut durations) = - sustained_dequeue(&shard2, 1, 2000).await; + let (total, elapsed, mut durations) = sustained_dequeue(&shard2, 1, 2000).await; let rate = total as f64 / elapsed.as_secs_f64(); println!( " Dequeued {} tasks in {:.1}s = {:.0} tasks/sec", diff --git a/tests/task_scanner_ordering_tests.rs b/tests/task_scanner_ordering_tests.rs index 66352f2..9193f56 100644 --- a/tests/task_scanner_ordering_tests.rs +++ b/tests/task_scanner_ordering_tests.rs @@ -99,7 +99,12 @@ async fn enqueue_start_at_zero_picked_up_after_buffer_populated() { all_ids ); // Should have gotten all 5 remaining tasks (5 enqueued - 1 dequeued + 1 zero = 5) - assert_eq!(all_ids.len(), 5, "all tasks must be drained; got: {:?}", all_ids); + assert_eq!( + all_ids.len(), + 5, + "all tasks must be drained; got: {:?}", + all_ids + ); }); } @@ -355,10 +360,7 @@ async fn concurrency_grant_task_picked_up_by_scanner() { // Complete holder — this frees the concurrency slot shard - .report_attempt_outcome( - &holder_task_id, - AttemptOutcome::Success { result: vec![] }, - ) + .report_attempt_outcome(&holder_task_id, AttemptOutcome::Success { result: vec![] }) .await .expect("complete holder"); @@ -570,7 +572,12 @@ async fn higher_priority_enqueued_later_picked_up() { all_ids ); // 5 original - 1 dequeued + 1 high-pri = 5 - assert_eq!(all_ids.len(), 5, "all tasks must be drained; got: {:?}", all_ids); + assert_eq!( + all_ids.len(), + 5, + "all tasks must be drained; got: {:?}", + all_ids + ); }); } @@ -638,11 +645,7 @@ async fn retry_task_picked_up_after_backoff() { .dequeue("w", "default", 1) .await .expect("dequeue retry"); - assert_eq!( - retry_result.tasks.len(), - 1, - "retry task should be dequeued" - ); + assert_eq!(retry_result.tasks.len(), 1, "retry task should be dequeued"); assert_eq!(retry_result.tasks[0].job().id(), "retry-job"); assert_eq!( retry_result.tasks[0].attempt().attempt_number(), From 8ccb6d10f2173d9ccdf03f51c5195a6173875c64 Mon Sep 17 00:00:00 2001 From: Harry Brundage Date: Mon, 23 Mar 2026 16:44:36 -0400 Subject: [PATCH 5/6] Cache the scan iterator across passes instead of recreating per-scan Instead of tracking a cursor key and creating a new SlateDB iterator each scan pass, hold the actual DbIterator as a local variable in the scan loop and march it forward. This avoids both the iterator creation cost (~160us) and the cost of re-reading already-buffered entries. The iterator stops when it hits future-timestamped tasks (HitFuture) or exhausts the range (Exhausted). It is dropped and recreated when: - wakeup() fires (new tasks may have been written behind the cursor) - The iterator is older than MAX_ITER_AGE (5s) - The iterator is exhausted When a future task is encountered, the consumed entry is held back and re-evaluated on the next pass with a fresh now_ms, so it is never lost from the iterator. Co-Authored-By: Claude Opus 4.6 (1M context) --- src/task_broker.rs | 317 +++++++++++++++------------ tests/task_scanner_ordering_tests.rs | 24 +- 2 files changed, 197 insertions(+), 144 deletions(-) diff --git a/src/task_broker.rs b/src/task_broker.rs index bf2e7d6..398d41e 100644 --- a/src/task_broker.rs +++ b/src/task_broker.rs @@ -8,7 +8,7 @@ use std::time::Duration; use crossbeam_skiplist::SkipMap; use dashmap::DashMap; -use slatedb::{Db, WriteBatch}; +use slatedb::{Db, DbIterator, KeyValue, WriteBatch}; use tokio::sync::Notify; use crate::codec::{DecodedTask, decode_task_validated}; @@ -28,10 +28,29 @@ pub struct BrokerTask { pub decoded: DecodedTask, } +/// Result of a single scan pass from the cached iterator. +enum ScanOutcome { + /// Inserted entries, stopped at batch limit or buffer full. + /// The iterator has more entries to read. + ReadBatch { inserted: usize }, + /// Hit a task with a future timestamp. The consumed entry is returned + /// so the caller can re-evaluate it on the next pass with a fresh + /// `now_ms` without losing it from the iterator. + HitFuture { + inserted: usize, + pending_kv: KeyValue, + }, + /// The iterator reached the end of the key range. + Exhausted { inserted: usize }, +} + /// A single per-task-group broker that scans only its own key range. /// /// - Maintains a sorted buffer of ready tasks using a skiplist keyed by the task key bytes. -/// - Populates from SlateDB in the background with exponential backoff when no work is found. +/// - Populates from SlateDB in the background using a cached iterator that marches +/// forward through the key range, avoiding redundant re-reads of buffered entries. +/// - When wakeup() fires (new tasks written), the cached iterator is dropped and +/// recreated from the beginning to pick up tasks behind the old cursor. /// - Ensures tasks claimed but not yet durably leased are tracked as in-flight and not reinserted. pub struct TaskBroker { // The task group this broker is responsible for @@ -52,11 +71,12 @@ pub struct TaskBroker { running: Arc, // A notify object to wake up the background scanner when a task is claimed notify: Arc, - // Whether the background scanner should be woken up + // Whether the background scanner should be woken up and the cached + // iterator should be dropped (new tasks may exist behind the cursor). scan_requested: Arc, // The target buffer size target_buffer: usize, - // The batch size for the background scanner to read out of the DB + // Maximum entries to read per scan pass before yielding control scan_batch: usize, // The shard name for metrics labeling shard_name: String, @@ -64,16 +84,6 @@ pub struct TaskBroker { metrics: Option, /// The shard's tenant range for filtering defunct tasks. range: ShardRange, - /// Scan cursor: the last key read by the scanner. When set, the next scan - /// resumes from just after this key instead of re-reading from the - /// beginning of the task group range. Invalidated by `wakeup()` so that - /// newly-created tasks behind the cursor are not missed. - scan_cursor: Mutex>>, - /// Set to true when a wakeup fires, forcing the next scan to start from - /// the beginning of the key range. This ensures tasks created behind the - /// cursor (by enqueue, expedite, restart, concurrency grants, etc.) are - /// picked up on the next scan pass. - cursor_invalidated: AtomicBool, } impl TaskBroker { @@ -81,6 +91,11 @@ impl TaskBroker { // refresh the tombstone generation whenever a stale key is observed again. const ACK_TOMBSTONE_RETAIN_GENERATIONS: u64 = 64; + // Maximum age of a cached iterator before it's recreated. This bounds + // how long we hold a snapshot-isolated view that can't see new writes + // when no wakeups arrive (e.g. scheduled tasks becoming ready). + const MAX_ITER_AGE: Duration = Duration::from_secs(5); + fn new( task_group: String, db: Arc, @@ -100,17 +115,10 @@ impl TaskBroker { notify: Arc::new(Notify::new()), scan_requested: Arc::new(AtomicBool::new(false)), target_buffer: 8192, - // Must be >= target_buffer / 2 (the low watermark) so that a - // single scan pass can refill from low-watermark to target. - // With a smaller batch the scanner needs multiple passes and - // each pass re-reads every already-buffered entry from the - // start of the key range, wasting O(buffer_size) reads/pass. scan_batch: 4096, shard_name, metrics, range, - scan_cursor: Mutex::new(None), - cursor_invalidated: AtomicBool::new(false), }) } @@ -136,84 +144,69 @@ impl TaskBroker { }); } - /// Determine the scan start key. If we have a valid cursor (not - /// invalidated), resume scanning from just after the cursor. Otherwise - /// start from the beginning of the task group range. - fn take_scan_start(&self) -> (Vec, bool) { - let group_prefix = task_group_prefix(&self.task_group); - - // If cursor was invalidated by a wakeup, start from the beginning - if self.cursor_invalidated.swap(false, Ordering::SeqCst) { - *self.scan_cursor.lock().unwrap() = None; - return (group_prefix, true); - } - - let cursor = self.scan_cursor.lock().unwrap(); - match cursor.as_ref() { - Some(last_key) => { - // Resume just after the last key we read - let resume_from = end_bound(last_key); - (resume_from, false) - } - None => (group_prefix, true), - } - } - - /// Update the scan cursor to the last key read in the scan. - fn set_scan_cursor(&self, last_key: Option>) { - *self.scan_cursor.lock().unwrap() = last_key; - } - - /// Reset the cursor so the next scan starts from the beginning. - fn reset_scan_cursor(&self) { - *self.scan_cursor.lock().unwrap() = None; - } - - /// Scan tasks from DB and insert into buffer, skipping future tasks and inflight ones. - async fn scan_tasks(&self, now_ms: i64, generation: u64) -> usize { + /// Create a fresh iterator over the full task group key range. + async fn new_scan_iter(&self) -> Option { // [SILO-SCAN-1] Scan only this task group's key range - let (start, is_full_scan) = self.take_scan_start(); - let end = end_bound(&task_group_prefix(&self.task_group)); - - // If the cursor is past the end of the range, wrap around - if start >= end { - self.reset_scan_cursor(); - return 0; - } - - let Ok(mut iter) = self.db.scan::, _>(start..end).await else { - return 0; - }; + let start = task_group_prefix(&self.task_group); + let end = end_bound(&start); + self.db.scan::, _>(start..end).await.ok() + } - // Collect keys to delete for defunct tasks (outside shard range) + /// Read entries from an existing iterator, inserting ready tasks into the + /// buffer. Stops when the batch limit is hit, the buffer is full, a + /// future-timestamped task is encountered, or the iterator is exhausted. + /// + /// `pending_kv` is a previously-consumed entry that was returned from a + /// prior `HitFuture` outcome. If provided, it is processed first before + /// reading from the iterator. + async fn scan_from_iter( + &self, + iter: &mut DbIterator, + pending_kv: Option, + now_ms: i64, + generation: u64, + ) -> ScanOutcome { let mut defunct_keys: Vec> = Vec::new(); - let mut inserted = 0; - let mut last_key_read: Option> = None; - let mut reached_end = true; - while inserted < self.scan_batch && self.buffer.len() < self.target_buffer { - let Ok(Some(kv)) = iter.next().await else { - break; + let mut next_kv = pending_kv; + + loop { + if inserted >= self.scan_batch || self.buffer.len() >= self.target_buffer { + self.delete_defunct_keys(defunct_keys).await; + return ScanOutcome::ReadBatch { inserted }; + } + + // Use the held-back entry first, then read from the iterator. + let kv = match next_kv.take() { + Some(kv) => kv, + None => { + let Ok(Some(kv)) = iter.next().await else { + self.delete_defunct_keys(defunct_keys).await; + return ScanOutcome::Exhausted { inserted }; + }; + kv + } }; - reached_end = false; // Parse the task key to extract timestamp let Some(parsed_key) = parse_task_key(&kv.key) else { - last_key_read = Some(kv.key.to_vec()); continue; }; - // Filter out future tasks + // Stop when we reach future tasks. Return the consumed entry so + // the caller can re-evaluate it on the next pass with a fresh now_ms. if parsed_key.start_time_ms > now_ms as u64 { - last_key_read = Some(kv.key.to_vec()); - continue; + self.delete_defunct_keys(defunct_keys).await; + return ScanOutcome::HitFuture { + inserted, + pending_kv: kv, + }; } let key_bytes = kv.key.to_vec(); // [SILO-SCAN-3] Skip inflight tasks if self.inflight.lock().unwrap().contains(&key_bytes) { - last_key_read = Some(key_bytes); continue; } @@ -229,23 +222,18 @@ impl TaskBroker { } }; if suppress_due_to_tombstone { - last_key_read = Some(key_bytes); continue; } let decoded = match decode_task_validated(kv.value.clone()) { Ok(t) => t, - Err(_) => { - last_key_read = Some(key_bytes); - continue; // Skip malformed tasks - } + Err(_) => continue, }; // Check if task's tenant is within shard range let task_tenant = decoded.tenant(); if !self.range.contains_tenant(task_tenant) { - // Task is for a tenant outside our range - mark for deletion defunct_keys.push(kv.key.to_vec()); debug!( task_group = %parsed_key.task_group, @@ -254,7 +242,6 @@ impl TaskBroker { range = %self.range, "skipping defunct task (tenant outside shard range)" ); - last_key_read = Some(key_bytes); continue; } @@ -265,7 +252,7 @@ impl TaskBroker { // [SILO-SCAN-2] Insert into buffer if not already present if self.buffer.get(&key_bytes).is_none() { - self.buffer.insert(key_bytes.clone(), entry); + self.buffer.insert(key_bytes, entry); inserted += 1; // Yield periodically to avoid starving other tasks @@ -273,48 +260,37 @@ impl TaskBroker { tokio::task::yield_now().await; } } - - last_key_read = Some(key_bytes); } + } - // Update cursor position - if let Some(last_key) = last_key_read { - self.set_scan_cursor(Some(last_key)); - } else if reached_end { - // Iterator returned nothing — we've exhausted the range. - // Reset cursor so next scan starts from the beginning. - self.reset_scan_cursor(); + /// Delete defunct tasks (outside shard range) from the database. + async fn delete_defunct_keys(&self, defunct_keys: Vec>) { + if defunct_keys.is_empty() { + return; } - // If we stopped because inserted >= scan_batch or buffer is full, - // and we got a cursor update above, we'll resume from there next time. - - // On a full scan that inserted nothing, the cursor has now walked - // through the entire range without finding new work. Reset it so - // the next scan starts fresh rather than repeatedly scanning the tail. - if is_full_scan && inserted == 0 { - self.reset_scan_cursor(); + let mut batch = WriteBatch::new(); + for key in &defunct_keys { + batch.delete(key); } - - // Delete defunct tasks from the database - if !defunct_keys.is_empty() { - let mut batch = WriteBatch::new(); - for key in &defunct_keys { - batch.delete(key); - } - if let Err(e) = self.db.write(batch).await { - debug!(error = %e, count = defunct_keys.len(), "failed to delete defunct tasks"); - } else { - debug!( - count = defunct_keys.len(), - "deleted defunct tasks outside shard range" - ); - } + if let Err(e) = self.db.write(batch).await { + debug!(error = %e, count = defunct_keys.len(), "failed to delete defunct tasks"); + } else { + debug!( + count = defunct_keys.len(), + "deleted defunct tasks outside shard range" + ); } - - inserted } /// Start the background scanning loop. + /// + /// The scan loop holds a cached `DbIterator` that marches forward through + /// the task group's key range. This avoids re-reading already-buffered + /// entries on each pass. The iterator is dropped and recreated when: + /// - `wakeup()` fires (new tasks may have been written behind the cursor) + /// - The iterator is exhausted (reached end of range) + /// - The iterator is older than `MAX_ITER_AGE` (to pick up time-based + /// changes like scheduled tasks becoming ready) fn start(self: &Arc) { if self.running.swap(true, Ordering::SeqCst) { return; @@ -328,6 +304,14 @@ impl TaskBroker { let scan_low_watermark = broker.target_buffer / 2; let mut scanning = false; + // Cached iterator state — lives across loop iterations + let mut cached_iter: Option = None; + let mut iter_created_at = std::time::Instant::now(); + // Entry consumed from the iterator that turned out to be a future + // task. Held here so the next scan pass can re-evaluate it with a + // fresh now_ms without losing it. + let mut pending_future_kv: Option = None; + loop { if !broker.running.load(Ordering::SeqCst) { break; @@ -347,12 +331,46 @@ impl TaskBroker { continue; } + // Check if the cached iterator needs to be dropped and recreated. + // This happens when: + // 1. wakeup() fired (scan_requested) — new tasks may be behind cursor + // 2. Iterator is too old — scheduled tasks may have become ready + // 3. No iterator exists yet + let was_scan_requested = broker.scan_requested.swap(false, Ordering::SeqCst); + let need_fresh_iter = was_scan_requested + || cached_iter.is_none() + || iter_created_at.elapsed() > Self::MAX_ITER_AGE; + + if need_fresh_iter { + cached_iter = broker.new_scan_iter().await; + iter_created_at = std::time::Instant::now(); + // Drop any held-back future entry — the fresh iterator + // will re-read it from the new snapshot. + pending_future_kv = None; + if cached_iter.is_none() { + sleep_ms = (sleep_ms * 2).min(max_sleep_ms); + tokio::time::sleep(Duration::from_millis(sleep_ms)).await; + continue; + } + } + + let iter = cached_iter.as_mut().unwrap(); + // Scan for ready tasks let now_ms = crate::job_store_shard::now_epoch_ms(); let scan_start = std::time::Instant::now(); let generation = broker.begin_scan_generation(); - let inserted = broker.scan_tasks(now_ms, generation).await; + let outcome = broker + .scan_from_iter(iter, pending_future_kv.take(), now_ms, generation) + .await; broker.complete_scan_generation(generation); + + let inserted = match &outcome { + ScanOutcome::ReadBatch { inserted } + | ScanOutcome::HitFuture { inserted, .. } + | ScanOutcome::Exhausted { inserted } => *inserted, + }; + if let Some(ref m) = broker.metrics { m.record_broker_scan_duration( &broker.shard_name, @@ -370,18 +388,38 @@ impl TaskBroker { ); } - // Adjust backoff: stay aggressive when buffer needs filling - // AND the scan actually found tasks to insert. If the scan - // found nothing, back off even if the buffer is low — the DB - // is empty and there's no point hammering it at 200 scans/s. - if inserted == 0 { - sleep_ms = (sleep_ms * 2).min(max_sleep_ms); - } else { - sleep_ms = min_sleep_ms; + match outcome { + ScanOutcome::ReadBatch { .. } => { + // More entries available — stay aggressive + sleep_ms = min_sleep_ms; + } + ScanOutcome::HitFuture { pending_kv, .. } => { + // Hold the consumed future entry for the next pass. + pending_future_kv = Some(pending_kv); + if inserted == 0 { + sleep_ms = (sleep_ms * 2).min(max_sleep_ms); + } else { + sleep_ms = min_sleep_ms; + } + } + ScanOutcome::Exhausted { .. } => { + // Nothing left in range. Drop iterator so next pass + // creates a fresh one from the beginning. + cached_iter = None; + if inserted == 0 { + sleep_ms = (sleep_ms * 2).min(max_sleep_ms); + } else { + sleep_ms = min_sleep_ms; + } + } } - // Handle explicit scan requests with minimal sleep - if broker.scan_requested.swap(false, Ordering::SeqCst) { + // Fast-path: if this scan was triggered by a wakeup, or a + // new wakeup arrived during the scan, loop immediately with + // minimal delay. This ensures the scanner stays responsive + // to enqueue→wakeup→dequeue sequences without requiring + // the full backoff sleep to elapse. + if was_scan_requested || broker.scan_requested.load(Ordering::SeqCst) { tokio::time::sleep(Duration::from_millis(1)).await; continue; } @@ -396,6 +434,9 @@ impl TaskBroker { // resetting to min — keeps scan rate reasonable // while still responding to demand. sleep_ms = (sleep_ms / 2).max(min_sleep_ms); + // Mark scan_requested so the loop drops the cached + // iterator on the next pass. + broker.scan_requested.store(true, Ordering::SeqCst); debug!(task_group = %broker.task_group, "broker woken by notification"); } _ = &mut delay => {}, @@ -500,11 +541,11 @@ impl TaskBroker { } } - /// Wake the scanner to refill promptly. Invalidates the scan cursor so - /// that the next scan starts from the beginning of the key range, ensuring - /// newly-created tasks that sort before the cursor are not missed. + /// Wake the scanner to refill promptly. Signals the scan loop to drop its + /// cached iterator and create a fresh one from the beginning of the key + /// range, ensuring newly-created tasks that sort before the old cursor + /// position are picked up. fn wakeup(&self) { - self.cursor_invalidated.store(true, Ordering::SeqCst); self.scan_requested.store(true, Ordering::SeqCst); self.notify.notify_one(); } diff --git a/tests/task_scanner_ordering_tests.rs b/tests/task_scanner_ordering_tests.rs index 9193f56..a89de20 100644 --- a/tests/task_scanner_ordering_tests.rs +++ b/tests/task_scanner_ordering_tests.rs @@ -186,12 +186,24 @@ async fn expedited_task_picked_up_by_scanner() { .expect("expedite"); // The expedited task should now be dequeueable - let result = shard - .dequeue("w", "default", 1) - .await - .expect("dequeue after expedite"); - assert_eq!(result.tasks.len(), 1, "expedited task should be dequeued"); - assert_eq!(result.tasks[0].job().id(), "future-job"); + let mut all_ids: Vec = Vec::new(); + loop { + let batch = shard + .dequeue("w", "default", 10) + .await + .expect("dequeue after expedite"); + if batch.tasks.is_empty() { + break; + } + for task in &batch.tasks { + all_ids.push(task.job().id().to_string()); + } + } + assert!( + all_ids.contains(&"future-job".to_string()), + "expedited task must be picked up; got: {:?}", + all_ids + ); }); } From cdc050193e97c49709bbc0703deaca1af8da60b1 Mon Sep 17 00:00:00 2001 From: Harry Brundage Date: Mon, 23 Mar 2026 17:47:38 -0400 Subject: [PATCH 6/6] Trigger CI Co-Authored-By: Claude Opus 4.6 (1M context)