Skip to content

Commit 4b7c1d8

Browse files
Harden SQL top-k detection (LIMIT 0, alias case) and expand planner/engine tests
1 parent 2c969a5 commit 4b7c1d8

4 files changed

Lines changed: 101 additions & 2 deletions

File tree

asap-common/dependencies/rs/sql_utilities/src/ast_matching/sqlhelper.rs

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -280,6 +280,10 @@ impl SqlTopk {
280280
/// query pattern type; the planner rejects nested queries up front).
281281
pub fn detect_sql_topk(query_data: &SQLQueryData) -> Option<SqlTopk> {
282282
let k = query_data.limit?;
283+
// LIMIT 0 is an empty-result query, not a top-k heavy-hitter request.
284+
if k == 0 {
285+
return None;
286+
}
283287
// Need a GROUP BY key to rank and an ORDER BY to define "top".
284288
if query_data.labels.is_empty() || query_data.order_by.is_empty() {
285289
return None;
@@ -300,8 +304,11 @@ pub fn detect_sql_topk(query_data: &SQLQueryData) -> Option<SqlTopk> {
300304
if primary.ascending {
301305
return None;
302306
}
303-
if query_data.aggregation_alias.as_deref() != Some(primary.column.as_str()) {
307+
// ORDER BY may differ only by identifier case for unquoted aliases.
308+
let alias = query_data.aggregation_alias.as_deref()?;
309+
if !alias.eq_ignore_ascii_case(primary.column.as_str()) {
304310
return None;
305311
}
306312
Some(SqlTopk { k, weighting })
307313
}
314+

asap-planner-rs/src/planner/sql.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -105,6 +105,8 @@ impl SQLSingleQueryProcessor {
105105

106106
// Label routing
107107
let spatial_output = KeyByLabelNames::new(labels.iter().cloned().collect::<Vec<_>>());
108+
// Top-k needs ORDER BY / LIMIT from the parser; SQLPatternMatcher drops them
109+
// when building `sql_query.query_data[0]`, so use `qdata` not query_data[0].
108110
let sql_topk = detect_sql_topk(&qdata);
109111
let treatment_type = get_sql_treatment_type(agg_info.get_name());
110112
let statistics = if sql_topk.is_some() {

asap-planner-rs/tests/sql_integration.rs

Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -890,8 +890,28 @@ fn spatial_sum_topk_heap() {
890890
.unwrap();
891891

892892
assert_eq!(out.streaming_aggregation_count(), 1);
893+
assert_eq!(out.inference_query_count(), 1);
893894
assert!(out.has_aggregation_type("CountMinSketchWithHeap"));
895+
assert!(out.has_aggregation_type_and_sub_type("CountMinSketchWithHeap", "topk"));
894896
assert!(!out.has_aggregation_type("DeltaSetAggregator"));
897+
assert!(!out.has_aggregation_type("CountMinSketch"));
898+
assert!(out.all_tumbling_window_sizes_eq(1));
899+
assert_eq!(
900+
out.aggregation_labels("CountMinSketchWithHeap", "grouping"),
901+
Vec::<String>::new()
902+
);
903+
assert_eq!(
904+
out.aggregation_labels("CountMinSketchWithHeap", "aggregated"),
905+
vec!["srcip".to_string()]
906+
);
907+
let mut rollup = out.aggregation_labels("CountMinSketchWithHeap", "rollup");
908+
rollup.sort();
909+
assert_eq!(rollup, vec!["dstip".to_string(), "proto".to_string()]);
910+
assert_eq!(
911+
out.aggregation_parameter("CountMinSketchWithHeap", "heapsize")
912+
.and_then(|v| v.as_u64()),
913+
Some(40)
914+
);
895915
assert_eq!(
896916
out.aggregation_parameter("CountMinSketchWithHeap", "count_events")
897917
.and_then(|v| v.as_bool()),
@@ -915,3 +935,37 @@ fn spatial_count_without_order_by_is_not_topk() {
915935
assert!(out.has_aggregation_type("DeltaSetAggregator"));
916936
assert!(!out.has_aggregation_type("CountMinSketchWithHeap"));
917937
}
938+
939+
/// ORDER BY aggregate alias ASC (bottom-k) stays on the CMS + DeltaSet path.
940+
#[test]
941+
fn spatial_count_order_by_asc_limit_is_not_topk() {
942+
let q = "SELECT srcip, COUNT(pkt_len) AS transfer_events FROM netflow_table \
943+
WHERE time BETWEEN DATEADD(s, -11, NOW()) AND DATEADD(s, -10, NOW()) \
944+
GROUP BY srcip ORDER BY transfer_events ASC LIMIT 10";
945+
let out = SQLController::from_yaml(&netflow_one_query_config(q, 1), sql_opts_1s_ingest())
946+
.unwrap()
947+
.generate()
948+
.unwrap();
949+
950+
assert_eq!(out.streaming_aggregation_count(), 2);
951+
assert!(out.has_aggregation_type("CountMinSketch"));
952+
assert!(out.has_aggregation_type("DeltaSetAggregator"));
953+
assert!(!out.has_aggregation_type("CountMinSketchWithHeap"));
954+
}
955+
956+
/// LIMIT 0 is treated as non-top-k and uses the normal CMS + DeltaSet path.
957+
#[test]
958+
fn spatial_count_limit_zero_is_not_topk() {
959+
let q = "SELECT srcip, COUNT(pkt_len) AS transfer_events FROM netflow_table \
960+
WHERE time BETWEEN DATEADD(s, -11, NOW()) AND DATEADD(s, -10, NOW()) \
961+
GROUP BY srcip ORDER BY transfer_events DESC LIMIT 0";
962+
let out = SQLController::from_yaml(&netflow_one_query_config(q, 1), sql_opts_1s_ingest())
963+
.unwrap()
964+
.generate()
965+
.unwrap();
966+
967+
assert_eq!(out.streaming_aggregation_count(), 2);
968+
assert!(out.has_aggregation_type("CountMinSketch"));
969+
assert!(out.has_aggregation_type("DeltaSetAggregator"));
970+
assert!(!out.has_aggregation_type("CountMinSketchWithHeap"));
971+
}

asap-query-engine/src/engines/simple_engine/sql.rs

Lines changed: 37 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -772,7 +772,7 @@ impl SimpleEngine {
772772
#[cfg(test)]
773773
mod detect_topk_tests {
774774
use sql_utilities::ast_matching::{detect_sql_topk, SQLPatternParser, SqlTopk, TopkWeighting};
775-
use sql_utilities::sqlhelper::{SQLSchema, Table};
775+
use sql_utilities::sqlhelper::{AggregationInfo, OrderByItem, SQLQueryData, SQLSchema, Table, TimeInfo};
776776
use sqlparser::dialect::GenericDialect;
777777
use sqlparser::parser::Parser;
778778
use std::collections::HashSet;
@@ -836,6 +836,42 @@ mod detect_topk_tests {
836836
);
837837
}
838838

839+
#[test]
840+
fn alias_case_mismatch_still_detects_topk() {
841+
// The parser path can normalize/canonicalize identifiers; verify directly on
842+
// SQLQueryData that alias matching in detect_sql_topk is case-insensitive.
843+
let qd = SQLQueryData {
844+
aggregation_info: AggregationInfo::new("COUNT".to_string(), "pkt_len".to_string(), vec![]),
845+
aggregation_alias: Some("transfer_events".to_string()),
846+
metric: "netflow_table".to_string(),
847+
labels: HashSet::from(["srcip".to_string()]),
848+
time_info: TimeInfo::new("time".to_string(), 0.0, 1.0),
849+
subquery: None,
850+
order_by: vec![OrderByItem {
851+
column: "TRANSFER_EVENTS".to_string(),
852+
ascending: false,
853+
}],
854+
limit: Some(10),
855+
};
856+
assert_eq!(
857+
detect_sql_topk(&qd),
858+
Some(SqlTopk {
859+
k: 10,
860+
weighting: TopkWeighting::Count,
861+
}),
862+
);
863+
}
864+
865+
#[test]
866+
fn zero_limit_is_not_topk() {
867+
let sql = format!(
868+
"SELECT srcip, COUNT(pkt_len) AS transfer_events FROM netflow_table {WINDOW} \
869+
GROUP BY srcip ORDER BY transfer_events DESC LIMIT 0"
870+
);
871+
let qd = parse(&sql).expect("query should parse");
872+
assert_eq!(detect_sql_topk(&qd), None, "LIMIT 0 is not top-k");
873+
}
874+
839875
#[test]
840876
fn missing_limit_is_not_topk() {
841877
let sql = format!(

0 commit comments

Comments
 (0)