Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 11 additions & 3 deletions be/src/exec/pipeline/exchange/exchange_sink_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -581,9 +581,17 @@ Status ExchangeSinkOperator::push_chunk(RuntimeState* state, const ChunkPtr& chu

// Compute hash for each partition column
if (_part_type == TPartitionType::HASH_PARTITIONED) {
_hash_values.assign(num_rows, HashUtil::FNV_SEED);
for (const ColumnPtr& column : _partitions_columns) {
column->fnv_hash(&_hash_values[0], 0, num_rows);
// Check if we should use crc32_hash instead of fnv_hash based on session variable
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What hash function does starrocks use for aggregation.
The hash functions of aggregation and shuffle should be different, otherwise, it will cause a lot of hash conflicts in aggregation or hash-join. It will cause a much bigger hash table than it should be.
So when you change it, you must make sure that they are using two different functions.

if (runtime_state()->use_crc32_hash_for_exchange()) {
_hash_values.assign(num_rows, 0);
for (const ColumnPtr& column : _partitions_columns) {
column->crc32_hash(&_hash_values[0], 0, num_rows);
}
} else {
_hash_values.assign(num_rows, HashUtil::FNV_SEED);
for (const ColumnPtr& column : _partitions_columns) {
column->fnv_hash(&_hash_values[0], 0, num_rows);
}
}
} else if (_bucket_properties.empty()) {
// The data distribution was calculated using CRC32_HASH,
Expand Down
4 changes: 4 additions & 0 deletions be/src/runtime/runtime_state.h
Original file line number Diff line number Diff line change
Expand Up @@ -572,6 +572,10 @@ class RuntimeState {
return _query_options.__isset.lower_upper_support_utf8 && _query_options.lower_upper_support_utf8;
}

bool use_crc32_hash_for_exchange() const {
return _query_options.__isset.use_crc32_hash_for_exchange && _query_options.use_crc32_hash_for_exchange;
}

DebugActionMgr& debug_action_mgr() { return _debug_action_mgr; }

private:
Expand Down
17 changes: 17 additions & 0 deletions fe/fe-core/src/main/java/com/starrocks/qe/SessionVariable.java
Original file line number Diff line number Diff line change
Expand Up @@ -959,6 +959,8 @@ public static MaterializedViewRewriteMode parse(String str) {

public static final String ENABLE_FULL_SORT_USE_GERMAN_STRING = "enable_full_sort_use_german_string";

public static final String USE_CRC32_HASH_FOR_EXCHANGE = "use_crc32_hash_for_exchange";

public static final List<String> DEPRECATED_VARIABLES = ImmutableList.<String>builder()
.add(CODEGEN_LEVEL)
.add(MAX_EXECUTION_TIME)
Expand Down Expand Up @@ -1965,6 +1967,11 @@ public void setEnableDesensitizeExplain(boolean enableDesensitizeExplain) {
@VarAttr(name = ENABLE_FULL_SORT_USE_GERMAN_STRING)
private boolean enableFullSortUseGermanString = true;

// Control the hash function used in exchange sink operator for hash partitioning
// When true, use crc32_hash instead of fnv_hash
@VarAttr(name = USE_CRC32_HASH_FOR_EXCHANGE)
private boolean useCrc32HashForExchange = true;

public int getCboPruneJsonSubfieldDepth() {
return cboPruneJsonSubfieldDepth;
}
Expand Down Expand Up @@ -5335,6 +5342,15 @@ public void setEnableFullSortUseGermanString(boolean value) {
public boolean isEnableFullSortUseGermanString() {
return this.enableFullSortUseGermanString;
}

public boolean isUseCrc32HashForExchange() {
return useCrc32HashForExchange;
}

public void setUseCrc32HashForExchange(boolean useCrc32HashForExchange) {
this.useCrc32HashForExchange = useCrc32HashForExchange;
}

// Serialize to thrift object
// used for rest api
public TQueryOptions toThrift() {
Expand Down Expand Up @@ -5419,6 +5435,7 @@ public TQueryOptions toThrift() {
tResult.setGroup_concat_max_len(groupConcatMaxLen);
tResult.setRpc_http_min_size(rpcHttpMinSize);
tResult.setInterleaving_group_size(interleavingGroupSize);
tResult.setUse_crc32_hash_for_exchange(useCrc32HashForExchange);

TCompressionType loadCompressionType =
CompressionUtils.findTCompressionByName(loadTransmissionCompressionType);
Expand Down
3 changes: 3 additions & 0 deletions gensrc/thrift/InternalService.thrift
Original file line number Diff line number Diff line change
Expand Up @@ -349,6 +349,9 @@ struct TQueryOptions {
191: optional i64 column_view_concat_bytes_limit;

200: optional bool enable_full_sort_use_german_string;

// whether use crc32_hash instead of fnv_hash for exchange sink operator hash partitioning
201: optional bool use_crc32_hash_for_exchange;
}

// A scan range plus the parameters needed to execute that scan.
Expand Down
Loading