Skip to content

Commit 7849b58

Browse files
authored
fix: 5 correctness bugs from external review (#142)
fix: 5 correctness bugs from external review (stale reads, cache epochs, silo bypass)
2 parents bb13ff8 + ffe1f28 commit 7849b58

4 files changed

Lines changed: 136 additions & 45 deletions

File tree

src/engine/concurrent_engine.rs

Lines changed: 71 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -548,6 +548,8 @@ impl ConcurrentEngine {
548548
| MutationOp::FilterRemove { .. }
549549
| MutationOp::SortSet { .. }
550550
| MutationOp::SortClear { .. }
551+
| MutationOp::AliveInsert { .. }
552+
| MutationOp::AliveRemove { .. }
551553
));
552554
if !has_field_ops {
553555
return;
@@ -562,6 +564,9 @@ impl ConcurrentEngine {
562564
| MutationOp::SortClear { field, .. } => {
563565
guard.insert(field.to_string(), new_epoch);
564566
}
567+
MutationOp::AliveInsert { .. } | MutationOp::AliveRemove { .. } => {
568+
guard.insert("__alive__".to_string(), new_epoch);
569+
}
565570
_ => {}
566571
}
567572
}
@@ -726,7 +731,15 @@ impl ConcurrentEngine {
726731
self.send_mutation_ops(ops)
727732
}
728733
/// Get the number of alive documents.
734+
///
735+
/// When a BitmapSilo is present, reads from the silo (includes ops-log replay)
736+
/// rather than from the stale in-memory SlotAllocator.
729737
pub fn alive_count(&self) -> u64 {
738+
if let Some(ref silo_arc) = self.bitmap_silo {
739+
if let Some(alive) = silo_arc.read().get_alive_with_ops() {
740+
return alive.len();
741+
}
742+
}
730743
self.slots.read().alive_count()
731744
}
732745
/// Flush loop stats: (apply_count, cumulative_duration_nanos, last_duration_nanos).
@@ -754,8 +767,21 @@ impl ConcurrentEngine {
754767
self.slots.read().slot_counter()
755768
}
756769
/// Reconstruct the sort value for a given slot in the named sort field.
757-
/// Returns None if the field is not found in the in-memory sort index.
770+
///
771+
/// When a BitmapSilo is present, reads from the silo (correct when in-memory
772+
/// SortIndex is not updated). Falls back to in-memory SortIndex otherwise.
773+
/// Returns None if the field is not found in either source.
758774
pub fn reconstruct_sort_value(&self, field: &str, slot: u32) -> Option<u32> {
775+
if let Some(ref silo_arc) = self.bitmap_silo {
776+
// Look up num_bits from config for this sort field.
777+
if let Some(sc) = self.config.sort_fields.iter().find(|s| s.name == field) {
778+
let num_bits = sc.bits as usize;
779+
let silo = silo_arc.read();
780+
return Some(crate::engine::frozen_sort::frozen_reconstruct_value(
781+
&silo, field, num_bits, slot,
782+
));
783+
}
784+
}
759785
self.sorts.read().get_field(field).map(|f| f.reconstruct_value(slot))
760786
}
761787
// ---- Named cursors ----
@@ -1112,34 +1138,62 @@ impl ConcurrentEngine {
11121138
/// Used by the NDJSON loader to apply accumulated bitmaps from a parsed chunk
11131139
/// without the staging InnerEngine pattern. Takes write locks briefly to OR-merge
11141140
/// filter/sort bitmaps and alive bits into the existing live state.
1141+
///
1142+
/// When a BitmapSilo is present, writes are directed to the silo via
1143+
/// `write_dump_maps()` (batch frozen write) and the in-memory indexes are skipped,
1144+
/// since all reads go through the silo when it is active.
11151145
pub fn merge_bitmap_maps(
11161146
&self,
11171147
filter_maps: HashMap<String, HashMap<u64, RoaringBitmap>>,
11181148
sort_maps: HashMap<String, HashMap<usize, RoaringBitmap>>,
11191149
alive: RoaringBitmap,
11201150
) {
1121-
{
1122-
let mut filters_w = self.filters.write();
1123-
for (field_name, value_map) in filter_maps {
1124-
if let Some(field) = filters_w.get_field_mut(&field_name) {
1125-
for (value, bitmap) in value_map {
1126-
field.or_bitmap(value, &bitmap);
1151+
if let Some(ref silo_arc) = self.bitmap_silo {
1152+
// Silo present: route writes to the silo only (reads bypass in-memory indexes).
1153+
// Convert sort_maps: HashMap<usize, RoaringBitmap> → Vec<RoaringBitmap> (indexed by bit layer).
1154+
let silo_sort_maps: HashMap<String, Vec<RoaringBitmap>> = sort_maps.into_iter()
1155+
.map(|(field_name, bit_map)| {
1156+
let max_bit = bit_map.keys().copied().max().map(|b| b + 1).unwrap_or(0);
1157+
let mut layers = vec![RoaringBitmap::new(); max_bit];
1158+
for (bit, bm) in bit_map {
1159+
if bit < max_bit {
1160+
layers[bit] = bm;
1161+
}
1162+
}
1163+
(field_name, layers)
1164+
})
1165+
.collect();
1166+
let slot_counter = self.slots.read().slot_counter();
1167+
let cursors = self.cursors.lock().clone();
1168+
let mut silo = silo_arc.write();
1169+
if let Err(e) = silo.write_dump_maps(filter_maps, silo_sort_maps, &alive, slot_counter, &cursors) {
1170+
tracing::warn!("merge_bitmap_maps: silo write_dump_maps failed: {e}");
1171+
}
1172+
} else {
1173+
// No silo: apply to in-memory indexes only (legacy/test path).
1174+
{
1175+
let mut filters_w = self.filters.write();
1176+
for (field_name, value_map) in filter_maps {
1177+
if let Some(field) = filters_w.get_field_mut(&field_name) {
1178+
for (value, bitmap) in value_map {
1179+
field.or_bitmap(value, &bitmap);
1180+
}
11271181
}
11281182
}
11291183
}
1130-
}
1131-
{
1132-
let mut sorts_w = self.sorts.write();
1133-
for (field_name, bit_map) in sort_maps {
1134-
if let Some(field) = sorts_w.get_field_mut(&field_name) {
1135-
for (bit, bitmap) in bit_map {
1136-
field.or_layer(bit, &bitmap);
1184+
{
1185+
let mut sorts_w = self.sorts.write();
1186+
for (field_name, bit_map) in sort_maps {
1187+
if let Some(field) = sorts_w.get_field_mut(&field_name) {
1188+
for (bit, bitmap) in bit_map {
1189+
field.or_layer(bit, &bitmap);
1190+
}
11371191
}
11381192
}
11391193
}
1140-
}
1141-
{
1142-
self.slots.write().alive_or_bitmap(&alive);
1194+
{
1195+
self.slots.write().alive_or_bitmap(&alive);
1196+
}
11431197
}
11441198
self.dirty_flag.store(true, Ordering::Release);
11451199
self.invalidate_all_caches();

src/engine/executor.rs

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -154,13 +154,11 @@ impl<'a> QueryExecutor<'a> {
154154
}
155155

156156
/// Alive count consistent with `alive_bitmap()`.
157+
///
158+
/// Derives from the cached `alive_bitmap()` so both methods always agree
159+
/// within a single query execution (avoids double-computing the silo alive set).
157160
fn alive_count(&self) -> u64 {
158-
if let Some(silo) = self.bitmap_silo {
159-
if let Some(alive) = silo.get_alive_with_ops() {
160-
return alive.len();
161-
}
162-
}
163-
self.slots.alive_count()
161+
self.alive_bitmap().len()
164162
}
165163

166164
/// Attach a time bucket manager for in-executor bucket snapping (C3).

src/engine/flush.rs

Lines changed: 57 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -129,28 +129,64 @@ pub fn run_flush_thread(args: FlushArgs) {
129129
let sort_field_name = tb.sort_field_name().to_string();
130130
let field_name = tb.field_name().to_string();
131131
let bucket_names: Vec<String> = tb.bucket_names();
132-
let sorts_r = flush_sorts.read();
133-
if let Some(sort_field) = sorts_r.get_field(&sort_field_name) {
134-
for &slot in &batch.alive_inserts {
135-
let ts = sort_field.reconstruct_value(slot) as u64;
136-
// Determine which buckets this slot qualifies for (same logic as insert_slot)
137-
let qualifying: Vec<String> = bucket_names.iter()
138-
.filter(|name| {
139-
if let Some(bucket) = tb.get_bucket(name) {
140-
let cutoff = now_secs.saturating_sub(bucket.duration_secs);
141-
ts >= cutoff && ts <= now_secs
142-
} else {
143-
false
144-
}
145-
})
146-
.cloned()
147-
.collect();
148-
tb.insert_slot(slot, ts, now_secs);
149-
// Mirror to silo
150-
if let Some(ref silo_arc) = flush_bitmap_silo {
132+
// Reconstruct sort values via silo when silo is active
133+
// (in-memory SortIndex is not updated when has_silo is true).
134+
let mut reconstruct_via_silo = false;
135+
if has_silo {
136+
if let Some(ref silo_arc) = flush_bitmap_silo {
137+
// Look up num_bits for this sort field from config.
138+
if let Some(sc) = flush_config.sort_fields.iter().find(|s| s.name == sort_field_name) {
139+
let num_bits = sc.bits as usize;
151140
let silo = silo_arc.read();
152-
for bucket_name in &qualifying {
153-
let _ = silo.bucket_set(&field_name, bucket_name, slot);
141+
for &slot in &batch.alive_inserts {
142+
let ts = crate::engine::frozen_sort::frozen_reconstruct_value(
143+
&silo, &sort_field_name, num_bits, slot,
144+
) as u64;
145+
let qualifying: Vec<String> = bucket_names.iter()
146+
.filter(|name| {
147+
if let Some(bucket) = tb.get_bucket(name) {
148+
let cutoff = now_secs.saturating_sub(bucket.duration_secs);
149+
ts >= cutoff && ts <= now_secs
150+
} else {
151+
false
152+
}
153+
})
154+
.cloned()
155+
.collect();
156+
tb.insert_slot(slot, ts, now_secs);
157+
for bucket_name in &qualifying {
158+
let _ = silo.bucket_set(&field_name, bucket_name, slot);
159+
}
160+
}
161+
reconstruct_via_silo = true;
162+
}
163+
}
164+
}
165+
if !reconstruct_via_silo {
166+
// In-memory path (no silo, or silo sort field not found in config).
167+
let sorts_r = flush_sorts.read();
168+
if let Some(sort_field) = sorts_r.get_field(&sort_field_name) {
169+
for &slot in &batch.alive_inserts {
170+
let ts = sort_field.reconstruct_value(slot) as u64;
171+
// Determine which buckets this slot qualifies for (same logic as insert_slot)
172+
let qualifying: Vec<String> = bucket_names.iter()
173+
.filter(|name| {
174+
if let Some(bucket) = tb.get_bucket(name) {
175+
let cutoff = now_secs.saturating_sub(bucket.duration_secs);
176+
ts >= cutoff && ts <= now_secs
177+
} else {
178+
false
179+
}
180+
})
181+
.cloned()
182+
.collect();
183+
tb.insert_slot(slot, ts, now_secs);
184+
// Mirror to silo
185+
if let Some(ref silo_arc) = flush_bitmap_silo {
186+
let silo = silo_arc.read();
187+
for bucket_name in &qualifying {
188+
let _ = silo.bucket_set(&field_name, bucket_name, slot);
189+
}
154190
}
155191
}
156192
}

src/engine/query.rs

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -257,10 +257,13 @@ impl ConcurrentEngine {
257257
let mut bm = roaring::RoaringBitmap::new();
258258
for &slot in &sorted_slots { bm.insert(slot); }
259259
// Tag the entry with the current epoch so staleness can be detected.
260+
// Include __alive__ so inserts/deletes invalidate cached results that
261+
// implicitly depend on the alive set (e.g. negation queries, count queries).
260262
let current_epoch = self.mutation_epoch();
261-
let entry_field_epochs: Vec<(String, u64)> = ukey.filter_clauses.iter()
263+
let mut entry_field_epochs: Vec<(String, u64)> = ukey.filter_clauses.iter()
262264
.map(|c| (c.field.clone(), self.field_epoch(&c.field)))
263265
.collect();
266+
entry_field_epochs.push(("__alive__".to_string(), self.field_epoch("__alive__")));
264267
let entry_data = crate::silos::cache_silo::CacheEntryData {
265268
key: ukey.clone(),
266269
bitmap: bm,

0 commit comments

Comments
 (0)