@@ -834,3 +834,84 @@ fn t_not_multiple_of_data_ingestion_interval_returns_planner_error() {
834834 . generate ( ) ;
835835 assert ! ( matches!( result, Err ( ControllerError :: PlannerError ( _) ) ) ) ;
836836}
837+
838+ // ── SQL top-k (CountMinSketchWithHeap) ────────────────────────────────────────
839+
840+ /// COUNT … ORDER BY <alias> DESC LIMIT k → single heap-only sketch.
841+ #[ test]
842+ fn spatial_count_topk_heap ( ) {
843+ let q = "SELECT srcip, COUNT(pkt_len) AS transfer_events FROM netflow_table \
844+ WHERE time BETWEEN DATEADD(s, -11, NOW()) AND DATEADD(s, -10, NOW()) \
845+ GROUP BY srcip ORDER BY transfer_events DESC LIMIT 10";
846+ let out = SQLController :: from_yaml ( & netflow_one_query_config ( q, 1 ) , sql_opts_1s_ingest ( ) )
847+ . unwrap ( )
848+ . generate ( )
849+ . unwrap ( ) ;
850+
851+ assert_eq ! ( out. streaming_aggregation_count( ) , 1 ) ;
852+ assert_eq ! ( out. inference_query_count( ) , 1 ) ;
853+ assert ! ( out. has_aggregation_type( "CountMinSketchWithHeap" ) ) ;
854+ assert ! ( out. has_aggregation_type_and_sub_type( "CountMinSketchWithHeap" , "topk" ) ) ;
855+ assert ! ( !out. has_aggregation_type( "DeltaSetAggregator" ) ) ;
856+ assert ! ( !out. has_aggregation_type( "CountMinSketch" ) ) ;
857+ assert ! ( out. all_tumbling_window_sizes_eq( 1 ) ) ;
858+ assert_eq ! (
859+ out. aggregation_labels( "CountMinSketchWithHeap" , "grouping" ) ,
860+ Vec :: <String >:: new( )
861+ ) ;
862+ assert_eq ! (
863+ out. aggregation_labels( "CountMinSketchWithHeap" , "aggregated" ) ,
864+ vec![ "srcip" . to_string( ) ]
865+ ) ;
866+ let mut rollup = out. aggregation_labels ( "CountMinSketchWithHeap" , "rollup" ) ;
867+ rollup. sort ( ) ;
868+ assert_eq ! ( rollup, vec![ "dstip" . to_string( ) , "proto" . to_string( ) ] ) ;
869+ assert_eq ! (
870+ out. aggregation_parameter( "CountMinSketchWithHeap" , "heapsize" )
871+ . and_then( |v| v. as_u64( ) ) ,
872+ Some ( 40 )
873+ ) ;
874+ assert_eq ! (
875+ out. aggregation_parameter( "CountMinSketchWithHeap" , "count_events" )
876+ . and_then( |v| v. as_bool( ) ) ,
877+ Some ( true )
878+ ) ;
879+ }
880+
881+ /// SUM … ORDER BY <alias> DESC LIMIT k → value-weighted heap sketch.
882+ #[ test]
883+ fn spatial_sum_topk_heap ( ) {
884+ let q = "SELECT srcip, SUM(pkt_len) AS total_bytes FROM netflow_table \
885+ WHERE time BETWEEN DATEADD(s, -11, NOW()) AND DATEADD(s, -10, NOW()) \
886+ GROUP BY srcip ORDER BY total_bytes DESC LIMIT 10";
887+ let out = SQLController :: from_yaml ( & netflow_one_query_config ( q, 1 ) , sql_opts_1s_ingest ( ) )
888+ . unwrap ( )
889+ . generate ( )
890+ . unwrap ( ) ;
891+
892+ assert_eq ! ( out. streaming_aggregation_count( ) , 1 ) ;
893+ assert ! ( out. has_aggregation_type( "CountMinSketchWithHeap" ) ) ;
894+ assert ! ( !out. has_aggregation_type( "DeltaSetAggregator" ) ) ;
895+ assert_eq ! (
896+ out. aggregation_parameter( "CountMinSketchWithHeap" , "count_events" )
897+ . and_then( |v| v. as_bool( ) ) ,
898+ Some ( false )
899+ ) ;
900+ }
901+
902+ /// Plain COUNT without ORDER BY / LIMIT stays on the CMS + DeltaSet path.
903+ #[ test]
904+ fn spatial_count_without_order_by_is_not_topk ( ) {
905+ let q = "SELECT srcip, COUNT(pkt_len) AS transfer_events FROM netflow_table \
906+ WHERE time BETWEEN DATEADD(s, -11, NOW()) AND DATEADD(s, -10, NOW()) \
907+ GROUP BY srcip";
908+ let out = SQLController :: from_yaml ( & netflow_one_query_config ( q, 1 ) , sql_opts_1s_ingest ( ) )
909+ . unwrap ( )
910+ . generate ( )
911+ . unwrap ( ) ;
912+
913+ assert_eq ! ( out. streaming_aggregation_count( ) , 2 ) ;
914+ assert ! ( out. has_aggregation_type( "CountMinSketch" ) ) ;
915+ assert ! ( out. has_aggregation_type( "DeltaSetAggregator" ) ) ;
916+ assert ! ( !out. has_aggregation_type( "CountMinSketchWithHeap" ) ) ;
917+ }
0 commit comments