Skip to content

Commit 7043753

Browse files
Support for SUM Top-k queries using CountMinSketchWithHeap
1 parent 4668d8f commit 7043753

4 files changed

Lines changed: 369 additions & 28 deletions

File tree

asap-common/dependencies/rs/asap_types/src/capability_matching.rs

Lines changed: 156 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -101,6 +101,39 @@ pub fn spatial_filter_compatible(config_filter: &str, req_filter: &str) -> bool
101101
config_norm == req_norm
102102
}
103103

104+
/// Reads the `count_events` parameter from a `CountMinSketchWithHeap` config.
105+
/// Defaults to `true` (COUNT semantics) so existing count top-k configs that
106+
/// omit the flag keep matching COUNT top-k queries.
107+
fn config_count_events(config: &AggregationConfig) -> bool {
108+
config
109+
.parameters
110+
.get("count_events")
111+
.and_then(|v| v.as_bool())
112+
.unwrap_or(true)
113+
}
114+
115+
/// Top-k weighting compatibility. Only constrains `Statistic::Topk` candidates;
116+
/// every other statistic passes unconditionally.
117+
///
118+
/// A COUNT top-k query (`Some(true)`) must be served by a `count_events: true`
119+
/// sketch and a SUM top-k query (`Some(false)`) by a `count_events: false`
120+
/// (value-weighted) sketch. This is what tells two `CountMinSketchWithHeap`
121+
/// configs on the same metric apart. `None` (non-top-k, or PromQL top-k which
122+
/// does not pin the weighting) imposes no constraint.
123+
pub fn topk_weighting_compatible(
124+
stat: Statistic,
125+
config: &AggregationConfig,
126+
req_count_events: Option<bool>,
127+
) -> bool {
128+
if stat != Statistic::Topk {
129+
return true;
130+
}
131+
match req_count_events {
132+
Some(want) => config_count_events(config) == want,
133+
None => true,
134+
}
135+
}
136+
104137
/// Aggregation priority comparator: prefer larger `window_size` (descending).
105138
/// This is a separate function so callers can swap the policy without touching matching logic.
106139
pub fn aggregation_priority(a: &AggregationConfig, b: &AggregationConfig) -> Ordering {
@@ -157,7 +190,8 @@ pub fn find_compatible_aggregation(
157190
&& spatial_filter_compatible(
158191
&c.spatial_filter_normalized,
159192
&requirements.spatial_filter_normalized,
160-
);
193+
)
194+
&& topk_weighting_compatible(stat, c, requirements.topk_count_events);
161195
if !ok {
162196
debug!(
163197
agg_id = c.aggregation_id,
@@ -216,6 +250,10 @@ pub fn find_compatible_aggregation(
216250
}
217251

218252
// If value type is multi-population, find the paired key aggregation.
253+
// Top-k (CountMinSketchWithHeap) follows the same path as plain COUNT: the
254+
// self-keyed case is expressed via the query_config path (a single
255+
// aggregation reference), while the capability-matching fallback resolves a
256+
// separate key aggregation just like any other multi-population value type.
219257
let key_agg: &AggregationConfig = if is_multi_population_value_type(value_agg.aggregation_type)
220258
{
221259
let ka = configs
@@ -310,6 +348,7 @@ mod tests {
310348
data_range_ms,
311349
grouping_labels: KeyByLabelNames::new(grouping.iter().map(|s| s.to_string()).collect()),
312350
spatial_filter_normalized: normalize_spatial_filter(spatial_filter),
351+
topk_count_events: None,
313352
}
314353
}
315354

@@ -839,4 +878,120 @@ mod tests {
839878
);
840879
assert!(result.is_none());
841880
}
881+
882+
// --- top-k count vs sum weighting ---
883+
//
884+
// Top-k follows the same capability-matching path as plain COUNT: a
885+
// CountMinSketchWithHeap is a multi-population value type, so the fallback
886+
// pairs it with a key aggregation. These tests therefore always provision a
887+
// DeltaSetAggregator and focus on which *value* heap (count- vs sum-weighted)
888+
// is selected via the count_events discriminator.
889+
890+
/// Paired key aggregation required by the multi-population fallback.
891+
fn make_key_agg(id: u64, metric: &str) -> AggregationConfig {
892+
make_config(id, metric, "DeltaSetAggregator", "", 1, "tumbling", &[], "")
893+
}
894+
895+
/// `CountMinSketchWithHeap` config with an explicit `count_events` parameter.
896+
fn make_topk_config(id: u64, metric: &str, count_events: bool) -> AggregationConfig {
897+
let mut c = make_config(
898+
id,
899+
metric,
900+
"CountMinSketchWithHeap",
901+
"",
902+
1,
903+
"tumbling",
904+
&[],
905+
"",
906+
);
907+
c.parameters.insert(
908+
"count_events".to_string(),
909+
serde_json::Value::Bool(count_events),
910+
);
911+
c
912+
}
913+
914+
fn topk_req(metric: &str, count_events: Option<bool>) -> QueryRequirements {
915+
let mut r = req(metric, &[Statistic::Topk], Some(1_000), &[], "");
916+
r.topk_count_events = count_events;
917+
r
918+
}
919+
920+
#[test]
921+
fn topk_count_query_picks_count_events_sketch() {
922+
// Two heap sketches on the same metric: one count-weighted, one
923+
// sum-weighted. A COUNT top-k query must resolve to the count one.
924+
let mut configs = HashMap::new();
925+
configs.insert(1, make_topk_config(1, "netflow_table", true));
926+
configs.insert(2, make_topk_config(2, "netflow_table", false));
927+
configs.insert(9, make_key_agg(9, "netflow_table"));
928+
929+
let result = find_compatible_aggregation(&configs, &topk_req("netflow_table", Some(true)))
930+
.expect("COUNT top-k should match the count_events sketch");
931+
assert_eq!(result.aggregation_id_for_value, 1);
932+
assert_eq!(result.aggregation_id_for_key, 9);
933+
}
934+
935+
#[test]
936+
fn topk_sum_query_picks_value_weighted_sketch() {
937+
let mut configs = HashMap::new();
938+
configs.insert(1, make_topk_config(1, "netflow_table", true));
939+
configs.insert(2, make_topk_config(2, "netflow_table", false));
940+
configs.insert(9, make_key_agg(9, "netflow_table"));
941+
942+
let result = find_compatible_aggregation(&configs, &topk_req("netflow_table", Some(false)))
943+
.expect("SUM top-k should match the count_events: false sketch");
944+
assert_eq!(result.aggregation_id_for_value, 2);
945+
assert_eq!(result.aggregation_id_for_key, 9);
946+
}
947+
948+
#[test]
949+
fn topk_sum_query_rejects_count_only_sketch() {
950+
// Only a count-weighted sketch exists; a SUM top-k query cannot be served
951+
// even with a key agg available.
952+
let mut configs = HashMap::new();
953+
configs.insert(1, make_topk_config(1, "netflow_table", true));
954+
configs.insert(9, make_key_agg(9, "netflow_table"));
955+
let result = find_compatible_aggregation(&configs, &topk_req("netflow_table", Some(false)));
956+
assert!(
957+
result.is_none(),
958+
"SUM top-k must not fall back to a count_events: true sketch",
959+
);
960+
}
961+
962+
#[test]
963+
fn topk_count_query_matches_sketch_without_explicit_flag() {
964+
// Configs that omit `count_events` default to count semantics, so a
965+
// COUNT top-k query still matches (backwards compatibility).
966+
let mut configs = HashMap::new();
967+
configs.insert(
968+
7,
969+
make_config(
970+
7,
971+
"netflow_table",
972+
"CountMinSketchWithHeap",
973+
"",
974+
1,
975+
"tumbling",
976+
&[],
977+
"",
978+
),
979+
);
980+
configs.insert(9, make_key_agg(9, "netflow_table"));
981+
let result = find_compatible_aggregation(&configs, &topk_req("netflow_table", Some(true)))
982+
.expect("default (no flag) sketch should serve COUNT top-k");
983+
assert_eq!(result.aggregation_id_for_value, 7);
984+
}
985+
986+
#[test]
987+
fn topk_unconstrained_weighting_matches_any_sketch() {
988+
// `topk_count_events: None` (e.g. PromQL top-k) imposes no weighting
989+
// constraint, so any heap sketch on the metric matches.
990+
let mut configs = HashMap::new();
991+
configs.insert(3, make_topk_config(3, "netflow_table", false));
992+
configs.insert(9, make_key_agg(9, "netflow_table"));
993+
let result = find_compatible_aggregation(&configs, &topk_req("netflow_table", None))
994+
.expect("unconstrained top-k should match regardless of count_events");
995+
assert_eq!(result.aggregation_id_for_value, 3);
996+
}
842997
}

asap-common/dependencies/rs/asap_types/src/query_requirements.rs

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,4 +18,13 @@ pub struct QueryRequirements {
1818
pub grouping_labels: KeyByLabelNames,
1919
/// Normalized label filter (produced by normalize_spatial_filter).
2020
pub spatial_filter_normalized: String,
21+
/// For `Statistic::Topk` requirements, the heavy-hitter weighting the query
22+
/// needs, used to disambiguate two `CountMinSketchWithHeap` configs on the
23+
/// same metric:
24+
/// * `Some(true)` → COUNT semantics (`count_events: true`, weight 1/event),
25+
/// * `Some(false)` → SUM semantics (`count_events: false`, weight = value).
26+
///
27+
/// `None` for non-top-k requirements (and for PromQL top-k, which does not
28+
/// constrain the sketch weighting); matching ignores it when `None`.
29+
pub topk_count_events: Option<bool>,
2130
}

asap-query-engine/src/engines/simple_engine/promql.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -648,6 +648,9 @@ impl SimpleEngine {
648648
data_range_ms,
649649
grouping_labels,
650650
spatial_filter_normalized: normalize_spatial_filter(&spatial_filter),
651+
// PromQL top-k does not constrain the sketch weighting; leave the
652+
// count/sum discriminator unset so matching does not over-filter.
653+
topk_count_events: None,
651654
}
652655
}
653656

0 commit comments

Comments
 (0)