Skip to content

Commit 47acd15

Browse files
feat(asap-planner): support SQL COUNT(DISTINCT) -> HLL streaming/inference config generation (#385)
* Add support for HLL-Cardinality inference/streaming config generation * Formatting changes * Fixed clippy warnings * Preserved Error condition * Preserving error codition * addressing PR comments * Removing unreachable code
1 parent e1a8d3f commit 47acd15

5 files changed

Lines changed: 110 additions & 3 deletions

File tree

asap-common/dependencies/rs/promql_utilities/src/query_logics/logics.rs

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,7 @@ pub fn map_statistic_to_precompute_operator(
5050
Ok((AggregationType::MultipleIncrease, "".to_string()))
5151
}
5252
Statistic::Topk => Ok((AggregationType::CountMinSketchWithHeap, "topk".to_string())),
53-
_ => Err(format!("Statistic {statistic:?} not supported")),
53+
Statistic::Cardinality => Ok((AggregationType::HLL, "".to_string())),
5454
}
5555
}
5656

@@ -82,6 +82,8 @@ pub fn does_precompute_operator_support_subpopulations(
8282
// CountMinSketchWithHeap is only supported for Topk — does not support subpopulations
8383
AggregationType::CountMinSketchWithHeap if matches!(statistic, Statistic::Topk) => false,
8484

85+
AggregationType::HLL => false,
86+
8587
// Default: not supported
8688
_ => panic!("Unexpected precompute operator: {}", precompute_operator),
8789
}
@@ -169,6 +171,16 @@ mod tests {
169171
));
170172
}
171173

