From 8785a4eb77b5fb92b26033b7c5498f49cfca570c Mon Sep 17 00:00:00 2001 From: before-Sunrise Date: Thu, 28 Aug 2025 15:17:21 +0800 Subject: [PATCH 1/3] use crc hash for exchange Signed-off-by: before-Sunrise --- .../exchange/exchange_sink_operator.cpp | 14 +++++++++++--- be/src/runtime/runtime_state.h | 4 ++++ .../com/starrocks/qe/SessionVariable.java | 19 +++++++++++++++++++ gensrc/thrift/InternalService.thrift | 3 +++ 4 files changed, 37 insertions(+), 3 deletions(-) diff --git a/be/src/exec/pipeline/exchange/exchange_sink_operator.cpp b/be/src/exec/pipeline/exchange/exchange_sink_operator.cpp index 023a08cfbaa993..8149f5842ed9a4 100644 --- a/be/src/exec/pipeline/exchange/exchange_sink_operator.cpp +++ b/be/src/exec/pipeline/exchange/exchange_sink_operator.cpp @@ -581,9 +581,17 @@ Status ExchangeSinkOperator::push_chunk(RuntimeState* state, const ChunkPtr& chu // Compute hash for each partition column if (_part_type == TPartitionType::HASH_PARTITIONED) { - _hash_values.assign(num_rows, HashUtil::FNV_SEED); - for (const ColumnPtr& column : _partitions_columns) { - column->fnv_hash(&_hash_values[0], 0, num_rows); + // Check if we should use crc32_hash instead of fnv_hash based on session variable + if (runtime_state()->use_crc32_hash_for_exchange()) { + _hash_values.assign(num_rows, 0); + for (const ColumnPtr& column : _partitions_columns) { + column->crc32_hash(&_hash_values[0], 0, num_rows); + } + } else { + _hash_values.assign(num_rows, HashUtil::FNV_SEED); + for (const ColumnPtr& column : _partitions_columns) { + column->fnv_hash(&_hash_values[0], 0, num_rows); + } } } else if (_bucket_properties.empty()) { // The data distribution was calculated using CRC32_HASH, diff --git a/be/src/runtime/runtime_state.h b/be/src/runtime/runtime_state.h index 816a9efebb13eb..420bbfe241d694 100644 --- a/be/src/runtime/runtime_state.h +++ b/be/src/runtime/runtime_state.h @@ -572,6 +572,10 @@ class RuntimeState { return _query_options.__isset.lower_upper_support_utf8 && _query_options.lower_upper_support_utf8; } + bool use_crc32_hash_for_exchange() const { + return _query_options.__isset.use_crc32_hash_for_exchange && _query_options.use_crc32_hash_for_exchange; + } + DebugActionMgr& debug_action_mgr() { return _debug_action_mgr; } private: diff --git a/fe/fe-core/src/main/java/com/starrocks/qe/SessionVariable.java b/fe/fe-core/src/main/java/com/starrocks/qe/SessionVariable.java index da2ce5a1e55215..0b87d3f7b5e0dd 100644 --- a/fe/fe-core/src/main/java/com/starrocks/qe/SessionVariable.java +++ b/fe/fe-core/src/main/java/com/starrocks/qe/SessionVariable.java @@ -5335,6 +5335,20 @@ public void setEnableFullSortUseGermanString(boolean value) { public boolean isEnableFullSortUseGermanString() { return this.enableFullSortUseGermanString; } + + // Control the hash function used in exchange sink operator for hash partitioning + // When true, use crc32_hash instead of fnv_hash + @VariableMgr.VarAttr(name = "use_crc32_hash_for_exchange") + private boolean useCrc32HashForExchange = true; + + public boolean isUseCrc32HashForExchange() { + return useCrc32HashForExchange; + } + + public void setUseCrc32HashForExchange(boolean useCrc32HashForExchange) { + this.useCrc32HashForExchange = useCrc32HashForExchange; + } + // Serialize to thrift object // used for rest api public TQueryOptions toThrift() { @@ -5419,6 +5433,11 @@ public TQueryOptions toThrift() { tResult.setGroup_concat_max_len(groupConcatMaxLen); tResult.setRpc_http_min_size(rpcHttpMinSize); tResult.setInterleaving_group_size(interleavingGroupSize); +<<<<<<< HEAD +======= + tResult.setEnable_predicate_col_late_materialize(enablePredicateColLateMaterialize); + tResult.setUse_crc32_hash_for_exchange(useCrc32HashForExchange); +>>>>>>> 998d31bf54 (use crc hash for exchange) TCompressionType loadCompressionType = CompressionUtils.findTCompressionByName(loadTransmissionCompressionType); diff --git a/gensrc/thrift/InternalService.thrift b/gensrc/thrift/InternalService.thrift index 0b72605d31fe51..542c88fcaa1d8a 100644 --- a/gensrc/thrift/InternalService.thrift +++ b/gensrc/thrift/InternalService.thrift @@ -349,6 +349,9 @@ struct TQueryOptions { 191: optional i64 column_view_concat_bytes_limit; 200: optional bool enable_full_sort_use_german_string; + + // whether use crc32_hash instead of fnv_hash for exchange sink operator hash partitioning + 201: optional bool use_crc32_hash_for_exchange; } // A scan range plus the parameters needed to execute that scan. From be44eead53b5039d19fa39d164301a872ddf5e57 Mon Sep 17 00:00:00 2001 From: before-Sunrise Date: Tue, 30 Sep 2025 13:44:23 +0800 Subject: [PATCH 2/3] fix Signed-off-by: before-Sunrise --- .../src/main/java/com/starrocks/qe/SessionVariable.java | 6 +----- 1 file changed, 1 insertion(+), 5 deletions(-) diff --git a/fe/fe-core/src/main/java/com/starrocks/qe/SessionVariable.java b/fe/fe-core/src/main/java/com/starrocks/qe/SessionVariable.java index 0b87d3f7b5e0dd..1f39c5644bcc67 100644 --- a/fe/fe-core/src/main/java/com/starrocks/qe/SessionVariable.java +++ b/fe/fe-core/src/main/java/com/starrocks/qe/SessionVariable.java @@ -5335,7 +5335,7 @@ public void setEnableFullSortUseGermanString(boolean value) { public boolean isEnableFullSortUseGermanString() { return this.enableFullSortUseGermanString; } - + // Control the hash function used in exchange sink operator for hash partitioning // When true, use crc32_hash instead of fnv_hash @VariableMgr.VarAttr(name = "use_crc32_hash_for_exchange") @@ -5433,11 +5433,7 @@ public TQueryOptions toThrift() { tResult.setGroup_concat_max_len(groupConcatMaxLen); tResult.setRpc_http_min_size(rpcHttpMinSize); tResult.setInterleaving_group_size(interleavingGroupSize); -<<<<<<< HEAD -======= - tResult.setEnable_predicate_col_late_materialize(enablePredicateColLateMaterialize); tResult.setUse_crc32_hash_for_exchange(useCrc32HashForExchange); ->>>>>>> 998d31bf54 (use crc hash for exchange) TCompressionType loadCompressionType = CompressionUtils.findTCompressionByName(loadTransmissionCompressionType); From 0a0415ce9a6ebe7e697f081b6bb2a5f4ea1a3f8f Mon Sep 17 00:00:00 2001 From: before-Sunrise Date: Tue, 30 Sep 2025 13:49:55 +0800 Subject: [PATCH 3/3] fix Signed-off-by: before-Sunrise --- .../main/java/com/starrocks/qe/SessionVariable.java | 12 +++++++----- 1 file changed, 7 insertions(+), 5 deletions(-) diff --git a/fe/fe-core/src/main/java/com/starrocks/qe/SessionVariable.java b/fe/fe-core/src/main/java/com/starrocks/qe/SessionVariable.java index 1f39c5644bcc67..30dcbbe68d41a4 100644 --- a/fe/fe-core/src/main/java/com/starrocks/qe/SessionVariable.java +++ b/fe/fe-core/src/main/java/com/starrocks/qe/SessionVariable.java @@ -959,6 +959,8 @@ public static MaterializedViewRewriteMode parse(String str) { public static final String ENABLE_FULL_SORT_USE_GERMAN_STRING = "enable_full_sort_use_german_string"; + public static final String USE_CRC32_HASH_FOR_EXCHANGE = "use_crc32_hash_for_exchange"; + public static final List DEPRECATED_VARIABLES = ImmutableList.builder() .add(CODEGEN_LEVEL) .add(MAX_EXECUTION_TIME) @@ -1965,6 +1967,11 @@ public void setEnableDesensitizeExplain(boolean enableDesensitizeExplain) { @VarAttr(name = ENABLE_FULL_SORT_USE_GERMAN_STRING) private boolean enableFullSortUseGermanString = true; + // Control the hash function used in exchange sink operator for hash partitioning + // When true, use crc32_hash instead of fnv_hash + @VarAttr(name = USE_CRC32_HASH_FOR_EXCHANGE) + private boolean useCrc32HashForExchange = true; + public int getCboPruneJsonSubfieldDepth() { return cboPruneJsonSubfieldDepth; } @@ -5336,11 +5343,6 @@ public boolean isEnableFullSortUseGermanString() { return this.enableFullSortUseGermanString; } - // Control the hash function used in exchange sink operator for hash partitioning - // When true, use crc32_hash instead of fnv_hash - @VariableMgr.VarAttr(name = "use_crc32_hash_for_exchange") - private boolean useCrc32HashForExchange = true; - public boolean isUseCrc32HashForExchange() { return useCrc32HashForExchange; }