diff --git a/benches/task_scanner_profile.rs b/benches/task_scanner_profile.rs index b44efe13..6b9a5a77 100644 --- a/benches/task_scanner_profile.rs +++ b/benches/task_scanner_profile.rs @@ -1,8 +1,7 @@ -//! Profile the task scanner hot path. +//! Profile the task scanner hot path using real shard dequeue operations. //! -//! Creates a shard with a large number of pending tasks and repeatedly scans -//! the task key range, measuring per-scan cost. This reproduces the production -//! hot path where `TaskBroker::scan_tasks` iterates through SlateDB SSTs. +//! Creates a shard with a large number of pending tasks and measures sustained +//! dequeue throughput, which exercises the full TaskBroker scanner pipeline. //! //! Run with: //! cargo bench --bench task_scanner_profile @@ -14,17 +13,15 @@ mod bench_helpers; use bench_helpers::*; -use silo::codec::decode_task_validated; -use silo::keys::{end_bound, parse_task_key, task_group_prefix, tasks_prefix}; use std::sync::Arc; use std::time::{Duration, Instant}; -/// Number of additional tasks to enqueue beyond what the golden shard has. -const EXTRA_TASKS: usize = 100_000; -/// How many scan iterations to run for timing. -const SCAN_ITERATIONS: usize = 200; -/// Batch size per scan (matches TaskBroker::scan_batch). -const SCAN_BATCH: usize = 1024; +/// Number of tasks to enqueue for sustained dequeue benchmarks. +const TASK_COUNT: usize = 50_000; +/// Task group used for benchmark tasks. +const TASK_GROUP: &str = "scanner-bench"; +/// Tenant used for benchmark tasks. +const TENANT: &str = "scanner-bench-tenant"; /// Enqueue a large number of ready tasks into a shard for benchmarking. async fn enqueue_many_tasks(shard: &Arc, count: usize) { @@ -47,13 +44,13 @@ async fn enqueue_many_tasks(shard: &Arc, c payload: rmp_serde::to_vec(&serde_json::json!({"bench": "scanner"})).unwrap(), limits: vec![], metadata: None, - task_group: "scanner-bench".to_string(), + task_group: TASK_GROUP.to_string(), attempts: vec![], // no attempts = Waiting status, creates a task entry }); } shard - .import_jobs("scanner-bench-tenant", batch) + .import_jobs(TENANT, batch) .await .expect("import scan bench jobs"); @@ -83,253 +80,52 @@ async fn enqueue_many_tasks(shard: &Arc, c ); } -/// Count all task keys in the DB under the tasks prefix. -async fn count_all_tasks(db: &slatedb::Db) -> usize { - let start = tasks_prefix(); - let end = end_bound(&start); - let Ok(mut iter) = db.scan::, _>(start..end).await else { - return 0; - }; - let mut count = 0; - while let Ok(Some(_)) = iter.next().await { - count += 1; - } - count -} - -/// Simulate one scan_tasks pass: open iterator, read up to SCAN_BATCH entries, -/// parse keys and decode values. Returns (entries_read, duration). -async fn single_scan_pass(db: &slatedb::Db, task_group: &str, now: i64) -> (usize, Duration) { - let start_key = task_group_prefix(task_group); - let end_key = end_bound(&start_key); - - let scan_start = Instant::now(); - - let Ok(mut iter) = db.scan::, _>(start_key..end_key).await else { - return (0, scan_start.elapsed()); - }; - - let mut read = 0; - while read < SCAN_BATCH { - let Ok(Some(kv)) = iter.next().await else { - break; - }; - - // Parse key (same as TaskBroker::scan_tasks) - let Some(parsed_key) = parse_task_key(&kv.key) else { - continue; - }; - - // Time filter check - if parsed_key.start_time_ms > now as u64 { - continue; - } - - // Decode task value (same as TaskBroker::scan_tasks) - let _decoded = decode_task_validated(kv.value.clone()); - - read += 1; - } - - (read, scan_start.elapsed()) -} - -/// Detailed breakdown of scan costs: iterator creation, reads, parsing, decoding. -struct ScanBreakdown { - iter_creation: Duration, - total_read: Duration, - total_key_parse: Duration, - total_value_decode: Duration, - total_wall: Duration, - entries: usize, -} - -async fn single_scan_pass_breakdown(db: &slatedb::Db, task_group: &str, now: i64) -> ScanBreakdown { - let start_key = task_group_prefix(task_group); - let end_key = end_bound(&start_key); - - let wall_start = Instant::now(); - - let t = Instant::now(); - let Ok(mut iter) = db.scan::, _>(start_key..end_key).await else { - return ScanBreakdown { - iter_creation: t.elapsed(), - total_read: Duration::ZERO, - total_key_parse: Duration::ZERO, - total_value_decode: Duration::ZERO, - total_wall: wall_start.elapsed(), - entries: 0, - }; - }; - let iter_creation = t.elapsed(); - - let mut total_read = Duration::ZERO; - let mut total_key_parse = Duration::ZERO; - let mut total_value_decode = Duration::ZERO; - let mut read = 0; - - while read < SCAN_BATCH { - let t = Instant::now(); - let Ok(Some(kv)) = iter.next().await else { - total_read += t.elapsed(); - break; - }; - total_read += t.elapsed(); - - let t = Instant::now(); - let Some(parsed_key) = parse_task_key(&kv.key) else { - total_key_parse += t.elapsed(); - continue; - }; - total_key_parse += t.elapsed(); - - if parsed_key.start_time_ms > now as u64 { - continue; - } - - let t = Instant::now(); - let _decoded = decode_task_validated(kv.value.clone()); - total_value_decode += t.elapsed(); - - read += 1; - } - - ScanBreakdown { - iter_creation, - total_read, - total_key_parse, - total_value_decode, - total_wall: wall_start.elapsed(), - entries: read, - } -} - -/// Benchmark just the iterator creation cost (no reads). -async fn bench_iterator_creation( - db: &slatedb::Db, - task_group: &str, - iterations: usize, -) -> Vec { - let start_key = task_group_prefix(task_group); - let end_key = end_bound(&start_key); +/// Sustained dequeue: drain tasks as fast as possible, measuring throughput. +/// Returns (total_dequeued, elapsed, per_batch_durations). +async fn sustained_dequeue( + shard: &Arc, + batch_size: usize, + max_tasks: usize, +) -> (usize, Duration, Vec) { + let start = Instant::now(); + let mut total = 0; + let mut durations = Vec::new(); - let mut durations = Vec::with_capacity(iterations); - for _ in 0..iterations { + while total < max_tasks { let t = Instant::now(); - let _iter = db - .scan::, _>(start_key.clone()..end_key.clone()) - .await; - durations.push(t.elapsed()); - } - durations.sort(); - durations -} - -/// Benchmark full scan passes (iterator creation + reading SCAN_BATCH entries). -async fn bench_full_scan( - db: &slatedb::Db, - task_group: &str, - iterations: usize, -) -> (Vec, usize) { - let now = now_ms(); - let mut durations = Vec::with_capacity(iterations); - let mut total_entries = 0; - - for _ in 0..iterations { - let (entries, elapsed) = single_scan_pass(db, task_group, now).await; - durations.push(elapsed); - total_entries += entries; - } - durations.sort(); - (durations, total_entries) -} - -/// Benchmark scanning the ENTIRE task group (all entries, not just SCAN_BATCH). -async fn bench_full_group_scan( - db: &slatedb::Db, - task_group: &str, - iterations: usize, -) -> (Vec, usize) { - let now = now_ms(); - let start_key = task_group_prefix(task_group); - let end_key = end_bound(&start_key); - - let mut durations = Vec::with_capacity(iterations); - let mut last_count = 0; - - for _ in 0..iterations { - let scan_start = Instant::now(); - - let Ok(mut iter) = db - .scan::, _>(start_key.clone()..end_key.clone()) + let result = shard + .dequeue("bench-worker", TASK_GROUP, batch_size) .await - else { - durations.push(scan_start.elapsed()); - continue; - }; - - let mut read = 0; - loop { - let Ok(Some(kv)) = iter.next().await else { + .expect("dequeue"); + let elapsed = t.elapsed(); + + if result.tasks.is_empty() { + // Give scanner a moment to refill, then try once more + tokio::time::sleep(Duration::from_millis(100)).await; + let result = shard + .dequeue("bench-worker", TASK_GROUP, batch_size) + .await + .expect("dequeue retry"); + if result.tasks.is_empty() { break; - }; - let Some(parsed_key) = parse_task_key(&kv.key) else { - continue; - }; - if parsed_key.start_time_ms > now as u64 { - continue; } - let _decoded = decode_task_validated(kv.value.clone()); - read += 1; + total += result.tasks.len(); + durations.push(elapsed + Duration::from_millis(100)); + } else { + total += result.tasks.len(); + durations.push(elapsed); } - - durations.push(scan_start.elapsed()); - last_count = read; } - durations.sort(); - (durations, last_count) -} - -/// Benchmark just iterator reads with NO key parsing or value decoding. -async fn bench_raw_reads( - db: &slatedb::Db, - task_group: &str, - iterations: usize, -) -> (Vec, usize) { - let start_key = task_group_prefix(task_group); - let end_key = end_bound(&start_key); - - let mut durations = Vec::with_capacity(iterations); - let mut total = 0; - - for _ in 0..iterations { - let Ok(mut iter) = db - .scan::, _>(start_key.clone()..end_key.clone()) - .await - else { - continue; - }; - let t = Instant::now(); - let mut read = 0; - while read < SCAN_BATCH { - let Ok(Some(_kv)) = iter.next().await else { - break; - }; - read += 1; - } - durations.push(t.elapsed()); - total += read; - } - durations.sort(); - (durations, total) + (total, start.elapsed(), durations) } -fn print_stats(label: &str, durations: &[Duration]) { +fn print_stats(label: &str, durations: &mut [Duration]) { if durations.is_empty() { println!(" {:<40} (no data)", label); return; } + durations.sort(); let p50 = durations[durations.len() / 2]; let p99 = durations[(durations.len() as f64 * 0.99).ceil() as usize - 1]; let mean: Duration = durations.iter().sum::() / durations.len() as u32; @@ -354,396 +150,49 @@ async fn main() { let metadata = ensure_golden_shard().await; - println!( - "Cloning golden shard and adding {} extra tasks...", - EXTRA_TASKS - ); + println!("Cloning golden shard and adding {} tasks...", TASK_COUNT); let (_guard, shard) = clone_golden_shard("scanner-profile", &metadata).await; + enqueue_many_tasks(&shard, TASK_COUNT).await; - // Count existing tasks - let existing_tasks = count_all_tasks(shard.db()).await; - println!(" Existing task keys in golden shard: {}", existing_tasks); - - // Add more tasks to the "scanner-bench" task group - enqueue_many_tasks(&shard, EXTRA_TASKS).await; - - let total_tasks = count_all_tasks(shard.db()).await; - println!(" Total task keys after additions: {}", total_tasks); - - // Count tasks in each group - { - let start_key = task_group_prefix("scanner-bench"); - let end_key = end_bound(&start_key); - let Ok(mut iter) = shard.db().scan::, _>(start_key..end_key).await else { - panic!("failed to scan scanner-bench group"); - }; - let mut count = 0; - while let Ok(Some(_)) = iter.next().await { - count += 1; - } - println!(" Tasks in 'scanner-bench' group: {}", count); - - let start_key = task_group_prefix(""); - let end_key = end_bound(&start_key); - let Ok(mut iter) = shard.db().scan::, _>(start_key..end_key).await else { - panic!("failed to scan default group"); - }; - let mut count = 0; - while let Ok(Some(_)) = iter.next().await { - count += 1; - } - println!(" Tasks in default '' group: {}", count); - } - - println!( - "\n--- Iterator creation cost ({} iterations) ---", - SCAN_ITERATIONS - ); - - let durations = bench_iterator_creation(shard.db(), "scanner-bench", SCAN_ITERATIONS).await; - print_stats("iter_create(scanner-bench, 100K)", &durations); + // Let the scanner populate the buffer initially + println!("\n Waiting for scanner to populate buffer..."); + tokio::time::sleep(Duration::from_millis(500)).await; - let durations = bench_iterator_creation(shard.db(), "", SCAN_ITERATIONS).await; - print_stats("iter_create(default group)", &durations); - - { - let start = tasks_prefix(); - let end = end_bound(&start); - let mut durations = Vec::with_capacity(SCAN_ITERATIONS); - for _ in 0..SCAN_ITERATIONS { - let t = Instant::now(); - let _iter = shard - .db() - .scan::, _>(start.clone()..end.clone()) - .await; - durations.push(t.elapsed()); - } - durations.sort(); - print_stats("iter_create(all tasks)", &durations); - } - - println!( - "\n--- Raw reads (no parse/decode) {} entries ({} iters) ---", - SCAN_BATCH, SCAN_ITERATIONS - ); - - let (durations, _) = bench_raw_reads(shard.db(), "scanner-bench", SCAN_ITERATIONS).await; - print_stats( - &format!("raw_read(scanner-bench, {})", SCAN_BATCH), - &durations, - ); - - let (durations, _) = bench_raw_reads(shard.db(), "", SCAN_ITERATIONS).await; - print_stats(&format!("raw_read(default, {})", SCAN_BATCH), &durations); - - println!( - "\n--- Full scan {} entries per pass ({} iterations) ---", - SCAN_BATCH, SCAN_ITERATIONS - ); - - let (durations, total) = bench_full_scan(shard.db(), "scanner-bench", SCAN_ITERATIONS).await; - print_stats( - &format!("scan_batch(scanner-bench, {})", SCAN_BATCH), - &durations, - ); - println!(" total entries read across all iterations: {}", total); - - let (durations, total) = bench_full_scan(shard.db(), "", SCAN_ITERATIONS).await; - print_stats(&format!("scan_batch(default, {})", SCAN_BATCH), &durations); - println!(" total entries read across all iterations: {}", total); - - println!("\n--- CPU time breakdown (100 iterations, scanner-bench group) ---"); + // --- Sustained dequeue throughput --- + println!("\n--- Sustained dequeue throughput (batch_size=32) ---"); { - let now = now_ms(); - let iters: u32 = 100; - let mut iter_creation = Duration::ZERO; - let mut total_read = Duration::ZERO; - let mut total_key_parse = Duration::ZERO; - let mut total_value_decode = Duration::ZERO; - let mut total_wall = Duration::ZERO; - let mut total_entries: usize = 0; - - for _ in 0..iters { - let bd = single_scan_pass_breakdown(shard.db(), "scanner-bench", now).await; - iter_creation += bd.iter_creation; - total_read += bd.total_read; - total_key_parse += bd.total_key_parse; - total_value_decode += bd.total_value_decode; - total_wall += bd.total_wall; - total_entries += bd.entries; - } - - let avg_wall = total_wall / iters; - let avg_iter = iter_creation / iters; - let avg_read = total_read / iters; - let avg_parse = total_key_parse / iters; - let avg_decode = total_value_decode / iters; - - println!( - " Average per-scan breakdown ({} entries/scan):", - total_entries / iters as usize - ); - println!(" wall time: {}", format_duration(avg_wall)); - println!( - " iter creation: {} ({:.1}%)", - format_duration(avg_iter), - avg_iter.as_secs_f64() / avg_wall.as_secs_f64() * 100.0 - ); - println!( - " iter.next() reads: {} ({:.1}%)", - format_duration(avg_read), - avg_read.as_secs_f64() / avg_wall.as_secs_f64() * 100.0 - ); - println!( - " key parsing: {} ({:.1}%)", - format_duration(avg_parse), - avg_parse.as_secs_f64() / avg_wall.as_secs_f64() * 100.0 - ); - println!( - " value decoding: {} ({:.1}%)", - format_duration(avg_decode), - avg_decode.as_secs_f64() / avg_wall.as_secs_f64() * 100.0 - ); - let accounted = avg_iter + avg_read + avg_parse + avg_decode; - let overhead = avg_wall.saturating_sub(accounted); + let (total, elapsed, mut durations) = sustained_dequeue(&shard, 32, TASK_COUNT).await; + let rate = total as f64 / elapsed.as_secs_f64(); println!( - " other overhead: {} ({:.1}%)", - format_duration(overhead), - overhead.as_secs_f64() / avg_wall.as_secs_f64() * 100.0 + " Dequeued {} tasks in {:.1}s = {:.0} tasks/sec", + total, + elapsed.as_secs_f64(), + rate, ); + print_stats("dequeue_batch(32)", &mut durations); } - // Simulate production pattern: scan with a pre-populated buffer. - // The scanner must read through all buffered entries before finding new ones. - println!("\n--- Production pattern: scan with pre-populated buffer ---"); - { - use std::collections::HashSet; - let now = now_ms(); - - // Pre-populate a "buffer" by scanning first N entries - for buffer_size in [0usize, 2048, 4096, 8192, 16384, 32768] { - let start_key = task_group_prefix("scanner-bench"); - let end_key = end_bound(&start_key); - - // Collect the first buffer_size keys to simulate what's already in the buffer - let mut buffered_keys: HashSet> = HashSet::new(); - if buffer_size > 0 { - let Ok(mut iter) = shard - .db() - .scan::, _>(start_key.clone()..end_key.clone()) - .await - else { - continue; - }; - let mut collected = 0; - while collected < buffer_size { - let Ok(Some(kv)) = iter.next().await else { - break; - }; - buffered_keys.insert(kv.key.to_vec()); - collected += 1; - } - } - - // Now simulate scan_tasks: scan from beginning, skip buffered, insert 1024 new - let iters = 50; - let mut durations = Vec::with_capacity(iters); - let mut total_iter_next_calls = 0usize; - - for _ in 0..iters { - let Ok(mut iter) = shard - .db() - .scan::, _>(start_key.clone()..end_key.clone()) - .await - else { - continue; - }; - - let scan_start = Instant::now(); - let mut inserted = 0; - let mut reads = 0; - while inserted < SCAN_BATCH { - let Ok(Some(kv)) = iter.next().await else { - break; - }; - reads += 1; - - let Some(parsed_key) = parse_task_key(&kv.key) else { - continue; - }; - if parsed_key.start_time_ms > now as u64 { - continue; - } - let _decoded = decode_task_validated(kv.value.clone()); - - // Skip if already in "buffer" - if buffered_keys.contains(&kv.key.to_vec()) { - continue; - } - inserted += 1; - } - durations.push(scan_start.elapsed()); - total_iter_next_calls += reads; - } + shard.close().await.expect("close"); - durations.sort(); - let avg_reads = total_iter_next_calls / iters; - print_stats( - &format!("buffer={}, +{} new", buffer_size, SCAN_BATCH), - &durations, - ); - println!( - " avg iter.next() calls: {} ({}x scan_batch)", - avg_reads, - avg_reads as f64 / SCAN_BATCH as f64 - ); - } - } + // Run again with a fresh clone for a second batch size + let (_guard2, shard2) = clone_golden_shard("scanner-profile-2", &metadata).await; + enqueue_many_tasks(&shard2, TASK_COUNT).await; + tokio::time::sleep(Duration::from_millis(500)).await; - // Simulate a full refill cycle: buffer drains from target_buffer to low_watermark, - // then the scanner refills it. Measure total CPU cost of the refill with different - // scan_batch sizes. - println!("\n--- Refill cycle cost: buffer 4096->8192 with varying scan_batch ---"); + println!("\n--- Sustained dequeue throughput (batch_size=1) ---"); { - use std::collections::HashSet; - let now = now_ms(); - let target_buffer = 8192usize; - let low_watermark = target_buffer / 2; // 4096 - - // Pre-populate buffer with 4096 entries (simulates post-drain state) - let start_key = task_group_prefix("scanner-bench"); - let end_key = end_bound(&start_key); - let mut buffered_keys: HashSet> = HashSet::new(); - { - let Ok(mut iter) = shard - .db() - .scan::, _>(start_key.clone()..end_key.clone()) - .await - else { - panic!("scan failed"); - }; - let mut collected = 0; - while collected < low_watermark { - let Ok(Some(kv)) = iter.next().await else { - break; - }; - buffered_keys.insert(kv.key.to_vec()); - collected += 1; - } - } - - for scan_batch in [1024usize, 2048, 4096, 8192] { - let iters = 20; - let mut durations = Vec::with_capacity(iters); - let mut total_scans = 0usize; - let mut total_reads = 0usize; - - for _ in 0..iters { - let cycle_start = Instant::now(); - let mut buf_len = low_watermark; - let mut current_buffered = buffered_keys.clone(); - let mut cycle_scans = 0; - let mut cycle_reads = 0; - - // Simulate multiple scan passes to fill buffer from low_watermark to target_buffer - while buf_len < target_buffer { - let Ok(mut iter) = shard - .db() - .scan::, _>(start_key.clone()..end_key.clone()) - .await - else { - break; - }; - - let mut inserted = 0; - while inserted < scan_batch && buf_len < target_buffer { - let Ok(Some(kv)) = iter.next().await else { - break; - }; - cycle_reads += 1; - - let Some(parsed_key) = parse_task_key(&kv.key) else { - continue; - }; - if parsed_key.start_time_ms > now as u64 { - continue; - } - let _decoded = decode_task_validated(kv.value.clone()); - - let key_bytes = kv.key.to_vec(); - if !current_buffered.contains(&key_bytes) { - current_buffered.insert(key_bytes); - buf_len += 1; - inserted += 1; - } - } - cycle_scans += 1; - } - - durations.push(cycle_start.elapsed()); - total_scans += cycle_scans; - total_reads += cycle_reads; - } - - durations.sort(); - let avg_scans = total_scans as f64 / iters as f64; - let avg_reads = total_reads as f64 / iters as f64; - print_stats(&format!("scan_batch={}", scan_batch), &durations); - println!( - " avg scans/refill: {:.1}, avg reads/refill: {:.0}, refills_at_50scan/s: {:.1}/s, est CPU: {:.2} vCPU", - avg_scans, - avg_reads, - 50.0 / avg_scans, - durations[durations.len() / 2].as_secs_f64() * 50.0 / avg_scans, - ); - } - } - - println!("\n--- Full group scan ({} iterations) ---", 10); - - let (durations, count) = bench_full_group_scan(shard.db(), "scanner-bench", 10).await; - print_stats( - &format!("full_scan(scanner-bench, {} tasks)", count), - &durations, - ); - - let (durations, count) = bench_full_group_scan(shard.db(), "", 10).await; - print_stats(&format!("full_scan(default, {} tasks)", count), &durations); - - // Throughput estimate - println!("\n--- Throughput estimate ---"); - let now = now_ms(); - let sustained_iters = 500; - let sustained_start = Instant::now(); - let mut total_entries = 0; - for _ in 0..sustained_iters { - let (entries, _) = single_scan_pass(shard.db(), "scanner-bench", now).await; - total_entries += entries; + // Only drain 2000 with batch_size=1 to keep it reasonable + let (total, elapsed, mut durations) = sustained_dequeue(&shard2, 1, 2000).await; + let rate = total as f64 / elapsed.as_secs_f64(); + println!( + " Dequeued {} tasks in {:.1}s = {:.0} tasks/sec", + total, + elapsed.as_secs_f64(), + rate, + ); + print_stats("dequeue_batch(1)", &mut durations); } - let sustained_elapsed = sustained_start.elapsed(); - let scans_per_sec = sustained_iters as f64 / sustained_elapsed.as_secs_f64(); - let entries_per_sec = total_entries as f64 / sustained_elapsed.as_secs_f64(); - println!( - " {} scans in {:.1}s = {:.1} scans/sec ({:.0} entries/sec)", - sustained_iters, - sustained_elapsed.as_secs_f64(), - scans_per_sec, - entries_per_sec, - ); - println!( - " Average {:.1}ms per scan ({} entries/scan)", - sustained_elapsed.as_secs_f64() * 1000.0 / sustained_iters as f64, - total_entries / sustained_iters - ); - // Extrapolate to production: if scanning at 50 scans/sec - let ms_per_scan = sustained_elapsed.as_secs_f64() * 1000.0 / sustained_iters as f64; - let cpu_at_50_scans = ms_per_scan * 50.0 / 1000.0; - println!( - "\n At 50 scans/sec: estimated {:.2} vCPU ({:.1}ms/scan * 50)", - cpu_at_50_scans, ms_per_scan - ); - - shard.close().await.expect("close"); + shard2.close().await.expect("close"); println!("\nDone."); } diff --git a/src/task_broker.rs b/src/task_broker.rs index 1cdf7aa0..398d41e1 100644 --- a/src/task_broker.rs +++ b/src/task_broker.rs @@ -8,7 +8,7 @@ use std::time::Duration; use crossbeam_skiplist::SkipMap; use dashmap::DashMap; -use slatedb::{Db, WriteBatch}; +use slatedb::{Db, DbIterator, KeyValue, WriteBatch}; use tokio::sync::Notify; use crate::codec::{DecodedTask, decode_task_validated}; @@ -28,10 +28,29 @@ pub struct BrokerTask { pub decoded: DecodedTask, } +/// Result of a single scan pass from the cached iterator. +enum ScanOutcome { + /// Inserted entries, stopped at batch limit or buffer full. + /// The iterator has more entries to read. + ReadBatch { inserted: usize }, + /// Hit a task with a future timestamp. The consumed entry is returned + /// so the caller can re-evaluate it on the next pass with a fresh + /// `now_ms` without losing it from the iterator. + HitFuture { + inserted: usize, + pending_kv: KeyValue, + }, + /// The iterator reached the end of the key range. + Exhausted { inserted: usize }, +} + /// A single per-task-group broker that scans only its own key range. /// /// - Maintains a sorted buffer of ready tasks using a skiplist keyed by the task key bytes. -/// - Populates from SlateDB in the background with exponential backoff when no work is found. +/// - Populates from SlateDB in the background using a cached iterator that marches +/// forward through the key range, avoiding redundant re-reads of buffered entries. +/// - When wakeup() fires (new tasks written), the cached iterator is dropped and +/// recreated from the beginning to pick up tasks behind the old cursor. /// - Ensures tasks claimed but not yet durably leased are tracked as in-flight and not reinserted. pub struct TaskBroker { // The task group this broker is responsible for @@ -52,11 +71,12 @@ pub struct TaskBroker { running: Arc, // A notify object to wake up the background scanner when a task is claimed notify: Arc, - // Whether the background scanner should be woken up + // Whether the background scanner should be woken up and the cached + // iterator should be dropped (new tasks may exist behind the cursor). scan_requested: Arc, // The target buffer size target_buffer: usize, - // The batch size for the background scanner to read out of the DB + // Maximum entries to read per scan pass before yielding control scan_batch: usize, // The shard name for metrics labeling shard_name: String, @@ -71,6 +91,11 @@ impl TaskBroker { // refresh the tombstone generation whenever a stale key is observed again. const ACK_TOMBSTONE_RETAIN_GENERATIONS: u64 = 64; + // Maximum age of a cached iterator before it's recreated. This bounds + // how long we hold a snapshot-isolated view that can't see new writes + // when no wakeups arrive (e.g. scheduled tasks becoming ready). + const MAX_ITER_AGE: Duration = Duration::from_secs(5); + fn new( task_group: String, db: Arc, @@ -90,11 +115,6 @@ impl TaskBroker { notify: Arc::new(Notify::new()), scan_requested: Arc::new(AtomicBool::new(false)), target_buffer: 8192, - // Must be >= target_buffer / 2 (the low watermark) so that a - // single scan pass can refill from low-watermark to target. - // With a smaller batch the scanner needs multiple passes and - // each pass re-reads every already-buffered entry from the - // start of the key range, wasting O(buffer_size) reads/pass. scan_batch: 4096, shard_name, metrics, @@ -124,23 +144,48 @@ impl TaskBroker { }); } - /// Scan tasks from DB and insert into buffer, skipping future tasks and inflight ones. - async fn scan_tasks(&self, now_ms: i64, generation: u64) -> usize { + /// Create a fresh iterator over the full task group key range. + async fn new_scan_iter(&self) -> Option { // [SILO-SCAN-1] Scan only this task group's key range let start = task_group_prefix(&self.task_group); let end = end_bound(&start); + self.db.scan::, _>(start..end).await.ok() + } - let Ok(mut iter) = self.db.scan::, _>(start..end).await else { - return 0; - }; - - // Collect keys to delete for defunct tasks (outside shard range) + /// Read entries from an existing iterator, inserting ready tasks into the + /// buffer. Stops when the batch limit is hit, the buffer is full, a + /// future-timestamped task is encountered, or the iterator is exhausted. + /// + /// `pending_kv` is a previously-consumed entry that was returned from a + /// prior `HitFuture` outcome. If provided, it is processed first before + /// reading from the iterator. + async fn scan_from_iter( + &self, + iter: &mut DbIterator, + pending_kv: Option, + now_ms: i64, + generation: u64, + ) -> ScanOutcome { let mut defunct_keys: Vec> = Vec::new(); - let mut inserted = 0; - while inserted < self.scan_batch && self.buffer.len() < self.target_buffer { - let Ok(Some(kv)) = iter.next().await else { - break; + let mut next_kv = pending_kv; + + loop { + if inserted >= self.scan_batch || self.buffer.len() >= self.target_buffer { + self.delete_defunct_keys(defunct_keys).await; + return ScanOutcome::ReadBatch { inserted }; + } + + // Use the held-back entry first, then read from the iterator. + let kv = match next_kv.take() { + Some(kv) => kv, + None => { + let Ok(Some(kv)) = iter.next().await else { + self.delete_defunct_keys(defunct_keys).await; + return ScanOutcome::Exhausted { inserted }; + }; + kv + } }; // Parse the task key to extract timestamp @@ -148,9 +193,14 @@ impl TaskBroker { continue; }; - // Filter out future tasks + // Stop when we reach future tasks. Return the consumed entry so + // the caller can re-evaluate it on the next pass with a fresh now_ms. if parsed_key.start_time_ms > now_ms as u64 { - continue; + self.delete_defunct_keys(defunct_keys).await; + return ScanOutcome::HitFuture { + inserted, + pending_kv: kv, + }; } let key_bytes = kv.key.to_vec(); @@ -177,14 +227,13 @@ impl TaskBroker { let decoded = match decode_task_validated(kv.value.clone()) { Ok(t) => t, - Err(_) => continue, // Skip malformed tasks + Err(_) => continue, }; // Check if task's tenant is within shard range let task_tenant = decoded.tenant(); if !self.range.contains_tenant(task_tenant) { - // Task is for a tenant outside our range - mark for deletion defunct_keys.push(kv.key.to_vec()); debug!( task_group = %parsed_key.task_group, @@ -212,27 +261,36 @@ impl TaskBroker { } } } + } - // Delete defunct tasks from the database - if !defunct_keys.is_empty() { - let mut batch = WriteBatch::new(); - for key in &defunct_keys { - batch.delete(key); - } - if let Err(e) = self.db.write(batch).await { - debug!(error = %e, count = defunct_keys.len(), "failed to delete defunct tasks"); - } else { - debug!( - count = defunct_keys.len(), - "deleted defunct tasks outside shard range" - ); - } + /// Delete defunct tasks (outside shard range) from the database. + async fn delete_defunct_keys(&self, defunct_keys: Vec>) { + if defunct_keys.is_empty() { + return; + } + let mut batch = WriteBatch::new(); + for key in &defunct_keys { + batch.delete(key); + } + if let Err(e) = self.db.write(batch).await { + debug!(error = %e, count = defunct_keys.len(), "failed to delete defunct tasks"); + } else { + debug!( + count = defunct_keys.len(), + "deleted defunct tasks outside shard range" + ); } - - inserted } /// Start the background scanning loop. + /// + /// The scan loop holds a cached `DbIterator` that marches forward through + /// the task group's key range. This avoids re-reading already-buffered + /// entries on each pass. The iterator is dropped and recreated when: + /// - `wakeup()` fires (new tasks may have been written behind the cursor) + /// - The iterator is exhausted (reached end of range) + /// - The iterator is older than `MAX_ITER_AGE` (to pick up time-based + /// changes like scheduled tasks becoming ready) fn start(self: &Arc) { if self.running.swap(true, Ordering::SeqCst) { return; @@ -246,6 +304,14 @@ impl TaskBroker { let scan_low_watermark = broker.target_buffer / 2; let mut scanning = false; + // Cached iterator state — lives across loop iterations + let mut cached_iter: Option = None; + let mut iter_created_at = std::time::Instant::now(); + // Entry consumed from the iterator that turned out to be a future + // task. Held here so the next scan pass can re-evaluate it with a + // fresh now_ms without losing it. + let mut pending_future_kv: Option = None; + loop { if !broker.running.load(Ordering::SeqCst) { break; @@ -265,12 +331,46 @@ impl TaskBroker { continue; } + // Check if the cached iterator needs to be dropped and recreated. + // This happens when: + // 1. wakeup() fired (scan_requested) — new tasks may be behind cursor + // 2. Iterator is too old — scheduled tasks may have become ready + // 3. No iterator exists yet + let was_scan_requested = broker.scan_requested.swap(false, Ordering::SeqCst); + let need_fresh_iter = was_scan_requested + || cached_iter.is_none() + || iter_created_at.elapsed() > Self::MAX_ITER_AGE; + + if need_fresh_iter { + cached_iter = broker.new_scan_iter().await; + iter_created_at = std::time::Instant::now(); + // Drop any held-back future entry — the fresh iterator + // will re-read it from the new snapshot. + pending_future_kv = None; + if cached_iter.is_none() { + sleep_ms = (sleep_ms * 2).min(max_sleep_ms); + tokio::time::sleep(Duration::from_millis(sleep_ms)).await; + continue; + } + } + + let iter = cached_iter.as_mut().unwrap(); + // Scan for ready tasks let now_ms = crate::job_store_shard::now_epoch_ms(); let scan_start = std::time::Instant::now(); let generation = broker.begin_scan_generation(); - let inserted = broker.scan_tasks(now_ms, generation).await; + let outcome = broker + .scan_from_iter(iter, pending_future_kv.take(), now_ms, generation) + .await; broker.complete_scan_generation(generation); + + let inserted = match &outcome { + ScanOutcome::ReadBatch { inserted } + | ScanOutcome::HitFuture { inserted, .. } + | ScanOutcome::Exhausted { inserted } => *inserted, + }; + if let Some(ref m) = broker.metrics { m.record_broker_scan_duration( &broker.shard_name, @@ -288,18 +388,38 @@ impl TaskBroker { ); } - // Adjust backoff: stay aggressive when buffer needs filling - // AND the scan actually found tasks to insert. If the scan - // found nothing, back off even if the buffer is low — the DB - // is empty and there's no point hammering it at 200 scans/s. - if inserted == 0 { - sleep_ms = (sleep_ms * 2).min(max_sleep_ms); - } else { - sleep_ms = min_sleep_ms; + match outcome { + ScanOutcome::ReadBatch { .. } => { + // More entries available — stay aggressive + sleep_ms = min_sleep_ms; + } + ScanOutcome::HitFuture { pending_kv, .. } => { + // Hold the consumed future entry for the next pass. + pending_future_kv = Some(pending_kv); + if inserted == 0 { + sleep_ms = (sleep_ms * 2).min(max_sleep_ms); + } else { + sleep_ms = min_sleep_ms; + } + } + ScanOutcome::Exhausted { .. } => { + // Nothing left in range. Drop iterator so next pass + // creates a fresh one from the beginning. + cached_iter = None; + if inserted == 0 { + sleep_ms = (sleep_ms * 2).min(max_sleep_ms); + } else { + sleep_ms = min_sleep_ms; + } + } } - // Handle explicit scan requests with minimal sleep - if broker.scan_requested.swap(false, Ordering::SeqCst) { + // Fast-path: if this scan was triggered by a wakeup, or a + // new wakeup arrived during the scan, loop immediately with + // minimal delay. This ensures the scanner stays responsive + // to enqueue→wakeup→dequeue sequences without requiring + // the full backoff sleep to elapse. + if was_scan_requested || broker.scan_requested.load(Ordering::SeqCst) { tokio::time::sleep(Duration::from_millis(1)).await; continue; } @@ -314,6 +434,9 @@ impl TaskBroker { // resetting to min — keeps scan rate reasonable // while still responding to demand. sleep_ms = (sleep_ms / 2).max(min_sleep_ms); + // Mark scan_requested so the loop drops the cached + // iterator on the next pass. + broker.scan_requested.store(true, Ordering::SeqCst); debug!(task_group = %broker.task_group, "broker woken by notification"); } _ = &mut delay => {}, @@ -418,7 +541,10 @@ impl TaskBroker { } } - /// Wake the scanner to refill promptly. + /// Wake the scanner to refill promptly. Signals the scan loop to drop its + /// cached iterator and create a fresh one from the beginning of the key + /// range, ensuring newly-created tasks that sort before the old cursor + /// position are picked up. fn wakeup(&self) { self.scan_requested.store(true, Ordering::SeqCst); self.notify.notify_one(); diff --git a/tests/task_scanner_ordering_tests.rs b/tests/task_scanner_ordering_tests.rs new file mode 100644 index 00000000..a89de208 --- /dev/null +++ b/tests/task_scanner_ordering_tests.rs @@ -0,0 +1,812 @@ +//! Tests for task scanner pickup of tasks created "behind the cursor". +//! +//! These tests verify that all code paths that create task keys result in +//! those tasks eventually being picked up by the TaskBroker scanner and +//! delivered via dequeue. This is important because: +//! +//! - The scanner always starts from the beginning of the task group key range. +//! - If we ever optimise the scanner to use a cursor (resume scanning from +//! where it left off), we must ensure tasks inserted behind the cursor are +//! still picked up. +//! +//! Each test exercises a different code path that can create a task key that +//! sorts BEFORE tasks already in the broker buffer. + +mod test_helpers; + +use silo::job::ConcurrencyLimit; +use silo::job::Limit; +use silo::job_attempt::AttemptOutcome; + +use test_helpers::*; + +const TIMEOUT_MS: u64 = 30000; + +// --------------------------------------------------------------------------- +// 1. Enqueue with start_at_ms=0 sorts before all other tasks +// --------------------------------------------------------------------------- + +/// Tasks enqueued with start_at_ms=0 use time=0 in their key, placing them +/// before any task with a real timestamp. After the buffer is already populated +/// with tasks at now_ms, a new start_at_ms=0 task must still be picked up +/// eventually (the scanner rescans from the beginning of the key range). +#[silo::test] +async fn enqueue_start_at_zero_picked_up_after_buffer_populated() { + with_timeout!(TIMEOUT_MS, { + let (_tmp, shard) = open_temp_shard().await; + let payload = msgpack_payload(&serde_json::json!({"test": true})); + let now = now_ms(); + + // Enqueue several tasks with start_at_ms=now to populate the buffer + for i in 0..5 { + shard + .enqueue( + "t1", + Some(format!("job-now-{}", i)), + 50, + now, + None, + payload.clone(), + vec![], + None, + "default", + ) + .await + .expect("enqueue now"); + } + + // Dequeue one to prove the buffer is working + let first = shard + .dequeue("w", "default", 1) + .await + .expect("first dequeue"); + assert_eq!(first.tasks.len(), 1, "should dequeue from initial batch"); + + // Now enqueue a task with start_at_ms=0 — this sorts BEFORE the buffered tasks + shard + .enqueue( + "t1", + Some("job-zero".to_string()), + 50, + 0, + None, + payload.clone(), + vec![], + None, + "default", + ) + .await + .expect("enqueue zero"); + + // Drain all remaining tasks — the zero-time task must appear eventually + let mut all_ids: Vec = Vec::new(); + loop { + let batch = shard + .dequeue("w", "default", 10) + .await + .expect("drain dequeue"); + if batch.tasks.is_empty() { + break; + } + for task in &batch.tasks { + all_ids.push(task.job().id().to_string()); + } + } + + assert!( + all_ids.contains(&"job-zero".to_string()), + "start_at_ms=0 task must be picked up; got jobs: {:?}", + all_ids + ); + // Should have gotten all 5 remaining tasks (5 enqueued - 1 dequeued + 1 zero = 5) + assert_eq!( + all_ids.len(), + 5, + "all tasks must be drained; got: {:?}", + all_ids + ); + }); +} + +// --------------------------------------------------------------------------- +// 2. Expedite moves a future task to now +// --------------------------------------------------------------------------- + +/// Expediting a far-future scheduled job creates a new task key at now_ms, +/// which sorts before the old future-scheduled key. The scanner must pick up +/// the new key even if it already has buffered entries with later timestamps. +#[silo::test] +async fn expedited_task_picked_up_by_scanner() { + with_timeout!(TIMEOUT_MS, { + let (_tmp, shard) = open_temp_shard().await; + let payload = msgpack_payload(&serde_json::json!({"test": true})); + let now = now_ms(); + let far_future = now + 86_400_000; // 1 day ahead + + // Enqueue a job scheduled for far future + shard + .enqueue( + "t1", + Some("future-job".to_string()), + 50, + far_future, + None, + payload.clone(), + vec![], + None, + "default", + ) + .await + .expect("enqueue future"); + + // Enqueue some immediate tasks to populate the buffer + for i in 0..3 { + shard + .enqueue( + "t1", + Some(format!("immediate-{}", i)), + 50, + now, + None, + payload.clone(), + vec![], + None, + "default", + ) + .await + .expect("enqueue immediate"); + } + + // Dequeue immediate tasks — future job should NOT appear (it's in the future) + let batch1 = shard + .dequeue("w", "default", 10) + .await + .expect("dequeue batch 1"); + let batch1_ids: Vec<&str> = batch1.tasks.iter().map(|t| t.job().id()).collect(); + assert!( + !batch1_ids.contains(&"future-job"), + "future job should not be dequeued before expedite" + ); + + // Complete those tasks + for task in &batch1.tasks { + shard + .report_attempt_outcome( + task.attempt().task_id(), + AttemptOutcome::Success { result: vec![] }, + ) + .await + .expect("report outcome"); + } + + // Expedite the future job — moves it to now + shard + .expedite_job("t1", "future-job") + .await + .expect("expedite"); + + // The expedited task should now be dequeueable + let mut all_ids: Vec = Vec::new(); + loop { + let batch = shard + .dequeue("w", "default", 10) + .await + .expect("dequeue after expedite"); + if batch.tasks.is_empty() { + break; + } + for task in &batch.tasks { + all_ids.push(task.job().id().to_string()); + } + } + assert!( + all_ids.contains(&"future-job".to_string()), + "expedited task must be picked up; got: {:?}", + all_ids + ); + }); +} + +// --------------------------------------------------------------------------- +// 3. Restart creates a task at now_ms +// --------------------------------------------------------------------------- + +/// Restarting a failed job creates a new task. The scanner must pick it up +/// even if the buffer already contains tasks from an earlier scan. +#[silo::test] +async fn restarted_job_task_picked_up_by_scanner() { + with_timeout!(TIMEOUT_MS, { + let (_tmp, shard) = open_temp_shard().await; + let payload = msgpack_payload(&serde_json::json!({"test": true})); + let now = now_ms(); + + // Enqueue and process a job to Failed state + shard + .enqueue( + "t1", + Some("restart-me".to_string()), + 50, + now, + None, + payload.clone(), + vec![], + None, + "default", + ) + .await + .expect("enqueue"); + + let tasks = shard + .dequeue("w", "default", 1) + .await + .expect("dequeue") + .tasks; + assert_eq!(tasks.len(), 1); + + shard + .report_attempt_outcome( + tasks[0].attempt().task_id(), + AttemptOutcome::Error { + error_code: "ERR".to_string(), + error: vec![], + }, + ) + .await + .expect("report error"); + + // Enqueue more tasks so the buffer has entries + for i in 0..3 { + shard + .enqueue( + "t1", + Some(format!("filler-{}", i)), + 50, + now, + None, + payload.clone(), + vec![], + None, + "default", + ) + .await + .expect("enqueue filler"); + } + + // Restart the failed job — creates a new task + shard + .restart_job("t1", "restart-me") + .await + .expect("restart"); + + // Dequeue all — the restarted job should appear + let result = shard + .dequeue("w", "default", 10) + .await + .expect("dequeue after restart"); + + let job_ids: Vec<&str> = result.tasks.iter().map(|t| t.job().id()).collect(); + assert!( + job_ids.contains(&"restart-me"), + "restarted job must be picked up; got jobs: {:?}", + job_ids + ); + }); +} + +// --------------------------------------------------------------------------- +// 4. Concurrency grant creates a task with the original start_time_ms +// --------------------------------------------------------------------------- + +/// When a concurrency slot opens and a queued request is granted, a RunAttempt +/// task is created with the original start_time_ms from when the job was first +/// enqueued. This time could be well in the past. The scanner must pick it up. +#[silo::test] +async fn concurrency_grant_task_picked_up_by_scanner() { + with_timeout!(TIMEOUT_MS, { + let (_tmp, shard) = open_temp_shard_with_reconcile_interval_ms(50).await; + let payload = msgpack_payload(&serde_json::json!({"test": true})); + let now = now_ms(); + + let limit = Limit::Concurrency(ConcurrencyLimit { + key: "test-queue".to_string(), + max_concurrency: 1, + }); + + // Enqueue job A with concurrency limit — it gets the slot + shard + .enqueue( + "t1", + Some("holder".to_string()), + 50, + now, + None, + payload.clone(), + vec![limit.clone()], + None, + "default", + ) + .await + .expect("enqueue holder"); + + // Dequeue job A — it becomes the holder + let holder_tasks = shard + .dequeue("w", "default", 1) + .await + .expect("dequeue holder"); + assert_eq!(holder_tasks.tasks.len(), 1); + let holder_task_id = holder_tasks.tasks[0].attempt().task_id().to_string(); + + // Enqueue job B with same concurrency limit — it gets queued (RequestTicket) + shard + .enqueue( + "t1", + Some("waiter".to_string()), + 50, + now, + None, + payload.clone(), + vec![limit.clone()], + None, + "default", + ) + .await + .expect("enqueue waiter"); + + // Enqueue some tasks in a different group to populate other buffers + for i in 0..3 { + shard + .enqueue( + "t1", + Some(format!("other-{}", i)), + 50, + now, + None, + payload.clone(), + vec![], + None, + "other-group", + ) + .await + .expect("enqueue other"); + } + + // Complete holder — this frees the concurrency slot + shard + .report_attempt_outcome(&holder_task_id, AttemptOutcome::Success { result: vec![] }) + .await + .expect("complete holder"); + + // Wait for the concurrency grant scanner to process the pending request + // and create a RunAttempt task for the waiter + let result = poll_until( + || async { + shard + .dequeue("w", "default", 1) + .await + .expect("dequeue waiter") + }, + |r| !r.tasks.is_empty(), + 5000, + ) + .await; + + assert_eq!( + result.tasks.len(), + 1, + "waiter should be dequeued after concurrency grant" + ); + assert_eq!(result.tasks[0].job().id(), "waiter"); + }); +} + +// --------------------------------------------------------------------------- +// 5. Interleaved enqueue and dequeue across task groups +// --------------------------------------------------------------------------- + +/// Tasks enqueued into a task group that already has buffered entries should be +/// picked up even when they sort before existing buffer entries. +#[silo::test] +async fn interleaved_enqueue_dequeue_picks_up_all_tasks() { + with_timeout!(TIMEOUT_MS, { + let (_tmp, shard) = open_temp_shard().await; + let payload = msgpack_payload(&serde_json::json!({"test": true})); + let now = now_ms(); + + // Phase 1: enqueue batch at now + for i in 0..5 { + shard + .enqueue( + "t1", + Some(format!("batch1-{}", i)), + 50, + now, + None, + payload.clone(), + vec![], + None, + "default", + ) + .await + .expect("enqueue batch 1"); + } + + // Dequeue 2 to partially drain the buffer + let first = shard + .dequeue("w", "default", 2) + .await + .expect("dequeue first"); + assert_eq!(first.tasks.len(), 2); + + // Phase 2: enqueue more at now+1 (sorts after remaining batch1 entries) + for i in 0..3 { + shard + .enqueue( + "t1", + Some(format!("batch2-{}", i)), + 50, + now + 1, + None, + payload.clone(), + vec![], + None, + "default", + ) + .await + .expect("enqueue batch 2"); + } + + // Phase 3: enqueue at start_at_ms=0 — sorts BEFORE everything + shard + .enqueue( + "t1", + Some("zero-time".to_string()), + 50, + 0, + None, + payload.clone(), + vec![], + None, + "default", + ) + .await + .expect("enqueue zero"); + + // Dequeue all remaining — should get all 7 remaining tasks + // (5 original - 2 dequeued + 3 batch2 + 1 zero = 7) + let mut all_ids: Vec = Vec::new(); + for task in &first.tasks { + all_ids.push(task.job().id().to_string()); + } + + // Drain everything + loop { + let batch = shard + .dequeue("w", "default", 10) + .await + .expect("drain dequeue"); + if batch.tasks.is_empty() { + break; + } + for task in &batch.tasks { + all_ids.push(task.job().id().to_string()); + } + } + + // We should have all 9 jobs + assert_eq!( + all_ids.len(), + 9, + "expected 9 total tasks, got {}: {:?}", + all_ids.len(), + all_ids + ); + assert!( + all_ids.contains(&"zero-time".to_string()), + "zero-time task must be dequeued; got: {:?}", + all_ids + ); + }); +} + +// --------------------------------------------------------------------------- +// 6. Higher priority task enqueued after lower priority tasks +// --------------------------------------------------------------------------- + +/// A task with higher priority (lower number) enqueued after the buffer already +/// contains lower-priority tasks. Within the same start_time_ms, the +/// higher-priority task key sorts first and must be picked up eventually. +#[silo::test] +async fn higher_priority_enqueued_later_picked_up() { + with_timeout!(TIMEOUT_MS, { + let (_tmp, shard) = open_temp_shard().await; + let payload = msgpack_payload(&serde_json::json!({"test": true})); + let now = now_ms(); + + // Enqueue low-priority tasks (priority=100) first + for i in 0..5 { + shard + .enqueue( + "t1", + Some(format!("low-pri-{}", i)), + 100, + now, + None, + payload.clone(), + vec![], + None, + "default", + ) + .await + .expect("enqueue low priority"); + } + + // Dequeue one to prove scanner is active + let first = shard + .dequeue("w", "default", 1) + .await + .expect("first dequeue"); + assert_eq!(first.tasks.len(), 1); + + // Now enqueue a HIGH priority task (priority=1) — sorts before the low-pri tasks + shard + .enqueue( + "t1", + Some("high-pri".to_string()), + 1, + now, + None, + payload.clone(), + vec![], + None, + "default", + ) + .await + .expect("enqueue high priority"); + + // Drain all remaining — the high-pri task must appear + let mut all_ids: Vec = Vec::new(); + loop { + let batch = shard + .dequeue("w", "default", 10) + .await + .expect("drain dequeue"); + if batch.tasks.is_empty() { + break; + } + for task in &batch.tasks { + all_ids.push(task.job().id().to_string()); + } + } + + assert!( + all_ids.contains(&"high-pri".to_string()), + "high priority task must be picked up; got: {:?}", + all_ids + ); + // 5 original - 1 dequeued + 1 high-pri = 5 + assert_eq!( + all_ids.len(), + 5, + "all tasks must be drained; got: {:?}", + all_ids + ); + }); +} + +// --------------------------------------------------------------------------- +// 7. Retry after failure creates a task at a future time +// --------------------------------------------------------------------------- + +/// When a job fails and has a retry policy, a new task is created at the retry +/// backoff time. That task should be picked up once its time arrives. +#[silo::test] +async fn retry_task_picked_up_after_backoff() { + with_timeout!(TIMEOUT_MS, { + let (_tmp, shard) = open_temp_shard().await; + let payload = msgpack_payload(&serde_json::json!({"test": true})); + let now = now_ms(); + + // Enqueue with a retry policy (1 retry, very short initial backoff) + let retry = silo::retry::RetryPolicy { + retry_count: 1, + initial_interval_ms: 1, // 1ms backoff so the retry is immediately ready + max_interval_ms: 10, + randomize_interval: false, + backoff_factor: 1.0, + }; + + shard + .enqueue( + "t1", + Some("retry-job".to_string()), + 50, + now, + Some(retry), + payload.clone(), + vec![], + None, + "default", + ) + .await + .expect("enqueue retry job"); + + // Dequeue and fail the first attempt + let tasks = shard + .dequeue("w", "default", 1) + .await + .expect("dequeue attempt 1"); + assert_eq!(tasks.tasks.len(), 1); + assert_eq!(tasks.tasks[0].attempt().attempt_number(), 1); + + shard + .report_attempt_outcome( + tasks.tasks[0].attempt().task_id(), + AttemptOutcome::Error { + error_code: "FAIL".to_string(), + error: vec![], + }, + ) + .await + .expect("fail attempt 1"); + + // Small sleep to let the retry backoff elapse (1ms) + tokio::time::sleep(std::time::Duration::from_millis(50)).await; + + // The retry task should now be dequeueable + let retry_result = shard + .dequeue("w", "default", 1) + .await + .expect("dequeue retry"); + assert_eq!(retry_result.tasks.len(), 1, "retry task should be dequeued"); + assert_eq!(retry_result.tasks[0].job().id(), "retry-job"); + assert_eq!( + retry_result.tasks[0].attempt().attempt_number(), + 2, + "should be attempt 2" + ); + }); +} + +// --------------------------------------------------------------------------- +// 8. Multiple task groups: task in group A does not block group B +// --------------------------------------------------------------------------- + +/// Each task group has its own scanner. Creating a task in group A should not +/// affect task delivery in group B, and vice versa. Tasks created in either +/// group after the other group's buffer is populated must still be delivered. +#[silo::test] +async fn tasks_across_groups_are_independently_scanned() { + with_timeout!(TIMEOUT_MS, { + let (_tmp, shard) = open_temp_shard().await; + let payload = msgpack_payload(&serde_json::json!({"test": true})); + let now = now_ms(); + + // Populate group A + for i in 0..3 { + shard + .enqueue( + "t1", + Some(format!("group-a-{}", i)), + 50, + now, + None, + payload.clone(), + vec![], + None, + "group-a", + ) + .await + .expect("enqueue group-a"); + } + + // Populate group B + for i in 0..3 { + shard + .enqueue( + "t1", + Some(format!("group-b-{}", i)), + 50, + now, + None, + payload.clone(), + vec![], + None, + "group-b", + ) + .await + .expect("enqueue group-b"); + } + + // Drain group A first + let a_result = shard + .dequeue("w", "group-a", 10) + .await + .expect("dequeue group-a"); + assert_eq!(a_result.tasks.len(), 3); + + // Group B should still have all its tasks + let b_result = shard + .dequeue("w", "group-b", 10) + .await + .expect("dequeue group-b"); + assert_eq!(b_result.tasks.len(), 3); + + // Now add new tasks to group A — they should be picked up + shard + .enqueue( + "t1", + Some("group-a-late".to_string()), + 50, + now, + None, + payload.clone(), + vec![], + None, + "group-a", + ) + .await + .expect("enqueue group-a late"); + + let a_late = shard + .dequeue("w", "group-a", 1) + .await + .expect("dequeue group-a late"); + assert_eq!(a_late.tasks.len(), 1); + assert_eq!(a_late.tasks[0].job().id(), "group-a-late"); + }); +} + +// --------------------------------------------------------------------------- +// 9. Bulk enqueue then drain verifies no tasks are lost +// --------------------------------------------------------------------------- + +/// Enqueue many tasks in a burst, then drain them all. No tasks should be +/// lost even if the scanner needs multiple passes to buffer them all. +#[silo::test] +async fn bulk_enqueue_drain_no_tasks_lost() { + with_timeout!(TIMEOUT_MS, { + let (_tmp, shard) = open_temp_shard().await; + let payload = msgpack_payload(&serde_json::json!({"test": true})); + let now = now_ms(); + let total = 50; + + for i in 0..total { + shard + .enqueue( + "t1", + Some(format!("bulk-{:04}", i)), + 50, + now + (i as i64), // stagger slightly for unique keys + None, + payload.clone(), + vec![], + None, + "default", + ) + .await + .expect("enqueue bulk"); + } + + let mut dequeued_ids: Vec = Vec::new(); + loop { + let batch = shard + .dequeue("w", "default", 20) + .await + .expect("drain dequeue"); + if batch.tasks.is_empty() { + break; + } + for task in &batch.tasks { + dequeued_ids.push(task.job().id().to_string()); + } + } + + assert_eq!( + dequeued_ids.len(), + total, + "expected {} tasks, got {}: missing tasks were lost by the scanner", + total, + dequeued_ids.len() + ); + }); +}