From 8cf0146072b60481409b47f499a5d62b7ce927ee Mon Sep 17 00:00:00 2001 From: "github-actions[bot]" Date: Fri, 20 Feb 2026 20:54:39 +0000 Subject: [PATCH] Add streaming dimension (#551) Signed-off-by: Harsha Vamsi Kalluri (cherry picked from commit 2dd3ababd8f4383a021f17851746a7715d36ddc1) Signed-off-by: github-actions[bot] --- .../core/listener/QueryInsightsListener.java | 1 + .../SearchQueryAggregationCategorizer.java | 23 ++++++++++++++----- .../categorizer/SearchQueryCategorizer.java | 14 ++++++++--- .../categorizer/SearchQueryCounters.java | 5 +++- .../rules/model/SearchQueryRecord.java | 9 ++++++++ .../listener/QueryInsightsListenerTests.java | 2 ++ .../SearchQueryCategorizerTests.java | 8 +++++-- 7 files changed, 50 insertions(+), 12 deletions(-) diff --git a/src/main/java/org/opensearch/plugin/insights/core/listener/QueryInsightsListener.java b/src/main/java/org/opensearch/plugin/insights/core/listener/QueryInsightsListener.java index 7b98b228..834e8d3c 100644 --- a/src/main/java/org/opensearch/plugin/insights/core/listener/QueryInsightsListener.java +++ b/src/main/java/org/opensearch/plugin/insights/core/listener/QueryInsightsListener.java @@ -386,6 +386,7 @@ private void constructSearchQueryRecord(final SearchPhaseContext context, final userPrincipalContext, null ); + record.setStreaming(searchRequestContext.isStreamingRequest()); queryInsightsService.addRecord(record); } catch (Exception e) { OperationalMetricsCounter.getInstance().incrementCounter(OperationalMetric.DATA_INGEST_EXCEPTIONS); diff --git a/src/main/java/org/opensearch/plugin/insights/core/service/categorizer/SearchQueryAggregationCategorizer.java b/src/main/java/org/opensearch/plugin/insights/core/service/categorizer/SearchQueryAggregationCategorizer.java index 534d0675..ea418fdf 100644 --- a/src/main/java/org/opensearch/plugin/insights/core/service/categorizer/SearchQueryAggregationCategorizer.java +++ b/src/main/java/org/opensearch/plugin/insights/core/service/categorizer/SearchQueryAggregationCategorizer.java @@ -37,26 +37,32 @@ public SearchQueryAggregationCategorizer(SearchQueryCounters searchQueryCounters * * @param aggregatorFactories input aggregations * @param measurements latency, cpu, memory measurements + * @param isStreaming whether the query is a streaming request */ public void incrementSearchQueryAggregationCounters( Collection aggregatorFactories, - Map measurements + Map measurements, + boolean isStreaming ) { for (AggregationBuilder aggregationBuilder : aggregatorFactories) { - incrementCountersRecursively(aggregationBuilder, measurements); + incrementCountersRecursively(aggregationBuilder, measurements, isStreaming); } } - private void incrementCountersRecursively(AggregationBuilder aggregationBuilder, Map measurements) { + private void incrementCountersRecursively( + AggregationBuilder aggregationBuilder, + Map measurements, + boolean isStreaming + ) { // Increment counters for the current aggregation String aggregationType = aggregationBuilder.getType(); - searchQueryCounters.incrementAggCounter(1, Tags.create().addTag(AGGREGATION_TYPE_TAG, aggregationType), measurements); + searchQueryCounters.incrementAggCounter(1, Tags.create().addTag(AGGREGATION_TYPE_TAG, aggregationType), measurements, isStreaming); // Recursively process sub-aggregations if any Collection subAggregations = aggregationBuilder.getSubAggregations(); if (subAggregations != null && !subAggregations.isEmpty()) { for (AggregationBuilder subAggregation : subAggregations) { - incrementCountersRecursively(subAggregation, measurements); + incrementCountersRecursively(subAggregation, measurements, isStreaming); } } @@ -64,7 +70,12 @@ private void incrementCountersRecursively(AggregationBuilder aggregationBuilder, Collection pipelineAggregations = aggregationBuilder.getPipelineAggregations(); for (PipelineAggregationBuilder pipelineAggregation : pipelineAggregations) { String pipelineAggregationType = pipelineAggregation.getType(); - searchQueryCounters.incrementAggCounter(1, Tags.create().addTag(AGGREGATION_TYPE_TAG, pipelineAggregationType), measurements); + searchQueryCounters.incrementAggCounter( + 1, + Tags.create().addTag(AGGREGATION_TYPE_TAG, pipelineAggregationType), + measurements, + isStreaming + ); } } } diff --git a/src/main/java/org/opensearch/plugin/insights/core/service/categorizer/SearchQueryCategorizer.java b/src/main/java/org/opensearch/plugin/insights/core/service/categorizer/SearchQueryCategorizer.java index 6c91ac5b..52c26d18 100644 --- a/src/main/java/org/opensearch/plugin/insights/core/service/categorizer/SearchQueryCategorizer.java +++ b/src/main/java/org/opensearch/plugin/insights/core/service/categorizer/SearchQueryCategorizer.java @@ -84,7 +84,7 @@ public void categorize(SearchQueryRecord record) { Map measurements = record.getMeasurements(); incrementQueryTypeCounters(source.query(), measurements); - incrementQueryAggregationCounters(source.aggregations(), measurements); + incrementQueryAggregationCounters(source.aggregations(), measurements, record.isStreaming()); incrementQuerySortCounters(source.sorts(), measurements); } @@ -97,12 +97,20 @@ private void incrementQuerySortCounters(List> sorts, Map measurements) { + private void incrementQueryAggregationCounters( + AggregatorFactories.Builder aggregations, + Map measurements, + boolean isStreaming + ) { if (aggregations == null) { return; } - searchQueryAggregationCategorizer.incrementSearchQueryAggregationCounters(aggregations.getAggregatorFactories(), measurements); + searchQueryAggregationCategorizer.incrementSearchQueryAggregationCounters( + aggregations.getAggregatorFactories(), + measurements, + isStreaming + ); } private void incrementQueryTypeCounters(QueryBuilder topLevelQueryBuilder, Map measurements) { diff --git a/src/main/java/org/opensearch/plugin/insights/core/service/categorizer/SearchQueryCounters.java b/src/main/java/org/opensearch/plugin/insights/core/service/categorizer/SearchQueryCounters.java index b4c6fe6a..ea3c707b 100644 --- a/src/main/java/org/opensearch/plugin/insights/core/service/categorizer/SearchQueryCounters.java +++ b/src/main/java/org/opensearch/plugin/insights/core/service/categorizer/SearchQueryCounters.java @@ -25,6 +25,7 @@ public final class SearchQueryCounters { private static final String LEVEL_TAG = "level"; private static final String QUERY_TYPE_TAG = "type"; + static final String IS_STREAMING_TAG = "is_streaming"; private static final String UNIT = "1"; private static final String UNIT_MILLIS = "ms"; private static final String UNIT_CPU_CYCLES = "ns"; @@ -122,8 +123,10 @@ public void incrementCounter(QueryBuilder queryBuilder, int level, Map measurements) { + public void incrementAggCounter(double value, Tags tags, Map measurements, boolean isStreaming) { + tags.addTag(IS_STREAMING_TAG, String.valueOf(isStreaming)); aggCounter.add(value, tags); incrementAllHistograms(tags, measurements); } diff --git a/src/main/java/org/opensearch/plugin/insights/rules/model/SearchQueryRecord.java b/src/main/java/org/opensearch/plugin/insights/rules/model/SearchQueryRecord.java index 5adc21ec..57f7d102 100644 --- a/src/main/java/org/opensearch/plugin/insights/rules/model/SearchQueryRecord.java +++ b/src/main/java/org/opensearch/plugin/insights/rules/model/SearchQueryRecord.java @@ -50,6 +50,7 @@ public class SearchQueryRecord implements ToXContentObject, Writeable { private final String id; private final SearchSourceBuilder searchSourceBuilder; private final UserPrincipalContext userPrincipalContext; // Private field for user extraction + private boolean streaming; /** * Timestamp @@ -697,6 +698,14 @@ public String getGroupingId() { return this.groupingId; } + public boolean isStreaming() { + return streaming; + } + + public void setStreaming(boolean streaming) { + this.streaming = streaming; + } + public boolean isCancelled() { return (Boolean) attributes.getOrDefault(Attribute.IS_CANCELLED, false); } diff --git a/src/test/java/org/opensearch/plugin/insights/core/listener/QueryInsightsListenerTests.java b/src/test/java/org/opensearch/plugin/insights/core/listener/QueryInsightsListenerTests.java index d4dbe5aa..85e2a92e 100644 --- a/src/test/java/org/opensearch/plugin/insights/core/listener/QueryInsightsListenerTests.java +++ b/src/test/java/org/opensearch/plugin/insights/core/listener/QueryInsightsListenerTests.java @@ -129,6 +129,7 @@ public void testOnRequestEnd() throws InterruptedException { when(searchRequest.source()).thenReturn(searchSourceBuilder); when(searchRequest.indices()).thenReturn(indices); when(searchRequestContext.phaseTookMap()).thenReturn(phaseLatencyMap); + when(searchRequestContext.isStreamingRequest()).thenReturn(true); when(searchPhaseContext.getRequest()).thenReturn(searchRequest); when(searchPhaseContext.getNumShards()).thenReturn(numberOfShards); when(searchPhaseContext.getTask()).thenReturn(task); @@ -141,6 +142,7 @@ public void testOnRequestEnd() throws InterruptedException { assertEquals(timestamp.longValue(), generatedRecord.getTimestamp()); assertEquals(numberOfShards, generatedRecord.getAttributes().get(Attribute.TOTAL_SHARDS)); assertEquals(searchType.toString().toLowerCase(Locale.ROOT), generatedRecord.getAttributes().get(Attribute.SEARCH_TYPE)); + assertTrue(generatedRecord.isStreaming()); // SOURCE attribute should be null initially (set asynchronously in drainRecords) assertNull(generatedRecord.getAttributes().get(Attribute.SOURCE)); // But SearchSourceBuilder should be available for async processing diff --git a/src/test/java/org/opensearch/plugin/insights/core/service/categorizer/SearchQueryCategorizerTests.java b/src/test/java/org/opensearch/plugin/insights/core/service/categorizer/SearchQueryCategorizerTests.java index d7a362fb..00761910 100644 --- a/src/test/java/org/opensearch/plugin/insights/core/service/categorizer/SearchQueryCategorizerTests.java +++ b/src/test/java/org/opensearch/plugin/insights/core/service/categorizer/SearchQueryCategorizerTests.java @@ -16,6 +16,7 @@ import static org.mockito.Mockito.when; import static org.opensearch.plugin.insights.QueryInsightsTestUtils.generateQueryInsightRecords; import static org.opensearch.plugin.insights.core.service.categorizer.SearchQueryAggregationCategorizer.AGGREGATION_TYPE_TAG; +import static org.opensearch.plugin.insights.core.service.categorizer.SearchQueryCounters.IS_STREAMING_TAG; import java.util.Arrays; import java.util.HashMap; @@ -102,6 +103,7 @@ public void testAggregationsQuery() { sourceBuilder.size(0); SearchQueryRecord record = generateQueryInsightRecords(1, sourceBuilder).get(0); + record.setStreaming(true); searchQueryCategorizer.categorize(record); verifyMeasurementHistogramsIncremented(record, 1); @@ -114,10 +116,12 @@ public void testAggregationsQuery() { verify(searchQueryCategorizer.getSearchQueryCounters().getAggCounter()).add(valueCaptor.capture(), tagsCaptor.capture()); double actualValue = valueCaptor.getValue(); - String actualTag = (String) tagsCaptor.getValue().getTagsMap().get(AGGREGATION_TYPE_TAG); + String actualAggTag = (String) tagsCaptor.getValue().getTagsMap().get(AGGREGATION_TYPE_TAG); + String actualStreamingTag = (String) tagsCaptor.getValue().getTagsMap().get(IS_STREAMING_TAG); assertEquals(1.0d, actualValue, 0.0001); - assertEquals(MULTI_TERMS_AGGREGATION, actualTag); + assertEquals(MULTI_TERMS_AGGREGATION, actualAggTag); + assertEquals("true", actualStreamingTag); } public void testBoolQuery() {