Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
60 changes: 20 additions & 40 deletions be/src/pipeline/pipeline_fragment_context.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -144,12 +144,7 @@ PipelineFragmentContext::~PipelineFragmentContext() {
auto st = _query_ctx->exec_status();
for (size_t i = 0; i < _tasks.size(); i++) {
if (!_tasks[i].empty()) {
_call_back(_tasks[i].front()->runtime_state(), &st);
}
}
for (auto& runtime_states : _task_runtime_states) {
for (auto& runtime_state : runtime_states) {
runtime_state.reset();
_call_back(_tasks[i].front().first->runtime_state(), &st);
}
}
_tasks.clear();
Expand Down Expand Up @@ -234,7 +229,7 @@ void PipelineFragmentContext::cancel(const Status reason) {

for (auto& tasks : _tasks) {
for (auto& task : tasks) {
task->terminate();
task.first->terminate();
}
}
}
Expand Down Expand Up @@ -379,9 +374,7 @@ Status PipelineFragmentContext::_build_pipeline_tasks(ThreadPool* thread_pool) {
const auto target_size = _params.local_params.size();
_tasks.resize(target_size);
_runtime_filter_mgr_map.resize(target_size);
_task_runtime_states.resize(_pipelines.size());
for (size_t pip_idx = 0; pip_idx < _pipelines.size(); pip_idx++) {
_task_runtime_states[pip_idx].resize(_pipelines[pip_idx]->num_tasks());
_pip_id_to_pipeline[_pipelines[pip_idx]->id()] = _pipelines[pip_idx].get();
}
auto pipeline_id_to_profile = _runtime_state->build_pipeline_profile(_pipelines.size());
Expand Down Expand Up @@ -416,14 +409,10 @@ Status PipelineFragmentContext::_build_pipeline_tasks(ThreadPool* thread_pool) {
for (size_t pip_idx = 0; pip_idx < _pipelines.size(); pip_idx++) {
auto& pipeline = _pipelines[pip_idx];
if (pipeline->num_tasks() > 1 || i == 0) {
DCHECK(_task_runtime_states[pip_idx][i] == nullptr)
<< print_id(_task_runtime_states[pip_idx][i]->fragment_instance_id()) << " "
<< pipeline->debug_string();
_task_runtime_states[pip_idx][i] = RuntimeState::create_unique(
auto task_runtime_state = RuntimeState::create_unique(
local_params.fragment_instance_id, _params.query_id, _params.fragment_id,
_params.query_options, _query_ctx->query_globals, _exec_env,
_query_ctx.get());
auto& task_runtime_state = _task_runtime_states[pip_idx][i];
{
// Initialize runtime state for this task
task_runtime_state->set_query_mem_tracker(_query_ctx->query_mem_tracker());
Expand Down Expand Up @@ -470,7 +459,7 @@ Status PipelineFragmentContext::_build_pipeline_tasks(ThreadPool* thread_pool) {
pipeline_id_to_profile[pip_idx].get(), get_shared_state(pipeline), i);
pipeline->incr_created_tasks(i, task.get());
pipeline_id_to_task.insert({pipeline->id(), task.get()});
_tasks[i].emplace_back(std::move(task));
_tasks[i].emplace_back({std::move(task), std::move(task_runtime_state)});
}
}

Expand Down Expand Up @@ -1810,12 +1799,9 @@ std::string PipelineFragmentContext::get_load_error_url() {
if (const auto& str = _runtime_state->get_error_log_file_path(); !str.empty()) {
return to_load_error_http_path(str);
}
for (auto& task_states : _task_runtime_states) {
for (auto& task_state : task_states) {
if (!task_state) {
continue;
}
if (const auto& str = task_state->get_error_log_file_path(); !str.empty()) {
for (auto& tasks : _tasks) {
for (auto& task : tasks) {
if (const auto& str = task.second->get_error_log_file_path(); !str.empty()) {
return to_load_error_http_path(str);
}
}
Expand All @@ -1827,12 +1813,9 @@ std::string PipelineFragmentContext::get_first_error_msg() {
if (const auto& str = _runtime_state->get_first_error_msg(); !str.empty()) {
return str;
}
for (auto& task_states : _task_runtime_states) {
for (auto& task_state : task_states) {
if (!task_state) {
continue;
}
if (const auto& str = task_state->get_first_error_msg(); !str.empty()) {
for (auto& tasks : _tasks) {
for (auto& task : tasks) {
if (const auto& str = task.second->get_first_error_msg(); !str.empty()) {
return str;
}
}
Expand Down Expand Up @@ -1862,11 +1845,9 @@ Status PipelineFragmentContext::send_report(bool done) {

std::vector<RuntimeState*> runtime_states;

for (auto& task_states : _task_runtime_states) {
for (auto& task_state : task_states) {
if (task_state) {
runtime_states.push_back(task_state.get());
}
for (auto& tasks : _tasks) {
for (auto& task : tasks) {
runtime_states.push_back(task.second.get());
}
}

Expand Down Expand Up @@ -1936,9 +1917,8 @@ std::string PipelineFragmentContext::debug_string() {
for (size_t j = 0; j < _tasks.size(); j++) {
fmt::format_to(debug_string_buffer, "Tasks in instance {}:\n", j);
for (size_t i = 0; i < _tasks[j].size(); i++) {
fmt::format_to(debug_string_buffer, "Task {}: {}\n{}\n", i,
_tasks[j][i]->debug_string(),
_task_runtime_states[i][j]->local_runtime_filter_mgr()->debug_string());
fmt::format_to(debug_string_buffer, "Task {}: {}\n", i,
_tasks[j][i].first->debug_string());
}
}

Expand Down Expand Up @@ -1988,16 +1968,16 @@ PipelineFragmentContext::collect_realtime_load_channel_profile() const {
return nullptr;
}

for (const auto& runtime_states : _task_runtime_states) {
for (const auto& runtime_state : runtime_states) {
if (runtime_state == nullptr || runtime_state->runtime_profile() == nullptr) {
for (const auto& tasks : _tasks) {
for (const auto& task : tasks) {
if (task.second->runtime_profile() == nullptr) {
continue;
}

auto tmp_load_channel_profile = std::make_shared<TRuntimeProfileTree>();

runtime_state->runtime_profile()->to_thrift(tmp_load_channel_profile.get(),
_runtime_state->profile_level());
task.second->runtime_profile()->to_thrift(tmp_load_channel_profile.get(),
_runtime_state->profile_level());
_runtime_state->load_channel_profile()->update(*tmp_load_channel_profile);
}
}
Expand Down
38 changes: 20 additions & 18 deletions be/src/pipeline/pipeline_fragment_context.h
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ class PipelineFragmentContext : public TaskExecutionContext {
void clear_finished_tasks() {
for (size_t j = 0; j < _tasks.size(); j++) {
for (size_t i = 0; i < _tasks[j].size(); i++) {
_tasks[j][i]->stop_if_finished();
_tasks[j][i].first->stop_if_finished();
}
}
}
Expand Down Expand Up @@ -228,8 +228,25 @@ class PipelineFragmentContext : public TaskExecutionContext {
bool _use_serial_source = false;

OperatorPtr _root_op = nullptr;
// this is a [n * m] matrix. n is parallelism of pipeline engine and m is the number of pipelines.
std::vector<std::vector<std::shared_ptr<PipelineTask>>> _tasks;
//
/**
* Matrix stores tasks with local runtime states.
* This is a [n * m] matrix. n is parallelism of pipeline engine and m is the number of pipelines.
*
* 2-D matrix:
* +-------------------------+------------+-------+
* | | Pipeline 0 | Pipeline 1 | ... |
* +------------+------------+------------+-------+
* | Instance 0 | task 0-0 | task 0-1 | ... |
* +------------+------------+------------+-------+
* | Instance 1 | task 1-0 | task 1-1 | ... |
* +------------+------------+------------+-------+
* | ... |
* +--------------------------------------+-------+
*/
std::vector<
std::vector<std::pair<std::shared_ptr<PipelineTask>, std::unique_ptr<RuntimeState>>>>
_tasks;

// TODO: remove the _sink and _multi_cast_stream_sink_senders to set both
// of it in pipeline task not the fragment_context
Expand Down Expand Up @@ -299,21 +316,6 @@ class PipelineFragmentContext : public TaskExecutionContext {
// - _task_runtime_states is at the task level, unique to each task.

std::vector<TUniqueId> _fragment_instance_ids;
/**
* Local runtime states for each task.
*
* 2-D matrix:
* +-------------------------+------------+-------+
* | | Instance 0 | Instance 1 | ... |
* +------------+------------+------------+-------+
* | Pipeline 0 | task 0-0 | task 0-1 | ... |
* +------------+------------+------------+-------+
* | Pipeline 1 | task 1-0 | task 1-1 | ... |
* +------------+------------+------------+-------+
* | ... |
* +--------------------------------------+-------+
*/
std::vector<std::vector<std::unique_ptr<RuntimeState>>> _task_runtime_states;

// Total instance num running on all BEs
int _total_instances = -1;
Expand Down
7 changes: 5 additions & 2 deletions be/src/pipeline/pipeline_task.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -736,8 +736,11 @@ std::string PipelineTask::debug_string() {
}
auto elapsed = fragment->elapsed_time() / NANOS_PER_SEC;
fmt::format_to(debug_string_buffer,
" elapse time = {}s, block dependency = [{}]\noperators: ", elapsed,
cur_blocked_dep && !is_finalized() ? cur_blocked_dep->debug_string() : "NULL");
" elapse time = {}s, block dependency = [{}]\nlocal_runtime_filter_mgr: "
"[{}]\noperators: ",
elapsed,
cur_blocked_dep && !is_finalized() ? cur_blocked_dep->debug_string() : "NULL",
_state->local_runtime_filter_mgr()->debug_string());

for (size_t i = 0; i < _operators.size(); i++) {
fmt::format_to(debug_string_buffer, "\n{}",
Expand Down
Loading