Skip to content

Commit 4bc9bf7

Browse files
feat(asap-planner): Support to generate SQL top-k CountMinSketchWithHeap configs (#395)
* Support to generate SQL top-k CountMinSketchWithHeap configs * Harden SQL top-k detection (LIMIT 0, alias case) and expand planner/engine tests * Formatting
1 parent 84d03cc commit 4bc9bf7

8 files changed

Lines changed: 327 additions & 91 deletions

File tree

asap-common/dependencies/rs/sql_utilities/src/ast_matching/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,6 @@ pub mod sqlparser_test;
33
pub mod sqlpattern_matcher;
44
pub mod sqlpattern_parser;
55

6-
pub use sqlhelper::{SQLSchema, Table};
6+
pub use sqlhelper::{detect_sql_topk, SQLSchema, SqlTopk, Table, TopkWeighting};
77
pub use sqlpattern_matcher::*;
88
pub use sqlpattern_parser::*;

asap-common/dependencies/rs/sql_utilities/src/ast_matching/sqlhelper.rs

Lines changed: 89 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -222,3 +222,92 @@ impl SQLQueryData {
222222
}
223223
}
224224
}
225+
226+
/// How a top-k query weights each observation fed into the heavy-hitter sketch.
227+
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
228+
pub enum TopkWeighting {
229+
/// `COUNT(col)`: every matching row contributes weight 1, so the heap ranks
230+
/// keys by event frequency (`count_events: true`).
231+
Count,
232+
/// `SUM(col)`: every matching row contributes weight = `col`, so the heap
233+
/// ranks keys by summed value (`count_events: false`).
234+
///
235+
/// Assumes **non-negative** summands: `CountMinSketch` is a frequency sketch
236+
/// and cannot represent negative weights, so a `SUM` over a column that can
237+
/// go negative would produce meaningless estimates.
238+
Sum,
239+
}
240+
241+
/// A detected SQL top-k query: the `LIMIT k` plus how the sketch is weighted.
242+
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
243+
pub struct SqlTopk {
244+
pub k: u64,
245+
pub weighting: TopkWeighting,
246+
}
247+
248+
impl SqlTopk {
249+
/// `count_events` flag the backing `CountMinSketchWithHeap` must use:
250+
/// `true` for COUNT (unit weight), `false` for SUM (value weight).
251+
pub fn count_events(&self) -> bool {
252+
matches!(self.weighting, TopkWeighting::Count)
253+
}
254+
}
255+
256+
/// Detect a SQL top-k query and return its `k` plus sketch weighting.
257+
///
258+
/// Recognises the heavy-hitter shape that `CountMinSketchWithHeap` serves:
259+
///
260+
/// ```sql
261+
/// SELECT <key>, COUNT(<col>) AS <alias> -- or SUM(<col>)
262+
/// FROM <table> WHERE <1s window>
263+
/// GROUP BY <key>
264+
/// ORDER BY <alias> DESC
265+
/// LIMIT k
266+
/// ```
267+
///
268+
/// The grouping key (`<key>`) becomes the *aggregated* dimension inside the
269+
/// sketch's heap — not a precompute partition key — so a single sketch per
270+
/// window tracks the top keys by event count (COUNT) or summed value (SUM).
271+
///
272+
/// The SQL parser only accepts identifier ORDER BY targets, so the descending
273+
/// order must reference the aggregate's alias (e.g. `transfer_events`), not the
274+
/// `COUNT(col)` / `SUM(col)` expression itself.
275+
///
276+
/// This detection inspects a single SELECT layer only. For nested queries the
277+
/// ORDER BY / LIMIT sit on the outer SELECT, which on its own matches this
278+
/// shape; callers that must exclude nested patterns (e.g. spatial-over-temporal)
279+
/// are responsible for gating before calling this (the query engine gates on
280+
/// query pattern type; the planner rejects nested queries up front).
281+
pub fn detect_sql_topk(query_data: &SQLQueryData) -> Option<SqlTopk> {
282+
let k = query_data.limit?;
283+
// LIMIT 0 is an empty-result query, not a top-k heavy-hitter request.
284+
if k == 0 {
285+
return None;
286+
}
287+
// Need a GROUP BY key to rank and an ORDER BY to define "top".
288+
if query_data.labels.is_empty() || query_data.order_by.is_empty() {
289+
return None;
290+
}
291+
// CountMinSketchWithHeap tracks heavy hitters by COUNT (unit weight) or
292+
// SUM (value weight). Any other aggregate (MIN/MAX/quantile/...) cannot be
293+
// served by the additive frequency sketch.
294+
let name = query_data.aggregation_info.get_name();
295+
let weighting = if name.eq_ignore_ascii_case("count") {
296+
TopkWeighting::Count
297+
} else if name.eq_ignore_ascii_case("sum") {
298+
TopkWeighting::Sum
299+
} else {
300+
return None;
301+
};
302+
// Primary ordering must be the aggregate alias, descending (largest first).
303+
let primary = &query_data.order_by[0];
304+
if primary.ascending {
305+
return None;
306+
}
307+
// ORDER BY may differ only by identifier case for unquoted aliases.
308+
let alias = query_data.aggregation_alias.as_deref()?;
309+
if !alias.eq_ignore_ascii_case(primary.column.as_str()) {
310+
return None;
311+
}
312+
Some(SqlTopk { k, weighting })
313+
}

