Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Add SMILE/CBOR/YAML document format support to Bulk GRPC endpoint ([#19744](https://github.com/opensearch-project/OpenSearch/pull/19744))
- Implement GRPC Search params `Highlight`and `Sort` ([#19868](https://github.com/opensearch-project/OpenSearch/pull/19868))
- Implement GRPC ConstantScoreQuery, FuzzyQuery, MatchBoolPrefixQuery, MatchPhrasePrefix, PrefixQuery, MatchQuery ([#19854](https://github.com/opensearch-project/OpenSearch/pull/19854))
- Add Hybrid Cardinality collector to prioritize Ordinals Collector ([#19524](https://github.com/opensearch-project/OpenSearch/pull/19524))
- Add async periodic flush task support for pull-based ingestion ([#19878](https://github.com/opensearch-project/OpenSearch/pull/19878))

### Changed
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -268,8 +268,8 @@ setup:
---
"profiler string":
- skip:
version: " - 7.99.99"
reason: new info added in 8.0.0 to be backported to 7.10.0
version: " - 3.3.99"
reason: hybrid collector added in 3.4.0
- do:
search:
body:
Expand All @@ -287,6 +287,7 @@ setup:
- gt: { profile.shards.0.aggregations.0.breakdown.post_collection: 0 }
- match: { profile.shards.0.aggregations.0.debug.empty_collectors_used: 0 }
- match: { profile.shards.0.aggregations.0.debug.numeric_collectors_used: 0 }
- gt: { profile.shards.0.aggregations.0.debug.ordinals_collectors_used: 0 }
- match: { profile.shards.0.aggregations.0.debug.ordinals_collectors_used: 0 }
- match: { profile.shards.0.aggregations.0.debug.ordinals_collectors_overhead_too_high: 0 }
- match: { profile.shards.0.aggregations.0.debug.string_hashing_collectors_used: 0 }
- gt: { profile.shards.0.aggregations.0.debug.hybrid_collectors_used: 0 }
Original file line number Diff line number Diff line change
Expand Up @@ -265,6 +265,9 @@ setup:

---
"profiler string":
- skip:
version: " - 3.3.99"
reason: hybrid collector added in 3.4.0
- do:
search:
body:
Expand All @@ -282,6 +285,7 @@ setup:
- gt: { profile.shards.0.aggregations.0.breakdown.post_collection: 0 }
- match: { profile.shards.0.aggregations.0.debug.empty_collectors_used: 0 }
- match: { profile.shards.0.aggregations.0.debug.numeric_collectors_used: 0 }
- gt: { profile.shards.0.aggregations.0.debug.ordinals_collectors_used: 0 }
- match: { profile.shards.0.aggregations.0.debug.ordinals_collectors_used: 0 }
- match: { profile.shards.0.aggregations.0.debug.ordinals_collectors_overhead_too_high: 0 }
- match: { profile.shards.0.aggregations.0.debug.string_hashing_collectors_used: 0 }
- gt: { profile.shards.0.aggregations.0.debug.hybrid_collectors_used: 0 }
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,7 @@
import org.opensearch.script.ScriptService;
import org.opensearch.search.SearchService;
import org.opensearch.search.aggregations.MultiBucketConsumerService;
import org.opensearch.search.aggregations.metrics.CardinalityAggregator;
import org.opensearch.search.backpressure.settings.NodeDuressSettings;
import org.opensearch.search.backpressure.settings.SearchBackpressureSettings;
import org.opensearch.search.backpressure.settings.SearchShardTaskSettings;
Expand Down Expand Up @@ -588,6 +589,8 @@ public void apply(Settings value, Settings current, Settings previous) {
SearchService.INDICES_MAX_CLAUSE_COUNT_SETTING,
SearchService.SEARCH_MAX_QUERY_STRING_LENGTH,
SearchService.CARDINALITY_AGGREGATION_PRUNING_THRESHOLD,
CardinalityAggregator.CARDINALITY_AGGREGATION_HYBRID_COLLECTOR_ENABLED,
CardinalityAggregator.CARDINALITY_AGGREGATION_HYBRID_COLLECTOR_MEMORY_THRESHOLD,
SearchService.KEYWORD_INDEX_OR_DOC_VALUES_ENABLED,
CreatePitController.PIT_INIT_KEEP_ALIVE,
Node.WRITE_PORTS_FILE_SETTING,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@
import org.opensearch.common.settings.Settings;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.common.util.BigArrays;
import org.opensearch.core.common.unit.ByteSizeValue;
import org.opensearch.core.concurrency.OpenSearchRejectedExecutionException;
import org.opensearch.index.IndexService;
import org.opensearch.index.IndexSettings;
Expand All @@ -75,6 +76,8 @@
import org.opensearch.search.aggregations.BucketCollectorProcessor;
import org.opensearch.search.aggregations.InternalAggregation;
import org.opensearch.search.aggregations.SearchContextAggregations;
import org.opensearch.search.aggregations.metrics.CardinalityAggregationContext;
import org.opensearch.search.aggregations.metrics.CardinalityAggregator;
import org.opensearch.search.builder.SearchSourceBuilder;
import org.opensearch.search.collapse.CollapseContext;
import org.opensearch.search.deciders.ConcurrentSearchDecision;
Expand Down Expand Up @@ -220,6 +223,7 @@ final class DefaultSearchContext extends SearchContext {
private final int maxAggRewriteFilters;
private final int filterRewriteSegmentThreshold;
private final int cardinalityAggregationPruningThreshold;
private final CardinalityAggregationContext cardinalityAggregationContext;
private final int bucketSelectionStrategyFactor;
private final boolean keywordIndexOrDocValuesEnabled;

Expand Down Expand Up @@ -287,6 +291,7 @@ final class DefaultSearchContext extends SearchContext {
this.maxAggRewriteFilters = evaluateFilterRewriteSetting();
this.filterRewriteSegmentThreshold = evaluateAggRewriteFilterSegThreshold();
this.cardinalityAggregationPruningThreshold = evaluateCardinalityAggregationPruningThreshold();
this.cardinalityAggregationContext = evaluateCardinalityAggregationContext();
this.bucketSelectionStrategyFactor = evaluateBucketSelectionStrategyFactor();
this.concurrentSearchDeciderFactories = concurrentSearchDeciderFactories;
this.keywordIndexOrDocValuesEnabled = evaluateKeywordIndexOrDocValuesEnabled();
Expand Down Expand Up @@ -1238,6 +1243,11 @@ public int cardinalityAggregationPruningThreshold() {
return cardinalityAggregationPruningThreshold;
}

@Override
public CardinalityAggregationContext cardinalityAggregationContext() {
return cardinalityAggregationContext;
}

@Override
public int bucketSelectionStrategyFactor() {
return bucketSelectionStrategyFactor;
Expand All @@ -1255,6 +1265,17 @@ private int evaluateCardinalityAggregationPruningThreshold() {
return 0;
}

private CardinalityAggregationContext evaluateCardinalityAggregationContext() {
if (clusterService != null) {
boolean hybridCollectorEnabled = clusterService.getClusterSettings()
.get(CardinalityAggregator.CARDINALITY_AGGREGATION_HYBRID_COLLECTOR_ENABLED);
ByteSizeValue memoryThreshold = clusterService.getClusterSettings()
.get(CardinalityAggregator.CARDINALITY_AGGREGATION_HYBRID_COLLECTOR_MEMORY_THRESHOLD);
return CardinalityAggregationContext.from(hybridCollectorEnabled, memoryThreshold);
}
return new CardinalityAggregationContext(false, Runtime.getRuntime().maxMemory() / 100);
}

private int evaluateBucketSelectionStrategyFactor() {
if (clusterService != null) {
return clusterService.getClusterSettings().get(BUCKET_SELECTION_STRATEGY_FACTOR_SETTING);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.search.aggregations.metrics;

import org.opensearch.common.annotation.PublicApi;
import org.opensearch.core.common.unit.ByteSizeValue;

/**
* Context object that encapsulates all cardinality aggregation related settings and configurations.
* This helps keep cardinality-specific settings scoped properly and reduces SearchContext bloat.
*/
@PublicApi(since = "3.4.0")
public class CardinalityAggregationContext {
private final boolean hybridCollectorEnabled;
private final long memoryThreshold;

public CardinalityAggregationContext(boolean hybridCollectorEnabled, long memoryThreshold) {
this.hybridCollectorEnabled = hybridCollectorEnabled;
this.memoryThreshold = memoryThreshold;
}

public boolean isHybridCollectorEnabled() {
return hybridCollectorEnabled;
}

public long getMemoryThreshold() {
return memoryThreshold;
}

/**
* Creates a CardinalityAggregationContext from cluster settings
*/
public static CardinalityAggregationContext from(boolean hybridCollectorEnabled, ByteSizeValue memoryThreshold) {
return new CardinalityAggregationContext(hybridCollectorEnabled, memoryThreshold.getBytes());
}
}
Loading
Loading