174+
#[test]
175+
fn test_cardinality_maps_to_hll() {
176+
let result = map_statistic_to_precompute_operator(
177+
Statistic::Cardinality,
178+
QueryTreatmentType::Approximate,
179+
)
180+
.unwrap();
181+
assert_eq!(result, (AggregationType::HLL, "".to_string()));
182+
}
183+
172184
#[test]
173185
fn test_topk_maps_to_count_min_sketch_with_heap() {
174186
let result =

asap-planner-rs/src/config/input.rs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -86,6 +86,8 @@ pub struct SketchParameterOverrides {
8686
pub datasketches_kll: Option<KllParams>,
8787
#[serde(rename = "HydraKLL")]
8888
pub hydra_kll: Option<HydraParams>,
89+
#[serde(rename = "HLL")]
90+
pub hll: Option<HllParams>,
8991
}
9092

9193
#[derive(Debug, Clone, Deserialize)]
@@ -114,6 +116,11 @@ pub struct HydraParams {
114116
pub k: u64,
115117
}
116118

119+
#[derive(Debug, Clone, Deserialize)]
120+
pub struct HllParams {
121+
pub precision: u64,
122+
}
123+
117124
#[derive(Debug, Clone, Deserialize)]
118125
pub struct SQLControllerConfig {
119126
pub query_groups: Vec<SQLQueryGroup>,

asap-planner-rs/src/planner/sketch.rs

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ const DEFAULT_KLL_K: u64 = 500;
1111
const DEFAULT_HYDRA_ROW: u64 = 3;
1212
const DEFAULT_HYDRA_COL: u64 = 1024;
1313
const DEFAULT_HYDRA_K: u64 = 20;
14+
const DEFAULT_HLL_PRECISION: u64 = 14;
1415

1516
/// Shared sketch parameter builder used by both PromQL and SQL paths.
1617
///
@@ -89,6 +90,19 @@ pub fn build_sketch_parameters(
8990
Ok(m)
9091
}
9192

93+
AggregationType::HLL => {
94+
let precision = sketch_params
95+
.and_then(|p| p.hll.as_ref())
96+
.map(|p| p.precision)
97+
.unwrap_or(DEFAULT_HLL_PRECISION);
98+
let mut m = HashMap::new();
99+
m.insert(
100+
"precision".to_string(),
101+
serde_json::Value::Number(precision.into()),
102+
);
103+
Ok(m)
104+
}
105+
92106
AggregationType::HydraKLL => {
93107
let row_num = sketch_params
94108
.and_then(|p| p.hydra_kll.as_ref())

asap-planner-rs/src/planner/sql.rs

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -105,10 +105,14 @@ impl SQLSingleQueryProcessor {
105105

106106
// Label routing
107107
let spatial_output = KeyByLabelNames::new(labels.iter().cloned().collect::<Vec<_>>());
108-
let rollup = all_metadata.difference(&spatial_output);
109-
110108
let treatment_type = get_sql_treatment_type(agg_info.get_name());
111109
let statistics = get_sql_statistics(agg_info.get_name())?;
110+
let rollup = if statistics.contains(&Statistic::Cardinality) {
111+
// Distinct target is value_column, not a rollup label dimension.
112+
KeyByLabelNames::empty()
113+
} else {
114+
all_metadata.difference(&spatial_output)
115+
};
112116

113117
let configs = build_agg_configs_for_statistics(
114118
&statistics,
@@ -179,6 +183,7 @@ fn get_sql_statistics(name: &str) -> Result<Vec<Statistic>, ControllerError> {
179183
"AVG" => Ok(vec![Statistic::Sum, Statistic::Count]),
180184
"MIN" => Ok(vec![Statistic::Min]),
181185
"MAX" => Ok(vec![Statistic::Max]),
186+
"CARDINALITY" => Ok(vec![Statistic::Cardinality]),
182187
other => Err(ControllerError::SqlParse(format!(
183188
"Unsupported aggregation: {}",
184189
other

asap-planner-rs/tests/sql_integration.rs

Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -407,6 +407,75 @@ fn temporal_quantile_cast_datetime_bounds() {
407407
assert_eq!(out.inference_cleanup_param(q), Some(1));
408408
}
409409

410+
// ── COUNT(DISTINCT) / HLL (spatial 1 s) ───────────────────────────────────────
411+
412+
fn netflow_one_query_config(query: &str, t_repeat: u64) -> String {
413+
format!(
414+
r#"
415+
tables:
416+
- name: netflow_table
417+
time_column: time
418+
value_columns: [pkt_len, dstip]
419+
metadata_columns: [srcip, dstip, proto]
420+
query_groups:
421+
- id: 1
422+
repetition_delay: {t_repeat}
423+
controller_options:
424+
accuracy_sla: 0.95
425+
latency_sla: 100.0
426+
queries:
427+
- >-
428+
{query}
429+
aggregate_cleanup:
430+
policy: no_cleanup
431+
"#
432+
)
433+
}
434+
435+
fn sql_opts_1s_ingest() -> SQLRuntimeOptions {
436+
SQLRuntimeOptions {
437+
streaming_engine: StreamingEngine::Arroyo,
438+
query_evaluation_time: Some(1_000_000.0),
439+
data_ingestion_interval: 1,
440+
}
441+
}
442+
443+
/// COUNT(DISTINCT dstip) GROUP BY srcip, 1 s window → HLL, grouping [srcip], empty rollup.
444+
#[test]
445+
fn spatial_count_distinct_hll() {
446+
let q = "SELECT srcip, COUNT(DISTINCT dstip) AS unique_peers FROM netflow_table WHERE time BETWEEN DATEADD(s, -11, NOW()) AND DATEADD(s, -10, NOW()) GROUP BY srcip";
447+
let out = SQLController::from_yaml(&netflow_one_query_config(q, 1), sql_opts_1s_ingest())
448+
.unwrap()
449+
.generate()
450+
.unwrap();
451+
452+
assert_eq!(out.streaming_aggregation_count(), 1);
453+
assert_eq!(out.inference_query_count(), 1);
454+
assert!(out.has_aggregation_type("HLL"));
455+
assert!(!out.has_aggregation_type("DeltaSetAggregator"));
456+
assert!(out.all_tumbling_window_sizes_eq(1));
457+
assert_eq!(
458+
out.aggregation_table_name("HLL"),
459+
Some("netflow_table".to_string())
460+
);
461+
assert_eq!(
462+
out.aggregation_value_column("HLL"),
463+
Some("dstip".to_string())
464+
);
465+
assert_eq!(
466+
out.aggregation_labels("HLL", "grouping"),
467+
vec!["srcip".to_string()]
468+
);
469+
assert_eq!(
470+
out.aggregation_labels("HLL", "rollup"),
471+
Vec::<String>::new()
472+
);
473+
assert_eq!(
474+
out.aggregation_labels("HLL", "aggregated"),
475+
Vec::<String>::new()
476+
);
477+
}
478+
410479
// ── T-value variants for SUM (range = 300 s fixed) ───────────────────────────
411480
//
412481
// These three tests use the same query and differ only in repetition_delay (T).

0 commit comments

Comments
 (0)