Skip to content

Commit eba7b46

Browse files
authored
feat: migrate time bucket bitmaps to BitmapSilo storage (#137)
feat: migrate time bucket bitmaps to BitmapSilo storage
2 parents 647c4ac + 60caf20 commit eba7b46

5 files changed

Lines changed: 356 additions & 19 deletions

File tree

src/engine/concurrent_engine.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -436,6 +436,7 @@ impl ConcurrentEngine {
436436
let flush_config = Arc::clone(&config);
437437
let flush_field_registry = field_registry.clone();
438438
let flush_mutation_rx = mutation_rx;
439+
let flush_bitmap_silo = bitmap_silo_arc.clone();
439440
thread::spawn(move || {
440441
super::flush::run_flush_thread(super::flush::FlushArgs {
441442
slots: flush_slots,
@@ -461,6 +462,7 @@ impl ConcurrentEngine {
461462
field_registry: flush_field_registry,
462463
mutation_rx: flush_mutation_rx,
463464
doc_rx,
465+
bitmap_silo: flush_bitmap_silo,
464466
});
465467
})
466468
};

src/engine/flush.rs

Lines changed: 50 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,8 @@ pub struct FlushArgs {
3939
pub field_registry: FieldRegistry,
4040
pub mutation_rx: Receiver<MutationOp>,
4141
pub doc_rx: Receiver<(u32, StoredDoc)>,
42+
/// BitmapSilo for writing time bucket SET/CLEAR ops alongside in-memory updates.
43+
pub bitmap_silo: Option<Arc<parking_lot::RwLock<crate::silos::bitmap_silo::BitmapSilo>>>,
4244
}
4345

4446
/// Entry point for the flush thread. Runs until `args.shutdown` is set.
@@ -72,6 +74,7 @@ pub fn run_flush_thread(args: FlushArgs) {
7274
field_registry: flush_field_registry,
7375
mutation_rx: flush_mutation_rx,
7476
doc_rx,
77+
bitmap_silo: flush_bitmap_silo,
7578
} = args;
7679

7780
let min_sleep = Duration::from_micros(flush_interval_us);
@@ -137,16 +140,48 @@ pub fn run_flush_thread(args: FlushArgs) {
137140
let mut tb = tb_arc.lock();
138141
if !batch.alive_inserts.is_empty() {
139142
let sort_field_name = tb.sort_field_name().to_string();
143+
let field_name = tb.field_name().to_string();
144+
let bucket_names: Vec<String> = tb.bucket_names();
140145
let sorts_r = flush_sorts.read();
141146
if let Some(sort_field) = sorts_r.get_field(&sort_field_name) {
142147
for &slot in &batch.alive_inserts {
143148
let ts = sort_field.reconstruct_value(slot) as u64;
149+
// Determine which buckets this slot qualifies for (same logic as insert_slot)
150+
let qualifying: Vec<String> = bucket_names.iter()
151+
.filter(|name| {
152+
if let Some(bucket) = tb.get_bucket(name) {
153+
let cutoff = now_secs.saturating_sub(bucket.duration_secs);
154+
ts >= cutoff && ts <= now_secs
155+
} else {
156+
false
157+
}
158+
})
159+
.cloned()
160+
.collect();
144161
tb.insert_slot(slot, ts, now_secs);
162+
// Mirror to silo
163+
if let Some(ref silo_arc) = flush_bitmap_silo {
164+
let silo = silo_arc.read();
165+
for bucket_name in &qualifying {
166+
let _ = silo.bucket_set(&field_name, bucket_name, slot);
167+
}
168+
}
145169
}
146170
}
147171
}
148-
for &slot in &batch.alive_removes {
149-
tb.remove_slot(slot);
172+
if !batch.alive_removes.is_empty() {
173+
let field_name = tb.field_name().to_string();
174+
let bucket_names: Vec<String> = tb.bucket_names();
175+
for &slot in &batch.alive_removes {
176+
tb.remove_slot(slot);
177+
// Mirror to silo — unconditionally clear from all buckets
178+
if let Some(ref silo_arc) = flush_bitmap_silo {
179+
let silo = silo_arc.read();
180+
for bucket_name in &bucket_names {
181+
let _ = silo.bucket_clear(&field_name, bucket_name, slot);
182+
}
183+
}
184+
}
150185
}
151186
}
152187
}
@@ -298,6 +333,19 @@ pub fn run_flush_thread(args: FlushArgs) {
298333
bucket.subtract_expired(&expired, new_cutoff);
299334
}
300335
}
336+
// Mirror expired CLEARs to silo
337+
if !expired.is_empty() {
338+
let field_name = {
339+
let tb = tb_arc.lock();
340+
tb.field_name().to_string()
341+
};
342+
if let Some(ref silo_arc) = flush_bitmap_silo {
343+
let silo = silo_arc.read();
344+
for slot in expired.iter() {
345+
let _ = silo.bucket_clear(&field_name, bucket_name, slot);
346+
}
347+
}
348+
}
301349
// Store diff for lazy cache application (no cache Mutex!)
302350
let diff = crate::bucket_diff_log::BucketDiff {
303351
cutoff_before: *old_cutoff,

src/engine/query.rs

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,7 @@ impl ConcurrentEngine {
4747
tb_guard.as_deref().map(|tb| (tb, now_unix)),
4848
);
4949
let (filter_arc, use_simple_sort) =
50-
self.resolve_filters(&executor, filters, tb_guard.as_deref(), now_unix)?;
50+
self.resolve_filters(&executor, filters, tb_guard.as_deref(), now_unix, silo_guard.as_deref())?;
5151
let result =
5252
executor.execute_from_bitmap(&filter_arc, sort, limit, None, use_simple_sort)?;
5353
Ok(result)
@@ -98,6 +98,7 @@ impl ConcurrentEngine {
9898
now_secs: now_unix,
9999
tolerance_pct: 0.10,
100100
always_snap: true,
101+
bitmap_silo: silo_guard.as_deref(),
101102
};
102103
snapped_filters = crate::query::snap_range_clauses(&query.filters, &ctx);
103104
&snapped_filters[..]
@@ -213,9 +214,9 @@ impl ConcurrentEngine {
213214
let filter_start = Instant::now();
214215
let (filter_arc, use_simple_sort) = if let Some(ref c) = collector {
215216
let _ = c;
216-
self.resolve_filters(&executor, effective_filters, tb_guard.as_deref(), now_unix)?
217+
self.resolve_filters(&executor, effective_filters, tb_guard.as_deref(), now_unix, silo_guard.as_deref())?
217218
} else {
218-
self.resolve_filters(&executor, effective_filters, tb_guard.as_deref(), now_unix)?
219+
self.resolve_filters(&executor, effective_filters, tb_guard.as_deref(), now_unix, silo_guard.as_deref())?
219220
};
220221
let filter_elapsed = filter_start.elapsed();
221222
let full_total_matched = filter_arc.len();
@@ -393,6 +394,7 @@ impl ConcurrentEngine {
393394
filters: &[FilterClause],
394395
time_buckets: Option<&TimeBucketManager>,
395396
now_unix: u64,
397+
silo: Option<&crate::silos::bitmap_silo::BitmapSilo>,
396398
) -> Result<(Arc<roaring::RoaringBitmap>, bool)> {
397399
// Snap range filters to pre-computed time bucket bitmaps (C3).
398400
// This must happen BEFORE canonicalization so cache keys use stable
@@ -406,6 +408,7 @@ impl ConcurrentEngine {
406408
now_secs: now_unix,
407409
tolerance_pct: 0.10,
408410
always_snap: true,
411+
bitmap_silo: silo,
409412
};
410413
snapped = crate::query::snap_range_clauses(filters, &ctx);
411414
&snapped[..]

src/query/mod.rs

Lines changed: 116 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -205,6 +205,10 @@ pub struct BucketSnapContext<'a> {
205205
/// If true, queries outside tolerance snap to the nearest bucket instead of returning empty.
206206
/// Default: true (always snap for safety).
207207
pub always_snap: bool,
208+
/// Optional BitmapSilo reference. When present, bucket bitmaps are read from the silo
209+
/// via ops-on-read (`get_bucket_with_ops`) instead of from the in-memory TimeBucketManager.
210+
/// The manager is still used for config (snap_duration, snap_nearest, bucket names/durations).
211+
pub bitmap_silo: Option<&'a crate::silos::bitmap_silo::BitmapSilo>,
208212
}
209213

210214
/// Pre-process filter clauses: replace range filters on bucketed timestamp fields with
@@ -239,18 +243,12 @@ fn snap_clause(clause: &FilterClause, ctx: &BucketSnapContext<'_>) -> FilterClau
239243
// bucket that covers the requested duration, or the largest bucket.
240244
let duration_secs = ctx.now_secs.saturating_sub(*ts as u64);
241245
let bucket_name = manager.snap_nearest(duration_secs);
242-
if let Some(bucket) = manager.get_bucket(bucket_name) {
243-
FilterClause::BucketBitmap {
244-
field: field.clone(),
245-
bucket_name: bucket_name.to_string(),
246-
bitmap: Arc::clone(bucket.bitmap()),
247-
}
248-
} else {
249-
FilterClause::BucketBitmap {
250-
field: field.clone(),
251-
bucket_name: "_none".to_string(),
252-
bitmap: Arc::new(RoaringBitmap::new()),
253-
}
246+
let bitmap = resolve_bucket_bitmap(ctx.bitmap_silo, manager, field, bucket_name)
247+
.unwrap_or_else(|| Arc::new(RoaringBitmap::new()));
248+
FilterClause::BucketBitmap {
249+
field: field.clone(),
250+
bucket_name: bucket_name.to_string(),
251+
bitmap,
254252
}
255253
} else {
256254
// Unsnapped queries allowed — return empty bitmap for out-of-range.
@@ -289,14 +287,30 @@ fn try_snap_to_bucket(
289287
// duration = now - ts (the window the filter requests)
290288
let duration_secs = ctx.now_secs.saturating_sub(ts as u64);
291289
let bucket_name = manager.snap_duration(duration_secs, ctx.tolerance_pct)?;
292-
let bucket = manager.get_bucket(bucket_name)?;
290+
let bitmap = resolve_bucket_bitmap(ctx.bitmap_silo, manager, field, bucket_name)?;
293291
Some(FilterClause::BucketBitmap {
294292
field: field.to_string(),
295293
bucket_name: bucket_name.to_string(),
296-
bitmap: Arc::clone(bucket.bitmap()),
294+
bitmap,
297295
})
298296
}
299297

298+
/// Resolve a bucket bitmap: check silo first (ops-on-read), fall back to in-memory manager.
299+
fn resolve_bucket_bitmap(
300+
silo: Option<&crate::silos::bitmap_silo::BitmapSilo>,
301+
manager: &crate::time_buckets::TimeBucketManager,
302+
field: &str,
303+
bucket_name: &str,
304+
) -> Option<Arc<RoaringBitmap>> {
305+
if let Some(silo) = silo {
306+
if let Some(bm) = silo.get_bucket_with_ops(field, bucket_name) {
307+
return Some(Arc::new(bm));
308+
}
309+
}
310+
// Fall back to in-memory bucket
311+
manager.get_bucket(bucket_name).map(|b| Arc::clone(b.bitmap()))
312+
}
313+
300314
#[cfg(test)]
301315
mod tests {
302316
use super::*;
@@ -314,6 +328,7 @@ mod tests {
314328
now_secs,
315329
tolerance_pct: 0.10,
316330
always_snap: true,
331+
bitmap_silo: None,
317332
}
318333
}
319334

@@ -411,6 +426,7 @@ mod tests {
411426
now_secs: now,
412427
tolerance_pct: 0.10,
413428
always_snap: false,
429+
bitmap_silo: None,
414430
};
415431

416432
// Duration = 200000s, outside tolerance, always_snap=false → empty bitmap
@@ -489,6 +505,92 @@ mod tests {
489505
assert!(matches!(&snapped[0], FilterClause::Gt(_, _)));
490506
}
491507

508+
/// When a BitmapSilo is available, snap_clause should read from it instead of
509+
/// the in-memory TimeBucketManager bitmap.
510+
#[test]
511+
fn test_snap_reads_from_silo_when_available() {
512+
let now: u64 = 1_700_000_000;
513+
let dir = tempfile::tempdir().unwrap();
514+
515+
// Build a silo with a specific bitmap for "sortAt"/"24h"
516+
let silo = crate::silos::bitmap_silo::BitmapSilo::open(dir.path()).unwrap();
517+
let mut silo_bm = roaring::RoaringBitmap::new();
518+
silo_bm.extend([100u32, 200, 300]); // distinct from in-memory
519+
silo.save_bucket("sortAt", "24h", &silo_bm).unwrap();
520+
521+
// Build a TimeBucketManager with DIFFERENT in-memory bitmap (slots 1, 2, 3)
522+
let mgr = make_manager_with_data(now);
523+
// Verify the in-memory manager has slots 1-3 for "24h", not 100-300
524+
{
525+
let bm = mgr.get_bucket("24h").unwrap().bitmap();
526+
assert!(bm.contains(1));
527+
assert!(!bm.contains(100));
528+
}
529+
530+
let mut managers = HashMap::new();
531+
managers.insert("sortAt".to_string(), &mgr);
532+
533+
// Build context with silo
534+
let ctx = BucketSnapContext {
535+
managers: &managers,
536+
now_secs: now,
537+
tolerance_pct: 0.10,
538+
always_snap: true,
539+
bitmap_silo: Some(&silo),
540+
};
541+
542+
// Snap to "24h" — should use silo bitmap (100, 200, 300), not in-memory (1, 2, 3)
543+
let ts = (now - 86400) as i64; // exactly 24h
544+
let clauses = vec![FilterClause::Gt("sortAt".to_string(), Value::Integer(ts))];
545+
let snapped = snap_range_clauses(&clauses, &ctx);
546+
547+
match &snapped[0] {
548+
FilterClause::BucketBitmap { field, bucket_name, bitmap } => {
549+
assert_eq!(field, "sortAt");
550+
assert_eq!(bucket_name, "24h");
551+
// Should come from silo, not in-memory manager
552+
assert!(bitmap.contains(100), "should have silo slot 100");
553+
assert!(bitmap.contains(200), "should have silo slot 200");
554+
assert!(bitmap.contains(300), "should have silo slot 300");
555+
assert!(!bitmap.contains(1), "should NOT have in-memory slot 1");
556+
assert_eq!(bitmap.len(), 3);
557+
}
558+
other => panic!("expected BucketBitmap, got {:?}", other),
559+
}
560+
}
561+
562+
/// When silo is None, snap_clause falls back to in-memory manager bitmap.
563+
#[test]
564+
fn test_snap_falls_back_to_in_memory_without_silo() {
565+
let now: u64 = 1_700_000_000;
566+
let mgr = make_manager_with_data(now);
567+
let mut managers = HashMap::new();
568+
managers.insert("sortAt".to_string(), &mgr);
569+
570+
let ctx = BucketSnapContext {
571+
managers: &managers,
572+
now_secs: now,
573+
tolerance_pct: 0.10,
574+
always_snap: true,
575+
bitmap_silo: None,
576+
};
577+
578+
let ts = (now - 86400) as i64;
579+
let clauses = vec![FilterClause::Gt("sortAt".to_string(), Value::Integer(ts))];
580+
let snapped = snap_range_clauses(&clauses, &ctx);
581+
582+
match &snapped[0] {
583+
FilterClause::BucketBitmap { bitmap, .. } => {
584+
// In-memory manager has slots 1-3 in "24h"
585+
assert!(bitmap.contains(1));
586+
assert!(bitmap.contains(2));
587+
assert!(bitmap.contains(3));
588+
assert_eq!(bitmap.len(), 3);
589+
}
590+
other => panic!("expected BucketBitmap, got {:?}", other),
591+
}
592+
}
593+
492594
#[test]
493595
fn test_filter_clause_construction() {
494596
let clause = FilterClause::And(vec![

0 commit comments

Comments
 (0)