asap-planner-rs/src/planner/elastic_dsl.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -114,6 +114,7 @@ impl ElasticSingleQueryProcessor {
114114
agg_type,
115115
agg_sub_type,
116116
None,
117+
None,
117118
self.sketch_parameters.as_ref(),
118119
)
119120
},

asap-planner-rs/src/planner/sketch.rs

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -15,13 +15,16 @@ const DEFAULT_HLL_PRECISION: u64 = 14;
1515

1616
/// Shared sketch parameter builder used by both PromQL and SQL paths.
1717
///
18-
/// `topk_k` is only required for `CountMinSketchWithHeap`: PromQL supplies it
19-
/// from the `topk(k, …)` query argument; SQL passes `None` (SQL never produces
20-
/// this operator today, so the `None` branch is unreachable in practice).
18+
/// `topk_k` is required for `CountMinSketchWithHeap`. PromQL supplies it from
19+
/// the `topk(k, …)` query argument; SQL supplies it from `LIMIT k`.
20+
///
21+
/// `topk_count_events` disambiguates COUNT vs SUM SQL top-k (`true` / `false`).
22+
/// PromQL passes `None` and omits the parameter (defaults to count semantics).
2123
pub fn build_sketch_parameters(
2224
aggregation_type: AggregationType,
2325
aggregation_sub_type: &str,
2426
topk_k: Option<u64>,
27+
topk_count_events: Option<bool>,
2528
sketch_params: Option<&SketchParameterOverrides>,
2629
) -> Result<HashMap<String, serde_json::Value>, String> {
2730
match aggregation_type {
@@ -77,6 +80,12 @@ pub fn build_sketch_parameters(
7780
"heapsize".to_string(),
7881
serde_json::Value::Number((k * heap_mult).into()),
7982
);
83+
if let Some(count_events) = topk_count_events {
84+
m.insert(
85+
"count_events".to_string(),
86+
serde_json::Value::Bool(count_events),
87+
);
88+
}
8089
Ok(m)
8190
}
8291

@@ -158,6 +167,7 @@ pub fn build_sketch_parameters_from_promql(
158167
aggregation_type,
159168
aggregation_sub_type,
160169
topk_k,
170+
None,
161171
sketch_params,
162172
)
163173
}

asap-planner-rs/src/planner/sql.rs

