From d67fc4f06c8f5ab101ecf161ba0513047aeea054 Mon Sep 17 00:00:00 2001 From: Anand Pravinbhai Patel Date: Fri, 3 Oct 2025 18:06:32 -0700 Subject: [PATCH 1/4] Add Hybrid Cardinality collector to prioritize Ordinals Collector Current cardinality aggregator logic selects DirectCollector over OrdinalsCollector when relative memory overhead due to OrdinalsCollector (compared to DirectCollector) is higher. Because of this relative memory consumption logic, DirectCollector is selected for high cardinality aggregation queries. DirectCollector is slower compared to OrdinalsCollector. This default selection leads to higher search latency even when Opensearch process have available memory to use ordinals collector for faster query performance. There is no way to figure out memory requirement for nested aggregation because number of buckets are dynamically created as we traverse through all the matching document ids. To overcome this limitation, this change create a hybrid collector which will first use Ordinals Collector and will switch to DirectCollector if memory usage for Ordinals Collector Increase beyond certain threshold. When Hybrid collector switch from Ordinals Collector to Direct Collector, it will utilize already computed aggregation data from Ordinals Collector so that we do not have to rebuild aggregation result using Direct Collector. Signed-off-by: Anand Pravinbhai Patel --- CHANGELOG.md | 1 + .../170_cardinality_metric.yml | 7 +- .../170_cardinality_metric_unsigned.yml | 6 +- .../common/settings/ClusterSettings.java | 3 + .../search/DefaultSearchContext.java | 32 +++++ .../metrics/CardinalityAggregator.java | 123 +++++++++++++++++- .../search/internal/SearchContext.java | 8 ++ .../metrics/CardinalityAggregatorTests.java | 109 ++++++++++++++++ 8 files changed, 278 insertions(+), 11 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 425902ccfd3af..2ba3ebc47e3b0 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -53,6 +53,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), - Query planning to determine flush mode for streaming aggregations ([#19488](https://github.com/opensearch-project/OpenSearch/pull/19488)) - Harden the circuit breaker and failure handle logic in query result consumer ([#19396](https://github.com/opensearch-project/OpenSearch/pull/19396)) - Add streaming cardinality aggregator ([#19484](https://github.com/opensearch-project/OpenSearch/pull/19484)) +- Add Hybrid Cardinality collector to prioritize Ordinals Collector ([#19524](https://github.com/opensearch-project/OpenSearch/pull/19524)) ### Changed - Refactor `if-else` chains to use `Java 17 pattern matching switch expressions`(([#18965](https://github.com/opensearch-project/OpenSearch/pull/18965)) diff --git a/rest-api-spec/src/main/resources/rest-api-spec/test/search.aggregation/170_cardinality_metric.yml b/rest-api-spec/src/main/resources/rest-api-spec/test/search.aggregation/170_cardinality_metric.yml index d0bd46a809015..fc541def0cf92 100644 --- a/rest-api-spec/src/main/resources/rest-api-spec/test/search.aggregation/170_cardinality_metric.yml +++ b/rest-api-spec/src/main/resources/rest-api-spec/test/search.aggregation/170_cardinality_metric.yml @@ -268,8 +268,8 @@ setup: --- "profiler string": - skip: - version: " - 7.99.99" - reason: new info added in 8.0.0 to be backported to 7.10.0 + version: " - 3.2.99" + reason: hybrid collector added in 3.3.0 - do: search: body: @@ -287,6 +287,7 @@ setup: - gt: { profile.shards.0.aggregations.0.breakdown.post_collection: 0 } - match: { profile.shards.0.aggregations.0.debug.empty_collectors_used: 0 } - match: { profile.shards.0.aggregations.0.debug.numeric_collectors_used: 0 } - - gt: { profile.shards.0.aggregations.0.debug.ordinals_collectors_used: 0 } + - match: { profile.shards.0.aggregations.0.debug.ordinals_collectors_used: 0 } - match: { profile.shards.0.aggregations.0.debug.ordinals_collectors_overhead_too_high: 0 } - match: { profile.shards.0.aggregations.0.debug.string_hashing_collectors_used: 0 } + - gt: { profile.shards.0.aggregations.0.debug.hybrid_collectors_used: 0 } diff --git a/rest-api-spec/src/main/resources/rest-api-spec/test/search.aggregation/170_cardinality_metric_unsigned.yml b/rest-api-spec/src/main/resources/rest-api-spec/test/search.aggregation/170_cardinality_metric_unsigned.yml index 27245d1aa435c..7872772837c49 100644 --- a/rest-api-spec/src/main/resources/rest-api-spec/test/search.aggregation/170_cardinality_metric_unsigned.yml +++ b/rest-api-spec/src/main/resources/rest-api-spec/test/search.aggregation/170_cardinality_metric_unsigned.yml @@ -265,6 +265,9 @@ setup: --- "profiler string": + - skip: + version: " - 3.2.99" + reason: hybrid collector added in 3.3.0 - do: search: body: @@ -282,6 +285,7 @@ setup: - gt: { profile.shards.0.aggregations.0.breakdown.post_collection: 0 } - match: { profile.shards.0.aggregations.0.debug.empty_collectors_used: 0 } - match: { profile.shards.0.aggregations.0.debug.numeric_collectors_used: 0 } - - gt: { profile.shards.0.aggregations.0.debug.ordinals_collectors_used: 0 } + - match: { profile.shards.0.aggregations.0.debug.ordinals_collectors_used: 0 } - match: { profile.shards.0.aggregations.0.debug.ordinals_collectors_overhead_too_high: 0 } - match: { profile.shards.0.aggregations.0.debug.string_hashing_collectors_used: 0 } + - gt: { profile.shards.0.aggregations.0.debug.hybrid_collectors_used: 0 } diff --git a/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java b/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java index 7fe3cde1e23f2..2adb0fcd24eb1 100644 --- a/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java +++ b/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java @@ -159,6 +159,7 @@ import org.opensearch.script.ScriptService; import org.opensearch.search.SearchService; import org.opensearch.search.aggregations.MultiBucketConsumerService; +import org.opensearch.search.aggregations.metrics.CardinalityAggregator; import org.opensearch.search.backpressure.settings.NodeDuressSettings; import org.opensearch.search.backpressure.settings.SearchBackpressureSettings; import org.opensearch.search.backpressure.settings.SearchShardTaskSettings; @@ -574,6 +575,8 @@ public void apply(Settings value, Settings current, Settings previous) { SearchService.INDICES_MAX_CLAUSE_COUNT_SETTING, SearchService.SEARCH_MAX_QUERY_STRING_LENGTH, SearchService.CARDINALITY_AGGREGATION_PRUNING_THRESHOLD, + CardinalityAggregator.CARDINALITY_AGGREGATION_HYBRID_COLLECTOR_ENABLED, + CardinalityAggregator.CARDINALITY_AGGREGATION_HYBRID_COLLECTOR_MEMORY_THRESHOLD, SearchService.KEYWORD_INDEX_OR_DOC_VALUES_ENABLED, CreatePitController.PIT_INIT_KEEP_ALIVE, Node.WRITE_PORTS_FILE_SETTING, diff --git a/server/src/main/java/org/opensearch/search/DefaultSearchContext.java b/server/src/main/java/org/opensearch/search/DefaultSearchContext.java index 14f7b4b321638..1ff55a994fc51 100644 --- a/server/src/main/java/org/opensearch/search/DefaultSearchContext.java +++ b/server/src/main/java/org/opensearch/search/DefaultSearchContext.java @@ -55,6 +55,7 @@ import org.opensearch.common.settings.Settings; import org.opensearch.common.unit.TimeValue; import org.opensearch.common.util.BigArrays; +import org.opensearch.core.common.unit.ByteSizeValue; import org.opensearch.core.concurrency.OpenSearchRejectedExecutionException; import org.opensearch.index.IndexService; import org.opensearch.index.IndexSettings; @@ -75,6 +76,7 @@ import org.opensearch.search.aggregations.BucketCollectorProcessor; import org.opensearch.search.aggregations.InternalAggregation; import org.opensearch.search.aggregations.SearchContextAggregations; +import org.opensearch.search.aggregations.metrics.CardinalityAggregator; import org.opensearch.search.builder.SearchSourceBuilder; import org.opensearch.search.collapse.CollapseContext; import org.opensearch.search.deciders.ConcurrentSearchDecision; @@ -220,6 +222,8 @@ final class DefaultSearchContext extends SearchContext { private final int maxAggRewriteFilters; private final int filterRewriteSegmentThreshold; private final int cardinalityAggregationPruningThreshold; + private final boolean cardinalityAggregationHybridCollectorEnabled; + private final long cardinalityAggregationHybridCollectorMemoryThreshold; private final int bucketSelectionStrategyFactor; private final boolean keywordIndexOrDocValuesEnabled; @@ -287,6 +291,8 @@ final class DefaultSearchContext extends SearchContext { this.maxAggRewriteFilters = evaluateFilterRewriteSetting(); this.filterRewriteSegmentThreshold = evaluateAggRewriteFilterSegThreshold(); this.cardinalityAggregationPruningThreshold = evaluateCardinalityAggregationPruningThreshold(); + this.cardinalityAggregationHybridCollectorEnabled = evaluateCardinalityAggregationHybridCollectorEnabled(); + this.cardinalityAggregationHybridCollectorMemoryThreshold = evaluateCardinalityAggregationHybridCollectorMemoryThreshold(); this.bucketSelectionStrategyFactor = evaluateBucketSelectionStrategyFactor(); this.concurrentSearchDeciderFactories = concurrentSearchDeciderFactories; this.keywordIndexOrDocValuesEnabled = evaluateKeywordIndexOrDocValuesEnabled(); @@ -1238,6 +1244,16 @@ public int cardinalityAggregationPruningThreshold() { return cardinalityAggregationPruningThreshold; } + @Override + public boolean cardinalityAggregationHybridCollectorEnabled() { + return cardinalityAggregationHybridCollectorEnabled; + } + + @Override + public long cardinalityAggregationHybridCollectorMemoryThreshold() { + return cardinalityAggregationHybridCollectorMemoryThreshold; + } + @Override public int bucketSelectionStrategyFactor() { return bucketSelectionStrategyFactor; @@ -1255,6 +1271,22 @@ private int evaluateCardinalityAggregationPruningThreshold() { return 0; } + private boolean evaluateCardinalityAggregationHybridCollectorEnabled() { + if (clusterService != null) { + return clusterService.getClusterSettings().get(CardinalityAggregator.CARDINALITY_AGGREGATION_HYBRID_COLLECTOR_ENABLED); + } + return false; + } + + private long evaluateCardinalityAggregationHybridCollectorMemoryThreshold() { + if (clusterService != null) { + ByteSizeValue threshold = clusterService.getClusterSettings() + .get(CardinalityAggregator.CARDINALITY_AGGREGATION_HYBRID_COLLECTOR_MEMORY_THRESHOLD); + return threshold.getBytes(); + } + return Runtime.getRuntime().maxMemory() / 100; // 1% default + } + private int evaluateBucketSelectionStrategyFactor() { if (clusterService != null) { return clusterService.getClusterSettings().get(BUCKET_SELECTION_STRATEGY_FACTOR_SETTING); diff --git a/server/src/main/java/org/opensearch/search/aggregations/metrics/CardinalityAggregator.java b/server/src/main/java/org/opensearch/search/aggregations/metrics/CardinalityAggregator.java index 0cb319b853bce..533e783f4fa1a 100644 --- a/server/src/main/java/org/opensearch/search/aggregations/metrics/CardinalityAggregator.java +++ b/server/src/main/java/org/opensearch/search/aggregations/metrics/CardinalityAggregator.java @@ -57,11 +57,13 @@ import org.opensearch.common.hash.MurmurHash3; import org.opensearch.common.lease.Releasable; import org.opensearch.common.lease.Releasables; +import org.opensearch.common.settings.Setting; import org.opensearch.common.util.BigArrays; import org.opensearch.common.util.BitArray; import org.opensearch.common.util.BitMixer; import org.opensearch.common.util.LongArray; import org.opensearch.common.util.ObjectArray; +import org.opensearch.core.common.unit.ByteSizeValue; import org.opensearch.core.rest.RestStatus; import org.opensearch.index.fielddata.SortedBinaryDocValues; import org.opensearch.index.fielddata.SortedNumericDoubleValues; @@ -88,6 +90,27 @@ public class CardinalityAggregator extends NumericMetricsAggregator.SingleValue private static final Logger logger = LogManager.getLogger(CardinalityAggregator.class); + /** + * Setting to enable/disable hybrid collector for cardinality aggregation. + */ + public static final Setting CARDINALITY_AGGREGATION_HYBRID_COLLECTOR_ENABLED = Setting.boolSetting( + "search.aggregations.cardinality.hybrid_collector.enabled", + true, + Setting.Property.NodeScope, + Setting.Property.Dynamic + ); + + /** + * Setting for hybrid collector memory threshold. Supports both percentage (e.g., "1%") + * and absolute values (e.g., "10mb", "1gb"). + */ + public static final Setting CARDINALITY_AGGREGATION_HYBRID_COLLECTOR_MEMORY_THRESHOLD = Setting.memorySizeSetting( + "search.aggregations.cardinality.hybrid_collector.memory_threshold", + "1%", + Setting.Property.NodeScope, + Setting.Property.Dynamic + ); + final CardinalityAggregatorFactory.ExecutionMode executionMode; final int precision; final ValuesSource valuesSource; @@ -103,6 +126,7 @@ public class CardinalityAggregator extends NumericMetricsAggregator.SingleValue int emptyCollectorsUsed; int numericCollectorsUsed; int ordinalsCollectorsUsed; + int hybridCollectorsUsed; int ordinalsCollectorsOverheadTooHigh; int stringHashingCollectorsUsed; int dynamicPrunedSegments; @@ -157,14 +181,24 @@ private Collector pickCollector(LeafReaderContext ctx) throws IOException { collector = new OrdinalsCollector(counts, ordinalValues, context.bigArrays()); } else if (executionMode == null) { // no hint provided, fall back to heuristics - final long ordinalsMemoryUsage = OrdinalsCollector.memoryOverhead(maxOrd); - final long countsMemoryUsage = HyperLogLogPlusPlus.memoryUsage(precision); - // only use ordinals if they don't increase memory usage by more than 25% - if (ordinalsMemoryUsage < countsMemoryUsage / 4) { - ordinalsCollectorsUsed++; - collector = new OrdinalsCollector(counts, ordinalValues, context.bigArrays()); + // Check if hybrid collector is enabled + boolean hybridCollectorEnabled = context.cardinalityAggregationHybridCollectorEnabled(); + + if (hybridCollectorEnabled) { + // Use HybridCollector with configurable memory threshold + long memoryThreshold = context.cardinalityAggregationHybridCollectorMemoryThreshold(); + MurmurHash3Values hashValues = MurmurHash3Values.hash(source.bytesValues(ctx)); + hybridCollectorsUsed++; + collector = new HybridCollector(counts, ordinalValues, hashValues, context.bigArrays(), memoryThreshold); } else { - ordinalsCollectorsOverheadTooHigh++; + final long ordinalsMemoryUsage = OrdinalsCollector.memoryOverhead(maxOrd); + final long countsMemoryUsage = HyperLogLogPlusPlus.memoryUsage(precision); + if (ordinalsMemoryUsage < countsMemoryUsage / 4) { + ordinalsCollectorsUsed++; + collector = new OrdinalsCollector(counts, ordinalValues, context.bigArrays()); + } else { + ordinalsCollectorsOverheadTooHigh++; + } } } } @@ -339,6 +373,7 @@ public void collectDebugInfo(BiConsumer add) { add.accept("empty_collectors_used", emptyCollectorsUsed); add.accept("numeric_collectors_used", numericCollectorsUsed); add.accept("ordinals_collectors_used", ordinalsCollectorsUsed); + add.accept("hybrid_collectors_used", hybridCollectorsUsed); add.accept("ordinals_collectors_overhead_too_high", ordinalsCollectorsOverheadTooHigh); add.accept("string_hashing_collectors_used", stringHashingCollectorsUsed); add.accept("dynamic_pruned_segments", dynamicPrunedSegments); @@ -564,6 +599,7 @@ public static long memoryOverhead(long maxOrd) { private final int maxOrd; private final HyperLogLogPlusPlus counts; private ObjectArray visitedOrds; + private long currentMemoryUsage = 0; OrdinalsCollector(HyperLogLogPlusPlus counts, SortedSetDocValues values, BigArrays bigArrays) { if (values.getValueCount() > Integer.MAX_VALUE) { @@ -583,6 +619,8 @@ public void collect(int doc, long bucketOrd) throws IOException { if (bits == null) { bits = new BitArray(maxOrd, bigArrays); visitedOrds.set(bucketOrd, bits); + // Update memory usage when new BitArray is created + currentMemoryUsage += memoryOverhead(maxOrd); } if (values.advanceExact(doc)) { int count = values.docValueCount(); @@ -627,6 +665,10 @@ public void postCollect() throws IOException { } } + public long getCurrentMemoryUsage() { + return currentMemoryUsage; + } + @Override public void close() { for (int i = 0; i < visitedOrds.size(); i++) { @@ -761,4 +803,71 @@ public long nextValue() throws IOException { } } } + + /** + * Hybrid Collector that starts with OrdinalsCollector and switches to DirectCollector + * when memory consumption exceeds a threshold. + */ + static class HybridCollector extends Collector { + private final HyperLogLogPlusPlus counts; + private final MurmurHash3Values hashValues; + private final long memoryThreshold; + + private Collector activeCollector; + private final OrdinalsCollector ordinalsCollector; + private boolean switchedToDirectCollector = false; + + HybridCollector( + HyperLogLogPlusPlus counts, + SortedSetDocValues ordinalValues, + MurmurHash3Values hashValues, + BigArrays bigArrays, + long memoryThreshold + ) { + this.counts = counts; + this.hashValues = hashValues; + this.memoryThreshold = memoryThreshold; + + // Start with OrdinalsCollector + this.ordinalsCollector = new OrdinalsCollector(counts, ordinalValues, bigArrays); + this.activeCollector = ordinalsCollector; + } + + @Override + public void collect(int doc, long bucketOrd) throws IOException { + activeCollector.collect(doc, bucketOrd); + + // Check memory usage after collection + if (!switchedToDirectCollector && ordinalsCollector.getCurrentMemoryUsage() > memoryThreshold) { + switchToDirectCollector(); + } + } + + private void switchToDirectCollector() throws IOException { + // Switching to Direct Collector because memory consumption from Ordinals Collector is high. + // Post collect all the already computed data. + ordinalsCollector.postCollect(); + ordinalsCollector.close(); + logger.debug("switching to direct collector"); + + // Pass computed data to Direct Collector. + activeCollector = new DirectCollector(counts, hashValues); + switchedToDirectCollector = true; + } + + @Override + public void postCollect() throws IOException { + activeCollector.postCollect(); + } + + @Override + public void close() { + Releasables.close(activeCollector); + } + + // Visible for testing + Collector getActiveCollector() { + return activeCollector; + } + } } diff --git a/server/src/main/java/org/opensearch/search/internal/SearchContext.java b/server/src/main/java/org/opensearch/search/internal/SearchContext.java index ac38b364fd36b..4a0b968e23ac0 100644 --- a/server/src/main/java/org/opensearch/search/internal/SearchContext.java +++ b/server/src/main/java/org/opensearch/search/internal/SearchContext.java @@ -555,6 +555,14 @@ public int cardinalityAggregationPruningThreshold() { return 0; } + public boolean cardinalityAggregationHybridCollectorEnabled() { + return false; + } + + public long cardinalityAggregationHybridCollectorMemoryThreshold() { + return Runtime.getRuntime().maxMemory() / 100; // 1% of available memory default + } + public int bucketSelectionStrategyFactor() { return SearchService.DEFAULT_BUCKET_SELECTION_STRATEGY_FACTOR; } diff --git a/server/src/test/java/org/opensearch/search/aggregations/metrics/CardinalityAggregatorTests.java b/server/src/test/java/org/opensearch/search/aggregations/metrics/CardinalityAggregatorTests.java index ca65c888f3363..8ff05ecbe0ac5 100644 --- a/server/src/test/java/org/opensearch/search/aggregations/metrics/CardinalityAggregatorTests.java +++ b/server/src/test/java/org/opensearch/search/aggregations/metrics/CardinalityAggregatorTests.java @@ -69,6 +69,8 @@ import org.opensearch.search.aggregations.InternalAggregation; import org.opensearch.search.aggregations.LeafBucketCollector; import org.opensearch.search.aggregations.MultiBucketConsumerService; +import org.opensearch.search.aggregations.metrics.CardinalityAggregator.HybridCollector; +import org.opensearch.search.aggregations.metrics.CardinalityAggregator.OrdinalsCollector; import org.opensearch.search.aggregations.pipeline.PipelineAggregator; import org.opensearch.search.aggregations.support.AggregationInspectionHelper; @@ -634,4 +636,111 @@ public void testOrdinalsExecutionHintWithByteValues() throws IOException { assertTrue(AggregationInspectionHelper.hasValue(card)); }, collector -> { assertTrue(collector instanceof CardinalityAggregator.OrdinalsCollector); }, fieldType); } + + private void testAggregationHybridCollector( + AggregationBuilder aggregationBuilder, + Query query, + CheckedConsumer buildIndex, + Consumer verify, + Consumer verifyCollector, + MappedFieldType fieldType, + boolean hybridCollectorEnabled, + long memoryThreshold + ) throws IOException { + try (Directory directory = newDirectory()) { + RandomIndexWriter indexWriter = new RandomIndexWriter(random(), directory); + buildIndex.accept(indexWriter); + indexWriter.close(); + + try (IndexReader indexReader = DirectoryReader.open(directory)) { + IndexSearcher indexSearcher = newSearcher(indexReader, true, true); + + CountingAggregator aggregator = new CountingAggregator( + new AtomicInteger(), + createAggregatorWithCustomizableSearchContext( + query, + aggregationBuilder, + indexSearcher, + createIndexSettings(), + new MultiBucketConsumerService.MultiBucketConsumer( + Integer.MAX_VALUE, + new NoneCircuitBreakerService().getBreaker(CircuitBreaker.REQUEST) + ), + (searchContext) -> { + when(searchContext.cardinalityAggregationHybridCollectorEnabled()).thenReturn(hybridCollectorEnabled); + when(searchContext.cardinalityAggregationHybridCollectorMemoryThreshold()).thenReturn(memoryThreshold); + }, + fieldType + ) + ); + aggregator.preCollection(); + indexSearcher.search(query, aggregator); + aggregator.postCollection(); + + MultiBucketConsumerService.MultiBucketConsumer reduceBucketConsumer = new MultiBucketConsumerService.MultiBucketConsumer( + Integer.MAX_VALUE, + new NoneCircuitBreakerService().getBreaker(CircuitBreaker.REQUEST) + ); + InternalAggregation.ReduceContext context = InternalAggregation.ReduceContext.forFinalReduction( + aggregator.context().bigArrays(), + getMockScriptService(), + reduceBucketConsumer, + PipelineAggregator.PipelineTree.EMPTY + ); + InternalCardinality topLevel = (InternalCardinality) aggregator.buildTopLevel(); + InternalCardinality card = (InternalCardinality) topLevel.reduce(Collections.singletonList(topLevel), context); + doAssertReducedMultiBucketConsumer(card, reduceBucketConsumer); + + verify.accept(card); + verifyCollector.accept(aggregator.getSelectedCollector()); + } + } + } + + public void testHybridCollectorEnabledWithKeywordField() throws IOException { + MappedFieldType fieldType = new KeywordFieldMapper.KeywordFieldType("field"); + final CardinalityAggregationBuilder aggregationBuilder = new CardinalityAggregationBuilder("_name").field("field"); + + testAggregationHybridCollector(aggregationBuilder, new MatchAllDocsQuery(), iw -> { + iw.addDocument(singleton(new SortedSetDocValuesField("field", new BytesRef("a")))); + iw.addDocument(singleton(new SortedSetDocValuesField("field", new BytesRef("b")))); + }, card -> { + assertEquals(2.0, card.getValue(), 0.1); + assertTrue(AggregationInspectionHelper.hasValue(card)); + }, collector -> { + assertTrue(collector instanceof HybridCollector); + assertTrue(((HybridCollector) collector).getActiveCollector() instanceof OrdinalsCollector); + }, fieldType, true, 1024L * 1024L); + } + + public void testHybridCollectorDisabledWithKeywordField() throws IOException { + MappedFieldType fieldType = new KeywordFieldMapper.KeywordFieldType("field"); + final CardinalityAggregationBuilder aggregationBuilder = new CardinalityAggregationBuilder("_name").field("field"); + + testAggregationHybridCollector(aggregationBuilder, new MatchAllDocsQuery(), iw -> { + iw.addDocument(singleton(new SortedSetDocValuesField("field", new BytesRef("a")))); + iw.addDocument(singleton(new SortedSetDocValuesField("field", new BytesRef("b")))); + }, card -> { + assertEquals(2.0, card.getValue(), 0.1); + assertTrue(AggregationInspectionHelper.hasValue(card)); + }, collector -> { assertTrue(collector instanceof OrdinalsCollector); }, fieldType, false, 1024L * 1024L); + } + + public void testHybridCollectorMemoryThresholdExceeded() throws IOException { + MappedFieldType fieldType = new KeywordFieldMapper.KeywordFieldType("field"); + final CardinalityAggregationBuilder aggregationBuilder = new CardinalityAggregationBuilder("_name").field("field"); + + testAggregationHybridCollector(aggregationBuilder, new MatchAllDocsQuery(), iw -> { + // Add many documents to potentially exceed memory threshold + for (int i = 0; i < 1000; i++) { + iw.addDocument(singleton(new SortedSetDocValuesField("field", new BytesRef("value" + i)))); + } + }, card -> { + assertEquals(1000.0, card.getValue(), 10.0); + assertTrue(AggregationInspectionHelper.hasValue(card)); + }, collector -> { + assertTrue(collector instanceof HybridCollector); + assertTrue(((HybridCollector) collector).getActiveCollector() instanceof CardinalityAggregator.DirectCollector); + }, fieldType, true, 1); // Very low threshold to trigger switching + } } From 625f7015e7e512fbad71d29eefe03063fc030d99 Mon Sep 17 00:00:00 2001 From: Anand Pravinbhai Patel Date: Wed, 15 Oct 2025 18:11:15 -0700 Subject: [PATCH 2/4] Address PR comments Signed-off-by: Anand Pravinbhai Patel --- .../metrics/CardinalityAggregator.java | 41 ++++++++++++++++--- 1 file changed, 35 insertions(+), 6 deletions(-) diff --git a/server/src/main/java/org/opensearch/search/aggregations/metrics/CardinalityAggregator.java b/server/src/main/java/org/opensearch/search/aggregations/metrics/CardinalityAggregator.java index 533e783f4fa1a..60feb5d096f07 100644 --- a/server/src/main/java/org/opensearch/search/aggregations/metrics/CardinalityAggregator.java +++ b/server/src/main/java/org/opensearch/search/aggregations/metrics/CardinalityAggregator.java @@ -77,6 +77,7 @@ import java.io.IOException; import java.util.HashMap; import java.util.Map; +import java.util.Optional; import java.util.function.BiConsumer; import static org.opensearch.search.SearchService.CARDINALITY_AGGREGATION_PRUNING_THRESHOLD; @@ -601,7 +602,20 @@ public static long memoryOverhead(long maxOrd) { private ObjectArray visitedOrds; private long currentMemoryUsage = 0; + private final long memoryThreshold; + private final Optional> onMemoryLimitReached; + OrdinalsCollector(HyperLogLogPlusPlus counts, SortedSetDocValues values, BigArrays bigArrays) { + this(counts, values, bigArrays, Long.MAX_VALUE, Optional.empty()); + } + + OrdinalsCollector( + HyperLogLogPlusPlus counts, + SortedSetDocValues values, + BigArrays bigArrays, + long memoryThreshold, + Optional> onMemoryLimitReached + ) { if (values.getValueCount() > Integer.MAX_VALUE) { throw new IllegalArgumentException(); } @@ -609,6 +623,8 @@ public static long memoryOverhead(long maxOrd) { this.bigArrays = bigArrays; this.counts = counts; this.values = values; + this.memoryThreshold = memoryThreshold; + this.onMemoryLimitReached = onMemoryLimitReached; visitedOrds = bigArrays.newObjectArray(1); } @@ -619,8 +635,12 @@ public void collect(int doc, long bucketOrd) throws IOException { if (bits == null) { bits = new BitArray(maxOrd, bigArrays); visitedOrds.set(bucketOrd, bits); - // Update memory usage when new BitArray is created currentMemoryUsage += memoryOverhead(maxOrd); + + if (currentMemoryUsage > memoryThreshold && onMemoryLimitReached.isPresent()) { + onMemoryLimitReached.get().accept(doc, bucketOrd); + return; + } } if (values.advanceExact(doc)) { int count = values.docValueCount(); @@ -815,7 +835,6 @@ static class HybridCollector extends Collector { private Collector activeCollector; private final OrdinalsCollector ordinalsCollector; - private boolean switchedToDirectCollector = false; HybridCollector( HyperLogLogPlusPlus counts, @@ -829,17 +848,28 @@ static class HybridCollector extends Collector { this.memoryThreshold = memoryThreshold; // Start with OrdinalsCollector - this.ordinalsCollector = new OrdinalsCollector(counts, ordinalValues, bigArrays); + this.ordinalsCollector = new OrdinalsCollector( + counts, + ordinalValues, + bigArrays, + memoryThreshold, + Optional.of(this::handleMemoryLimitReached) + ); this.activeCollector = ordinalsCollector; } @Override public void collect(int doc, long bucketOrd) throws IOException { activeCollector.collect(doc, bucketOrd); + } - // Check memory usage after collection - if (!switchedToDirectCollector && ordinalsCollector.getCurrentMemoryUsage() > memoryThreshold) { + private void handleMemoryLimitReached(int doc, long bucketOrd) { + try { switchToDirectCollector(); + // Collect the document that triggered the switch + activeCollector.collect(doc, bucketOrd); + } catch (IOException e) { + throw new RuntimeException("Failed to switch to direct collector", e); } } @@ -852,7 +882,6 @@ private void switchToDirectCollector() throws IOException { // Pass computed data to Direct Collector. activeCollector = new DirectCollector(counts, hashValues); - switchedToDirectCollector = true; } @Override From 6fbd9162e4d29403838b2ce376b066f92ade8f18 Mon Sep 17 00:00:00 2001 From: Anand Pravinbhai Patel Date: Mon, 3 Nov 2025 10:28:40 -0800 Subject: [PATCH 3/4] Address PR comments Signed-off-by: Anand Pravinbhai Patel --- .../search/DefaultSearchContext.java | 33 +++++---------- .../CardinalityAggregationContext.java | 42 +++++++++++++++++++ .../metrics/CardinalityAggregator.java | 28 ++++++------- .../search/internal/SearchContext.java | 9 ++-- .../metrics/CardinalityAggregatorTests.java | 7 +++- 5 files changed, 75 insertions(+), 44 deletions(-) create mode 100644 server/src/main/java/org/opensearch/search/aggregations/metrics/CardinalityAggregationContext.java diff --git a/server/src/main/java/org/opensearch/search/DefaultSearchContext.java b/server/src/main/java/org/opensearch/search/DefaultSearchContext.java index 1ff55a994fc51..5e2d93a9af31e 100644 --- a/server/src/main/java/org/opensearch/search/DefaultSearchContext.java +++ b/server/src/main/java/org/opensearch/search/DefaultSearchContext.java @@ -76,6 +76,7 @@ import org.opensearch.search.aggregations.BucketCollectorProcessor; import org.opensearch.search.aggregations.InternalAggregation; import org.opensearch.search.aggregations.SearchContextAggregations; +import org.opensearch.search.aggregations.metrics.CardinalityAggregationContext; import org.opensearch.search.aggregations.metrics.CardinalityAggregator; import org.opensearch.search.builder.SearchSourceBuilder; import org.opensearch.search.collapse.CollapseContext; @@ -222,8 +223,7 @@ final class DefaultSearchContext extends SearchContext { private final int maxAggRewriteFilters; private final int filterRewriteSegmentThreshold; private final int cardinalityAggregationPruningThreshold; - private final boolean cardinalityAggregationHybridCollectorEnabled; - private final long cardinalityAggregationHybridCollectorMemoryThreshold; + private final CardinalityAggregationContext cardinalityAggregationContext; private final int bucketSelectionStrategyFactor; private final boolean keywordIndexOrDocValuesEnabled; @@ -291,8 +291,7 @@ final class DefaultSearchContext extends SearchContext { this.maxAggRewriteFilters = evaluateFilterRewriteSetting(); this.filterRewriteSegmentThreshold = evaluateAggRewriteFilterSegThreshold(); this.cardinalityAggregationPruningThreshold = evaluateCardinalityAggregationPruningThreshold(); - this.cardinalityAggregationHybridCollectorEnabled = evaluateCardinalityAggregationHybridCollectorEnabled(); - this.cardinalityAggregationHybridCollectorMemoryThreshold = evaluateCardinalityAggregationHybridCollectorMemoryThreshold(); + this.cardinalityAggregationContext = evaluateCardinalityAggregationContext(); this.bucketSelectionStrategyFactor = evaluateBucketSelectionStrategyFactor(); this.concurrentSearchDeciderFactories = concurrentSearchDeciderFactories; this.keywordIndexOrDocValuesEnabled = evaluateKeywordIndexOrDocValuesEnabled(); @@ -1245,13 +1244,8 @@ public int cardinalityAggregationPruningThreshold() { } @Override - public boolean cardinalityAggregationHybridCollectorEnabled() { - return cardinalityAggregationHybridCollectorEnabled; - } - - @Override - public long cardinalityAggregationHybridCollectorMemoryThreshold() { - return cardinalityAggregationHybridCollectorMemoryThreshold; + public CardinalityAggregationContext cardinalityAggregationContext() { + return cardinalityAggregationContext; } @Override @@ -1271,20 +1265,15 @@ private int evaluateCardinalityAggregationPruningThreshold() { return 0; } - private boolean evaluateCardinalityAggregationHybridCollectorEnabled() { - if (clusterService != null) { - return clusterService.getClusterSettings().get(CardinalityAggregator.CARDINALITY_AGGREGATION_HYBRID_COLLECTOR_ENABLED); - } - return false; - } - - private long evaluateCardinalityAggregationHybridCollectorMemoryThreshold() { + private CardinalityAggregationContext evaluateCardinalityAggregationContext() { if (clusterService != null) { - ByteSizeValue threshold = clusterService.getClusterSettings() + boolean hybridCollectorEnabled = clusterService.getClusterSettings() + .get(CardinalityAggregator.CARDINALITY_AGGREGATION_HYBRID_COLLECTOR_ENABLED); + ByteSizeValue memoryThreshold = clusterService.getClusterSettings() .get(CardinalityAggregator.CARDINALITY_AGGREGATION_HYBRID_COLLECTOR_MEMORY_THRESHOLD); - return threshold.getBytes(); + return CardinalityAggregationContext.from(hybridCollectorEnabled, memoryThreshold); } - return Runtime.getRuntime().maxMemory() / 100; // 1% default + return new CardinalityAggregationContext(false, Runtime.getRuntime().maxMemory() / 100); } private int evaluateBucketSelectionStrategyFactor() { diff --git a/server/src/main/java/org/opensearch/search/aggregations/metrics/CardinalityAggregationContext.java b/server/src/main/java/org/opensearch/search/aggregations/metrics/CardinalityAggregationContext.java new file mode 100644 index 0000000000000..df6d8de333ebf --- /dev/null +++ b/server/src/main/java/org/opensearch/search/aggregations/metrics/CardinalityAggregationContext.java @@ -0,0 +1,42 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.search.aggregations.metrics; + +import org.opensearch.common.annotation.PublicApi; +import org.opensearch.core.common.unit.ByteSizeValue; + +/** + * Context object that encapsulates all cardinality aggregation related settings and configurations. + * This helps keep cardinality-specific settings scoped properly and reduces SearchContext bloat. + */ +@PublicApi(since = "3.4.0") +public class CardinalityAggregationContext { + private final boolean hybridCollectorEnabled; + private final long memoryThreshold; + + public CardinalityAggregationContext(boolean hybridCollectorEnabled, long memoryThreshold) { + this.hybridCollectorEnabled = hybridCollectorEnabled; + this.memoryThreshold = memoryThreshold; + } + + public boolean isHybridCollectorEnabled() { + return hybridCollectorEnabled; + } + + public long getMemoryThreshold() { + return memoryThreshold; + } + + /** + * Creates a CardinalityAggregationContext from cluster settings + */ + public static CardinalityAggregationContext from(boolean hybridCollectorEnabled, ByteSizeValue memoryThreshold) { + return new CardinalityAggregationContext(hybridCollectorEnabled, memoryThreshold.getBytes()); + } +} diff --git a/server/src/main/java/org/opensearch/search/aggregations/metrics/CardinalityAggregator.java b/server/src/main/java/org/opensearch/search/aggregations/metrics/CardinalityAggregator.java index 60feb5d096f07..77b58899b4059 100644 --- a/server/src/main/java/org/opensearch/search/aggregations/metrics/CardinalityAggregator.java +++ b/server/src/main/java/org/opensearch/search/aggregations/metrics/CardinalityAggregator.java @@ -182,15 +182,13 @@ private Collector pickCollector(LeafReaderContext ctx) throws IOException { collector = new OrdinalsCollector(counts, ordinalValues, context.bigArrays()); } else if (executionMode == null) { // no hint provided, fall back to heuristics - // Check if hybrid collector is enabled - boolean hybridCollectorEnabled = context.cardinalityAggregationHybridCollectorEnabled(); + CardinalityAggregationContext cardinalityContext = context.cardinalityAggregationContext(); - if (hybridCollectorEnabled) { + if (cardinalityContext.isHybridCollectorEnabled()) { // Use HybridCollector with configurable memory threshold - long memoryThreshold = context.cardinalityAggregationHybridCollectorMemoryThreshold(); MurmurHash3Values hashValues = MurmurHash3Values.hash(source.bytesValues(ctx)); hybridCollectorsUsed++; - collector = new HybridCollector(counts, ordinalValues, hashValues, context.bigArrays(), memoryThreshold); + collector = new HybridCollector(counts, ordinalValues, hashValues, context.bigArrays(), cardinalityContext); } else { final long ordinalsMemoryUsage = OrdinalsCollector.memoryOverhead(maxOrd); final long countsMemoryUsage = HyperLogLogPlusPlus.memoryUsage(precision); @@ -635,11 +633,13 @@ public void collect(int doc, long bucketOrd) throws IOException { if (bits == null) { bits = new BitArray(maxOrd, bigArrays); visitedOrds.set(bucketOrd, bits); - currentMemoryUsage += memoryOverhead(maxOrd); - if (currentMemoryUsage > memoryThreshold && onMemoryLimitReached.isPresent()) { - onMemoryLimitReached.get().accept(doc, bucketOrd); - return; + if (onMemoryLimitReached.isPresent()) { + currentMemoryUsage += memoryOverhead(maxOrd); + if (currentMemoryUsage > memoryThreshold) { + onMemoryLimitReached.get().accept(doc, bucketOrd); + return; + } } } if (values.advanceExact(doc)) { @@ -831,7 +831,7 @@ public long nextValue() throws IOException { static class HybridCollector extends Collector { private final HyperLogLogPlusPlus counts; private final MurmurHash3Values hashValues; - private final long memoryThreshold; + private final CardinalityAggregationContext cardinalityContext; private Collector activeCollector; private final OrdinalsCollector ordinalsCollector; @@ -841,18 +841,18 @@ static class HybridCollector extends Collector { SortedSetDocValues ordinalValues, MurmurHash3Values hashValues, BigArrays bigArrays, - long memoryThreshold + CardinalityAggregationContext cardinalityContext ) { this.counts = counts; this.hashValues = hashValues; - this.memoryThreshold = memoryThreshold; + this.cardinalityContext = cardinalityContext; - // Start with OrdinalsCollector + // Start with OrdinalsCollector with memory monitoring this.ordinalsCollector = new OrdinalsCollector( counts, ordinalValues, bigArrays, - memoryThreshold, + cardinalityContext.getMemoryThreshold(), Optional.of(this::handleMemoryLimitReached) ); this.activeCollector = ordinalsCollector; diff --git a/server/src/main/java/org/opensearch/search/internal/SearchContext.java b/server/src/main/java/org/opensearch/search/internal/SearchContext.java index 4a0b968e23ac0..e0e18daa2cc29 100644 --- a/server/src/main/java/org/opensearch/search/internal/SearchContext.java +++ b/server/src/main/java/org/opensearch/search/internal/SearchContext.java @@ -64,6 +64,7 @@ import org.opensearch.search.aggregations.SearchContextAggregations; import org.opensearch.search.aggregations.bucket.LocalBucketCountThresholds; import org.opensearch.search.aggregations.bucket.terms.TermsAggregator; +import org.opensearch.search.aggregations.metrics.CardinalityAggregationContext; import org.opensearch.search.collapse.CollapseContext; import org.opensearch.search.dfs.DfsSearchResult; import org.opensearch.search.fetch.FetchPhase; @@ -555,12 +556,8 @@ public int cardinalityAggregationPruningThreshold() { return 0; } - public boolean cardinalityAggregationHybridCollectorEnabled() { - return false; - } - - public long cardinalityAggregationHybridCollectorMemoryThreshold() { - return Runtime.getRuntime().maxMemory() / 100; // 1% of available memory default + public CardinalityAggregationContext cardinalityAggregationContext() { + return new CardinalityAggregationContext(false, Runtime.getRuntime().maxMemory() / 100); } public int bucketSelectionStrategyFactor() { diff --git a/server/src/test/java/org/opensearch/search/aggregations/metrics/CardinalityAggregatorTests.java b/server/src/test/java/org/opensearch/search/aggregations/metrics/CardinalityAggregatorTests.java index 8ff05ecbe0ac5..573dd8a91ff13 100644 --- a/server/src/test/java/org/opensearch/search/aggregations/metrics/CardinalityAggregatorTests.java +++ b/server/src/test/java/org/opensearch/search/aggregations/metrics/CardinalityAggregatorTests.java @@ -667,8 +667,11 @@ private void testAggregationHybridCollector( new NoneCircuitBreakerService().getBreaker(CircuitBreaker.REQUEST) ), (searchContext) -> { - when(searchContext.cardinalityAggregationHybridCollectorEnabled()).thenReturn(hybridCollectorEnabled); - when(searchContext.cardinalityAggregationHybridCollectorMemoryThreshold()).thenReturn(memoryThreshold); + CardinalityAggregationContext cardinalityContext = new CardinalityAggregationContext( + hybridCollectorEnabled, + memoryThreshold + ); + when(searchContext.cardinalityAggregationContext()).thenReturn(cardinalityContext); }, fieldType ) From b077a631df3499ac75f167b4d2f1db1eb7b41574 Mon Sep 17 00:00:00 2001 From: Anand Pravinbhai Patel Date: Mon, 3 Nov 2025 11:54:14 -0800 Subject: [PATCH 4/4] Fix UTs Signed-off-by: Anand Pravinbhai Patel --- .../search/aggregations/metrics/CardinalityAggregator.java | 1 + .../org/opensearch/search/aggregations/AggregatorTestCase.java | 3 +++ 2 files changed, 4 insertions(+) diff --git a/server/src/main/java/org/opensearch/search/aggregations/metrics/CardinalityAggregator.java b/server/src/main/java/org/opensearch/search/aggregations/metrics/CardinalityAggregator.java index 77b58899b4059..d395fa8989e38 100644 --- a/server/src/main/java/org/opensearch/search/aggregations/metrics/CardinalityAggregator.java +++ b/server/src/main/java/org/opensearch/search/aggregations/metrics/CardinalityAggregator.java @@ -104,6 +104,7 @@ public class CardinalityAggregator extends NumericMetricsAggregator.SingleValue /** * Setting for hybrid collector memory threshold. Supports both percentage (e.g., "1%") * and absolute values (e.g., "10mb", "1gb"). + * Note: This threshold is applied at the Lucene segment level since collectors are created per segment. */ public static final Setting CARDINALITY_AGGREGATION_HYBRID_COLLECTOR_MEMORY_THRESHOLD = Setting.memorySizeSetting( "search.aggregations.cardinality.hybrid_collector.memory_threshold", diff --git a/test/framework/src/main/java/org/opensearch/search/aggregations/AggregatorTestCase.java b/test/framework/src/main/java/org/opensearch/search/aggregations/AggregatorTestCase.java index c7de8388981c3..947f786333adf 100644 --- a/test/framework/src/main/java/org/opensearch/search/aggregations/AggregatorTestCase.java +++ b/test/framework/src/main/java/org/opensearch/search/aggregations/AggregatorTestCase.java @@ -546,6 +546,9 @@ public boolean shouldCache(Query query) { fieldNameToType.putAll(getFieldAliases(fieldTypes)); when(searchContext.maxAggRewriteFilters()).thenReturn(10_000); + when(searchContext.cardinalityAggregationContext()).thenReturn( + new org.opensearch.search.aggregations.metrics.CardinalityAggregationContext(false, Runtime.getRuntime().maxMemory() / 100) + ); registerFieldTypes(searchContext, mapperService, fieldNameToType); doAnswer(invocation -> { /* Store the release-ables so we can release them at the end of the test case. This is important because aggregations don't