From 01d2522d040dda7d872c44dd8a4c6bcc30ac8213 Mon Sep 17 00:00:00 2001 From: BiteTheDDDDt Date: Wed, 24 Sep 2025 18:32:51 +0800 Subject: [PATCH 1/4] add rf merger info to PipelineFragmentContext::debug_string() --- be/src/pipeline/pipeline_fragment_context.cpp | 4 +++- be/src/runtime_filter/runtime_filter_mgr.cpp | 10 ++++++++++ be/src/runtime_filter/runtime_filter_mgr.h | 2 ++ 3 files changed, 15 insertions(+), 1 deletion(-) diff --git a/be/src/pipeline/pipeline_fragment_context.cpp b/be/src/pipeline/pipeline_fragment_context.cpp index 6bd7853047e94a..7539ac7c542155 100644 --- a/be/src/pipeline/pipeline_fragment_context.cpp +++ b/be/src/pipeline/pipeline_fragment_context.cpp @@ -1936,7 +1936,9 @@ std::string PipelineFragmentContext::debug_string() { for (size_t j = 0; j < _tasks.size(); j++) { fmt::format_to(debug_string_buffer, "Tasks in instance {}:\n", j); for (size_t i = 0; i < _tasks[j].size(); i++) { - fmt::format_to(debug_string_buffer, "Task {}: {}\n", i, _tasks[j][i]->debug_string()); + fmt::format_to(debug_string_buffer, "Task {}: {}\n{}\n", i, + _tasks[j][i]->debug_string(), + _task_runtime_states[j][i]->local_runtime_filter_mgr()->debug_string()); } } diff --git a/be/src/runtime_filter/runtime_filter_mgr.cpp b/be/src/runtime_filter/runtime_filter_mgr.cpp index a2072b6771bed2..c93d6adbb95618 100644 --- a/be/src/runtime_filter/runtime_filter_mgr.cpp +++ b/be/src/runtime_filter/runtime_filter_mgr.cpp @@ -283,6 +283,15 @@ Status RuntimeFilterMgr::sync_filter_size(const PSyncFilterSizeRequest* request) return Status::OK(); } +std::string RuntimeFilterMgr::debug_string() const { + std::string result = "Merger Info:\n"; + std::lock_guard l(_lock); + for (const auto& [filter_id, merger] : _local_merge_map) { + result += fmt::format("merger: {}\n", filter_id, merger.merger->debug_string()); + } + return result; +} + // merge data Status RuntimeFilterMergeControllerEntity::merge(std::shared_ptr query_ctx, const PMergeFilterRequest* request, @@ -398,4 +407,5 @@ Status RuntimeFilterMergeControllerEntity::merge(std::shared_ptr q } return st; } + } // namespace doris diff --git a/be/src/runtime_filter/runtime_filter_mgr.h b/be/src/runtime_filter/runtime_filter_mgr.h index 00a91d61a2b0b2..28421168377138 100644 --- a/be/src/runtime_filter/runtime_filter_mgr.h +++ b/be/src/runtime_filter/runtime_filter_mgr.h @@ -100,6 +100,8 @@ class RuntimeFilterMgr { Status get_merge_addr(TNetworkAddress* addr); Status sync_filter_size(const PSyncFilterSizeRequest* request); + std::string debug_string() const; + private: /** * `_is_global = true` means this runtime filter manager menages query-level runtime filters. From 05fef1c8c6d6972ca631d06e5d65a17a1616ea40 Mon Sep 17 00:00:00 2001 From: BiteTheDDDDt Date: Wed, 24 Sep 2025 19:07:41 +0800 Subject: [PATCH 2/4] fix --- be/src/runtime_filter/runtime_filter_mgr.h | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/be/src/runtime_filter/runtime_filter_mgr.h b/be/src/runtime_filter/runtime_filter_mgr.h index 28421168377138..622d1d1742f42d 100644 --- a/be/src/runtime_filter/runtime_filter_mgr.h +++ b/be/src/runtime_filter/runtime_filter_mgr.h @@ -100,7 +100,7 @@ class RuntimeFilterMgr { Status get_merge_addr(TNetworkAddress* addr); Status sync_filter_size(const PSyncFilterSizeRequest* request); - std::string debug_string() const; + std::string debug_string(); private: /** From 0b1176ec34b45d90d8455ae2aad777f4a07d8171 Mon Sep 17 00:00:00 2001 From: BiteTheDDDDt Date: Wed, 24 Sep 2025 19:37:39 +0800 Subject: [PATCH 3/4] update --- be/src/runtime_filter/runtime_filter_mgr.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/be/src/runtime_filter/runtime_filter_mgr.cpp b/be/src/runtime_filter/runtime_filter_mgr.cpp index c93d6adbb95618..96c99ad36747b9 100644 --- a/be/src/runtime_filter/runtime_filter_mgr.cpp +++ b/be/src/runtime_filter/runtime_filter_mgr.cpp @@ -283,7 +283,7 @@ Status RuntimeFilterMgr::sync_filter_size(const PSyncFilterSizeRequest* request) return Status::OK(); } -std::string RuntimeFilterMgr::debug_string() const { +std::string RuntimeFilterMgr::debug_string() { std::string result = "Merger Info:\n"; std::lock_guard l(_lock); for (const auto& [filter_id, merger] : _local_merge_map) { From 8b4d77125475d3ebb7f93d963b284141610b837b Mon Sep 17 00:00:00 2001 From: BiteTheDDDDt Date: Wed, 24 Sep 2025 22:02:55 +0800 Subject: [PATCH 4/4] fix --- be/src/pipeline/pipeline_fragment_context.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/be/src/pipeline/pipeline_fragment_context.cpp b/be/src/pipeline/pipeline_fragment_context.cpp index 7539ac7c542155..05f698ffeba6ff 100644 --- a/be/src/pipeline/pipeline_fragment_context.cpp +++ b/be/src/pipeline/pipeline_fragment_context.cpp @@ -1938,7 +1938,7 @@ std::string PipelineFragmentContext::debug_string() { for (size_t i = 0; i < _tasks[j].size(); i++) { fmt::format_to(debug_string_buffer, "Task {}: {}\n{}\n", i, _tasks[j][i]->debug_string(), - _task_runtime_states[j][i]->local_runtime_filter_mgr()->debug_string()); + _task_runtime_states[i][j]->local_runtime_filter_mgr()->debug_string()); } }