Lines changed: 26 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ use std::collections::HashSet;
33
use asap_types::enums::{CleanupPolicy, WindowType};
44
use promql_utilities::data_model::KeyByLabelNames;
55
use promql_utilities::query_logics::enums::{AggregationType, QueryTreatmentType, Statistic};
6-
use sql_utilities::ast_matching::sqlhelper::Table;
6+
use sql_utilities::ast_matching::sqlhelper::{detect_sql_topk, Table};
77
use sql_utilities::ast_matching::sqlpattern_matcher::{QueryType, SQLPatternMatcher};
88
use sql_utilities::ast_matching::sqlpattern_parser::SQLPatternParser;
99
use sql_utilities::ast_matching::SQLSchema;
@@ -105,16 +105,26 @@ impl SQLSingleQueryProcessor {
105105

106106
// Label routing
107107
let spatial_output = KeyByLabelNames::new(labels.iter().cloned().collect::<Vec<_>>());
108+
// Top-k needs ORDER BY / LIMIT from the parser; SQLPatternMatcher drops them
109+
// when building `sql_query.query_data[0]`, so use `qdata` not query_data[0].
110+
let sql_topk = detect_sql_topk(&qdata);
108111
let treatment_type = get_sql_treatment_type(agg_info.get_name());
109-
let statistics = get_sql_statistics(agg_info.get_name())?;
112+
let statistics = if sql_topk.is_some() {
113+
vec![Statistic::Topk]
114+
} else {
115+
get_sql_statistics(agg_info.get_name())?
116+
};
110117
let rollup = if statistics.contains(&Statistic::Cardinality) {
111118
// Distinct target is value_column, not a rollup label dimension.
112119
KeyByLabelNames::empty()
113120
} else {
114121
all_metadata.difference(&spatial_output)
115122
};
116123

117-
let configs = build_agg_configs_for_statistics(
124+
let topk_k = sql_topk.map(|t| t.k);
125+
let topk_count_events = sql_topk.map(|t| t.count_events());
126+
127+
let mut configs = build_agg_configs_for_statistics(
118128
&statistics,
119129
treatment_type,
120130
&spatial_output,
@@ -128,13 +138,25 @@ impl SQLSingleQueryProcessor {
128138
build_sketch_parameters(
129139
agg_type,
130140
agg_sub_type,
131-
None,
141+
topk_k,
142+
topk_count_events,
132143
self.sketch_parameters.as_ref(),
133144
)
134145
},
135146
)
136147
.map_err(ControllerError::SqlParse)?;
137148

149+
if sql_topk.is_some() {
150+
for cfg in &mut configs {
151+
if cfg.aggregation_type == AggregationType::CountMinSketchWithHeap {
152+
// Heap-only self-keyed layout: the GROUP BY column is tracked
153+
// inside the sketch's aggregated dimension, not as a partition key.
154+
cfg.grouping_labels = KeyByLabelNames::empty();
155+
cfg.aggregated_labels = spatial_output.clone();
156+
}
157+
}
158+
}
159+
138160
let t_lookback = match query_type {
139161
QueryType::Spatial => self.data_ingestion_interval,
140162
_ => sql_query.query_data[0].time_info.get_duration() as u64,

asap-planner-rs/src/planner_output.rs

Lines changed: 15 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,8 +6,8 @@ use asap_types::streaming_config::StreamingConfig;
66

77
use crate::generator::{
88
GeneratorOutput, PuntedQuery, KEY_AGGREGATIONS, KEY_AGG_SUB_TYPE, KEY_AGG_TYPE, KEY_LABELS,
9-
KEY_NUM_AGG_TO_RETAIN, KEY_QUERIES, KEY_QUERY, KEY_READ_COUNT_THRESHOLD, KEY_TABLE_NAME,
10-
KEY_VALUE_COLUMN, KEY_WINDOW_SIZE,
9+
KEY_NUM_AGG_TO_RETAIN, KEY_PARAMETERS, KEY_QUERIES, KEY_QUERY, KEY_READ_COUNT_THRESHOLD,
10+
KEY_TABLE_NAME, KEY_VALUE_COLUMN, KEY_WINDOW_SIZE,
1111
};
1212

1313
/// Output of the planning process — contains the two YAML configs
@@ -227,4 +227,17 @@ impl PlannerOutput {
227227
})
228228
.unwrap_or(false)
229229
}
230+
231+
/// Returns a sketch parameter from the first aggregation matching `agg_type`.
232+
pub fn aggregation_parameter(&self, agg_type: &str, key: &str) -> Option<YamlValue> {
233+
self.find_aggregation_by_type(agg_type)
234+
.and_then(|m| m.get(KEY_PARAMETERS))
235+
.and_then(|v| {
236+
if let YamlValue::Mapping(params) = v {
237+
params.get(key).cloned()
238+
} else {
239+
None
240+
}
241+
})
242+
}
230243
}

