Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -386,6 +386,7 @@ private void constructSearchQueryRecord(final SearchPhaseContext context, final
userPrincipalContext,
null
);
record.setStreaming(searchRequestContext.isStreamingRequest());
queryInsightsService.addRecord(record);
} catch (Exception e) {
OperationalMetricsCounter.getInstance().incrementCounter(OperationalMetric.DATA_INGEST_EXCEPTIONS);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,34 +37,45 @@ public SearchQueryAggregationCategorizer(SearchQueryCounters searchQueryCounters
*
* @param aggregatorFactories input aggregations
* @param measurements latency, cpu, memory measurements
* @param isStreaming whether the query is a streaming request
*/
public void incrementSearchQueryAggregationCounters(
Collection<AggregationBuilder> aggregatorFactories,
Map<MetricType, Measurement> measurements
Map<MetricType, Measurement> measurements,
boolean isStreaming
) {
for (AggregationBuilder aggregationBuilder : aggregatorFactories) {
incrementCountersRecursively(aggregationBuilder, measurements);
incrementCountersRecursively(aggregationBuilder, measurements, isStreaming);
}
}

private void incrementCountersRecursively(AggregationBuilder aggregationBuilder, Map<MetricType, Measurement> measurements) {
private void incrementCountersRecursively(
AggregationBuilder aggregationBuilder,
Map<MetricType, Measurement> measurements,
boolean isStreaming
) {
// Increment counters for the current aggregation
String aggregationType = aggregationBuilder.getType();
searchQueryCounters.incrementAggCounter(1, Tags.create().addTag(AGGREGATION_TYPE_TAG, aggregationType), measurements);
searchQueryCounters.incrementAggCounter(1, Tags.create().addTag(AGGREGATION_TYPE_TAG, aggregationType), measurements, isStreaming);

// Recursively process sub-aggregations if any
Collection<AggregationBuilder> subAggregations = aggregationBuilder.getSubAggregations();
if (subAggregations != null && !subAggregations.isEmpty()) {
for (AggregationBuilder subAggregation : subAggregations) {
incrementCountersRecursively(subAggregation, measurements);
incrementCountersRecursively(subAggregation, measurements, isStreaming);
}
}

// Process pipeline aggregations
Collection<PipelineAggregationBuilder> pipelineAggregations = aggregationBuilder.getPipelineAggregations();
for (PipelineAggregationBuilder pipelineAggregation : pipelineAggregations) {
String pipelineAggregationType = pipelineAggregation.getType();
searchQueryCounters.incrementAggCounter(1, Tags.create().addTag(AGGREGATION_TYPE_TAG, pipelineAggregationType), measurements);
searchQueryCounters.incrementAggCounter(
1,
Tags.create().addTag(AGGREGATION_TYPE_TAG, pipelineAggregationType),
measurements,
isStreaming
);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ public void categorize(SearchQueryRecord record) {
Map<MetricType, Measurement> measurements = record.getMeasurements();

incrementQueryTypeCounters(source.query(), measurements);
incrementQueryAggregationCounters(source.aggregations(), measurements);
incrementQueryAggregationCounters(source.aggregations(), measurements, record.isStreaming());
incrementQuerySortCounters(source.sorts(), measurements);
}

Expand All @@ -97,12 +97,20 @@ private void incrementQuerySortCounters(List<SortBuilder<?>> sorts, Map<MetricTy
}
}

private void incrementQueryAggregationCounters(AggregatorFactories.Builder aggregations, Map<MetricType, Measurement> measurements) {
private void incrementQueryAggregationCounters(
AggregatorFactories.Builder aggregations,
Map<MetricType, Measurement> measurements,
boolean isStreaming
) {
if (aggregations == null) {
return;
}

searchQueryAggregationCategorizer.incrementSearchQueryAggregationCounters(aggregations.getAggregatorFactories(), measurements);
searchQueryAggregationCategorizer.incrementSearchQueryAggregationCounters(
aggregations.getAggregatorFactories(),
measurements,
isStreaming
);
}

private void incrementQueryTypeCounters(QueryBuilder topLevelQueryBuilder, Map<MetricType, Measurement> measurements) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
public final class SearchQueryCounters {
private static final String LEVEL_TAG = "level";
private static final String QUERY_TYPE_TAG = "type";
static final String IS_STREAMING_TAG = "is_streaming";
private static final String UNIT = "1";
private static final String UNIT_MILLIS = "ms";
private static final String UNIT_CPU_CYCLES = "ns";
Expand Down Expand Up @@ -122,8 +123,10 @@ public void incrementCounter(QueryBuilder queryBuilder, int level, Map<MetricTyp
* @param value value to increment
* @param tags tags
* @param measurements metrics measurements
* @param isStreaming whether the query is a streaming request
*/
public void incrementAggCounter(double value, Tags tags, Map<MetricType, Measurement> measurements) {
public void incrementAggCounter(double value, Tags tags, Map<MetricType, Measurement> measurements, boolean isStreaming) {
tags.addTag(IS_STREAMING_TAG, String.valueOf(isStreaming));
aggCounter.add(value, tags);
incrementAllHistograms(tags, measurements);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ public class SearchQueryRecord implements ToXContentObject, Writeable {
private final String id;
private final SearchSourceBuilder searchSourceBuilder;
private final UserPrincipalContext userPrincipalContext; // Private field for user extraction
private boolean streaming;

/**
* Timestamp
Expand Down Expand Up @@ -697,6 +698,14 @@ public String getGroupingId() {
return this.groupingId;
}

public boolean isStreaming() {
return streaming;
}

public void setStreaming(boolean streaming) {
this.streaming = streaming;
}

public boolean isCancelled() {
return (Boolean) attributes.getOrDefault(Attribute.IS_CANCELLED, false);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,7 @@ public void testOnRequestEnd() throws InterruptedException {
when(searchRequest.source()).thenReturn(searchSourceBuilder);
when(searchRequest.indices()).thenReturn(indices);
when(searchRequestContext.phaseTookMap()).thenReturn(phaseLatencyMap);
when(searchRequestContext.isStreamingRequest()).thenReturn(true);
when(searchPhaseContext.getRequest()).thenReturn(searchRequest);
when(searchPhaseContext.getNumShards()).thenReturn(numberOfShards);
when(searchPhaseContext.getTask()).thenReturn(task);
Expand All @@ -141,6 +142,7 @@ public void testOnRequestEnd() throws InterruptedException {
assertEquals(timestamp.longValue(), generatedRecord.getTimestamp());
assertEquals(numberOfShards, generatedRecord.getAttributes().get(Attribute.TOTAL_SHARDS));
assertEquals(searchType.toString().toLowerCase(Locale.ROOT), generatedRecord.getAttributes().get(Attribute.SEARCH_TYPE));
assertTrue(generatedRecord.isStreaming());
// SOURCE attribute should be null initially (set asynchronously in drainRecords)
assertNull(generatedRecord.getAttributes().get(Attribute.SOURCE));
// But SearchSourceBuilder should be available for async processing
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import static org.mockito.Mockito.when;
import static org.opensearch.plugin.insights.QueryInsightsTestUtils.generateQueryInsightRecords;
import static org.opensearch.plugin.insights.core.service.categorizer.SearchQueryAggregationCategorizer.AGGREGATION_TYPE_TAG;
import static org.opensearch.plugin.insights.core.service.categorizer.SearchQueryCounters.IS_STREAMING_TAG;

import java.util.Arrays;
import java.util.HashMap;
Expand Down Expand Up @@ -102,6 +103,7 @@ public void testAggregationsQuery() {
sourceBuilder.size(0);

SearchQueryRecord record = generateQueryInsightRecords(1, sourceBuilder).get(0);
record.setStreaming(true);
searchQueryCategorizer.categorize(record);

verifyMeasurementHistogramsIncremented(record, 1);
Expand All @@ -114,10 +116,12 @@ public void testAggregationsQuery() {
verify(searchQueryCategorizer.getSearchQueryCounters().getAggCounter()).add(valueCaptor.capture(), tagsCaptor.capture());

double actualValue = valueCaptor.getValue();
String actualTag = (String) tagsCaptor.getValue().getTagsMap().get(AGGREGATION_TYPE_TAG);
String actualAggTag = (String) tagsCaptor.getValue().getTagsMap().get(AGGREGATION_TYPE_TAG);
String actualStreamingTag = (String) tagsCaptor.getValue().getTagsMap().get(IS_STREAMING_TAG);

assertEquals(1.0d, actualValue, 0.0001);
assertEquals(MULTI_TERMS_AGGREGATION, actualTag);
assertEquals(MULTI_TERMS_AGGREGATION, actualAggTag);
assertEquals("true", actualStreamingTag);
}

public void testBoolQuery() {
Expand Down
Loading