diff --git a/CHANGELOG.md b/CHANGELOG.md index 8599977452207..a4656a9c2806d 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -22,6 +22,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), - Add SMILE/CBOR/YAML document format support to Bulk GRPC endpoint ([#19744](https://github.com/opensearch-project/OpenSearch/pull/19744)) - Implement GRPC Search params `Highlight`and `Sort` ([#19868](https://github.com/opensearch-project/OpenSearch/pull/19868)) - Implement GRPC ConstantScoreQuery, FuzzyQuery, MatchBoolPrefixQuery, MatchPhrasePrefix, PrefixQuery, MatchQuery ([#19854](https://github.com/opensearch-project/OpenSearch/pull/19854)) +- Add Hybrid Cardinality collector to prioritize Ordinals Collector ([#19524](https://github.com/opensearch-project/OpenSearch/pull/19524)) - Add async periodic flush task support for pull-based ingestion ([#19878](https://github.com/opensearch-project/OpenSearch/pull/19878)) ### Changed diff --git a/rest-api-spec/src/main/resources/rest-api-spec/test/search.aggregation/170_cardinality_metric.yml b/rest-api-spec/src/main/resources/rest-api-spec/test/search.aggregation/170_cardinality_metric.yml index d0bd46a809015..4d9527d078f4d 100644 --- a/rest-api-spec/src/main/resources/rest-api-spec/test/search.aggregation/170_cardinality_metric.yml +++ b/rest-api-spec/src/main/resources/rest-api-spec/test/search.aggregation/170_cardinality_metric.yml @@ -268,8 +268,8 @@ setup: --- "profiler string": - skip: - version: " - 7.99.99" - reason: new info added in 8.0.0 to be backported to 7.10.0 + version: " - 3.3.99" + reason: hybrid collector added in 3.4.0 - do: search: body: @@ -287,6 +287,7 @@ setup: - gt: { profile.shards.0.aggregations.0.breakdown.post_collection: 0 } - match: { profile.shards.0.aggregations.0.debug.empty_collectors_used: 0 } - match: { profile.shards.0.aggregations.0.debug.numeric_collectors_used: 0 } - - gt: { profile.shards.0.aggregations.0.debug.ordinals_collectors_used: 0 } + - match: { profile.shards.0.aggregations.0.debug.ordinals_collectors_used: 0 } - match: { profile.shards.0.aggregations.0.debug.ordinals_collectors_overhead_too_high: 0 } - match: { profile.shards.0.aggregations.0.debug.string_hashing_collectors_used: 0 } + - gt: { profile.shards.0.aggregations.0.debug.hybrid_collectors_used: 0 } diff --git a/rest-api-spec/src/main/resources/rest-api-spec/test/search.aggregation/170_cardinality_metric_unsigned.yml b/rest-api-spec/src/main/resources/rest-api-spec/test/search.aggregation/170_cardinality_metric_unsigned.yml index 27245d1aa435c..f6f0ec74ceb9d 100644 --- a/rest-api-spec/src/main/resources/rest-api-spec/test/search.aggregation/170_cardinality_metric_unsigned.yml +++ b/rest-api-spec/src/main/resources/rest-api-spec/test/search.aggregation/170_cardinality_metric_unsigned.yml @@ -265,6 +265,9 @@ setup: --- "profiler string": + - skip: + version: " - 3.3.99" + reason: hybrid collector added in 3.4.0 - do: search: body: @@ -282,6 +285,7 @@ setup: - gt: { profile.shards.0.aggregations.0.breakdown.post_collection: 0 } - match: { profile.shards.0.aggregations.0.debug.empty_collectors_used: 0 } - match: { profile.shards.0.aggregations.0.debug.numeric_collectors_used: 0 } - - gt: { profile.shards.0.aggregations.0.debug.ordinals_collectors_used: 0 } + - match: { profile.shards.0.aggregations.0.debug.ordinals_collectors_used: 0 } - match: { profile.shards.0.aggregations.0.debug.ordinals_collectors_overhead_too_high: 0 } - match: { profile.shards.0.aggregations.0.debug.string_hashing_collectors_used: 0 } + - gt: { profile.shards.0.aggregations.0.debug.hybrid_collectors_used: 0 } diff --git a/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java b/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java index ca8c43da99dbb..485f3bf363da8 100644 --- a/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java +++ b/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java @@ -162,6 +162,7 @@ import org.opensearch.script.ScriptService; import org.opensearch.search.SearchService; import org.opensearch.search.aggregations.MultiBucketConsumerService; +import org.opensearch.search.aggregations.metrics.CardinalityAggregator; import org.opensearch.search.backpressure.settings.NodeDuressSettings; import org.opensearch.search.backpressure.settings.SearchBackpressureSettings; import org.opensearch.search.backpressure.settings.SearchShardTaskSettings; @@ -588,6 +589,8 @@ public void apply(Settings value, Settings current, Settings previous) { SearchService.INDICES_MAX_CLAUSE_COUNT_SETTING, SearchService.SEARCH_MAX_QUERY_STRING_LENGTH, SearchService.CARDINALITY_AGGREGATION_PRUNING_THRESHOLD, + CardinalityAggregator.CARDINALITY_AGGREGATION_HYBRID_COLLECTOR_ENABLED, + CardinalityAggregator.CARDINALITY_AGGREGATION_HYBRID_COLLECTOR_MEMORY_THRESHOLD, SearchService.KEYWORD_INDEX_OR_DOC_VALUES_ENABLED, CreatePitController.PIT_INIT_KEEP_ALIVE, Node.WRITE_PORTS_FILE_SETTING, diff --git a/server/src/main/java/org/opensearch/search/DefaultSearchContext.java b/server/src/main/java/org/opensearch/search/DefaultSearchContext.java index 14f7b4b321638..5e2d93a9af31e 100644 --- a/server/src/main/java/org/opensearch/search/DefaultSearchContext.java +++ b/server/src/main/java/org/opensearch/search/DefaultSearchContext.java @@ -55,6 +55,7 @@ import org.opensearch.common.settings.Settings; import org.opensearch.common.unit.TimeValue; import org.opensearch.common.util.BigArrays; +import org.opensearch.core.common.unit.ByteSizeValue; import org.opensearch.core.concurrency.OpenSearchRejectedExecutionException; import org.opensearch.index.IndexService; import org.opensearch.index.IndexSettings; @@ -75,6 +76,8 @@ import org.opensearch.search.aggregations.BucketCollectorProcessor; import org.opensearch.search.aggregations.InternalAggregation; import org.opensearch.search.aggregations.SearchContextAggregations; +import org.opensearch.search.aggregations.metrics.CardinalityAggregationContext; +import org.opensearch.search.aggregations.metrics.CardinalityAggregator; import org.opensearch.search.builder.SearchSourceBuilder; import org.opensearch.search.collapse.CollapseContext; import org.opensearch.search.deciders.ConcurrentSearchDecision; @@ -220,6 +223,7 @@ final class DefaultSearchContext extends SearchContext { private final int maxAggRewriteFilters; private final int filterRewriteSegmentThreshold; private final int cardinalityAggregationPruningThreshold; + private final CardinalityAggregationContext cardinalityAggregationContext; private final int bucketSelectionStrategyFactor; private final boolean keywordIndexOrDocValuesEnabled; @@ -287,6 +291,7 @@ final class DefaultSearchContext extends SearchContext { this.maxAggRewriteFilters = evaluateFilterRewriteSetting(); this.filterRewriteSegmentThreshold = evaluateAggRewriteFilterSegThreshold(); this.cardinalityAggregationPruningThreshold = evaluateCardinalityAggregationPruningThreshold(); + this.cardinalityAggregationContext = evaluateCardinalityAggregationContext(); this.bucketSelectionStrategyFactor = evaluateBucketSelectionStrategyFactor(); this.concurrentSearchDeciderFactories = concurrentSearchDeciderFactories; this.keywordIndexOrDocValuesEnabled = evaluateKeywordIndexOrDocValuesEnabled(); @@ -1238,6 +1243,11 @@ public int cardinalityAggregationPruningThreshold() { return cardinalityAggregationPruningThreshold; } + @Override + public CardinalityAggregationContext cardinalityAggregationContext() { + return cardinalityAggregationContext; + } + @Override public int bucketSelectionStrategyFactor() { return bucketSelectionStrategyFactor; @@ -1255,6 +1265,17 @@ private int evaluateCardinalityAggregationPruningThreshold() { return 0; } + private CardinalityAggregationContext evaluateCardinalityAggregationContext() { + if (clusterService != null) { + boolean hybridCollectorEnabled = clusterService.getClusterSettings() + .get(CardinalityAggregator.CARDINALITY_AGGREGATION_HYBRID_COLLECTOR_ENABLED); + ByteSizeValue memoryThreshold = clusterService.getClusterSettings() + .get(CardinalityAggregator.CARDINALITY_AGGREGATION_HYBRID_COLLECTOR_MEMORY_THRESHOLD); + return CardinalityAggregationContext.from(hybridCollectorEnabled, memoryThreshold); + } + return new CardinalityAggregationContext(false, Runtime.getRuntime().maxMemory() / 100); + } + private int evaluateBucketSelectionStrategyFactor() { if (clusterService != null) { return clusterService.getClusterSettings().get(BUCKET_SELECTION_STRATEGY_FACTOR_SETTING); diff --git a/server/src/main/java/org/opensearch/search/aggregations/metrics/CardinalityAggregationContext.java b/server/src/main/java/org/opensearch/search/aggregations/metrics/CardinalityAggregationContext.java new file mode 100644 index 0000000000000..df6d8de333ebf --- /dev/null +++ b/server/src/main/java/org/opensearch/search/aggregations/metrics/CardinalityAggregationContext.java @@ -0,0 +1,42 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.search.aggregations.metrics; + +import org.opensearch.common.annotation.PublicApi; +import org.opensearch.core.common.unit.ByteSizeValue; + +/** + * Context object that encapsulates all cardinality aggregation related settings and configurations. + * This helps keep cardinality-specific settings scoped properly and reduces SearchContext bloat. + */ +@PublicApi(since = "3.4.0") +public class CardinalityAggregationContext { + private final boolean hybridCollectorEnabled; + private final long memoryThreshold; + + public CardinalityAggregationContext(boolean hybridCollectorEnabled, long memoryThreshold) { + this.hybridCollectorEnabled = hybridCollectorEnabled; + this.memoryThreshold = memoryThreshold; + } + + public boolean isHybridCollectorEnabled() { + return hybridCollectorEnabled; + } + + public long getMemoryThreshold() { + return memoryThreshold; + } + + /** + * Creates a CardinalityAggregationContext from cluster settings + */ + public static CardinalityAggregationContext from(boolean hybridCollectorEnabled, ByteSizeValue memoryThreshold) { + return new CardinalityAggregationContext(hybridCollectorEnabled, memoryThreshold.getBytes()); + } +} diff --git a/server/src/main/java/org/opensearch/search/aggregations/metrics/CardinalityAggregator.java b/server/src/main/java/org/opensearch/search/aggregations/metrics/CardinalityAggregator.java index 0cb319b853bce..d395fa8989e38 100644 --- a/server/src/main/java/org/opensearch/search/aggregations/metrics/CardinalityAggregator.java +++ b/server/src/main/java/org/opensearch/search/aggregations/metrics/CardinalityAggregator.java @@ -57,11 +57,13 @@ import org.opensearch.common.hash.MurmurHash3; import org.opensearch.common.lease.Releasable; import org.opensearch.common.lease.Releasables; +import org.opensearch.common.settings.Setting; import org.opensearch.common.util.BigArrays; import org.opensearch.common.util.BitArray; import org.opensearch.common.util.BitMixer; import org.opensearch.common.util.LongArray; import org.opensearch.common.util.ObjectArray; +import org.opensearch.core.common.unit.ByteSizeValue; import org.opensearch.core.rest.RestStatus; import org.opensearch.index.fielddata.SortedBinaryDocValues; import org.opensearch.index.fielddata.SortedNumericDoubleValues; @@ -75,6 +77,7 @@ import java.io.IOException; import java.util.HashMap; import java.util.Map; +import java.util.Optional; import java.util.function.BiConsumer; import static org.opensearch.search.SearchService.CARDINALITY_AGGREGATION_PRUNING_THRESHOLD; @@ -88,6 +91,28 @@ public class CardinalityAggregator extends NumericMetricsAggregator.SingleValue private static final Logger logger = LogManager.getLogger(CardinalityAggregator.class); + /** + * Setting to enable/disable hybrid collector for cardinality aggregation. + */ + public static final Setting CARDINALITY_AGGREGATION_HYBRID_COLLECTOR_ENABLED = Setting.boolSetting( + "search.aggregations.cardinality.hybrid_collector.enabled", + true, + Setting.Property.NodeScope, + Setting.Property.Dynamic + ); + + /** + * Setting for hybrid collector memory threshold. Supports both percentage (e.g., "1%") + * and absolute values (e.g., "10mb", "1gb"). + * Note: This threshold is applied at the Lucene segment level since collectors are created per segment. + */ + public static final Setting CARDINALITY_AGGREGATION_HYBRID_COLLECTOR_MEMORY_THRESHOLD = Setting.memorySizeSetting( + "search.aggregations.cardinality.hybrid_collector.memory_threshold", + "1%", + Setting.Property.NodeScope, + Setting.Property.Dynamic + ); + final CardinalityAggregatorFactory.ExecutionMode executionMode; final int precision; final ValuesSource valuesSource; @@ -103,6 +128,7 @@ public class CardinalityAggregator extends NumericMetricsAggregator.SingleValue int emptyCollectorsUsed; int numericCollectorsUsed; int ordinalsCollectorsUsed; + int hybridCollectorsUsed; int ordinalsCollectorsOverheadTooHigh; int stringHashingCollectorsUsed; int dynamicPrunedSegments; @@ -157,14 +183,22 @@ private Collector pickCollector(LeafReaderContext ctx) throws IOException { collector = new OrdinalsCollector(counts, ordinalValues, context.bigArrays()); } else if (executionMode == null) { // no hint provided, fall back to heuristics - final long ordinalsMemoryUsage = OrdinalsCollector.memoryOverhead(maxOrd); - final long countsMemoryUsage = HyperLogLogPlusPlus.memoryUsage(precision); - // only use ordinals if they don't increase memory usage by more than 25% - if (ordinalsMemoryUsage < countsMemoryUsage / 4) { - ordinalsCollectorsUsed++; - collector = new OrdinalsCollector(counts, ordinalValues, context.bigArrays()); + CardinalityAggregationContext cardinalityContext = context.cardinalityAggregationContext(); + + if (cardinalityContext.isHybridCollectorEnabled()) { + // Use HybridCollector with configurable memory threshold + MurmurHash3Values hashValues = MurmurHash3Values.hash(source.bytesValues(ctx)); + hybridCollectorsUsed++; + collector = new HybridCollector(counts, ordinalValues, hashValues, context.bigArrays(), cardinalityContext); } else { - ordinalsCollectorsOverheadTooHigh++; + final long ordinalsMemoryUsage = OrdinalsCollector.memoryOverhead(maxOrd); + final long countsMemoryUsage = HyperLogLogPlusPlus.memoryUsage(precision); + if (ordinalsMemoryUsage < countsMemoryUsage / 4) { + ordinalsCollectorsUsed++; + collector = new OrdinalsCollector(counts, ordinalValues, context.bigArrays()); + } else { + ordinalsCollectorsOverheadTooHigh++; + } } } } @@ -339,6 +373,7 @@ public void collectDebugInfo(BiConsumer add) { add.accept("empty_collectors_used", emptyCollectorsUsed); add.accept("numeric_collectors_used", numericCollectorsUsed); add.accept("ordinals_collectors_used", ordinalsCollectorsUsed); + add.accept("hybrid_collectors_used", hybridCollectorsUsed); add.accept("ordinals_collectors_overhead_too_high", ordinalsCollectorsOverheadTooHigh); add.accept("string_hashing_collectors_used", stringHashingCollectorsUsed); add.accept("dynamic_pruned_segments", dynamicPrunedSegments); @@ -564,8 +599,22 @@ public static long memoryOverhead(long maxOrd) { private final int maxOrd; private final HyperLogLogPlusPlus counts; private ObjectArray visitedOrds; + private long currentMemoryUsage = 0; + + private final long memoryThreshold; + private final Optional> onMemoryLimitReached; OrdinalsCollector(HyperLogLogPlusPlus counts, SortedSetDocValues values, BigArrays bigArrays) { + this(counts, values, bigArrays, Long.MAX_VALUE, Optional.empty()); + } + + OrdinalsCollector( + HyperLogLogPlusPlus counts, + SortedSetDocValues values, + BigArrays bigArrays, + long memoryThreshold, + Optional> onMemoryLimitReached + ) { if (values.getValueCount() > Integer.MAX_VALUE) { throw new IllegalArgumentException(); } @@ -573,6 +622,8 @@ public static long memoryOverhead(long maxOrd) { this.bigArrays = bigArrays; this.counts = counts; this.values = values; + this.memoryThreshold = memoryThreshold; + this.onMemoryLimitReached = onMemoryLimitReached; visitedOrds = bigArrays.newObjectArray(1); } @@ -583,6 +634,14 @@ public void collect(int doc, long bucketOrd) throws IOException { if (bits == null) { bits = new BitArray(maxOrd, bigArrays); visitedOrds.set(bucketOrd, bits); + + if (onMemoryLimitReached.isPresent()) { + currentMemoryUsage += memoryOverhead(maxOrd); + if (currentMemoryUsage > memoryThreshold) { + onMemoryLimitReached.get().accept(doc, bucketOrd); + return; + } + } } if (values.advanceExact(doc)) { int count = values.docValueCount(); @@ -627,6 +686,10 @@ public void postCollect() throws IOException { } } + public long getCurrentMemoryUsage() { + return currentMemoryUsage; + } + @Override public void close() { for (int i = 0; i < visitedOrds.size(); i++) { @@ -761,4 +824,80 @@ public long nextValue() throws IOException { } } } + + /** + * Hybrid Collector that starts with OrdinalsCollector and switches to DirectCollector + * when memory consumption exceeds a threshold. + */ + static class HybridCollector extends Collector { + private final HyperLogLogPlusPlus counts; + private final MurmurHash3Values hashValues; + private final CardinalityAggregationContext cardinalityContext; + + private Collector activeCollector; + private final OrdinalsCollector ordinalsCollector; + + HybridCollector( + HyperLogLogPlusPlus counts, + SortedSetDocValues ordinalValues, + MurmurHash3Values hashValues, + BigArrays bigArrays, + CardinalityAggregationContext cardinalityContext + ) { + this.counts = counts; + this.hashValues = hashValues; + this.cardinalityContext = cardinalityContext; + + // Start with OrdinalsCollector with memory monitoring + this.ordinalsCollector = new OrdinalsCollector( + counts, + ordinalValues, + bigArrays, + cardinalityContext.getMemoryThreshold(), + Optional.of(this::handleMemoryLimitReached) + ); + this.activeCollector = ordinalsCollector; + } + + @Override + public void collect(int doc, long bucketOrd) throws IOException { + activeCollector.collect(doc, bucketOrd); + } + + private void handleMemoryLimitReached(int doc, long bucketOrd) { + try { + switchToDirectCollector(); + // Collect the document that triggered the switch + activeCollector.collect(doc, bucketOrd); + } catch (IOException e) { + throw new RuntimeException("Failed to switch to direct collector", e); + } + } + + private void switchToDirectCollector() throws IOException { + // Switching to Direct Collector because memory consumption from Ordinals Collector is high. + // Post collect all the already computed data. + ordinalsCollector.postCollect(); + ordinalsCollector.close(); + logger.debug("switching to direct collector"); + + // Pass computed data to Direct Collector. + activeCollector = new DirectCollector(counts, hashValues); + } + + @Override + public void postCollect() throws IOException { + activeCollector.postCollect(); + } + + @Override + public void close() { + Releasables.close(activeCollector); + } + + // Visible for testing + Collector getActiveCollector() { + return activeCollector; + } + } } diff --git a/server/src/main/java/org/opensearch/search/internal/SearchContext.java b/server/src/main/java/org/opensearch/search/internal/SearchContext.java index ac38b364fd36b..e0e18daa2cc29 100644 --- a/server/src/main/java/org/opensearch/search/internal/SearchContext.java +++ b/server/src/main/java/org/opensearch/search/internal/SearchContext.java @@ -64,6 +64,7 @@ import org.opensearch.search.aggregations.SearchContextAggregations; import org.opensearch.search.aggregations.bucket.LocalBucketCountThresholds; import org.opensearch.search.aggregations.bucket.terms.TermsAggregator; +import org.opensearch.search.aggregations.metrics.CardinalityAggregationContext; import org.opensearch.search.collapse.CollapseContext; import org.opensearch.search.dfs.DfsSearchResult; import org.opensearch.search.fetch.FetchPhase; @@ -555,6 +556,10 @@ public int cardinalityAggregationPruningThreshold() { return 0; } + public CardinalityAggregationContext cardinalityAggregationContext() { + return new CardinalityAggregationContext(false, Runtime.getRuntime().maxMemory() / 100); + } + public int bucketSelectionStrategyFactor() { return SearchService.DEFAULT_BUCKET_SELECTION_STRATEGY_FACTOR; } diff --git a/server/src/test/java/org/opensearch/search/aggregations/metrics/CardinalityAggregatorTests.java b/server/src/test/java/org/opensearch/search/aggregations/metrics/CardinalityAggregatorTests.java index ca65c888f3363..573dd8a91ff13 100644 --- a/server/src/test/java/org/opensearch/search/aggregations/metrics/CardinalityAggregatorTests.java +++ b/server/src/test/java/org/opensearch/search/aggregations/metrics/CardinalityAggregatorTests.java @@ -69,6 +69,8 @@ import org.opensearch.search.aggregations.InternalAggregation; import org.opensearch.search.aggregations.LeafBucketCollector; import org.opensearch.search.aggregations.MultiBucketConsumerService; +import org.opensearch.search.aggregations.metrics.CardinalityAggregator.HybridCollector; +import org.opensearch.search.aggregations.metrics.CardinalityAggregator.OrdinalsCollector; import org.opensearch.search.aggregations.pipeline.PipelineAggregator; import org.opensearch.search.aggregations.support.AggregationInspectionHelper; @@ -634,4 +636,114 @@ public void testOrdinalsExecutionHintWithByteValues() throws IOException { assertTrue(AggregationInspectionHelper.hasValue(card)); }, collector -> { assertTrue(collector instanceof CardinalityAggregator.OrdinalsCollector); }, fieldType); } + + private void testAggregationHybridCollector( + AggregationBuilder aggregationBuilder, + Query query, + CheckedConsumer buildIndex, + Consumer verify, + Consumer verifyCollector, + MappedFieldType fieldType, + boolean hybridCollectorEnabled, + long memoryThreshold + ) throws IOException { + try (Directory directory = newDirectory()) { + RandomIndexWriter indexWriter = new RandomIndexWriter(random(), directory); + buildIndex.accept(indexWriter); + indexWriter.close(); + + try (IndexReader indexReader = DirectoryReader.open(directory)) { + IndexSearcher indexSearcher = newSearcher(indexReader, true, true); + + CountingAggregator aggregator = new CountingAggregator( + new AtomicInteger(), + createAggregatorWithCustomizableSearchContext( + query, + aggregationBuilder, + indexSearcher, + createIndexSettings(), + new MultiBucketConsumerService.MultiBucketConsumer( + Integer.MAX_VALUE, + new NoneCircuitBreakerService().getBreaker(CircuitBreaker.REQUEST) + ), + (searchContext) -> { + CardinalityAggregationContext cardinalityContext = new CardinalityAggregationContext( + hybridCollectorEnabled, + memoryThreshold + ); + when(searchContext.cardinalityAggregationContext()).thenReturn(cardinalityContext); + }, + fieldType + ) + ); + aggregator.preCollection(); + indexSearcher.search(query, aggregator); + aggregator.postCollection(); + + MultiBucketConsumerService.MultiBucketConsumer reduceBucketConsumer = new MultiBucketConsumerService.MultiBucketConsumer( + Integer.MAX_VALUE, + new NoneCircuitBreakerService().getBreaker(CircuitBreaker.REQUEST) + ); + InternalAggregation.ReduceContext context = InternalAggregation.ReduceContext.forFinalReduction( + aggregator.context().bigArrays(), + getMockScriptService(), + reduceBucketConsumer, + PipelineAggregator.PipelineTree.EMPTY + ); + InternalCardinality topLevel = (InternalCardinality) aggregator.buildTopLevel(); + InternalCardinality card = (InternalCardinality) topLevel.reduce(Collections.singletonList(topLevel), context); + doAssertReducedMultiBucketConsumer(card, reduceBucketConsumer); + + verify.accept(card); + verifyCollector.accept(aggregator.getSelectedCollector()); + } + } + } + + public void testHybridCollectorEnabledWithKeywordField() throws IOException { + MappedFieldType fieldType = new KeywordFieldMapper.KeywordFieldType("field"); + final CardinalityAggregationBuilder aggregationBuilder = new CardinalityAggregationBuilder("_name").field("field"); + + testAggregationHybridCollector(aggregationBuilder, new MatchAllDocsQuery(), iw -> { + iw.addDocument(singleton(new SortedSetDocValuesField("field", new BytesRef("a")))); + iw.addDocument(singleton(new SortedSetDocValuesField("field", new BytesRef("b")))); + }, card -> { + assertEquals(2.0, card.getValue(), 0.1); + assertTrue(AggregationInspectionHelper.hasValue(card)); + }, collector -> { + assertTrue(collector instanceof HybridCollector); + assertTrue(((HybridCollector) collector).getActiveCollector() instanceof OrdinalsCollector); + }, fieldType, true, 1024L * 1024L); + } + + public void testHybridCollectorDisabledWithKeywordField() throws IOException { + MappedFieldType fieldType = new KeywordFieldMapper.KeywordFieldType("field"); + final CardinalityAggregationBuilder aggregationBuilder = new CardinalityAggregationBuilder("_name").field("field"); + + testAggregationHybridCollector(aggregationBuilder, new MatchAllDocsQuery(), iw -> { + iw.addDocument(singleton(new SortedSetDocValuesField("field", new BytesRef("a")))); + iw.addDocument(singleton(new SortedSetDocValuesField("field", new BytesRef("b")))); + }, card -> { + assertEquals(2.0, card.getValue(), 0.1); + assertTrue(AggregationInspectionHelper.hasValue(card)); + }, collector -> { assertTrue(collector instanceof OrdinalsCollector); }, fieldType, false, 1024L * 1024L); + } + + public void testHybridCollectorMemoryThresholdExceeded() throws IOException { + MappedFieldType fieldType = new KeywordFieldMapper.KeywordFieldType("field"); + final CardinalityAggregationBuilder aggregationBuilder = new CardinalityAggregationBuilder("_name").field("field"); + + testAggregationHybridCollector(aggregationBuilder, new MatchAllDocsQuery(), iw -> { + // Add many documents to potentially exceed memory threshold + for (int i = 0; i < 1000; i++) { + iw.addDocument(singleton(new SortedSetDocValuesField("field", new BytesRef("value" + i)))); + } + }, card -> { + assertEquals(1000.0, card.getValue(), 10.0); + assertTrue(AggregationInspectionHelper.hasValue(card)); + }, collector -> { + assertTrue(collector instanceof HybridCollector); + assertTrue(((HybridCollector) collector).getActiveCollector() instanceof CardinalityAggregator.DirectCollector); + }, fieldType, true, 1); // Very low threshold to trigger switching + } } diff --git a/test/framework/src/main/java/org/opensearch/search/aggregations/AggregatorTestCase.java b/test/framework/src/main/java/org/opensearch/search/aggregations/AggregatorTestCase.java index c7de8388981c3..947f786333adf 100644 --- a/test/framework/src/main/java/org/opensearch/search/aggregations/AggregatorTestCase.java +++ b/test/framework/src/main/java/org/opensearch/search/aggregations/AggregatorTestCase.java @@ -546,6 +546,9 @@ public boolean shouldCache(Query query) { fieldNameToType.putAll(getFieldAliases(fieldTypes)); when(searchContext.maxAggRewriteFilters()).thenReturn(10_000); + when(searchContext.cardinalityAggregationContext()).thenReturn( + new org.opensearch.search.aggregations.metrics.CardinalityAggregationContext(false, Runtime.getRuntime().maxMemory() / 100) + ); registerFieldTypes(searchContext, mapperService, fieldNameToType); doAnswer(invocation -> { /* Store the release-ables so we can release them at the end of the test case. This is important because aggregations don't