asap-planner-rs/tests/sql_integration.rs

Lines changed: 135 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -834,3 +834,138 @@ fn t_not_multiple_of_data_ingestion_interval_returns_planner_error() {
834834
.generate();
835835
assert!(matches!(result, Err(ControllerError::PlannerError(_))));
836836
}
837+
838+
// ── SQL top-k (CountMinSketchWithHeap) ────────────────────────────────────────
839+
840+
/// COUNT … ORDER BY <alias> DESC LIMIT k → single heap-only sketch.
841+
#[test]
842+
fn spatial_count_topk_heap() {
843+
let q = "SELECT srcip, COUNT(pkt_len) AS transfer_events FROM netflow_table \
844+
WHERE time BETWEEN DATEADD(s, -11, NOW()) AND DATEADD(s, -10, NOW()) \
845+
GROUP BY srcip ORDER BY transfer_events DESC LIMIT 10";
846+
let out = SQLController::from_yaml(&netflow_one_query_config(q, 1), sql_opts_1s_ingest())
847+
.unwrap()
848+
.generate()
849+
.unwrap();
850+
851+
assert_eq!(out.streaming_aggregation_count(), 1);
852+
assert_eq!(out.inference_query_count(), 1);
853+
assert!(out.has_aggregation_type("CountMinSketchWithHeap"));
854+
assert!(out.has_aggregation_type_and_sub_type("CountMinSketchWithHeap", "topk"));
855+
assert!(!out.has_aggregation_type("DeltaSetAggregator"));
856+
assert!(!out.has_aggregation_type("CountMinSketch"));
857+
assert!(out.all_tumbling_window_sizes_eq(1));
858+
assert_eq!(
859+
out.aggregation_labels("CountMinSketchWithHeap", "grouping"),
860+
Vec::<String>::new()
861+
);
862+
assert_eq!(
863+
out.aggregation_labels("CountMinSketchWithHeap", "aggregated"),
864+
vec!["srcip".to_string()]
865+
);
866+
let mut rollup = out.aggregation_labels("CountMinSketchWithHeap", "rollup");
867+
rollup.sort();
868+
assert_eq!(rollup, vec!["dstip".to_string(), "proto".to_string()]);
869+
assert_eq!(
870+
out.aggregation_parameter("CountMinSketchWithHeap", "heapsize")
871+
.and_then(|v| v.as_u64()),
872+
Some(40)
873+
);
874+
assert_eq!(
875+
out.aggregation_parameter("CountMinSketchWithHeap", "count_events")
876+
.and_then(|v| v.as_bool()),
877+
Some(true)
878+
);
879+
}
880+
881+
/// SUM … ORDER BY <alias> DESC LIMIT k → value-weighted heap sketch.
882+
#[test]
883+
fn spatial_sum_topk_heap() {
884+
let q = "SELECT srcip, SUM(pkt_len) AS total_bytes FROM netflow_table \
885+
WHERE time BETWEEN DATEADD(s, -11, NOW()) AND DATEADD(s, -10, NOW()) \
886+
GROUP BY srcip ORDER BY total_bytes DESC LIMIT 10";
887+
let out = SQLController::from_yaml(&netflow_one_query_config(q, 1), sql_opts_1s_ingest())
888+
.unwrap()
889+
.generate()
890+
.unwrap();
891+
892+
assert_eq!(out.streaming_aggregation_count(), 1);
893+
assert_eq!(out.inference_query_count(), 1);
894+
assert!(out.has_aggregation_type("CountMinSketchWithHeap"));
895+
assert!(out.has_aggregation_type_and_sub_type("CountMinSketchWithHeap", "topk"));
896+
assert!(!out.has_aggregation_type("DeltaSetAggregator"));
897+
assert!(!out.has_aggregation_type("CountMinSketch"));
898+
assert!(out.all_tumbling_window_sizes_eq(1));
899+
assert_eq!(
900+
out.aggregation_labels("CountMinSketchWithHeap", "grouping"),
901+
Vec::<String>::new()
902+
);
903+
assert_eq!(
904+
out.aggregation_labels("CountMinSketchWithHeap", "aggregated"),
905+
vec!["srcip".to_string()]
906+
);
907+
let mut rollup = out.aggregation_labels("CountMinSketchWithHeap", "rollup");
908+
rollup.sort();
909+
assert_eq!(rollup, vec!["dstip".to_string(), "proto".to_string()]);
910+
assert_eq!(
911+
out.aggregation_parameter("CountMinSketchWithHeap", "heapsize")
912+
.and_then(|v| v.as_u64()),
913+
Some(40)
914+
);
915+
assert_eq!(
916+
out.aggregation_parameter("CountMinSketchWithHeap", "count_events")
917+
.and_then(|v| v.as_bool()),
918+
Some(false)
919+
);
920+
}
921+
922+
/// Plain COUNT without ORDER BY / LIMIT stays on the CMS + DeltaSet path.
923+
#[test]
924+
fn spatial_count_without_order_by_is_not_topk() {
925+
let q = "SELECT srcip, COUNT(pkt_len) AS transfer_events FROM netflow_table \
926+
WHERE time BETWEEN DATEADD(s, -11, NOW()) AND DATEADD(s, -10, NOW()) \
927+
GROUP BY srcip";
928+
let out = SQLController::from_yaml(&netflow_one_query_config(q, 1), sql_opts_1s_ingest())
929+
.unwrap()
930+
.generate()
931+
.unwrap();
932+
933+
assert_eq!(out.streaming_aggregation_count(), 2);
934+
assert!(out.has_aggregation_type("CountMinSketch"));
935+
assert!(out.has_aggregation_type("DeltaSetAggregator"));
936+
assert!(!out.has_aggregation_type("CountMinSketchWithHeap"));
937+
}
938+
939+
/// ORDER BY aggregate alias ASC (bottom-k) stays on the CMS + DeltaSet path.
940+
#[test]
941+
fn spatial_count_order_by_asc_limit_is_not_topk() {
942+
let q = "SELECT srcip, COUNT(pkt_len) AS transfer_events FROM netflow_table \
943+
WHERE time BETWEEN DATEADD(s, -11, NOW()) AND DATEADD(s, -10, NOW()) \
944+
GROUP BY srcip ORDER BY transfer_events ASC LIMIT 10";
945+
let out = SQLController::from_yaml(&netflow_one_query_config(q, 1), sql_opts_1s_ingest())
946+
.unwrap()
947+
.generate()
948+
.unwrap();
949+
950+
assert_eq!(out.streaming_aggregation_count(), 2);
951+
assert!(out.has_aggregation_type("CountMinSketch"));
952+
assert!(out.has_aggregation_type("DeltaSetAggregator"));
953+
assert!(!out.has_aggregation_type("CountMinSketchWithHeap"));
954+
}
955+
956+
/// LIMIT 0 is treated as non-top-k and uses the normal CMS + DeltaSet path.
957+
#[test]
958+
fn spatial_count_limit_zero_is_not_topk() {
959+
let q = "SELECT srcip, COUNT(pkt_len) AS transfer_events FROM netflow_table \
960+
WHERE time BETWEEN DATEADD(s, -11, NOW()) AND DATEADD(s, -10, NOW()) \
961+
GROUP BY srcip ORDER BY transfer_events DESC LIMIT 0";
962+
let out = SQLController::from_yaml(&netflow_one_query_config(q, 1), sql_opts_1s_ingest())
963+
.unwrap()
964+
.generate()
965+
.unwrap();
966+
967+
assert_eq!(out.streaming_aggregation_count(), 2);
968+
assert!(out.has_aggregation_type("CountMinSketch"));
969+
assert!(out.has_aggregation_type("DeltaSetAggregator"));
970+
assert!(!out.has_aggregation_type("CountMinSketchWithHeap"));
971+
}

0 commit comments

Comments
 (0)