Skip to content

Commit 388041f

Browse files
refactor(query-engine): make CountMinSketchWithHeap config params required in accumulator factory
1 parent b088ae5 commit 388041f

2 files changed

Lines changed: 89 additions & 59 deletions

File tree

asap-query-engine/src/precompute_engine/accumulator_factory.rs

Lines changed: 82 additions & 54 deletions
Original file line numberDiff line numberDiff line change
@@ -734,19 +734,26 @@ fn hydra_kll_params(config: &AggregationConfig) -> (usize, usize, u16) {
734734
/// Extract `(row_num, col_num, heap_size)` for CountMinSketchWithHeap configs.
735735
///
736736
/// Accepts the planner/Arroyo-canonical `depth`/`width`/`heapsize` names first,
737-
/// then falls back to the `row_num`/`col_num`/`heap_size` aliases. Defaults
738-
/// mirror the planner sketch defaults (depth 3, width 1024) with a heap of 32.
739-
fn cms_heap_params(config: &AggregationConfig) -> (usize, usize, usize) {
740-
let read = |names: &[&str], default: u64| -> usize {
737+
/// then falls back to the `row_num`/`col_num`/`heap_size` aliases. All three
738+
/// parameters are required — the planner always emits them and their absence
739+
/// indicates a malformed config.
740+
fn cms_heap_params(config: &AggregationConfig) -> Result<(usize, usize, usize), String> {
741+
let read = |names: &[&str]| -> Result<usize, String> {
741742
names
742743
.iter()
743744
.find_map(|n| config.parameters.get(*n).and_then(|v| v.as_u64()))
744-
.unwrap_or(default) as usize
745+
.map(|v| v as usize)
746+
.ok_or_else(|| {
747+
format!(
748+
"CountMinSketchWithHeap config missing required parameter (tried: {})",
749+
names.join(", ")
750+
)
751+
})
745752
};
746-
let row_num = read(&["depth", "row_num"], 3);
747-
let col_num = read(&["width", "col_num"], 1024);
748-
let heap_size = read(&["heapsize", "heap_size"], 32);
749-
(row_num, col_num, heap_size)
753+
let row_num = read(&["depth", "row_num"])?;
754+
let col_num = read(&["width", "col_num"])?;
755+
let heap_size = read(&["heapsize", "heap_size"])?;
756+
Ok((row_num, col_num, heap_size))
750757
}
751758

752759
/// Whether a CountMinSketchWithHeap config should count events (weight 1 per
@@ -787,84 +794,97 @@ fn hll_precision_param(config: &AggregationConfig) -> u32 {
787794
// ---------------------------------------------------------------------------
788795

789796
/// Create an appropriate `AccumulatorUpdater` from an `AggregationConfig`.
790-
pub fn create_accumulator_updater(config: &AggregationConfig) -> Box<dyn AccumulatorUpdater> {
797+
///
798+
/// Returns `Err` if the config is of a type that requires specific parameters
799+
/// (e.g. `CountMinSketchWithHeap`) but those parameters are absent or invalid.
800+
pub fn create_accumulator_updater(
801+
config: &AggregationConfig,
802+
) -> Result<Box<dyn AccumulatorUpdater>, String> {
791803
let sub_type = config.aggregation_sub_type.as_str();
792804

793805
match config.aggregation_type {
794806
AggregationType::SingleSubpopulation => match sub_type {
795-
"Sum" | "sum" => Box::new(SumAccumulatorUpdater::new()),
796-
"Min" | "min" => Box::new(MinMaxAccumulatorUpdater::new(false)),
797-
"Max" | "max" => Box::new(MinMaxAccumulatorUpdater::new(true)),
798-
"Increase" | "increase" => Box::new(IncreaseAccumulatorUpdater::new()),
807+
"Sum" | "sum" => Ok(Box::new(SumAccumulatorUpdater::new())),
808+
"Min" | "min" => Ok(Box::new(MinMaxAccumulatorUpdater::new(false))),
809+
"Max" | "max" => Ok(Box::new(MinMaxAccumulatorUpdater::new(true))),
810+
"Increase" | "increase" => Ok(Box::new(IncreaseAccumulatorUpdater::new())),
799811
"DatasketchesKLL" | "datasketches_kll" | "KLL" | "kll" => {
800-
Box::new(KllAccumulatorUpdater::new(kll_k_param(config)))
812+
Ok(Box::new(KllAccumulatorUpdater::new(kll_k_param(config))))
801813
}
802814
other => {
803815
tracing::warn!(
804816
"Unknown SingleSubpopulation sub_type '{}', defaulting to Sum",
805817
other
806818
);
807-
Box::new(SumAccumulatorUpdater::new())
819+
Ok(Box::new(SumAccumulatorUpdater::new()))
808820
}
809821
},
810822
AggregationType::MultipleSubpopulation => match sub_type {
811-
"Sum" | "sum" => Box::new(MultipleSumAccumulatorUpdater::new()),
812-
"Min" | "min" => Box::new(MultipleMinMaxAccumulatorUpdater::new(false)),
813-
"Max" | "max" => Box::new(MultipleMinMaxAccumulatorUpdater::new(true)),
814-
"Increase" | "increase" => Box::new(MultipleIncreaseAccumulatorUpdater::new()),
823+
"Sum" | "sum" => Ok(Box::new(MultipleSumAccumulatorUpdater::new())),
824+
"Min" | "min" => Ok(Box::new(MultipleMinMaxAccumulatorUpdater::new(false))),
825+
"Max" | "max" => Ok(Box::new(MultipleMinMaxAccumulatorUpdater::new(true))),
826+
"Increase" | "increase" => Ok(Box::new(MultipleIncreaseAccumulatorUpdater::new())),
815827
"CountMinSketch" | "count_min_sketch" | "CMS" | "cms" => {
816828
let (row_num, col_num) = cms_params(config);
817-
Box::new(CmsAccumulatorUpdater::new(row_num, col_num))
829+
Ok(Box::new(CmsAccumulatorUpdater::new(row_num, col_num)))
818830
}
819831
"HydraKLL" | "hydra_kll" => {
820832
let (row_num, col_num, k) = hydra_kll_params(config);
821-
Box::new(HydraKllAccumulatorUpdater::new(row_num, col_num, k))
833+
Ok(Box::new(HydraKllAccumulatorUpdater::new(
834+
row_num, col_num, k,
835+
)))
822836
}
823837
other => {
824838
tracing::warn!(
825839
"Unknown MultipleSubpopulation sub_type '{}', defaulting to Sum",
826840
other
827841
);
828-
Box::new(MultipleSumAccumulatorUpdater::new())
842+
Ok(Box::new(MultipleSumAccumulatorUpdater::new()))
829843
}
830844
},
831845
AggregationType::DatasketchesKLL => {
832-
Box::new(KllAccumulatorUpdater::new(kll_k_param(config)))
846+
Ok(Box::new(KllAccumulatorUpdater::new(kll_k_param(config))))
847+
}
848+
AggregationType::MultipleSum => Ok(Box::new(MultipleSumAccumulatorUpdater::new())),
849+
AggregationType::MultipleIncrease => {
850+
Ok(Box::new(MultipleIncreaseAccumulatorUpdater::new()))
833851
}
834-
AggregationType::MultipleSum => Box::new(MultipleSumAccumulatorUpdater::new()),
835-
AggregationType::MultipleIncrease => Box::new(MultipleIncreaseAccumulatorUpdater::new()),
836-
AggregationType::MultipleMinMax => Box::new(MultipleMinMaxAccumulatorUpdater::new(
852+
AggregationType::MultipleMinMax => Ok(Box::new(MultipleMinMaxAccumulatorUpdater::new(
837853
sub_type.eq_ignore_ascii_case("max"),
838-
)),
839-
AggregationType::Sum => Box::new(SumAccumulatorUpdater::new()),
840-
AggregationType::MinMax => Box::new(MinMaxAccumulatorUpdater::new(
854+
))),
855+
AggregationType::Sum => Ok(Box::new(SumAccumulatorUpdater::new())),
856+
AggregationType::MinMax => Ok(Box::new(MinMaxAccumulatorUpdater::new(
841857
sub_type.eq_ignore_ascii_case("max"),
842-
)),
843-
AggregationType::Increase => Box::new(IncreaseAccumulatorUpdater::new()),
858+
))),
859+
AggregationType::Increase => Ok(Box::new(IncreaseAccumulatorUpdater::new())),
844860
AggregationType::CountMinSketch => {
845861
let (row_num, col_num) = cms_params(config);
846-
Box::new(CmsAccumulatorUpdater::new(row_num, col_num))
862+
Ok(Box::new(CmsAccumulatorUpdater::new(row_num, col_num)))
847863
}
848864
AggregationType::CountMinSketchWithHeap => {
849-
let (row_num, col_num, heap_size) = cms_heap_params(config);
850-
Box::new(CmsWithHeapAccumulatorUpdater::new(
865+
let (row_num, col_num, heap_size) = cms_heap_params(config)?;
866+
Ok(Box::new(CmsWithHeapAccumulatorUpdater::new(
851867
row_num,
852868
col_num,
853869
heap_size,
854870
cms_count_events(config),
855-
))
871+
)))
856872
}
857873
AggregationType::HydraKLL => {
858874
let (row_num, col_num, k) = hydra_kll_params(config);
859-
Box::new(HydraKllAccumulatorUpdater::new(row_num, col_num, k))
875+
Ok(Box::new(HydraKllAccumulatorUpdater::new(
876+
row_num, col_num, k,
877+
)))
860878
}
861-
AggregationType::HLL => Box::new(HllAccumulatorUpdater::new(hll_precision_param(config))),
879+
AggregationType::HLL => Ok(Box::new(HllAccumulatorUpdater::new(hll_precision_param(
880+
config,
881+
)))),
862882
other => {
863883
tracing::warn!(
864884
"Unknown aggregation_type '{:?}', defaulting to SingleSubpopulation Sum",
865885
other
866886
);
867-
Box::new(SumAccumulatorUpdater::new())
887+
Ok(Box::new(SumAccumulatorUpdater::new()))
868888
}
869889
}
870890
}
@@ -1017,7 +1037,7 @@ mod tests {
10171037
(AggregationType::CountMinSketch, ""),
10181038
] {
10191039
let config = make_config(*agg_type, sub_type);
1020-
let updater = create_accumulator_updater(&config);
1040+
let updater = create_accumulator_updater(&config).unwrap();
10211041
assert_eq!(
10221042
config_is_keyed(&config),
10231043
updater.is_keyed(),
@@ -1052,7 +1072,7 @@ mod tests {
10521072
None,
10531073
None,
10541074
);
1055-
let mut updater = create_accumulator_updater(&config);
1075+
let mut updater = create_accumulator_updater(&config).unwrap();
10561076
assert!(
10571077
!updater.is_keyed(),
10581078
"HLL is single-population per grouping key (like KLL), not keyed",
@@ -1107,7 +1127,7 @@ mod tests {
11071127
None,
11081128
None,
11091129
);
1110-
let updater = create_accumulator_updater(&config);
1130+
let updater = create_accumulator_updater(&config).unwrap();
11111131
let acc = updater.snapshot_accumulator();
11121132
let hll = acc
11131133
.as_any()
@@ -1140,7 +1160,7 @@ mod tests {
11401160
None,
11411161
None,
11421162
);
1143-
let updater = create_accumulator_updater(&config);
1163+
let updater = create_accumulator_updater(&config).unwrap();
11441164
let acc = updater.snapshot_accumulator();
11451165
let hll = acc
11461166
.as_any()
@@ -1174,7 +1194,7 @@ mod tests {
11741194
None,
11751195
None,
11761196
);
1177-
let mut updater = create_accumulator_updater(&config);
1197+
let mut updater = create_accumulator_updater(&config).unwrap();
11781198
for i in 0..50 {
11791199
updater.update_single(i as f64, 0);
11801200
}
@@ -1212,7 +1232,7 @@ mod tests {
12121232
None,
12131233
None,
12141234
);
1215-
let updater = create_accumulator_updater(&config);
1235+
let updater = create_accumulator_updater(&config).unwrap();
12161236
let acc = updater.snapshot_accumulator();
12171237
let kll = acc
12181238
.as_any()
@@ -1221,6 +1241,14 @@ mod tests {
12211241
assert_eq!(kll.inner.k, 50, "k should be 50 from capital-K param");
12221242
}
12231243

1244+
fn cms_heap_params_required() -> std::collections::HashMap<String, serde_json::Value> {
1245+
let mut p = std::collections::HashMap::new();
1246+
p.insert("depth".to_string(), serde_json::json!(3_u64));
1247+
p.insert("width".to_string(), serde_json::json!(1024_u64));
1248+
p.insert("heapsize".to_string(), serde_json::json!(32_u64));
1249+
p
1250+
}
1251+
12241252
fn cms_heap_config(
12251253
parameters: std::collections::HashMap<String, serde_json::Value>,
12261254
) -> AggregationConfig {
@@ -1251,8 +1279,8 @@ mod tests {
12511279
fn test_cms_with_heap_factory_routes_to_heap_accumulator_and_is_keyed() {
12521280
// CountMinSketchWithHeap must build a CmsWithHeapAccumulatorUpdater whose
12531281
// accumulator exposes the heap (get_keys), NOT a plain CMS (no heap).
1254-
let config = cms_heap_config(std::collections::HashMap::new());
1255-
let updater = create_accumulator_updater(&config);
1282+
let config = cms_heap_config(cms_heap_params_required());
1283+
let updater = create_accumulator_updater(&config).unwrap();
12561284
assert!(updater.is_keyed(), "CMS-with-heap top-k is keyed by srcip");
12571285

12581286
let acc = updater.snapshot_accumulator();
@@ -1271,8 +1299,8 @@ mod tests {
12711299
fn test_cms_with_heap_count_events_uses_unit_weight() {
12721300
// count_events (the default) → each observation contributes weight 1, so
12731301
// the per-key estimate is the EVENT COUNT, not the sum of sample values.
1274-
let config = cms_heap_config(std::collections::HashMap::new());
1275-
let mut updater = create_accumulator_updater(&config);
1302+
let config = cms_heap_config(cms_heap_params_required());
1303+
let mut updater = create_accumulator_updater(&config).unwrap();
12761304

12771305
let key = KeyByLabelValues::new_with_labels(vec!["10.0.0.1".to_string()]);
12781306
// Feed 5 events with large values; count semantics must yield ~5, not ~Σvalue.
@@ -1294,10 +1322,10 @@ mod tests {
12941322
#[test]
12951323
fn test_cms_with_heap_count_events_false_sums_values() {
12961324
// count_events=false → weight is the sample value, giving SUM semantics.
1297-
let mut params = std::collections::HashMap::new();
1325+
let mut params = cms_heap_params_required();
12981326
params.insert("count_events".to_string(), serde_json::json!(false));
12991327
let config = cms_heap_config(params);
1300-
let mut updater = create_accumulator_updater(&config);
1328+
let mut updater = create_accumulator_updater(&config).unwrap();
13011329

13021330
let key = KeyByLabelValues::new_with_labels(vec!["10.0.0.1".to_string()]);
13031331
for _ in 0..5 {
@@ -1318,14 +1346,14 @@ mod tests {
13181346
params.insert("width".to_string(), serde_json::json!(2048));
13191347
params.insert("heapsize".to_string(), serde_json::json!(40));
13201348
let config = cms_heap_config(params);
1321-
assert_eq!(cms_heap_params(&config), (4, 2048, 40));
1349+
assert_eq!(cms_heap_params(&config).unwrap(), (4, 2048, 40));
13221350
assert!(cms_count_events(&config), "count_events defaults to true");
13231351
}
13241352

13251353
#[test]
13261354
fn test_cms_with_heap_reset_clears_state() {
1327-
let config = cms_heap_config(std::collections::HashMap::new());
1328-
let mut updater = create_accumulator_updater(&config);
1355+
let config = cms_heap_config(cms_heap_params_required());
1356+
let mut updater = create_accumulator_updater(&config).unwrap();
13291357
let key = KeyByLabelValues::new_with_labels(vec!["k".to_string()]);
13301358
for _ in 0..10 {
13311359
updater.update_keyed(&key, 1.0, 0);

asap-query-engine/src/precompute_engine/worker.rs

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -370,7 +370,7 @@ impl Worker {
370370
continue;
371371
}
372372
LateDataPolicy::ForwardToStore => {
373-
let mut updater = create_accumulator_updater(&state.config);
373+
let mut updater = create_accumulator_updater(&state.config)?;
374374
apply_sample(&mut *updater, series_key, *val, *ts, &state.config);
375375
let key = build_group_key_label_values(group_key);
376376
let output = PrecomputedOutput::new(
@@ -390,10 +390,12 @@ impl Worker {
390390
}
391391

392392
// Normal path: route sample to its single pane accumulator
393-
let updater = state
394-
.active_panes
395-
.entry(pane_start)
396-
.or_insert_with(|| create_accumulator_updater(&state.config));
393+
let updater = match state.active_panes.entry(pane_start) {
394+
std::collections::btree_map::Entry::Occupied(e) => e.into_mut(),
395+
std::collections::btree_map::Entry::Vacant(e) => {
396+
e.insert(create_accumulator_updater(&state.config)?)
397+
}
398+
};
397399

398400
apply_sample(&mut **updater, series_key, *val, *ts, &state.config);
399401
}

0 commit comments

Comments
 (0)