From 3b2e233e3ec40976f313a57164fd7321eae62817 Mon Sep 17 00:00:00 2001 From: Sandesh Kumar Date: Sun, 18 Aug 2024 20:07:24 -0700 Subject: [PATCH 1/3] Star Tree sum metric aggregation Signed-off-by: Sandesh Kumar --- .../index/mapper/StarTreeMapperIT.java | 2 +- .../opensearch/common/util/FeatureFlags.java | 2 +- .../datacube/startree/node/StarTree.java | 19 +- .../datacube/startree/node/StarTreeNode.java | 1 + .../index/query/QueryShardContext.java | 84 +++++ .../org/opensearch/search/SearchService.java | 66 +++- .../aggregations/AggregatorFactories.java | 6 +- .../aggregations/AggregatorFactory.java | 4 + .../metrics/NumericMetricsAggregator.java | 19 ++ .../aggregations/metrics/SumAggregator.java | 59 +++- .../metrics/SumAggregatorFactory.java | 2 +- .../metrics/ValueCountAggregationBuilder.java | 2 +- .../aggregations/support/ValuesSource.java | 4 + .../ValuesSourceAggregatorFactory.java | 12 + .../startree/OriginalOrStarTreeQuery.java | 81 +++++ .../search/startree/StarTreeFilter.java | 286 ++++++++++++++++++ .../search/startree/StarTreeQuery.java | 112 +++++++ .../search/startree/package-info.java | 10 + .../StarTreeDocValuesFormatTests.java | 2 +- .../MetricAggregatorInfoTests.java | 8 +- .../ValueAggregatorFactoryTests.java | 2 +- .../builder/AbstractStarTreeBuilderTests.java | 6 +- 22 files changed, 755 insertions(+), 34 deletions(-) create mode 100644 server/src/main/java/org/opensearch/search/startree/OriginalOrStarTreeQuery.java create mode 100644 server/src/main/java/org/opensearch/search/startree/StarTreeFilter.java create mode 100644 server/src/main/java/org/opensearch/search/startree/StarTreeQuery.java create mode 100644 server/src/main/java/org/opensearch/search/startree/package-info.java diff --git a/server/src/internalClusterTest/java/org/opensearch/index/mapper/StarTreeMapperIT.java b/server/src/internalClusterTest/java/org/opensearch/index/mapper/StarTreeMapperIT.java index 20a07477828cd..6acc8dda61b5c 100644 --- a/server/src/internalClusterTest/java/org/opensearch/index/mapper/StarTreeMapperIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/index/mapper/StarTreeMapperIT.java @@ -499,7 +499,7 @@ public void testUpdateIndexWhenMappingIsSame() { assertEquals("numeric_dv", starTreeFieldType.getMetrics().get(0).getField()); // Assert default metrics - List expectedMetrics = Arrays.asList(MetricStat.VALUE_COUNT, MetricStat.SUM, MetricStat.AVG); + List expectedMetrics = Arrays.asList(MetricStat.COUNT, MetricStat.SUM, MetricStat.AVG); assertEquals(expectedMetrics, starTreeFieldType.getMetrics().get(0).getMetrics()); assertEquals(10000, starTreeFieldType.getStarTreeConfig().maxLeafDocs()); assertEquals( diff --git a/server/src/main/java/org/opensearch/common/util/FeatureFlags.java b/server/src/main/java/org/opensearch/common/util/FeatureFlags.java index e2554d61116ad..1b791a9f93521 100644 --- a/server/src/main/java/org/opensearch/common/util/FeatureFlags.java +++ b/server/src/main/java/org/opensearch/common/util/FeatureFlags.java @@ -110,7 +110,7 @@ public class FeatureFlags { * aggregations. */ public static final String STAR_TREE_INDEX = "opensearch.experimental.feature.composite_index.star_tree.enabled"; - public static final Setting STAR_TREE_INDEX_SETTING = Setting.boolSetting(STAR_TREE_INDEX, false, Property.NodeScope); + public static final Setting STAR_TREE_INDEX_SETTING = Setting.boolSetting(STAR_TREE_INDEX, true, Property.NodeScope); /** * Gates the functionality of application based configuration templates. diff --git a/server/src/main/java/org/opensearch/index/compositeindex/datacube/startree/node/StarTree.java b/server/src/main/java/org/opensearch/index/compositeindex/datacube/startree/node/StarTree.java index 4ed3c3ec9febe..02cb02d67a0fb 100644 --- a/server/src/main/java/org/opensearch/index/compositeindex/datacube/startree/node/StarTree.java +++ b/server/src/main/java/org/opensearch/index/compositeindex/datacube/startree/node/StarTree.java @@ -16,9 +16,6 @@ import java.io.IOException; -import static org.opensearch.index.compositeindex.CompositeIndexConstants.COMPOSITE_FIELD_MARKER; -import static org.opensearch.index.compositeindex.datacube.startree.fileformats.StarTreeWriter.VERSION_CURRENT; - /** * Off heap implementation of the star-tree. * @@ -31,15 +28,15 @@ public class StarTree { public StarTree(IndexInput data, StarTreeMetadata starTreeMetadata) throws IOException { long magicMarker = data.readLong(); - if (COMPOSITE_FIELD_MARKER != magicMarker) { - logger.error("Invalid magic marker"); - throw new IOException("Invalid magic marker"); - } + // if (COMPOSITE_FIELD_MARKER != magicMarker) { + // logger.error("Invalid magic marker"); + // throw new IOException("Invalid magic marker"); + // } int version = data.readInt(); - if (VERSION_CURRENT != version) { - logger.error("Invalid star tree version"); - throw new IOException("Invalid version"); - } + // if (VERSION_CURRENT != version) { + // logger.error("Invalid star tree version"); + // throw new IOException("Invalid version"); + // } numNodes = data.readInt(); // num nodes RandomAccessInput in = data.randomAccessSlice( diff --git a/server/src/main/java/org/opensearch/index/compositeindex/datacube/startree/node/StarTreeNode.java b/server/src/main/java/org/opensearch/index/compositeindex/datacube/startree/node/StarTreeNode.java index dd9d301096f44..0d28212b66e16 100644 --- a/server/src/main/java/org/opensearch/index/compositeindex/datacube/startree/node/StarTreeNode.java +++ b/server/src/main/java/org/opensearch/index/compositeindex/datacube/startree/node/StarTreeNode.java @@ -20,6 +20,7 @@ */ @ExperimentalApi public interface StarTreeNode { + long ALL = -1l; /** * Returns the dimension ID of the current star-tree node. diff --git a/server/src/main/java/org/opensearch/index/query/QueryShardContext.java b/server/src/main/java/org/opensearch/index/query/QueryShardContext.java index 91313092d8d28..9e3f9425d352b 100644 --- a/server/src/main/java/org/opensearch/index/query/QueryShardContext.java +++ b/server/src/main/java/org/opensearch/index/query/QueryShardContext.java @@ -56,7 +56,12 @@ import org.opensearch.index.IndexSortConfig; import org.opensearch.index.analysis.IndexAnalyzers; import org.opensearch.index.cache.bitset.BitsetFilterCache; +import org.opensearch.index.codec.composite.CompositeIndexFieldInfo; +import org.opensearch.index.compositeindex.datacube.Dimension; +import org.opensearch.index.compositeindex.datacube.Metric; +import org.opensearch.index.compositeindex.datacube.MetricStat; import org.opensearch.index.fielddata.IndexFieldData; +import org.opensearch.index.mapper.CompositeDataCubeFieldType; import org.opensearch.index.mapper.ContentPath; import org.opensearch.index.mapper.DerivedFieldResolver; import org.opensearch.index.mapper.DerivedFieldResolverFactory; @@ -73,12 +78,17 @@ import org.opensearch.script.ScriptContext; import org.opensearch.script.ScriptFactory; import org.opensearch.script.ScriptService; +import org.opensearch.search.aggregations.AggregatorFactory; +import org.opensearch.search.aggregations.metrics.SumAggregatorFactory; import org.opensearch.search.aggregations.support.AggregationUsageService; import org.opensearch.search.aggregations.support.ValuesSourceRegistry; import org.opensearch.search.lookup.SearchLookup; +import org.opensearch.search.startree.OriginalOrStarTreeQuery; +import org.opensearch.search.startree.StarTreeQuery; import org.opensearch.transport.RemoteClusterAware; import java.io.IOException; +import java.util.ArrayList; import java.util.HashMap; import java.util.HashSet; import java.util.List; @@ -89,6 +99,7 @@ import java.util.function.LongSupplier; import java.util.function.Predicate; import java.util.function.Supplier; +import java.util.stream.Collectors; import static java.util.Collections.emptyList; import static java.util.Collections.emptyMap; @@ -522,6 +533,79 @@ private ParsedQuery toQuery(QueryBuilder queryBuilder, CheckedFunction>> predicateMap; + + if (queryBuilder == null) { + predicateMap = null; + } else if (queryBuilder instanceof TermQueryBuilder) { + List supportedDimensions = compositeIndexFieldInfo.getDimensions() + .stream() + .map(Dimension::getField) + .collect(Collectors.toList()); + predicateMap = getStarTreePredicates(queryBuilder, supportedDimensions); + } else { + return null; + } + + StarTreeQuery starTreeQuery = new StarTreeQuery(starTree, predicateMap); + OriginalOrStarTreeQuery originalOrStarTreeQuery = new OriginalOrStarTreeQuery(starTreeQuery, query); + return new ParsedQuery(originalOrStarTreeQuery); + } + + /** + * Parse query body to star-tree predicates + * @param queryBuilder + * @return predicates to match + */ + private Map>> getStarTreePredicates(QueryBuilder queryBuilder, List supportedDimensions) { + TermQueryBuilder tq = (TermQueryBuilder) queryBuilder; + String field = tq.fieldName(); + if (supportedDimensions.contains(field) == false) { + throw new IllegalArgumentException("unsupported field in star-tree"); + } + long inputQueryVal = Long.parseLong(tq.value().toString()); + + // Get or create the list of predicates for the given field + Map>> predicateMap = new HashMap<>(); + List> predicates = predicateMap.getOrDefault(field, new ArrayList<>()); + + // Create a predicate to match the input query value + Predicate predicate = dimVal -> dimVal == inputQueryVal; + predicates.add(predicate); + + // Put the predicates list back into the map + predicateMap.put(field, predicates); + return predicateMap; + } + + public boolean validateStarTreeMetricSuport(CompositeDataCubeFieldType compositeIndexFieldInfo, AggregatorFactory aggregatorFactory) { + String field = null; + Map> supportedMetrics = compositeIndexFieldInfo.getMetrics() + .stream() + .collect(Collectors.toMap(Metric::getField, Metric::getMetrics)); + + // Existing support only for MetricAggregators without sub-aggregations + if (aggregatorFactory.getSubFactories().getFactories().length != 0) { + return false; + } + + // TODO: increment supported aggregation type + if (aggregatorFactory instanceof SumAggregatorFactory) { + field = ((SumAggregatorFactory) aggregatorFactory).getField(); + if (supportedMetrics.containsKey(field) && supportedMetrics.get(field).contains(MetricStat.SUM)) { + return true; + } + } + + return false; + } + public Index index() { return indexSettings.getIndex(); } diff --git a/server/src/main/java/org/opensearch/search/SearchService.java b/server/src/main/java/org/opensearch/search/SearchService.java index a53a7198c366f..81fb5d538bddf 100644 --- a/server/src/main/java/org/opensearch/search/SearchService.java +++ b/server/src/main/java/org/opensearch/search/SearchService.java @@ -77,12 +77,16 @@ import org.opensearch.index.IndexNotFoundException; import org.opensearch.index.IndexService; import org.opensearch.index.IndexSettings; +import org.opensearch.index.codec.composite.CompositeIndexFieldInfo; import org.opensearch.index.engine.Engine; +import org.opensearch.index.mapper.CompositeDataCubeFieldType; import org.opensearch.index.mapper.DerivedFieldResolver; import org.opensearch.index.mapper.DerivedFieldResolverFactory; +import org.opensearch.index.mapper.StarTreeMapper; import org.opensearch.index.query.InnerHitContextBuilder; import org.opensearch.index.query.MatchAllQueryBuilder; import org.opensearch.index.query.MatchNoneQueryBuilder; +import org.opensearch.index.query.ParsedQuery; import org.opensearch.index.query.QueryBuilder; import org.opensearch.index.query.QueryRewriteContext; import org.opensearch.index.query.QueryShardContext; @@ -97,11 +101,13 @@ import org.opensearch.script.ScriptService; import org.opensearch.search.aggregations.AggregationInitializationException; import org.opensearch.search.aggregations.AggregatorFactories; +import org.opensearch.search.aggregations.AggregatorFactory; import org.opensearch.search.aggregations.InternalAggregation; import org.opensearch.search.aggregations.InternalAggregation.ReduceContext; import org.opensearch.search.aggregations.MultiBucketConsumerService; import org.opensearch.search.aggregations.SearchContextAggregations; import org.opensearch.search.aggregations.pipeline.PipelineAggregator.PipelineTree; +import org.opensearch.search.aggregations.support.ValuesSourceAggregatorFactory; import org.opensearch.search.builder.SearchSourceBuilder; import org.opensearch.search.collapse.CollapseContext; import org.opensearch.search.dfs.DfsPhase; @@ -1314,6 +1320,10 @@ private void parseSource(DefaultSearchContext context, SearchSourceBuilder sourc context.evaluateRequestShouldUseConcurrentSearch(); return; } + // Can be marked false for majority cases for which star-tree cannot be used + // As we increment the cases where star-tree can be used, this can be set back to true + boolean canUseStarTree = context.mapperService().isCompositeIndexPresent(); + SearchShardTarget shardTarget = context.shardTarget(); QueryShardContext queryShardContext = context.getQueryShardContext(); context.from(source.from()); @@ -1324,10 +1334,12 @@ private void parseSource(DefaultSearchContext context, SearchSourceBuilder sourc context.parsedQuery(queryShardContext.toQuery(source.query())); } if (source.postFilter() != null) { + canUseStarTree = false; InnerHitContextBuilder.extractInnerHits(source.postFilter(), innerHitBuilders); context.parsedPostFilter(queryShardContext.toQuery(source.postFilter())); } - if (innerHitBuilders.size() > 0) { + if (!innerHitBuilders.isEmpty()) { + canUseStarTree = false; for (Map.Entry entry : innerHitBuilders.entrySet()) { try { entry.getValue().build(context, context.innerHits()); @@ -1337,11 +1349,10 @@ private void parseSource(DefaultSearchContext context, SearchSourceBuilder sourc } } if (source.sorts() != null) { + canUseStarTree = false; try { Optional optionalSort = SortBuilder.buildSort(source.sorts(), context.getQueryShardContext()); - if (optionalSort.isPresent()) { - context.sort(optionalSort.get()); - } + optionalSort.ifPresent(context::sort); } catch (IOException e) { throw new SearchException(shardTarget, "failed to create sort elements", e); } @@ -1354,9 +1365,11 @@ private void parseSource(DefaultSearchContext context, SearchSourceBuilder sourc throw new SearchException(shardTarget, "disabling [track_total_hits] is not allowed in a scroll context"); } if (source.trackTotalHitsUpTo() != null) { + canUseStarTree = false; context.trackTotalHitsUpTo(source.trackTotalHitsUpTo()); } if (source.minScore() != null) { + canUseStarTree = false; context.minimumScore(source.minScore()); } if (source.timeout() != null) { @@ -1496,6 +1509,51 @@ private void parseSource(DefaultSearchContext context, SearchSourceBuilder sourc if (source.profile()) { context.setProfilers(new Profilers(context.searcher(), context.shouldUseConcurrentSearch())); } + + if (canUseStarTree) { + try { + setStarTreeQuery(context, queryShardContext, source); + logger.debug("can use star tree"); + } catch (IOException e) { + logger.debug("not using star tree"); + } + } + } + + private boolean setStarTreeQuery(SearchContext context, QueryShardContext queryShardContext, SearchSourceBuilder source) + throws IOException { + + if (source.aggregations() == null) { + return false; + } + + // TODO: Support for multiple startrees + // Current implementation assumes only single star-tree is supported + CompositeDataCubeFieldType compositeMappedFieldType = (StarTreeMapper.StarTreeFieldType) context.mapperService() + .getCompositeFieldTypes() + .iterator() + .next(); + CompositeIndexFieldInfo starTree = new CompositeIndexFieldInfo( + compositeMappedFieldType.name(), + compositeMappedFieldType.getCompositeIndexType() + ); + + ParsedQuery newParsedQuery = queryShardContext.toStarTreeQuery(starTree, compositeMappedFieldType, source.query(), context.query()); + if (newParsedQuery == null) { + return false; + } + + AggregatorFactory aggregatorFactory = context.aggregations().factories().getFactories()[0]; + if (!(aggregatorFactory instanceof ValuesSourceAggregatorFactory + && aggregatorFactory.getSubFactories().getFactories().length == 0)) { + return false; + } + + if (queryShardContext.validateStarTreeMetricSuport(compositeMappedFieldType, aggregatorFactory)) { + context.parsedQuery(newParsedQuery); + } + + return true; } /** diff --git a/server/src/main/java/org/opensearch/search/aggregations/AggregatorFactories.java b/server/src/main/java/org/opensearch/search/aggregations/AggregatorFactories.java index eeb0c606694b0..dfcb245ef3656 100644 --- a/server/src/main/java/org/opensearch/search/aggregations/AggregatorFactories.java +++ b/server/src/main/java/org/opensearch/search/aggregations/AggregatorFactories.java @@ -255,7 +255,7 @@ public static Builder builder() { return new Builder(); } - private AggregatorFactories(AggregatorFactory[] factories) { + public AggregatorFactories(AggregatorFactory[] factories) { this.factories = factories; } @@ -661,4 +661,8 @@ public PipelineTree buildPipelineTree() { return new PipelineTree(subTrees, aggregators); } } + + public AggregatorFactory[] getFactories() { + return factories; + } } diff --git a/server/src/main/java/org/opensearch/search/aggregations/AggregatorFactory.java b/server/src/main/java/org/opensearch/search/aggregations/AggregatorFactory.java index 6cc3a78fb1e36..86fbb46a9ad3c 100644 --- a/server/src/main/java/org/opensearch/search/aggregations/AggregatorFactory.java +++ b/server/src/main/java/org/opensearch/search/aggregations/AggregatorFactory.java @@ -127,4 +127,8 @@ protected boolean supportsConcurrentSegmentSearch() { public boolean evaluateChildFactories() { return factories.allFactoriesSupportConcurrentSearch(); } + + public AggregatorFactories getSubFactories() { + return factories; + } } diff --git a/server/src/main/java/org/opensearch/search/aggregations/metrics/NumericMetricsAggregator.java b/server/src/main/java/org/opensearch/search/aggregations/metrics/NumericMetricsAggregator.java index f90e5a092385f..395bf32c7ffe8 100644 --- a/server/src/main/java/org/opensearch/search/aggregations/metrics/NumericMetricsAggregator.java +++ b/server/src/main/java/org/opensearch/search/aggregations/metrics/NumericMetricsAggregator.java @@ -31,13 +31,20 @@ package org.opensearch.search.aggregations.metrics; +import org.apache.lucene.index.LeafReaderContext; +import org.apache.lucene.index.SegmentReader; +import org.opensearch.common.lucene.Lucene; import org.opensearch.common.util.Comparators; +import org.opensearch.index.codec.composite.CompositeIndexFieldInfo; +import org.opensearch.index.codec.composite.CompositeIndexReader; +import org.opensearch.index.compositeindex.datacube.startree.index.StarTreeValues; import org.opensearch.search.aggregations.Aggregator; import org.opensearch.search.internal.SearchContext; import org.opensearch.search.sort.SortOrder; import java.io.IOException; import java.util.Map; +import java.util.concurrent.atomic.AtomicReference; /** * Base class to aggregate all docs into a single numeric metric value. @@ -107,4 +114,16 @@ public BucketComparator bucketComparator(String key, SortOrder order) { return (lhs, rhs) -> Comparators.compareDiscardNaN(metric(key, lhs), metric(key, rhs), order == SortOrder.ASC); } } + + protected StarTreeValues getStarTreeValues(LeafReaderContext ctx, CompositeIndexFieldInfo starTree) throws IOException { + SegmentReader reader = Lucene.segmentReader(ctx.reader()); + if (!(reader.getDocValuesReader() instanceof CompositeIndexReader)) { + return null; + } + CompositeIndexReader starTreeDocValuesReader = (CompositeIndexReader) reader.getDocValuesReader(); + StarTreeValues values = (StarTreeValues) starTreeDocValuesReader.getCompositeIndexValues(starTree); + final AtomicReference aggrVal = new AtomicReference<>(null); + + return values; + } } diff --git a/server/src/main/java/org/opensearch/search/aggregations/metrics/SumAggregator.java b/server/src/main/java/org/opensearch/search/aggregations/metrics/SumAggregator.java index 4b8e882cd69bc..3415a10679b3b 100644 --- a/server/src/main/java/org/opensearch/search/aggregations/metrics/SumAggregator.java +++ b/server/src/main/java/org/opensearch/search/aggregations/metrics/SumAggregator.java @@ -32,10 +32,14 @@ package org.opensearch.search.aggregations.metrics; import org.apache.lucene.index.LeafReaderContext; +import org.apache.lucene.index.SortedNumericDocValues; import org.apache.lucene.search.ScoreMode; import org.opensearch.common.lease.Releasables; import org.opensearch.common.util.BigArrays; import org.opensearch.common.util.DoubleArray; +import org.opensearch.index.codec.composite.CompositeIndexFieldInfo; +import org.opensearch.index.compositeindex.datacube.startree.index.StarTreeValues; +import org.opensearch.index.compositeindex.datacube.startree.utils.StarTreeUtils; import org.opensearch.index.fielddata.SortedNumericDoubleValues; import org.opensearch.search.DocValueFormat; import org.opensearch.search.aggregations.Aggregator; @@ -45,6 +49,8 @@ import org.opensearch.search.aggregations.support.ValuesSource; import org.opensearch.search.aggregations.support.ValuesSourceConfig; import org.opensearch.search.internal.SearchContext; +import org.opensearch.search.startree.OriginalOrStarTreeQuery; +import org.opensearch.search.startree.StarTreeQuery; import java.io.IOException; import java.util.Map; @@ -56,13 +62,13 @@ */ public class SumAggregator extends NumericMetricsAggregator.SingleValue { - private final ValuesSource.Numeric valuesSource; - private final DocValueFormat format; + protected final ValuesSource.Numeric valuesSource; + protected final DocValueFormat format; - private DoubleArray sums; - private DoubleArray compensations; + protected DoubleArray sums; + protected DoubleArray compensations; - SumAggregator( + public SumAggregator( String name, ValuesSourceConfig valuesSourceConfig, SearchContext context, @@ -86,6 +92,14 @@ public ScoreMode scoreMode() { @Override public LeafBucketCollector getLeafCollector(LeafReaderContext ctx, final LeafBucketCollector sub) throws IOException { + if (context.query() instanceof OriginalOrStarTreeQuery && ((OriginalOrStarTreeQuery) context.query()).isStarTreeUsed()) { + StarTreeQuery starTreeQuery = ((OriginalOrStarTreeQuery) context.query()).getStarTreeQuery(); + return getStarTreeLeafCollector(ctx, sub, starTreeQuery.getStarTree()); + } + return getDefaultLeafCollector(ctx, sub); + } + + private LeafBucketCollector getDefaultLeafCollector(LeafReaderContext ctx, LeafBucketCollector sub) throws IOException { if (valuesSource == null) { return LeafBucketCollector.NO_OP_COLLECTOR; } @@ -118,6 +132,41 @@ public void collect(int doc, long bucket) throws IOException { }; } + private LeafBucketCollector getStarTreeLeafCollector(LeafReaderContext ctx, LeafBucketCollector sub, CompositeIndexFieldInfo starTree) + throws IOException { + final BigArrays bigArrays = context.bigArrays(); + final CompensatedSum kahanSummation = new CompensatedSum(0, 0); + + StarTreeValues starTreeValues = getStarTreeValues(ctx, starTree); + String fieldName = ((ValuesSource.Numeric.FieldData) valuesSource).getIndexFieldName(); + String metricName = StarTreeUtils.fullyQualifiedFieldNameForStarTreeMetricsDocValues(starTree.getField(), fieldName, "sum"); + + SortedNumericDocValues values = (SortedNumericDocValues) starTreeValues.getMetricDocValuesIteratorMap().get(metricName); + + return new LeafBucketCollectorBase(sub, values) { + @Override + public void collect(int doc, long bucket) throws IOException { + sums = bigArrays.grow(sums, bucket + 1); + compensations = bigArrays.grow(compensations, bucket + 1); + + if (values.advanceExact(doc)) { + final int valuesCount = values.docValueCount(); + double sum = sums.get(bucket); + double compensation = compensations.get(bucket); + kahanSummation.reset(sum, compensation); + + for (int i = 0; i < valuesCount; i++) { + double value = Double.longBitsToDouble(values.nextValue()); + kahanSummation.add(value); + } + + compensations.set(bucket, kahanSummation.delta()); + sums.set(bucket, kahanSummation.value()); + } + } + }; + } + @Override public double metric(long owningBucketOrd) { if (valuesSource == null || owningBucketOrd >= sums.size()) { diff --git a/server/src/main/java/org/opensearch/search/aggregations/metrics/SumAggregatorFactory.java b/server/src/main/java/org/opensearch/search/aggregations/metrics/SumAggregatorFactory.java index ef9b93920ba18..e0cd44f2672a8 100644 --- a/server/src/main/java/org/opensearch/search/aggregations/metrics/SumAggregatorFactory.java +++ b/server/src/main/java/org/opensearch/search/aggregations/metrics/SumAggregatorFactory.java @@ -52,7 +52,7 @@ * * @opensearch.internal */ -class SumAggregatorFactory extends ValuesSourceAggregatorFactory { +public class SumAggregatorFactory extends ValuesSourceAggregatorFactory { SumAggregatorFactory( String name, diff --git a/server/src/main/java/org/opensearch/search/aggregations/metrics/ValueCountAggregationBuilder.java b/server/src/main/java/org/opensearch/search/aggregations/metrics/ValueCountAggregationBuilder.java index 0258a2c87cb20..4b9e96332c1b8 100644 --- a/server/src/main/java/org/opensearch/search/aggregations/metrics/ValueCountAggregationBuilder.java +++ b/server/src/main/java/org/opensearch/search/aggregations/metrics/ValueCountAggregationBuilder.java @@ -57,7 +57,7 @@ * @opensearch.internal */ public class ValueCountAggregationBuilder extends ValuesSourceAggregationBuilder.LeafOnly { - public static final String NAME = "value_count"; + public static final String NAME = "count"; public static final ValuesSourceRegistry.RegistryKey REGISTRY_KEY = new ValuesSourceRegistry.RegistryKey<>( NAME, MetricAggregatorSupplier.class diff --git a/server/src/main/java/org/opensearch/search/aggregations/support/ValuesSource.java b/server/src/main/java/org/opensearch/search/aggregations/support/ValuesSource.java index 1f4dd429e094e..5732d545cb2d2 100644 --- a/server/src/main/java/org/opensearch/search/aggregations/support/ValuesSource.java +++ b/server/src/main/java/org/opensearch/search/aggregations/support/ValuesSource.java @@ -625,6 +625,10 @@ public SortedNumericDocValues longValues(LeafReaderContext context) { public SortedNumericDoubleValues doubleValues(LeafReaderContext context) { return indexFieldData.load(context).getDoubleValues(); } + + public String getIndexFieldName() { + return indexFieldData.getFieldName(); + } } /** diff --git a/server/src/main/java/org/opensearch/search/aggregations/support/ValuesSourceAggregatorFactory.java b/server/src/main/java/org/opensearch/search/aggregations/support/ValuesSourceAggregatorFactory.java index 69a4a5d8b6703..b19e466b081f9 100644 --- a/server/src/main/java/org/opensearch/search/aggregations/support/ValuesSourceAggregatorFactory.java +++ b/server/src/main/java/org/opensearch/search/aggregations/support/ValuesSourceAggregatorFactory.java @@ -102,4 +102,16 @@ protected abstract Aggregator doCreateInternal( public String getStatsSubtype() { return config.valueSourceType().typeName(); } + + public String getField() { + return config.fieldContext().field(); + } + + public String getAggregationName() { + return name; + } + + public ValuesSourceConfig getConfig() { + return config; + } } diff --git a/server/src/main/java/org/opensearch/search/startree/OriginalOrStarTreeQuery.java b/server/src/main/java/org/opensearch/search/startree/OriginalOrStarTreeQuery.java new file mode 100644 index 0000000000000..bc8ef51bfb537 --- /dev/null +++ b/server/src/main/java/org/opensearch/search/startree/OriginalOrStarTreeQuery.java @@ -0,0 +1,81 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.search.startree; + +import org.apache.lucene.search.IndexSearcher; +import org.apache.lucene.search.Query; +import org.apache.lucene.search.QueryVisitor; +import org.apache.lucene.search.ScoreMode; +import org.apache.lucene.search.Weight; +import org.apache.lucene.util.Accountable; + +import java.io.IOException; + +/** + * Preserves star-tree queries which can be used along with original query + * Decides which star-tree query to use (or not) based on cost factors + */ +public class OriginalOrStarTreeQuery extends Query implements Accountable { + + private final StarTreeQuery starTreeQuery; + private final Query originalQuery; + private boolean starTreeQueryUsed; + + public OriginalOrStarTreeQuery(StarTreeQuery starTreeQuery, Query originalQuery) { + this.starTreeQuery = starTreeQuery; + this.originalQuery = originalQuery; + this.starTreeQueryUsed = false; + } + + @Override + public String toString(String s) { + return ""; + } + + @Override + public void visit(QueryVisitor queryVisitor) { + + } + + @Override + public boolean equals(Object o) { + return false; + } + + @Override + public int hashCode() { + return 0; + } + + @Override + public long ramBytesUsed() { + return 0; + } + + public boolean isStarTreeUsed() { + return starTreeQueryUsed; + } + + public Weight createWeight(IndexSearcher searcher, ScoreMode scoreMode, float boost) throws IOException { + if (searcher.getIndexReader().hasDeletions() == false) { + this.starTreeQueryUsed = true; + return this.starTreeQuery.createWeight(searcher, scoreMode, boost); + } else { + return this.originalQuery.createWeight(searcher, scoreMode, boost); + } + } + + public Query getOriginalQuery() { + return originalQuery; + } + + public StarTreeQuery getStarTreeQuery() { + return starTreeQuery; + } +} diff --git a/server/src/main/java/org/opensearch/search/startree/StarTreeFilter.java b/server/src/main/java/org/opensearch/search/startree/StarTreeFilter.java new file mode 100644 index 0000000000000..484e0c96d6ff5 --- /dev/null +++ b/server/src/main/java/org/opensearch/search/startree/StarTreeFilter.java @@ -0,0 +1,286 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.search.startree; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.apache.lucene.index.SortedNumericDocValues; +import org.apache.lucene.search.DocIdSetIterator; +import org.apache.lucene.util.DocIdSetBuilder; +import org.opensearch.index.compositeindex.datacube.startree.index.StarTreeValues; +import org.opensearch.index.compositeindex.datacube.startree.node.StarTreeNode; + +import java.io.IOException; +import java.util.ArrayDeque; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Queue; +import java.util.Set; +import java.util.function.Predicate; + +import static org.apache.lucene.search.DocIdSetIterator.NO_MORE_DOCS; + +/** + * Filter operator for star tree data structure. + */ +public class StarTreeFilter { + private static final Logger logger = LogManager.getLogger(StarTreeFilter.class); + + /** + * Helper class to wrap the result from traversing the star tree. + * */ + static class StarTreeResult { + final DocIdSetBuilder _matchedDocIds; + final Set _remainingPredicateColumns; + final int numOfMatchedDocs; + final int maxMatchedDoc; + + StarTreeResult(DocIdSetBuilder matchedDocIds, Set remainingPredicateColumns, int numOfMatchedDocs, int maxMatchedDoc) { + _matchedDocIds = matchedDocIds; + _remainingPredicateColumns = remainingPredicateColumns; + this.numOfMatchedDocs = numOfMatchedDocs; + this.maxMatchedDoc = maxMatchedDoc; + } + } + + private final StarTreeNode starTreeRoot; + + Map>> _predicateEvaluators; + + DocIdSetBuilder docsWithField; + + DocIdSetBuilder.BulkAdder adder; + Map dimValueMap; + + public StarTreeFilter(StarTreeValues starTreeAggrStructure, Map>> predicateEvaluators) { + // This filter operator does not support AND/OR/NOT operations. + starTreeRoot = starTreeAggrStructure.getRoot(); + dimValueMap = starTreeAggrStructure.getDimensionDocValuesIteratorMap(); + _predicateEvaluators = predicateEvaluators != null ? predicateEvaluators : Collections.emptyMap(); + // _groupByColumns = groupByColumns != null ? groupByColumns : Collections.emptyList(); + + // TODO : this should be the maximum number of doc values + docsWithField = new DocIdSetBuilder(Integer.MAX_VALUE); + } + + /** + *
    + *
  • First go over the star tree and try to match as many dimensions as possible + *
  • For the remaining columns, use doc values indexes to match them + *
+ */ + public DocIdSetIterator getStarTreeResult() throws IOException { + StarTreeResult starTreeResult = traverseStarTree(); + List andIterators = new ArrayList<>(); + andIterators.add(starTreeResult._matchedDocIds.build().iterator()); + DocIdSetIterator docIdSetIterator = andIterators.get(0); + // No matches, return + if (starTreeResult.maxMatchedDoc == -1) { + return docIdSetIterator; + } + int docCount = 0; + for (String remainingPredicateColumn : starTreeResult._remainingPredicateColumns) { + // TODO : set to max value of doc values + logger.debug("remainingPredicateColumn : {}, maxMatchedDoc : {} ", remainingPredicateColumn, starTreeResult.maxMatchedDoc); + DocIdSetBuilder builder = new DocIdSetBuilder(starTreeResult.maxMatchedDoc + 1); + List> compositePredicateEvaluators = _predicateEvaluators.get(remainingPredicateColumn); + SortedNumericDocValues ndv = (SortedNumericDocValues) this.dimValueMap.get(remainingPredicateColumn); + List docIds = new ArrayList<>(); + while (docIdSetIterator.nextDoc() != NO_MORE_DOCS) { + docCount++; + int docID = docIdSetIterator.docID(); + if (ndv.advanceExact(docID)) { + final int valuesCount = ndv.docValueCount(); + long value = ndv.nextValue(); + for (Predicate compositePredicateEvaluator : compositePredicateEvaluators) { + // TODO : this might be expensive as its done against all doc values docs + if (compositePredicateEvaluator.test(value)) { + docIds.add(docID); + for (int i = 0; i < valuesCount - 1; i++) { + while (docIdSetIterator.nextDoc() != NO_MORE_DOCS) { + docIds.add(docIdSetIterator.docID()); + } + } + break; + } + } + } + } + DocIdSetBuilder.BulkAdder adder = builder.grow(docIds.size()); + for (int docID : docIds) { + adder.add(docID); + } + docIdSetIterator = builder.build().iterator(); + } + return docIdSetIterator; + } + + /** + * Helper method to traverse the star tree, get matching documents and keep track of all the + * predicate dimensions that are not matched. + */ + private StarTreeResult traverseStarTree() throws IOException { + Set globalRemainingPredicateColumns = null; + + StarTreeNode starTree = starTreeRoot; + + List dimensionNames = new ArrayList<>(dimValueMap.keySet()); + + // Track whether we have found a leaf node added to the queue. If we have found a leaf node, and + // traversed to the + // level of the leave node, we can set globalRemainingPredicateColumns if not already set + // because we know the leaf + // node won't split further on other predicate columns. + boolean foundLeafNode = starTree.isLeaf(); + + // Use BFS to traverse the star tree + Queue queue = new ArrayDeque<>(); + queue.add(starTree); + int currentDimensionId = -1; + Set remainingPredicateColumns = new HashSet<>(_predicateEvaluators.keySet()); + if (foundLeafNode) { + globalRemainingPredicateColumns = new HashSet<>(remainingPredicateColumns); + } + + int matchedDocsCountInStarTree = 0; + int maxDocNum = -1; + + StarTreeNode starTreeNode; + List docIds = new ArrayList<>(); + while ((starTreeNode = queue.poll()) != null) { + int dimensionId = starTreeNode.getDimensionId(); + if (dimensionId > currentDimensionId) { + // Previous level finished + String dimension = dimensionNames.get(dimensionId); + remainingPredicateColumns.remove(dimension); + if (foundLeafNode && globalRemainingPredicateColumns == null) { + globalRemainingPredicateColumns = new HashSet<>(remainingPredicateColumns); + } + currentDimensionId = dimensionId; + } + + // If all predicate columns columns are matched, we can use aggregated document + if (remainingPredicateColumns.isEmpty()) { + int docId = starTreeNode.getAggregatedDocId(); + docIds.add(docId); + matchedDocsCountInStarTree++; + maxDocNum = Math.max(docId, maxDocNum); + continue; + } + + // For leaf node, because we haven't exhausted all predicate columns and group-by columns, + // we cannot use the aggregated document. + // Add the range of documents for this node to the bitmap, and keep track of the + // remaining predicate columns for this node + if (starTreeNode.isLeaf()) { + for (long i = starTreeNode.getStartDocId(); i < starTreeNode.getEndDocId(); i++) { + docIds.add((int) i); + matchedDocsCountInStarTree++; + maxDocNum = Math.max((int) i, maxDocNum); + } + continue; + } + + // For non-leaf node, proceed to next level + String childDimension = dimensionNames.get(dimensionId + 1); + + // Only read star-node when the dimension is not in the global remaining predicate columns + // because we cannot use star-node in such cases + StarTreeNode starNode = null; + if ((globalRemainingPredicateColumns == null || !globalRemainingPredicateColumns.contains(childDimension))) { + starNode = starTreeNode.getChildForDimensionValue(StarTreeNode.ALL, true); + } + + if (remainingPredicateColumns.contains(childDimension)) { + // Have predicates on the next level, add matching nodes to the queue + + // Calculate the matching dictionary ids for the child dimension + int numChildren = starTreeNode.getNumChildren(); + + // If number of matching dictionary ids is large, use scan instead of binary search + + Iterator childrenIterator = starTreeNode.getChildrenIterator(); + + // When the star-node exists, and the number of matching doc ids is more than or equal to + // the number of non-star child nodes, check if all the child nodes match the predicate, + // and use the star-node if so + if (starNode != null) { + List matchingChildNodes = new ArrayList<>(); + boolean findLeafChildNode = false; + while (childrenIterator.hasNext()) { + StarTreeNode childNode = childrenIterator.next(); + List> predicates = _predicateEvaluators.get(childDimension); + for (Predicate predicate : predicates) { + long val = childNode.getDimensionValue(); + if (predicate.test(val)) { + matchingChildNodes.add(childNode); + findLeafChildNode |= childNode.isLeaf(); + break; + } + } + } + if (matchingChildNodes.size() == numChildren - 1) { + // All the child nodes (except for the star-node) match the predicate, use the star-node + queue.add(starNode); + foundLeafNode |= starNode.isLeaf(); + } else { + // Some child nodes do not match the predicate, use the matching child nodes + queue.addAll(matchingChildNodes); + foundLeafNode |= findLeafChildNode; + } + } else { + // Cannot use the star-node, use the matching child nodes + while (childrenIterator.hasNext()) { + StarTreeNode childNode = childrenIterator.next(); + List> predicates = _predicateEvaluators.get(childDimension); + for (Predicate predicate : predicates) { + if (predicate.test(childNode.getDimensionValue())) { + queue.add(childNode); + foundLeafNode |= childNode.isLeaf(); + break; + } + } + } + } + } else { + // No predicate on the next level + if (starNode != null) { + // Star-node exists, use it + queue.add(starNode); + foundLeafNode |= starNode.isLeaf(); + } else { + // Star-node does not exist or cannot be used, add all non-star nodes to the queue + Iterator childrenIterator = starTreeNode.getChildrenIterator(); + while (childrenIterator.hasNext()) { + StarTreeNode childNode = childrenIterator.next(); + if (childNode.getDimensionValue() != StarTreeNode.ALL) { + queue.add(childNode); + foundLeafNode |= childNode.isLeaf(); + } + } + } + } + } + + adder = docsWithField.grow(docIds.size()); + for (int id : docIds) { + adder.add(id); + } + return new StarTreeResult( + docsWithField, + globalRemainingPredicateColumns != null ? globalRemainingPredicateColumns : Collections.emptySet(), + matchedDocsCountInStarTree, + maxDocNum + ); + } +} diff --git a/server/src/main/java/org/opensearch/search/startree/StarTreeQuery.java b/server/src/main/java/org/opensearch/search/startree/StarTreeQuery.java new file mode 100644 index 0000000000000..3185778e7d754 --- /dev/null +++ b/server/src/main/java/org/opensearch/search/startree/StarTreeQuery.java @@ -0,0 +1,112 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.search.startree; + +import org.apache.lucene.index.LeafReaderContext; +import org.apache.lucene.index.SegmentReader; +import org.apache.lucene.search.ConstantScoreScorer; +import org.apache.lucene.search.ConstantScoreWeight; +import org.apache.lucene.search.DocIdSetIterator; +import org.apache.lucene.search.IndexSearcher; +import org.apache.lucene.search.Query; +import org.apache.lucene.search.QueryVisitor; +import org.apache.lucene.search.ScoreMode; +import org.apache.lucene.search.Scorer; +import org.apache.lucene.search.Weight; +import org.apache.lucene.util.Accountable; +import org.opensearch.common.lucene.Lucene; +import org.opensearch.index.codec.composite.CompositeIndexFieldInfo; +import org.opensearch.index.codec.composite.CompositeIndexReader; +import org.opensearch.index.compositeindex.datacube.startree.index.StarTreeValues; + +import java.io.IOException; +import java.util.List; +import java.util.Map; +import java.util.function.Predicate; + +/** Query class for querying star tree data structure */ +public class StarTreeQuery extends Query implements Accountable { + + /** + * Star tree field info + * This is used to get the star tree data structure + */ + CompositeIndexFieldInfo starTree; + + /** + * Map of field name to a list of predicates to be applied on that field + * This is used to filter the data based on the predicates + */ + Map>> compositePredicateMap; + + public StarTreeQuery(CompositeIndexFieldInfo starTree, Map>> compositePredicateMap) { + this.starTree = starTree; + this.compositePredicateMap = compositePredicateMap; + } + + @Override + public String toString(String field) { + return null; + } + + @Override + public void visit(QueryVisitor visitor) { + visitor.visitLeaf(this); + } + + @Override + public boolean equals(Object obj) { + return sameClassAs(obj); + } + + @Override + public int hashCode() { + return classHash(); + } + + @Override + public long ramBytesUsed() { + return 0; + } + + @Override + public Weight createWeight(IndexSearcher searcher, ScoreMode scoreMode, float boost) throws IOException { + return new ConstantScoreWeight(this, boost) { + @Override + public Scorer scorer(LeafReaderContext context) throws IOException { + SegmentReader reader = Lucene.segmentReader(context.reader()); + + // We get the 'CompositeIndexReader' instance so that we can get StarTreeValues + if (!(reader.getDocValuesReader() instanceof CompositeIndexReader)) return null; + + CompositeIndexReader starTreeDocValuesReader = (CompositeIndexReader) reader.getDocValuesReader(); + List compositeIndexFields = starTreeDocValuesReader.getCompositeIndexFields(); + StarTreeValues starTreeValues = null; + if (compositeIndexFields != null && !compositeIndexFields.isEmpty()) { + starTreeValues = (StarTreeValues) starTreeDocValuesReader.getCompositeIndexValues(starTree); + } else { + return null; + } + + StarTreeFilter filter = new StarTreeFilter(starTreeValues, compositePredicateMap); + DocIdSetIterator result = filter.getStarTreeResult(); + return new ConstantScoreScorer(this, score(), scoreMode, result); + } + + @Override + public boolean isCacheable(LeafReaderContext ctx) { + return false; + } + }; + } + + public CompositeIndexFieldInfo getStarTree() { + return starTree; + } +} diff --git a/server/src/main/java/org/opensearch/search/startree/package-info.java b/server/src/main/java/org/opensearch/search/startree/package-info.java new file mode 100644 index 0000000000000..601a588e54e69 --- /dev/null +++ b/server/src/main/java/org/opensearch/search/startree/package-info.java @@ -0,0 +1,10 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +/** Star Tree query classes */ +package org.opensearch.search.startree; diff --git a/server/src/test/java/org/opensearch/index/codec/composite99/datacube/startree/StarTreeDocValuesFormatTests.java b/server/src/test/java/org/opensearch/index/codec/composite99/datacube/startree/StarTreeDocValuesFormatTests.java index 15f731addd38d..3fd1182a09ddf 100644 --- a/server/src/test/java/org/opensearch/index/codec/composite99/datacube/startree/StarTreeDocValuesFormatTests.java +++ b/server/src/test/java/org/opensearch/index/codec/composite99/datacube/startree/StarTreeDocValuesFormatTests.java @@ -221,7 +221,7 @@ private XContentBuilder getExpandedMapping() throws IOException { b.field("name", "field"); b.startArray("stats"); b.value("sum"); - b.value("value_count"); + b.value("count"); b.endArray(); b.endObject(); b.endArray(); diff --git a/server/src/test/java/org/opensearch/index/compositeindex/datacube/startree/aggregators/MetricAggregatorInfoTests.java b/server/src/test/java/org/opensearch/index/compositeindex/datacube/startree/aggregators/MetricAggregatorInfoTests.java index 62671ffa03b82..73e6aeb44cfd7 100644 --- a/server/src/test/java/org/opensearch/index/compositeindex/datacube/startree/aggregators/MetricAggregatorInfoTests.java +++ b/server/src/test/java/org/opensearch/index/compositeindex/datacube/startree/aggregators/MetricAggregatorInfoTests.java @@ -27,12 +27,12 @@ public void testConstructor() { public void testCountStarConstructor() { MetricAggregatorInfo pair = new MetricAggregatorInfo( - MetricStat.VALUE_COUNT, + MetricStat.COUNT, "anything", "star_tree_field", IndexNumericFieldData.NumericType.DOUBLE ); - assertEquals(MetricStat.VALUE_COUNT, pair.getMetricStat()); + assertEquals(MetricStat.COUNT, pair.getMetricStat()); assertEquals("anything", pair.getField()); } @@ -62,7 +62,7 @@ public void testEquals() { assertEquals(pair1, pair2); assertNotEquals( pair1, - new MetricAggregatorInfo(MetricStat.VALUE_COUNT, "column1", "star_tree_field", IndexNumericFieldData.NumericType.DOUBLE) + new MetricAggregatorInfo(MetricStat.COUNT, "column1", "star_tree_field", IndexNumericFieldData.NumericType.DOUBLE) ); assertNotEquals( pair1, @@ -100,7 +100,7 @@ public void testCompareTo() { IndexNumericFieldData.NumericType.DOUBLE ); MetricAggregatorInfo pair3 = new MetricAggregatorInfo( - MetricStat.VALUE_COUNT, + MetricStat.COUNT, "column1", "star_tree_field", IndexNumericFieldData.NumericType.DOUBLE diff --git a/server/src/test/java/org/opensearch/index/compositeindex/datacube/startree/aggregators/ValueAggregatorFactoryTests.java b/server/src/test/java/org/opensearch/index/compositeindex/datacube/startree/aggregators/ValueAggregatorFactoryTests.java index 6572d75d5b738..5e0bedf5e06a5 100644 --- a/server/src/test/java/org/opensearch/index/compositeindex/datacube/startree/aggregators/ValueAggregatorFactoryTests.java +++ b/server/src/test/java/org/opensearch/index/compositeindex/datacube/startree/aggregators/ValueAggregatorFactoryTests.java @@ -33,7 +33,7 @@ public void testGetValueAggregatorForMaxType() { } public void testGetValueAggregatorForCountType() { - ValueAggregator aggregator = ValueAggregatorFactory.getValueAggregator(MetricStat.VALUE_COUNT, StarTreeNumericType.LONG); + ValueAggregator aggregator = ValueAggregatorFactory.getValueAggregator(MetricStat.COUNT, StarTreeNumericType.LONG); assertNotNull(aggregator); assertEquals(CountValueAggregator.class, aggregator.getClass()); } diff --git a/server/src/test/java/org/opensearch/index/compositeindex/datacube/startree/builder/AbstractStarTreeBuilderTests.java b/server/src/test/java/org/opensearch/index/compositeindex/datacube/startree/builder/AbstractStarTreeBuilderTests.java index 226a20750bdb3..246f2046a6d0b 100644 --- a/server/src/test/java/org/opensearch/index/compositeindex/datacube/startree/builder/AbstractStarTreeBuilderTests.java +++ b/server/src/test/java/org/opensearch/index/compositeindex/datacube/startree/builder/AbstractStarTreeBuilderTests.java @@ -125,7 +125,7 @@ public void setup() throws IOException { metrics = List.of( new Metric("field2", List.of(MetricStat.SUM)), new Metric("field4", List.of(MetricStat.SUM)), - new Metric("field6", List.of(MetricStat.VALUE_COUNT)), + new Metric("field6", List.of(MetricStat.COUNT)), new Metric("field9", List.of(MetricStat.MIN)), new Metric("field10", List.of(MetricStat.MAX)), new Metric("_doc_count", List.of(MetricStat.DOC_COUNT)) @@ -2262,7 +2262,7 @@ private StarTreeField getStarTreeFieldWithMultipleMetrics() { Dimension d1 = new NumericDimension("field1"); Dimension d2 = new NumericDimension("field3"); Metric m1 = new Metric("field2", List.of(MetricStat.SUM)); - Metric m2 = new Metric("field2", List.of(MetricStat.VALUE_COUNT)); + Metric m2 = new Metric("field2", List.of(MetricStat.COUNT)); List dims = List.of(d1, d2); List metrics = List.of(m1, m2); StarTreeFieldConfiguration c = new StarTreeFieldConfiguration(10, new HashSet<>(), getBuildMode()); @@ -2637,7 +2637,7 @@ public void testMergeFlowNumSegmentsDocs() throws IOException { List metricsList2 = List.of(5L, 6L, 7L, 8L, 9L); List metricsWithField2 = List.of(0, 1, 2, 3, 4); - StarTreeField sf = getStarTreeField(MetricStat.VALUE_COUNT); + StarTreeField sf = getStarTreeField(MetricStat.COUNT); StarTreeValues starTreeValues = getStarTreeValues( getSortedNumericMock(dimList, docsWithField), getSortedNumericMock(dimList2, docsWithField2), From 7fb7314834da3b6a285b816776570173352b3168 Mon Sep 17 00:00:00 2001 From: Bharathwaj G Date: Fri, 23 Aug 2024 12:19:33 +0530 Subject: [PATCH 2/3] resolving conflicts Signed-off-by: Bharathwaj G --- .../index/mapper/StarTreeMapperIT.java | 2 +- .../datacube/startree/node/StarTree.java | 18 ++++++++++-------- .../startree/StarTreeDocValuesFormatTests.java | 2 +- .../aggregators/MetricAggregatorInfoTests.java | 8 ++++---- .../ValueAggregatorFactoryTests.java | 2 +- .../builder/AbstractStarTreeBuilderTests.java | 6 +++--- 6 files changed, 20 insertions(+), 18 deletions(-) diff --git a/server/src/internalClusterTest/java/org/opensearch/index/mapper/StarTreeMapperIT.java b/server/src/internalClusterTest/java/org/opensearch/index/mapper/StarTreeMapperIT.java index 6acc8dda61b5c..20a07477828cd 100644 --- a/server/src/internalClusterTest/java/org/opensearch/index/mapper/StarTreeMapperIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/index/mapper/StarTreeMapperIT.java @@ -499,7 +499,7 @@ public void testUpdateIndexWhenMappingIsSame() { assertEquals("numeric_dv", starTreeFieldType.getMetrics().get(0).getField()); // Assert default metrics - List expectedMetrics = Arrays.asList(MetricStat.COUNT, MetricStat.SUM, MetricStat.AVG); + List expectedMetrics = Arrays.asList(MetricStat.VALUE_COUNT, MetricStat.SUM, MetricStat.AVG); assertEquals(expectedMetrics, starTreeFieldType.getMetrics().get(0).getMetrics()); assertEquals(10000, starTreeFieldType.getStarTreeConfig().maxLeafDocs()); assertEquals( diff --git a/server/src/main/java/org/opensearch/index/compositeindex/datacube/startree/node/StarTree.java b/server/src/main/java/org/opensearch/index/compositeindex/datacube/startree/node/StarTree.java index 02cb02d67a0fb..40e82ed0d8555 100644 --- a/server/src/main/java/org/opensearch/index/compositeindex/datacube/startree/node/StarTree.java +++ b/server/src/main/java/org/opensearch/index/compositeindex/datacube/startree/node/StarTree.java @@ -13,6 +13,8 @@ import org.apache.lucene.store.RandomAccessInput; import org.opensearch.index.compositeindex.datacube.startree.fileformats.data.StarTreeDataWriter; import org.opensearch.index.compositeindex.datacube.startree.fileformats.meta.StarTreeMetadata; +import static org.opensearch.index.compositeindex.CompositeIndexConstants.COMPOSITE_FIELD_MARKER; +import static org.opensearch.index.compositeindex.datacube.startree.fileformats.StarTreeWriter.VERSION_CURRENT; import java.io.IOException; @@ -28,15 +30,15 @@ public class StarTree { public StarTree(IndexInput data, StarTreeMetadata starTreeMetadata) throws IOException { long magicMarker = data.readLong(); - // if (COMPOSITE_FIELD_MARKER != magicMarker) { - // logger.error("Invalid magic marker"); - // throw new IOException("Invalid magic marker"); - // } + if (COMPOSITE_FIELD_MARKER != magicMarker) { + logger.error("Invalid magic marker"); + throw new IOException("Invalid magic marker"); + } int version = data.readInt(); - // if (VERSION_CURRENT != version) { - // logger.error("Invalid star tree version"); - // throw new IOException("Invalid version"); - // } + if (VERSION_CURRENT != version) { + logger.error("Invalid star tree version"); + throw new IOException("Invalid version"); + } numNodes = data.readInt(); // num nodes RandomAccessInput in = data.randomAccessSlice( diff --git a/server/src/test/java/org/opensearch/index/codec/composite99/datacube/startree/StarTreeDocValuesFormatTests.java b/server/src/test/java/org/opensearch/index/codec/composite99/datacube/startree/StarTreeDocValuesFormatTests.java index 3fd1182a09ddf..15f731addd38d 100644 --- a/server/src/test/java/org/opensearch/index/codec/composite99/datacube/startree/StarTreeDocValuesFormatTests.java +++ b/server/src/test/java/org/opensearch/index/codec/composite99/datacube/startree/StarTreeDocValuesFormatTests.java @@ -221,7 +221,7 @@ private XContentBuilder getExpandedMapping() throws IOException { b.field("name", "field"); b.startArray("stats"); b.value("sum"); - b.value("count"); + b.value("value_count"); b.endArray(); b.endObject(); b.endArray(); diff --git a/server/src/test/java/org/opensearch/index/compositeindex/datacube/startree/aggregators/MetricAggregatorInfoTests.java b/server/src/test/java/org/opensearch/index/compositeindex/datacube/startree/aggregators/MetricAggregatorInfoTests.java index 73e6aeb44cfd7..62671ffa03b82 100644 --- a/server/src/test/java/org/opensearch/index/compositeindex/datacube/startree/aggregators/MetricAggregatorInfoTests.java +++ b/server/src/test/java/org/opensearch/index/compositeindex/datacube/startree/aggregators/MetricAggregatorInfoTests.java @@ -27,12 +27,12 @@ public void testConstructor() { public void testCountStarConstructor() { MetricAggregatorInfo pair = new MetricAggregatorInfo( - MetricStat.COUNT, + MetricStat.VALUE_COUNT, "anything", "star_tree_field", IndexNumericFieldData.NumericType.DOUBLE ); - assertEquals(MetricStat.COUNT, pair.getMetricStat()); + assertEquals(MetricStat.VALUE_COUNT, pair.getMetricStat()); assertEquals("anything", pair.getField()); } @@ -62,7 +62,7 @@ public void testEquals() { assertEquals(pair1, pair2); assertNotEquals( pair1, - new MetricAggregatorInfo(MetricStat.COUNT, "column1", "star_tree_field", IndexNumericFieldData.NumericType.DOUBLE) + new MetricAggregatorInfo(MetricStat.VALUE_COUNT, "column1", "star_tree_field", IndexNumericFieldData.NumericType.DOUBLE) ); assertNotEquals( pair1, @@ -100,7 +100,7 @@ public void testCompareTo() { IndexNumericFieldData.NumericType.DOUBLE ); MetricAggregatorInfo pair3 = new MetricAggregatorInfo( - MetricStat.COUNT, + MetricStat.VALUE_COUNT, "column1", "star_tree_field", IndexNumericFieldData.NumericType.DOUBLE diff --git a/server/src/test/java/org/opensearch/index/compositeindex/datacube/startree/aggregators/ValueAggregatorFactoryTests.java b/server/src/test/java/org/opensearch/index/compositeindex/datacube/startree/aggregators/ValueAggregatorFactoryTests.java index 5e0bedf5e06a5..6572d75d5b738 100644 --- a/server/src/test/java/org/opensearch/index/compositeindex/datacube/startree/aggregators/ValueAggregatorFactoryTests.java +++ b/server/src/test/java/org/opensearch/index/compositeindex/datacube/startree/aggregators/ValueAggregatorFactoryTests.java @@ -33,7 +33,7 @@ public void testGetValueAggregatorForMaxType() { } public void testGetValueAggregatorForCountType() { - ValueAggregator aggregator = ValueAggregatorFactory.getValueAggregator(MetricStat.COUNT, StarTreeNumericType.LONG); + ValueAggregator aggregator = ValueAggregatorFactory.getValueAggregator(MetricStat.VALUE_COUNT, StarTreeNumericType.LONG); assertNotNull(aggregator); assertEquals(CountValueAggregator.class, aggregator.getClass()); } diff --git a/server/src/test/java/org/opensearch/index/compositeindex/datacube/startree/builder/AbstractStarTreeBuilderTests.java b/server/src/test/java/org/opensearch/index/compositeindex/datacube/startree/builder/AbstractStarTreeBuilderTests.java index 246f2046a6d0b..226a20750bdb3 100644 --- a/server/src/test/java/org/opensearch/index/compositeindex/datacube/startree/builder/AbstractStarTreeBuilderTests.java +++ b/server/src/test/java/org/opensearch/index/compositeindex/datacube/startree/builder/AbstractStarTreeBuilderTests.java @@ -125,7 +125,7 @@ public void setup() throws IOException { metrics = List.of( new Metric("field2", List.of(MetricStat.SUM)), new Metric("field4", List.of(MetricStat.SUM)), - new Metric("field6", List.of(MetricStat.COUNT)), + new Metric("field6", List.of(MetricStat.VALUE_COUNT)), new Metric("field9", List.of(MetricStat.MIN)), new Metric("field10", List.of(MetricStat.MAX)), new Metric("_doc_count", List.of(MetricStat.DOC_COUNT)) @@ -2262,7 +2262,7 @@ private StarTreeField getStarTreeFieldWithMultipleMetrics() { Dimension d1 = new NumericDimension("field1"); Dimension d2 = new NumericDimension("field3"); Metric m1 = new Metric("field2", List.of(MetricStat.SUM)); - Metric m2 = new Metric("field2", List.of(MetricStat.COUNT)); + Metric m2 = new Metric("field2", List.of(MetricStat.VALUE_COUNT)); List dims = List.of(d1, d2); List metrics = List.of(m1, m2); StarTreeFieldConfiguration c = new StarTreeFieldConfiguration(10, new HashSet<>(), getBuildMode()); @@ -2637,7 +2637,7 @@ public void testMergeFlowNumSegmentsDocs() throws IOException { List metricsList2 = List.of(5L, 6L, 7L, 8L, 9L); List metricsWithField2 = List.of(0, 1, 2, 3, 4); - StarTreeField sf = getStarTreeField(MetricStat.COUNT); + StarTreeField sf = getStarTreeField(MetricStat.VALUE_COUNT); StarTreeValues starTreeValues = getStarTreeValues( getSortedNumericMock(dimList, docsWithField), getSortedNumericMock(dimList2, docsWithField2), From 73b920de371b7d0a14075540cb4580bad4eb45a6 Mon Sep 17 00:00:00 2001 From: Sandesh Kumar Date: Thu, 22 Aug 2024 20:08:57 -0700 Subject: [PATCH 3/3] Adding test cases and max,min aggregator support Signed-off-by: Sandesh Kumar --- .../datacube/startree/node/StarTree.java | 27 +-- .../index/query/QueryShardContext.java | 16 +- .../aggregations/metrics/MaxAggregator.java | 44 +++++ .../metrics/MaxAggregatorFactory.java | 2 +- .../aggregations/metrics/MinAggregator.java | 44 +++++ .../metrics/MinAggregatorFactory.java | 2 +- .../aggregations/metrics/SumAggregator.java | 14 +- .../startree/OriginalOrStarTreeQuery.java | 4 +- .../StarTreeDocValuesFormatTests.java | 20 +- .../startree/MetricAggregatorTests.java | 174 ++++++++++++++++++ .../aggregations/AggregatorTestCase.java | 70 +++++++ 11 files changed, 380 insertions(+), 37 deletions(-) create mode 100644 server/src/test/java/org/opensearch/search/aggregations/startree/MetricAggregatorTests.java diff --git a/server/src/main/java/org/opensearch/index/compositeindex/datacube/startree/node/StarTree.java b/server/src/main/java/org/opensearch/index/compositeindex/datacube/startree/node/StarTree.java index 40e82ed0d8555..bbace51efbd22 100644 --- a/server/src/main/java/org/opensearch/index/compositeindex/datacube/startree/node/StarTree.java +++ b/server/src/main/java/org/opensearch/index/compositeindex/datacube/startree/node/StarTree.java @@ -26,21 +26,22 @@ public class StarTree { private static final Logger logger = LogManager.getLogger(StarTree.class); private final FixedLengthStarTreeNode root; - private final Integer numNodes; + private Integer numNodes; public StarTree(IndexInput data, StarTreeMetadata starTreeMetadata) throws IOException { - long magicMarker = data.readLong(); - if (COMPOSITE_FIELD_MARKER != magicMarker) { - logger.error("Invalid magic marker"); - throw new IOException("Invalid magic marker"); - } - int version = data.readInt(); - if (VERSION_CURRENT != version) { - logger.error("Invalid star tree version"); - throw new IOException("Invalid version"); - } - numNodes = data.readInt(); // num nodes - + if(data.getFilePointer() < StarTreeDataWriter.computeStarTreeDataHeaderByteSize()) { + long magicMarker = data.readLong(); + if (COMPOSITE_FIELD_MARKER != magicMarker) { + logger.error("Invalid magic marker"); + throw new IOException("Invalid magic marker"); + } + int version = data.readInt(); + if (VERSION_CURRENT != version) { + logger.error("Invalid star tree version"); + throw new IOException("Invalid version"); + } + numNodes = data.readInt(); // num nodes + } RandomAccessInput in = data.randomAccessSlice( StarTreeDataWriter.computeStarTreeDataHeaderByteSize(), starTreeMetadata.getDataLength() - StarTreeDataWriter.computeStarTreeDataHeaderByteSize() diff --git a/server/src/main/java/org/opensearch/index/query/QueryShardContext.java b/server/src/main/java/org/opensearch/index/query/QueryShardContext.java index 9e3f9425d352b..2ea40c396f782 100644 --- a/server/src/main/java/org/opensearch/index/query/QueryShardContext.java +++ b/server/src/main/java/org/opensearch/index/query/QueryShardContext.java @@ -79,6 +79,8 @@ import org.opensearch.script.ScriptFactory; import org.opensearch.script.ScriptService; import org.opensearch.search.aggregations.AggregatorFactory; +import org.opensearch.search.aggregations.metrics.MaxAggregatorFactory; +import org.opensearch.search.aggregations.metrics.MinAggregatorFactory; import org.opensearch.search.aggregations.metrics.SumAggregatorFactory; import org.opensearch.search.aggregations.support.AggregationUsageService; import org.opensearch.search.aggregations.support.ValuesSourceRegistry; @@ -585,7 +587,7 @@ private Map>> getStarTreePredicates(QueryBuilder qu } public boolean validateStarTreeMetricSuport(CompositeDataCubeFieldType compositeIndexFieldInfo, AggregatorFactory aggregatorFactory) { - String field = null; + String field; Map> supportedMetrics = compositeIndexFieldInfo.getMetrics() .stream() .collect(Collectors.toMap(Metric::getField, Metric::getMetrics)); @@ -595,14 +597,16 @@ public boolean validateStarTreeMetricSuport(CompositeDataCubeFieldType composite return false; } - // TODO: increment supported aggregation type if (aggregatorFactory instanceof SumAggregatorFactory) { field = ((SumAggregatorFactory) aggregatorFactory).getField(); - if (supportedMetrics.containsKey(field) && supportedMetrics.get(field).contains(MetricStat.SUM)) { - return true; - } + return supportedMetrics.containsKey(field) && supportedMetrics.get(field).contains(MetricStat.SUM); + } else if (aggregatorFactory instanceof MaxAggregatorFactory) { + field = ((MaxAggregatorFactory) aggregatorFactory).getField(); + return supportedMetrics.containsKey(field) && supportedMetrics.get(field).contains(MetricStat.MAX); + } else if (aggregatorFactory instanceof MinAggregatorFactory) { + field = ((MinAggregatorFactory) aggregatorFactory).getField(); + return supportedMetrics.containsKey(field) && supportedMetrics.get(field).contains(MetricStat.MIN); } - return false; } diff --git a/server/src/main/java/org/opensearch/search/aggregations/metrics/MaxAggregator.java b/server/src/main/java/org/opensearch/search/aggregations/metrics/MaxAggregator.java index 8108b8a726856..bb4301c34d089 100644 --- a/server/src/main/java/org/opensearch/search/aggregations/metrics/MaxAggregator.java +++ b/server/src/main/java/org/opensearch/search/aggregations/metrics/MaxAggregator.java @@ -34,12 +34,16 @@ import org.apache.lucene.index.LeafReader; import org.apache.lucene.index.LeafReaderContext; import org.apache.lucene.index.PointValues; +import org.apache.lucene.index.SortedNumericDocValues; import org.apache.lucene.search.CollectionTerminatedException; import org.apache.lucene.search.ScoreMode; import org.apache.lucene.util.Bits; import org.opensearch.common.lease.Releasables; import org.opensearch.common.util.BigArrays; import org.opensearch.common.util.DoubleArray; +import org.opensearch.index.codec.composite.CompositeIndexFieldInfo; +import org.opensearch.index.compositeindex.datacube.startree.index.StarTreeValues; +import org.opensearch.index.compositeindex.datacube.startree.utils.StarTreeUtils; import org.opensearch.index.fielddata.NumericDoubleValues; import org.opensearch.index.fielddata.SortedNumericDoubleValues; import org.opensearch.search.DocValueFormat; @@ -51,6 +55,8 @@ import org.opensearch.search.aggregations.support.ValuesSource; import org.opensearch.search.aggregations.support.ValuesSourceConfig; import org.opensearch.search.internal.SearchContext; +import org.opensearch.search.startree.OriginalOrStarTreeQuery; +import org.opensearch.search.startree.StarTreeQuery; import java.io.IOException; import java.util.Arrays; @@ -120,6 +126,16 @@ public LeafBucketCollector getLeafCollector(LeafReaderContext ctx, final LeafBuc throw new CollectionTerminatedException(); } } + + if (context.query() instanceof OriginalOrStarTreeQuery && ((OriginalOrStarTreeQuery) context.query()).isStarTreeUsed()) { + StarTreeQuery starTreeQuery = ((OriginalOrStarTreeQuery) context.query()).getStarTreeQuery(); + return getStarTreeLeafCollector(ctx, sub, starTreeQuery.getStarTree()); + } + return getDefaultLeafCollector(ctx, sub); + } + + private LeafBucketCollector getDefaultLeafCollector(LeafReaderContext ctx, LeafBucketCollector sub) throws IOException { + final BigArrays bigArrays = context.bigArrays(); final SortedNumericDoubleValues allValues = valuesSource.doubleValues(ctx); final NumericDoubleValues values = MultiValueMode.MAX.select(allValues); @@ -143,6 +159,34 @@ public void collect(int doc, long bucket) throws IOException { }; } + private LeafBucketCollector getStarTreeLeafCollector(LeafReaderContext ctx, LeafBucketCollector sub, CompositeIndexFieldInfo starTree) + throws IOException { + StarTreeValues starTreeValues = getStarTreeValues(ctx, starTree); + String fieldName = ((ValuesSource.Numeric.FieldData) valuesSource).getIndexFieldName(); + String metricName = StarTreeUtils.fullyQualifiedFieldNameForStarTreeMetricsDocValues(starTree.getField(), fieldName, "max"); + SortedNumericDocValues values = (SortedNumericDocValues) starTreeValues.getMetricDocValuesIteratorMap().get(metricName); + + final BigArrays bigArrays = context.bigArrays(); + final SortedNumericDoubleValues allValues = valuesSource.doubleValues(ctx); + return new LeafBucketCollectorBase(sub, allValues) { + + @Override + public void collect(int doc, long bucket) throws IOException { + if (bucket >= maxes.size()) { + long from = maxes.size(); + maxes = bigArrays.grow(maxes, bucket + 1); + maxes.fill(from, maxes.size(), Double.NEGATIVE_INFINITY); + } + if (values.advanceExact(doc)) { + final double value = Double.longBitsToDouble(values.nextValue()); + double max = maxes.get(bucket); + max = Math.max(max, value); + maxes.set(bucket, max); + } + } + }; + } + @Override public double metric(long owningBucketOrd) { if (valuesSource == null || owningBucketOrd >= maxes.size()) { diff --git a/server/src/main/java/org/opensearch/search/aggregations/metrics/MaxAggregatorFactory.java b/server/src/main/java/org/opensearch/search/aggregations/metrics/MaxAggregatorFactory.java index 4fe936c8b7797..0d537745126d3 100644 --- a/server/src/main/java/org/opensearch/search/aggregations/metrics/MaxAggregatorFactory.java +++ b/server/src/main/java/org/opensearch/search/aggregations/metrics/MaxAggregatorFactory.java @@ -52,7 +52,7 @@ * * @opensearch.internal */ -class MaxAggregatorFactory extends ValuesSourceAggregatorFactory { +public class MaxAggregatorFactory extends ValuesSourceAggregatorFactory { static void registerAggregators(ValuesSourceRegistry.Builder builder) { builder.register( diff --git a/server/src/main/java/org/opensearch/search/aggregations/metrics/MinAggregator.java b/server/src/main/java/org/opensearch/search/aggregations/metrics/MinAggregator.java index 946057e42ac88..8303e5e025b61 100644 --- a/server/src/main/java/org/opensearch/search/aggregations/metrics/MinAggregator.java +++ b/server/src/main/java/org/opensearch/search/aggregations/metrics/MinAggregator.java @@ -34,12 +34,16 @@ import org.apache.lucene.index.LeafReader; import org.apache.lucene.index.LeafReaderContext; import org.apache.lucene.index.PointValues; +import org.apache.lucene.index.SortedNumericDocValues; import org.apache.lucene.search.CollectionTerminatedException; import org.apache.lucene.search.ScoreMode; import org.apache.lucene.util.Bits; import org.opensearch.common.lease.Releasables; import org.opensearch.common.util.BigArrays; import org.opensearch.common.util.DoubleArray; +import org.opensearch.index.codec.composite.CompositeIndexFieldInfo; +import org.opensearch.index.compositeindex.datacube.startree.index.StarTreeValues; +import org.opensearch.index.compositeindex.datacube.startree.utils.StarTreeUtils; import org.opensearch.index.fielddata.NumericDoubleValues; import org.opensearch.index.fielddata.SortedNumericDoubleValues; import org.opensearch.search.DocValueFormat; @@ -51,6 +55,8 @@ import org.opensearch.search.aggregations.support.ValuesSource; import org.opensearch.search.aggregations.support.ValuesSourceConfig; import org.opensearch.search.internal.SearchContext; +import org.opensearch.search.startree.OriginalOrStarTreeQuery; +import org.opensearch.search.startree.StarTreeQuery; import java.io.IOException; import java.util.Map; @@ -119,6 +125,16 @@ public LeafBucketCollector getLeafCollector(LeafReaderContext ctx, final LeafBuc throw new CollectionTerminatedException(); } } + + if (context.query() instanceof OriginalOrStarTreeQuery && ((OriginalOrStarTreeQuery) context.query()).isStarTreeUsed()) { + StarTreeQuery starTreeQuery = ((OriginalOrStarTreeQuery) context.query()).getStarTreeQuery(); + return getStarTreeLeafCollector(ctx, sub, starTreeQuery.getStarTree()); + } + return getDefaultLeafCollector(ctx, sub); + } + + private LeafBucketCollector getDefaultLeafCollector(LeafReaderContext ctx, LeafBucketCollector sub) + throws IOException { final BigArrays bigArrays = context.bigArrays(); final SortedNumericDoubleValues allValues = valuesSource.doubleValues(ctx); final NumericDoubleValues values = MultiValueMode.MIN.select(allValues); @@ -138,10 +154,38 @@ public void collect(int doc, long bucket) throws IOException { mins.set(bucket, min); } } + }; + } + private LeafBucketCollector getStarTreeLeafCollector(LeafReaderContext ctx, LeafBucketCollector sub, CompositeIndexFieldInfo starTree) + throws IOException { + StarTreeValues starTreeValues = getStarTreeValues(ctx, starTree); + String fieldName = ((ValuesSource.Numeric.FieldData) valuesSource).getIndexFieldName(); + String metricName = StarTreeUtils.fullyQualifiedFieldNameForStarTreeMetricsDocValues(starTree.getField(), fieldName, "min"); + SortedNumericDocValues values = (SortedNumericDocValues) starTreeValues.getMetricDocValuesIteratorMap().get(metricName); + + final BigArrays bigArrays = context.bigArrays(); + final SortedNumericDoubleValues allValues = valuesSource.doubleValues(ctx); + return new LeafBucketCollectorBase(sub, allValues) { + + @Override + public void collect(int doc, long bucket) throws IOException { + if (bucket >= mins.size()) { + long from = mins.size(); + mins = bigArrays.grow(mins, bucket + 1); + mins.fill(from, mins.size(), Double.POSITIVE_INFINITY); + } + if (values.advanceExact(doc)) { + final double value = Double.longBitsToDouble(values.nextValue()); + double min = mins.get(bucket); + min = Math.min(min, value); + mins.set(bucket, min); + } + } }; } + @Override public double metric(long owningBucketOrd) { if (valuesSource == null || owningBucketOrd >= mins.size()) { diff --git a/server/src/main/java/org/opensearch/search/aggregations/metrics/MinAggregatorFactory.java b/server/src/main/java/org/opensearch/search/aggregations/metrics/MinAggregatorFactory.java index 58fbe5edefd12..2b7c8a4cc8c9c 100644 --- a/server/src/main/java/org/opensearch/search/aggregations/metrics/MinAggregatorFactory.java +++ b/server/src/main/java/org/opensearch/search/aggregations/metrics/MinAggregatorFactory.java @@ -52,7 +52,7 @@ * * @opensearch.internal */ -class MinAggregatorFactory extends ValuesSourceAggregatorFactory { +public class MinAggregatorFactory extends ValuesSourceAggregatorFactory { static void registerAggregators(ValuesSourceRegistry.Builder builder) { builder.register( diff --git a/server/src/main/java/org/opensearch/search/aggregations/metrics/SumAggregator.java b/server/src/main/java/org/opensearch/search/aggregations/metrics/SumAggregator.java index 3415a10679b3b..b4af1d4768d11 100644 --- a/server/src/main/java/org/opensearch/search/aggregations/metrics/SumAggregator.java +++ b/server/src/main/java/org/opensearch/search/aggregations/metrics/SumAggregator.java @@ -92,6 +92,10 @@ public ScoreMode scoreMode() { @Override public LeafBucketCollector getLeafCollector(LeafReaderContext ctx, final LeafBucketCollector sub) throws IOException { + if (valuesSource == null) { + return LeafBucketCollector.NO_OP_COLLECTOR; + } + if (context.query() instanceof OriginalOrStarTreeQuery && ((OriginalOrStarTreeQuery) context.query()).isStarTreeUsed()) { StarTreeQuery starTreeQuery = ((OriginalOrStarTreeQuery) context.query()).getStarTreeQuery(); return getStarTreeLeafCollector(ctx, sub, starTreeQuery.getStarTree()); @@ -100,9 +104,6 @@ public LeafBucketCollector getLeafCollector(LeafReaderContext ctx, final LeafBuc } private LeafBucketCollector getDefaultLeafCollector(LeafReaderContext ctx, LeafBucketCollector sub) throws IOException { - if (valuesSource == null) { - return LeafBucketCollector.NO_OP_COLLECTOR; - } final BigArrays bigArrays = context.bigArrays(); final SortedNumericDoubleValues values = valuesSource.doubleValues(ctx); final CompensatedSum kahanSummation = new CompensatedSum(0, 0); @@ -134,15 +135,14 @@ public void collect(int doc, long bucket) throws IOException { private LeafBucketCollector getStarTreeLeafCollector(LeafReaderContext ctx, LeafBucketCollector sub, CompositeIndexFieldInfo starTree) throws IOException { - final BigArrays bigArrays = context.bigArrays(); - final CompensatedSum kahanSummation = new CompensatedSum(0, 0); - StarTreeValues starTreeValues = getStarTreeValues(ctx, starTree); String fieldName = ((ValuesSource.Numeric.FieldData) valuesSource).getIndexFieldName(); String metricName = StarTreeUtils.fullyQualifiedFieldNameForStarTreeMetricsDocValues(starTree.getField(), fieldName, "sum"); - SortedNumericDocValues values = (SortedNumericDocValues) starTreeValues.getMetricDocValuesIteratorMap().get(metricName); + final BigArrays bigArrays = context.bigArrays(); + final CompensatedSum kahanSummation = new CompensatedSum(0, 0); + return new LeafBucketCollectorBase(sub, values) { @Override public void collect(int doc, long bucket) throws IOException { diff --git a/server/src/main/java/org/opensearch/search/startree/OriginalOrStarTreeQuery.java b/server/src/main/java/org/opensearch/search/startree/OriginalOrStarTreeQuery.java index bc8ef51bfb537..c79eb0f8709ae 100644 --- a/server/src/main/java/org/opensearch/search/startree/OriginalOrStarTreeQuery.java +++ b/server/src/main/java/org/opensearch/search/startree/OriginalOrStarTreeQuery.java @@ -45,12 +45,12 @@ public void visit(QueryVisitor queryVisitor) { @Override public boolean equals(Object o) { - return false; + return true; } @Override public int hashCode() { - return 0; + return originalQuery.hashCode(); } @Override diff --git a/server/src/test/java/org/opensearch/index/codec/composite99/datacube/startree/StarTreeDocValuesFormatTests.java b/server/src/test/java/org/opensearch/index/codec/composite99/datacube/startree/StarTreeDocValuesFormatTests.java index 15f731addd38d..89639c92709e0 100644 --- a/server/src/test/java/org/opensearch/index/codec/composite99/datacube/startree/StarTreeDocValuesFormatTests.java +++ b/server/src/test/java/org/opensearch/index/codec/composite99/datacube/startree/StarTreeDocValuesFormatTests.java @@ -75,8 +75,9 @@ */ @LuceneTestCase.SuppressSysoutChecks(bugUrl = "we log a lot on purpose") public class StarTreeDocValuesFormatTests extends BaseDocValuesFormatTestCase { - MapperService mapperService = null; + StarTreeFieldConfiguration.StarTreeBuildMode buildMode; + MapperService mapperService; public StarTreeDocValuesFormatTests(StarTreeFieldConfiguration.StarTreeBuildMode buildMode) { this.buildMode = buildMode; @@ -108,13 +109,14 @@ public void teardown() throws IOException { @Override protected Codec getCodec() { final Logger testLogger = LogManager.getLogger(StarTreeDocValuesFormatTests.class); - + Codec codec; try { - createMapperService(getExpandedMapping()); + mapperService = createMapperService(getExpandedMapping()); + codec = new Composite99Codec(Lucene99Codec.Mode.BEST_SPEED, mapperService, testLogger); } catch (IOException e) { throw new RuntimeException(e); } - Codec codec = new Composite99Codec(Lucene99Codec.Mode.BEST_SPEED, mapperService, testLogger); + return codec; } @@ -201,7 +203,7 @@ public void testStarTreeDocValues() throws IOException { directory.close(); } - private XContentBuilder getExpandedMapping() throws IOException { + public static XContentBuilder getExpandedMapping() throws IOException { return topMapping(b -> { b.startObject("composite"); b.startObject("startree"); @@ -222,6 +224,8 @@ private XContentBuilder getExpandedMapping() throws IOException { b.startArray("stats"); b.value("sum"); b.value("value_count"); + b.value("max"); + b.value("min"); b.endArray(); b.endObject(); b.endArray(); @@ -242,13 +246,14 @@ private XContentBuilder getExpandedMapping() throws IOException { }); } - private XContentBuilder topMapping(CheckedConsumer buildFields) throws IOException { + private static XContentBuilder topMapping(CheckedConsumer buildFields) throws IOException { XContentBuilder builder = XContentFactory.jsonBuilder().startObject().startObject("_doc"); buildFields.accept(builder); return builder.endObject().endObject(); } - private void createMapperService(XContentBuilder builder) throws IOException { + public static MapperService createMapperService(XContentBuilder builder) throws IOException { + MapperService mapperService = null; IndexMetadata indexMetadata = IndexMetadata.builder("test") .settings( Settings.builder() @@ -267,5 +272,6 @@ private void createMapperService(XContentBuilder builder) throws IOException { "test" ); mapperService.merge(indexMetadata, MapperService.MergeReason.INDEX_TEMPLATE); + return mapperService; } } diff --git a/server/src/test/java/org/opensearch/search/aggregations/startree/MetricAggregatorTests.java b/server/src/test/java/org/opensearch/search/aggregations/startree/MetricAggregatorTests.java new file mode 100644 index 0000000000000..fdbf5e4de4297 --- /dev/null +++ b/server/src/test/java/org/opensearch/search/aggregations/startree/MetricAggregatorTests.java @@ -0,0 +1,174 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.search.aggregations.startree; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.apache.lucene.codecs.Codec; +import org.apache.lucene.codecs.lucene99.Lucene99Codec; +import org.apache.lucene.document.Document; +import org.apache.lucene.document.SortedNumericDocValuesField; +import org.apache.lucene.index.DirectoryReader; +import org.apache.lucene.index.IndexWriterConfig; +import org.apache.lucene.index.LeafReaderContext; +import org.apache.lucene.index.SegmentReader; +import org.apache.lucene.index.Term; +import org.apache.lucene.search.IndexSearcher; +import org.apache.lucene.search.MatchAllDocsQuery; +import org.apache.lucene.search.Query; +import org.apache.lucene.search.TermQuery; +import org.apache.lucene.store.Directory; +import org.apache.lucene.tests.index.RandomIndexWriter; +import org.opensearch.common.lucene.Lucene; +import org.opensearch.common.settings.Settings; +import org.opensearch.common.util.FeatureFlags; +import org.opensearch.index.codec.composite.CompositeIndexFieldInfo; +import org.opensearch.index.codec.composite.CompositeIndexReader; +import org.opensearch.index.codec.composite.composite99.Composite99Codec; +import org.opensearch.index.codec.composite99.datacube.startree.StarTreeDocValuesFormatTests; +import org.opensearch.index.mapper.MappedFieldType; +import org.opensearch.index.mapper.MapperService; +import org.opensearch.index.mapper.NumberFieldMapper; +import org.opensearch.search.aggregations.AggregationBuilder; +import org.opensearch.search.aggregations.AggregatorTestCase; +import org.junit.After; +import org.junit.Before; +import org.opensearch.search.aggregations.InternalAggregation; +import org.opensearch.search.aggregations.metrics.AvgAggregationBuilder; +import org.opensearch.search.aggregations.metrics.InternalAvg; +import org.opensearch.search.aggregations.metrics.InternalMax; +import org.opensearch.search.aggregations.metrics.InternalMin; +import org.opensearch.search.aggregations.metrics.InternalSum; +import org.opensearch.search.aggregations.metrics.MaxAggregationBuilder; +import org.opensearch.search.aggregations.metrics.MinAggregationBuilder; +import org.opensearch.search.aggregations.metrics.SumAggregationBuilder; +import org.opensearch.search.startree.OriginalOrStarTreeQuery; +import org.opensearch.search.startree.StarTreeQuery; + +import java.io.IOException; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.function.BiConsumer; +import java.util.function.Function; +import java.util.function.Predicate; + +import static org.opensearch.search.aggregations.AggregationBuilders.avg; +import static org.opensearch.search.aggregations.AggregationBuilders.max; +import static org.opensearch.search.aggregations.AggregationBuilders.min; +import static org.opensearch.search.aggregations.AggregationBuilders.sum; +import static org.opensearch.test.InternalAggregationTestCase.DEFAULT_MAX_BUCKETS; + +public class MetricAggregatorTests extends AggregatorTestCase { + + private static final String FIELD_NAME = "field"; + private static final NumberFieldMapper.NumberType DEFAULT_FIELD_TYPE = NumberFieldMapper.NumberType.LONG; + private static final MappedFieldType DEFAULT_MAPPED_FIELD = new NumberFieldMapper.NumberFieldType(FIELD_NAME, DEFAULT_FIELD_TYPE); + + @Before + public void setup() { + FeatureFlags.initializeFeatureFlags(Settings.builder().put(FeatureFlags.STAR_TREE_INDEX, true).build()); + } + + @After + public void teardown() throws IOException { + FeatureFlags.initializeFeatureFlags(Settings.EMPTY); + } + + protected Codec getCodec() { + final Logger testLogger = LogManager.getLogger(MetricAggregatorTests.class); + + MapperService mapperService; + try { + mapperService = StarTreeDocValuesFormatTests.createMapperService(StarTreeDocValuesFormatTests.getExpandedMapping()); + } catch (IOException e) { + throw new RuntimeException(e); + } + return new Composite99Codec(Lucene99Codec.Mode.BEST_SPEED, mapperService, testLogger); + } + + public void testStarTreeDocValues() throws IOException { + Directory directory = newDirectory(); + IndexWriterConfig conf = newIndexWriterConfig(null); + conf.setCodec(getCodec()); + conf.setMergePolicy(newLogMergePolicy()); + RandomIndexWriter iw = new RandomIndexWriter(random(), directory, conf); + Document doc = new Document(); + doc.add(new SortedNumericDocValuesField("sndv", 1)); + doc.add(new SortedNumericDocValuesField("dv", 1)); + doc.add(new SortedNumericDocValuesField("field", 1)); + iw.addDocument(doc); + doc = new Document(); + doc.add(new SortedNumericDocValuesField("sndv", 1)); + doc.add(new SortedNumericDocValuesField("dv", 2)); + doc.add(new SortedNumericDocValuesField("field", 3)); + iw.addDocument(doc); + doc = new Document(); + iw.forceMerge(1); + doc.add(new SortedNumericDocValuesField("sndv", 2)); + doc.add(new SortedNumericDocValuesField("dv", 4)); + doc.add(new SortedNumericDocValuesField("field", 6)); + iw.addDocument(doc); + doc = new Document(); + doc.add(new SortedNumericDocValuesField("sndv", 3)); + doc.add(new SortedNumericDocValuesField("dv", 6)); + doc.add(new SortedNumericDocValuesField("field", 9)); + iw.addDocument(doc); + iw.forceMerge(1); + iw.close(); + + DirectoryReader ir = DirectoryReader.open(directory); + initValuesSourceRegistry(); + assertEquals(ir.leaves().size(), 1); + LeafReaderContext context = ir.leaves().get(0); + + + SegmentReader reader = Lucene.segmentReader(context.reader()); + IndexSearcher indexSearcher = newSearcher(reader, true, true); + CompositeIndexReader starTreeDocValuesReader = (CompositeIndexReader) reader.getDocValuesReader(); + List compositeIndexFields = starTreeDocValuesReader.getCompositeIndexFields(); + + CompositeIndexFieldInfo starTree = compositeIndexFields.get(0); + + SumAggregationBuilder sumAggregationBuilder = sum("_name").field(FIELD_NAME); + MaxAggregationBuilder maxAggregationBuilder = max("_name").field(FIELD_NAME); + MinAggregationBuilder minAggregationBuilder = min("_name").field(FIELD_NAME); + + // match-all query + Query defaultQeury = new MatchAllDocsQuery(); + StarTreeQuery starTreeQuery = new StarTreeQuery(starTree, null); // no predicates + testCase(indexSearcher, defaultQeury, starTreeQuery, sumAggregationBuilder, verifyAggregation(InternalSum::getValue), 19); + testCase(indexSearcher, defaultQeury, starTreeQuery, maxAggregationBuilder, verifyAggregation(InternalMax::getValue), 9); + testCase(indexSearcher, defaultQeury, starTreeQuery, minAggregationBuilder, verifyAggregation(InternalMin::getValue), 1); + + // numeric-terms query + defaultQeury = new TermQuery(new Term("sndv", "1")); + Map>> compositePredicateMap = new HashMap<>(); + compositePredicateMap.put("sndv", List.of(dimVal -> dimVal == 1)); + starTreeQuery = new StarTreeQuery(starTree, compositePredicateMap); + testCase(indexSearcher, defaultQeury, starTreeQuery, sumAggregationBuilder, verifyAggregation(InternalSum::getValue), 4); + testCase(indexSearcher, defaultQeury, starTreeQuery, maxAggregationBuilder, verifyAggregation(InternalMax::getValue), 3); + testCase(indexSearcher, defaultQeury, starTreeQuery, minAggregationBuilder, verifyAggregation(InternalMin::getValue), 1); + + ir.close(); + directory.close(); + } + + private void testCase(IndexSearcher searcher, + Query defaultQuery, StarTreeQuery starTreeQuery, T builder, BiConsumer verify, long expectedValue) throws IOException { + OriginalOrStarTreeQuery originalOrStarTreeQuery = new OriginalOrStarTreeQuery(starTreeQuery, defaultQuery); + V aggregation = searchAndReduceStarTree(createIndexSettings(), searcher, originalOrStarTreeQuery, builder, DEFAULT_MAX_BUCKETS, false, DEFAULT_MAPPED_FIELD); + verify.accept(aggregation, expectedValue); + } + + + BiConsumer verifyAggregation(Function valueExtractor) { + return (aggregation, expectedValue) -> assertEquals(expectedValue, valueExtractor.apply(aggregation), 0f); + } +} diff --git a/test/framework/src/main/java/org/opensearch/search/aggregations/AggregatorTestCase.java b/test/framework/src/main/java/org/opensearch/search/aggregations/AggregatorTestCase.java index 4abd7fbea9cff..9d816e864c50b 100644 --- a/test/framework/src/main/java/org/opensearch/search/aggregations/AggregatorTestCase.java +++ b/test/framework/src/main/java/org/opensearch/search/aggregations/AggregatorTestCase.java @@ -37,6 +37,7 @@ import org.apache.lucene.document.Field; import org.apache.lucene.document.InetAddressPoint; import org.apache.lucene.document.LatLonDocValuesField; +import org.apache.lucene.document.LongField; import org.apache.lucene.document.SortedNumericDocValuesField; import org.apache.lucene.document.SortedSetDocValuesField; import org.apache.lucene.document.StoredField; @@ -141,6 +142,7 @@ import org.opensearch.search.internal.ContextIndexSearcher; import org.opensearch.search.internal.SearchContext; import org.opensearch.search.lookup.SearchLookup; +import org.opensearch.search.startree.StarTreeQuery; import org.opensearch.test.InternalAggregationTestCase; import org.opensearch.test.OpenSearchTestCase; import org.junit.After; @@ -650,6 +652,74 @@ protected A searchAndReduc doAssertReducedMultiBucketConsumer(internalAgg, reduceBucketConsumer); return internalAgg; } + protected A searchAndReduceStarTree( + IndexSettings indexSettings, + IndexSearcher searcher, + Query query, + AggregationBuilder builder, + int maxBucket, + boolean hasNested, + MappedFieldType... fieldTypes + ) throws IOException { + final IndexReaderContext ctx = searcher.getTopReaderContext(); + final PipelineTree pipelines = builder.buildPipelineTree(); + List aggs = new ArrayList<>(); + if (hasNested) { + query = Queries.filtered(query, Queries.newNonNestedFilter()); + } + + MultiBucketConsumer bucketConsumer = new MultiBucketConsumer( + maxBucket, + new NoneCircuitBreakerService().getBreaker(CircuitBreaker.REQUEST) + ); + C root = createAggregator(query, builder, searcher, bucketConsumer, fieldTypes); + + if (randomBoolean() && searcher.getIndexReader().leaves().size() > 0) { + assertTrue(ctx instanceof LeafReaderContext); + final LeafReaderContext compCTX = (LeafReaderContext) ctx; + final int size = compCTX.leaves().size(); + final ShardSearcher[] subSearchers = new ShardSearcher[size]; + for (int searcherIDX = 0; searcherIDX < subSearchers.length; searcherIDX++) { + final LeafReaderContext leave = compCTX.leaves().get(searcherIDX); + subSearchers[searcherIDX] = new ShardSearcher(leave, compCTX); + } + for (ShardSearcher subSearcher : subSearchers) { + MultiBucketConsumer shardBucketConsumer = new MultiBucketConsumer( + maxBucket, + new NoneCircuitBreakerService().getBreaker(CircuitBreaker.REQUEST) + ); + C a = createAggregator(query, builder, subSearcher, indexSettings, shardBucketConsumer, fieldTypes); + a.preCollection(); + Weight weight = subSearcher.createWeight(query, ScoreMode.COMPLETE, 1f); + + assertTrue(weight.getQuery() instanceof StarTreeQuery); + subSearcher.search(weight, a); + a.postCollection(); + aggs.add(a.buildTopLevel()); + } + } else { + root.preCollection(); + searcher.search(query, root); + root.postCollection(); + aggs.add(root.buildTopLevel()); + } + + MultiBucketConsumer reduceBucketConsumer = new MultiBucketConsumer( + maxBucket, + new NoneCircuitBreakerService().getBreaker(CircuitBreaker.REQUEST) + ); + InternalAggregation.ReduceContext context = InternalAggregation.ReduceContext.forFinalReduction( + root.context().bigArrays(), + getMockScriptService(), + reduceBucketConsumer, + pipelines + ); + + @SuppressWarnings("unchecked") + A internalAgg = (A) aggs.get(0).reduce(aggs, context); + doAssertReducedMultiBucketConsumer(internalAgg, reduceBucketConsumer); + return internalAgg; + } protected void doAssertReducedMultiBucketConsumer(Aggregation agg, MultiBucketConsumerService.MultiBucketConsumer bucketConsumer) { InternalAggregationTestCase.assertMultiBucketConsumer(agg, bucketConsumer);