Skip to content

Commit ad99858

Browse files
refactor(query-engine): split enable_topk into limiting vs formatting flags (#391)
* split enable_topk into limiting vs formatting flags * add PromQL topk pipeline and Prometheus wire format tests * formatting
1 parent 47acd15 commit ad99858

5 files changed

Lines changed: 273 additions & 59 deletions

File tree

asap-query-engine/src/engines/simple_engine/elastic.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ impl SimpleEngine {
2525
"Built execution context for ElasticSearch query {:?}",
2626
context
2727
);
28-
self.execute_context(context, false)
28+
self.execute_context(context, false, false)
2929
}
3030

3131
pub fn build_query_execution_context_elastic(

asap-query-engine/src/engines/simple_engine/mod.rs

Lines changed: 60 additions & 55 deletions
Original file line numberDiff line numberDiff line change
@@ -626,27 +626,32 @@ impl SimpleEngine {
626626
merged_keys: Option<&HashMap<Option<KeyByLabelValues>, Box<dyn AggregateCore>>>,
627627
statistic: &Statistic,
628628
query_kwargs: &HashMap<String, String>,
629-
enable_topk_limiting: bool,
630629
) -> Result<HashMap<Option<KeyByLabelValues>, f64>, String> {
631630
if let Some(keys_map) = merged_keys {
632631
// Separate keys and values
633632
self.collect_results_separate_keys(merged_values, keys_map, statistic, query_kwargs)
634633
} else {
635634
// Same aggregation for keys and values
636-
self.collect_results_same_aggregation(
637-
merged_values,
638-
statistic,
639-
query_kwargs,
640-
enable_topk_limiting,
641-
)
635+
self.collect_results_same_aggregation(merged_values, statistic, query_kwargs)
642636
}
643637
}
644638

645-
/// Executes the complete query pipeline: plan, execute, collect, and format
639+
/// Executes the complete query pipeline: plan, execute, collect, and format.
640+
///
641+
/// The two top-k flags are deliberately separate because the two engines
642+
/// need different halves of the behaviour:
643+
/// * `enable_topk_limiting` — enumerate candidate keys from the sketch
644+
/// heap and truncate to `k` during collection. Used by both PromQL and
645+
/// SQL top-k (it's what makes the heap actually drive the result set).
646+
/// * `enable_topk_formatting` — sort by value descending AND prepend the
647+
/// metric name to each key's labels. This is PromQL `topk(...)` output
648+
/// shape only; SQL returns bare `(group-by columns, value)` rows and
649+
/// applies its own ORDER BY / LIMIT, so SQL leaves this `false`.
646650
pub fn execute_query_pipeline(
647651
&self,
648652
context: &QueryExecutionContext,
649-
enable_topk: bool,
653+
enable_topk_limiting: bool,
654+
enable_topk_formatting: bool,
650655
) -> Result<Vec<InstantVectorElement>, String> {
651656
// Step 1: Execute the query plan (already created in context.store_plan)
652657
let (merged_values, merged_keys) = self.execute_and_merge_store_queries(
@@ -662,7 +667,6 @@ impl SimpleEngine {
662667
merged_keys.as_ref(),
663668
&context.metadata.statistic_to_compute,
664669
&context.metadata.query_kwargs,
665-
enable_topk, // SQL=false, PromQL=true
666670
)?;
667671
debug!(
668672
"[LATENCY] Unformatted results collection: {:.2}ms",
@@ -671,12 +675,24 @@ impl SimpleEngine {
671675

672676
// Step 3: Format results
673677
let results_start_time = Instant::now();
674-
let results = self.format_final_results(
678+
let mut results = self.format_final_results(
675679
unformatted_results,
676680
&context.metadata.statistic_to_compute,
677681
&context.metric,
678-
enable_topk, // SQL=false, PromQL=true
682+
enable_topk_formatting,
679683
);
684+
// Truncate to k when limiting is active (heap may carry heap_size > k
685+
// candidates; the query only asked for the top k).
686+
if enable_topk_limiting {
687+
if let Some(k) = context
688+
.metadata
689+
.query_kwargs
690+
.get("k")
691+
.and_then(|s| s.parse::<usize>().ok())
692+
{
693+
results.truncate(k);
694+
}
695+
}
680696
debug!(
681697
"[LATENCY] Results collection: {}ms",
682698
results_start_time.elapsed().as_millis()
@@ -841,22 +857,29 @@ impl SimpleEngine {
841857
Ok(self.format_final_results(all_results, statistic, metric, false))
842858
}
843859

844-
/// Formats unformatted results into final InstantVectorElement format
845-
/// For topk queries (when enabled), sorts by value and prepends metric name to keys
860+
/// Formats unformatted results into final InstantVectorElement format.
861+
///
862+
/// For top-k queries the rows are always sorted by value descending (that's
863+
/// the semantics of top-k, and lets the caller truncate to `k` correctly
864+
/// regardless of HashMap iteration order). `enable_topk_formatting`
865+
/// additionally prepends the metric name to each key's labels — this is the
866+
/// PromQL `topk(...)` output shape only; SQL leaves it `false` so rows stay
867+
/// as bare `(group-by columns, value)`.
846868
fn format_final_results(
847869
&self,
848870
unformatted_results: HashMap<Option<KeyByLabelValues>, f64>,
849871
statistic: &Statistic,
850872
metric: &str,
851873
enable_topk_formatting: bool,
852874
) -> Vec<InstantVectorElement> {
853-
let sorted_results: Vec<(Option<KeyByLabelValues>, f64)> =
854-
if *statistic == Statistic::Topk && enable_topk_formatting {
855-
// Sort by value descending for topk
856-
let mut sorted: Vec<_> = unformatted_results.into_iter().collect();
857-
sorted.sort_by(|a, b| b.1.partial_cmp(&a.1).unwrap_or(std::cmp::Ordering::Equal));
875+
let sorted_results: Vec<(Option<KeyByLabelValues>, f64)> = if *statistic == Statistic::Topk
876+
{
877+
// Sort by value descending for topk (independent of output formatting).
878+
let mut sorted: Vec<_> = unformatted_results.into_iter().collect();
879+
sorted.sort_by(|a, b| b.1.partial_cmp(&a.1).unwrap_or(std::cmp::Ordering::Equal));
858880

859-
// Prepend metric name to each key's label values
881+
if enable_topk_formatting {
882+
// Prepend metric name to each key's label values (PromQL shape).
860883
sorted
861884
.into_iter()
862885
.map(|(key_opt, value)| {
@@ -870,8 +893,11 @@ impl SimpleEngine {
870893
})
871894
.collect()
872895
} else {
873-
unformatted_results.into_iter().collect()
874-
};
896+
sorted
897+
}
898+
} else {
899+
unformatted_results.into_iter().collect()
900+
};
875901

876902
sorted_results
877903
.into_iter()
@@ -990,10 +1016,11 @@ impl SimpleEngine {
9901016
fn execute_context(
9911017
&self,
9921018
context: QueryExecutionContext,
993-
enable_topk: bool,
1019+
enable_topk_limiting: bool,
1020+
enable_topk_formatting: bool,
9941021
) -> Option<(KeyByLabelNames, QueryResult)> {
9951022
let results = self
996-
.execute_query_pipeline(&context, enable_topk)
1023+
.execute_query_pipeline(&context, enable_topk_limiting, enable_topk_formatting)
9971024
.map_err(|e| {
9981025
warn!("Query execution failed: {}", e);
9991026
e
@@ -1191,25 +1218,25 @@ impl SimpleEngine {
11911218
Ok(unformatted_results)
11921219
}
11931220

1194-
/// Collects results when key and value use same aggregation
1221+
/// Collects results when key and value use same aggregation.
1222+
///
1223+
/// For keyed accumulators (incl. `CountMinSketchWithHeap`) this enumerates
1224+
/// every candidate key the accumulator exposes. Top-k ordering/truncation is
1225+
/// applied later (sort in `format_final_results`, truncate-to-k in
1226+
/// `execute_query_pipeline`) so we must NOT pre-truncate here — the sketch
1227+
/// heap can hold more than `k` candidates and is not value-sorted, so
1228+
/// dropping keys now could discard a true top-k member.
11951229
fn collect_results_same_aggregation(
11961230
&self,
11971231
merged_outputs: &HashMap<Option<KeyByLabelValues>, Box<dyn AggregateCore>>,
11981232
statistic: &Statistic,
11991233
query_kwargs: &HashMap<String, String>,
1200-
enable_topk_limiting: bool,
12011234
) -> Result<HashMap<Option<KeyByLabelValues>, f64>, String> {
12021235
let mut unformatted_results = HashMap::new();
12031236

12041237
for (key, precompute) in merged_outputs {
12051238
if let Some(unwrapped_keys) = precompute.get_keys() {
1206-
let keys_to_process = if enable_topk_limiting {
1207-
self.limit_keys_for_topk(unwrapped_keys, statistic, query_kwargs)?
1208-
} else {
1209-
unwrapped_keys
1210-
};
1211-
1212-
for key_for_this_precompute in keys_to_process {
1239+
for key_for_this_precompute in unwrapped_keys {
12131240
let value = self
12141241
.query_precompute_for_statistic(
12151242
precompute.as_ref(),
@@ -1238,28 +1265,6 @@ impl SimpleEngine {
12381265
Ok(unformatted_results)
12391266
}
12401267

1241-
/// Limits keys for topk queries
1242-
fn limit_keys_for_topk(
1243-
&self,
1244-
keys: Vec<KeyByLabelValues>,
1245-
statistic: &Statistic,
1246-
query_kwargs: &HashMap<String, String>,
1247-
) -> Result<Vec<KeyByLabelValues>, String> {
1248-
if *statistic != Statistic::Topk {
1249-
return Ok(keys);
1250-
}
1251-
1252-
let k_str = query_kwargs
1253-
.get("k")
1254-
.ok_or_else(|| "Missing k parameter for topk".to_string())?;
1255-
1256-
let k = k_str
1257-
.parse::<usize>()
1258-
.map_err(|_| format!("Failed to parse k: '{}'", k_str))?;
1259-
1260-
Ok(keys.into_iter().take(k).collect())
1261-
}
1262-
12631268
fn query_precompute_for_statistic(
12641269
&self,
12651270
precompute: &dyn AggregateCore,

0 commit comments

Comments
 (0)