@@ -1189,15 +1189,21 @@ mod sort_and_truncate_tests {
11891189 }
11901190}
11911191
1192- /// End-to-end tests for SQL top-k queries served by `CountMinSketchWithHeap`.
1192+ /// End-to-end SQL top-k pipeline tests for `CountMinSketchWithHeap`.
11931193///
1194- /// Exercises the full path for `SELECT srcip, COUNT(pkt_len) AS k FROM
1195- /// netflow_table WHERE <1s window> GROUP BY srcip ORDER BY k DESC LIMIT n`:
1196- /// * SQL detection promotes it to `Statistic::Topk`.
1197- /// * The single `CountMinSketchWithHeap` aggregation resolves self-keyed
1198- /// (key id == value id), so the sketch heap enumerates candidate `srcip`s.
1199- /// * The pipeline sorts by count descending and truncates to `n`, without
1200- /// PromQL-style metric-name prefixing (rows stay bare `(srcip, count)`).
1194+ /// Covers both resolution paths:
1195+ /// * **query_config** — self-keyed single-aggregation reference
1196+ /// * **capability matching** — heap + paired `DeltaSetAggregator`, no query_config
1197+ ///
1198+ /// Example query shape:
1199+ /// ```sql
1200+ /// SELECT srcip, COUNT(pkt_len) AS transfer_events
1201+ /// FROM netflow_table WHERE <1s window> GROUP BY srcip ORDER BY transfer_events DESC LIMIT n
1202+ /// ```
1203+ /// SQL detection promotes it to `Statistic::Topk`. On the query_config path the
1204+ /// heap is self-keyed; on the capability path a separate key aggregation is paired.
1205+ /// The pipeline sorts by value descending and truncates to `n`, without PromQL-style
1206+ /// metric-name prefixing (rows stay bare `(srcip, count)`).
12011207///
12021208/// Lives here alongside `detect_topk_tests` / `sort_and_truncate_tests` so all
12031209/// SQL top-k coverage is co-located in the SQL handler. Unlike those pure-fn
@@ -1387,6 +1393,104 @@ mod topk_pipeline_tests {
13871393 )
13881394 }
13891395
1396+ const HEAP_COUNT_ID : u64 = 111 ;
1397+ const HEAP_SUM_ID : u64 = 112 ;
1398+ const HEAP_DEFAULT_ID : u64 = 113 ;
1399+ const KEY_AGG_ID : u64 = 211 ;
1400+
1401+ fn netflow_sql_schema ( ) -> SQLSchema {
1402+ let value_cols: HashSet < String > = [ "pkt_len" ] . iter ( ) . map ( |s| s. to_string ( ) ) . collect ( ) ;
1403+ let labels: HashSet < String > = [ "srcip" , "dstip" , "proto" ]
1404+ . iter ( )
1405+ . map ( |s| s. to_string ( ) )
1406+ . collect ( ) ;
1407+ let table = Table :: new ( METRIC . to_string ( ) , "time" . to_string ( ) , value_cols, labels) ;
1408+ SQLSchema :: new ( vec ! [ table] )
1409+ }
1410+
1411+ /// `CountMinSketchWithHeap` for capability-matching tests. When `count_events`
1412+ /// is `None`, the parameter is omitted so the config relies on the default
1413+ /// (`count_events: true`).
1414+ fn make_heap_agg ( id : u64 , count_events : Option < bool > ) -> AggregationConfig {
1415+ let mut parameters = HashMap :: new ( ) ;
1416+ if let Some ( count_events) = count_events {
1417+ parameters. insert ( "count_events" . to_string ( ) , serde_json:: json!( count_events) ) ;
1418+ }
1419+ AggregationConfig {
1420+ aggregation_id : id,
1421+ aggregation_type : AggregationType :: CountMinSketchWithHeap ,
1422+ aggregation_sub_type : String :: new ( ) ,
1423+ parameters,
1424+ grouping_labels : KeyByLabelNames :: empty ( ) ,
1425+ aggregated_labels : KeyByLabelNames :: new ( vec ! [ "srcip" . to_string( ) ] ) ,
1426+ rollup_labels : KeyByLabelNames :: empty ( ) ,
1427+ original_yaml : String :: new ( ) ,
1428+ window_size : 1 ,
1429+ slide_interval : 1 ,
1430+ window_type : WindowType :: Tumbling ,
1431+ spatial_filter : String :: new ( ) ,
1432+ spatial_filter_normalized : String :: new ( ) ,
1433+ metric : METRIC . to_string ( ) ,
1434+ num_aggregates_to_retain : None ,
1435+ read_count_threshold : None ,
1436+ table_name : None ,
1437+ value_column : None ,
1438+ }
1439+ }
1440+
1441+ fn make_delta_set_key_agg ( id : u64 ) -> AggregationConfig {
1442+ AggregationConfig {
1443+ aggregation_id : id,
1444+ aggregation_type : AggregationType :: DeltaSetAggregator ,
1445+ aggregation_sub_type : String :: new ( ) ,
1446+ parameters : HashMap :: new ( ) ,
1447+ grouping_labels : KeyByLabelNames :: empty ( ) ,
1448+ aggregated_labels : KeyByLabelNames :: empty ( ) ,
1449+ rollup_labels : KeyByLabelNames :: empty ( ) ,
1450+ original_yaml : String :: new ( ) ,
1451+ window_size : 1 ,
1452+ slide_interval : 1 ,
1453+ window_type : WindowType :: Tumbling ,
1454+ spatial_filter : String :: new ( ) ,
1455+ spatial_filter_normalized : String :: new ( ) ,
1456+ metric : METRIC . to_string ( ) ,
1457+ num_aggregates_to_retain : None ,
1458+ read_count_threshold : None ,
1459+ table_name : None ,
1460+ value_column : None ,
1461+ }
1462+ }
1463+
1464+ /// Engine with **no** query_configs so top-k resolves via capability matching.
1465+ /// Always provisions a paired `DeltaSetAggregator` key aggregation.
1466+ fn build_capability_fallback_engine ( heap_configs : Vec < AggregationConfig > ) -> SimpleEngine {
1467+ let mut agg_configs = HashMap :: new ( ) ;
1468+ for heap in & heap_configs {
1469+ agg_configs. insert ( heap. aggregation_id , heap. clone ( ) ) ;
1470+ }
1471+ agg_configs. insert ( KEY_AGG_ID , make_delta_set_key_agg ( KEY_AGG_ID ) ) ;
1472+
1473+ let streaming_config = Arc :: new ( StreamingConfig {
1474+ aggregation_configs : agg_configs,
1475+ } ) ;
1476+ let store = Arc :: new ( SimpleMapStore :: new (
1477+ streaming_config. clone ( ) ,
1478+ CleanupPolicy :: NoCleanup ,
1479+ ) ) ;
1480+ let inference_config = InferenceConfig {
1481+ schema : SchemaConfig :: SQL ( netflow_sql_schema ( ) ) ,
1482+ query_configs : vec ! [ ] ,
1483+ cleanup_policy : CleanupPolicy :: NoCleanup ,
1484+ } ;
1485+ SimpleEngine :: new (
1486+ store,
1487+ inference_config,
1488+ streaming_config,
1489+ 1 ,
1490+ QueryLanguage :: sql,
1491+ )
1492+ }
1493+
13901494 #[ test]
13911495 fn sum_topk_resolves_self_keyed_heap ( ) {
13921496 // SUM(col) ORDER BY DESC LIMIT k is a top-k query and, like COUNT,
@@ -1502,4 +1606,104 @@ mod topk_pipeline_tests {
15021606 let expected: HashSet < String > = ( 6 ..=15u64 ) . map ( |i| format ! ( "10.0.0.{i}" ) ) . collect ( ) ;
15031607 assert_eq ! ( returned, expected) ;
15041608 }
1609+
1610+ #[ test]
1611+ fn count_topk_capability_fallback_pairs_heap_with_key_agg ( ) {
1612+ let engine = build_capability_fallback_engine ( vec ! [ make_heap_agg(
1613+ HEAP_COUNT_ID ,
1614+ Some ( true ) ,
1615+ ) ] ) ;
1616+ let context = engine
1617+ . build_query_execution_context_sql ( topk_query ( 10 ) , QUERY_TIME )
1618+ . expect ( "COUNT top-k should resolve via capability matching" ) ;
1619+
1620+ assert_eq ! ( context. metadata. statistic_to_compute, Statistic :: Topk ) ;
1621+ assert_eq ! (
1622+ context. agg_info. aggregation_id_for_value,
1623+ HEAP_COUNT_ID ,
1624+ "count-weighted heap must be the value aggregation" ,
1625+ ) ;
1626+ assert_eq ! (
1627+ context. agg_info. aggregation_id_for_key, KEY_AGG_ID ,
1628+ "multi-population top-k must pair heap with DeltaSetAggregator" ,
1629+ ) ;
1630+ assert_ne ! (
1631+ context. agg_info. aggregation_id_for_key,
1632+ context. agg_info. aggregation_id_for_value,
1633+ ) ;
1634+ assert ! (
1635+ context. store_plan. keys_query. is_some( ) ,
1636+ "capability fallback plans a separate keys query" ,
1637+ ) ;
1638+ }
1639+
1640+ #[ test]
1641+ fn count_topk_capability_fallback_picks_count_weighted_when_both_heaps_exist ( ) {
1642+ let engine = build_capability_fallback_engine ( vec ! [
1643+ make_heap_agg( HEAP_COUNT_ID , Some ( true ) ) ,
1644+ make_heap_agg( HEAP_SUM_ID , Some ( false ) ) ,
1645+ ] ) ;
1646+ let context = engine
1647+ . build_query_execution_context_sql ( topk_query ( 10 ) , QUERY_TIME )
1648+ . expect ( "COUNT top-k should pick the count_events: true sketch" ) ;
1649+
1650+ assert_eq ! (
1651+ context. agg_info. aggregation_id_for_value,
1652+ HEAP_COUNT_ID ,
1653+ "COUNT top-k must not pick the sum-weighted sketch when both exist" ,
1654+ ) ;
1655+ }
1656+
1657+ #[ test]
1658+ fn count_topk_capability_fallback_defaults_count_events_true ( ) {
1659+ // Heap omits `count_events`; matcher treats that as count semantics.
1660+ let engine = build_capability_fallback_engine ( vec ! [ make_heap_agg(
1661+ HEAP_DEFAULT_ID ,
1662+ None ,
1663+ ) ] ) ;
1664+ let context = engine
1665+ . build_query_execution_context_sql ( topk_query ( 10 ) , QUERY_TIME )
1666+ . expect ( "COUNT top-k should match a sketch with default count_events" ) ;
1667+
1668+ assert_eq ! (
1669+ context. agg_info. aggregation_id_for_value,
1670+ HEAP_DEFAULT_ID ,
1671+ "default (no flag) heap must serve COUNT top-k" ,
1672+ ) ;
1673+ }
1674+
1675+ #[ test]
1676+ fn sum_topk_capability_fallback_picks_value_weighted_heap ( ) {
1677+ let engine = build_capability_fallback_engine ( vec ! [
1678+ make_heap_agg( HEAP_COUNT_ID , Some ( true ) ) ,
1679+ make_heap_agg( HEAP_SUM_ID , Some ( false ) ) ,
1680+ ] ) ;
1681+ let context = engine
1682+ . build_query_execution_context_sql ( sum_topk_query ( 5 ) , QUERY_TIME )
1683+ . expect ( "SUM top-k should resolve via capability matching" ) ;
1684+
1685+ assert_eq ! ( context. metadata. statistic_to_compute, Statistic :: Topk ) ;
1686+ assert_eq ! (
1687+ context. agg_info. aggregation_id_for_value,
1688+ HEAP_SUM_ID ,
1689+ "SUM top-k must pick the count_events: false sketch" ,
1690+ ) ;
1691+ assert_eq ! ( context. agg_info. aggregation_id_for_key, KEY_AGG_ID ) ;
1692+ assert ! ( context. store_plan. keys_query. is_some( ) ) ;
1693+ }
1694+
1695+ #[ test]
1696+ fn sum_topk_capability_fallback_rejects_count_only_default_heap ( ) {
1697+ // Only a default (count-weighted) sketch exists; SUM top-k cannot be served.
1698+ let engine = build_capability_fallback_engine ( vec ! [ make_heap_agg(
1699+ HEAP_DEFAULT_ID ,
1700+ None ,
1701+ ) ] ) ;
1702+ assert ! (
1703+ engine
1704+ . build_query_execution_context_sql( sum_topk_query( 5 ) , QUERY_TIME )
1705+ . is_none( ) ,
1706+ "SUM top-k must not fall back to a count_events-default sketch" ,
1707+ ) ;
1708+ }
15051709}
0 commit comments