Skip to content

Commit 09dd2b4

Browse files
committed
update
1 parent db4dcde commit 09dd2b4

File tree

1 file changed

+10
-8
lines changed

1 file changed

+10
-8
lines changed

be/src/pipeline/pipeline_fragment_context.cpp

Lines changed: 10 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -459,7 +459,9 @@ Status PipelineFragmentContext::_build_pipeline_tasks(ThreadPool* thread_pool) {
459459
pipeline_id_to_profile[pip_idx].get(), get_shared_state(pipeline), i);
460460
pipeline->incr_created_tasks(i, task.get());
461461
pipeline_id_to_task.insert({pipeline->id(), task.get()});
462-
_tasks[i].emplace_back({std::move(task), std::move(task_runtime_state)});
462+
_tasks[i].emplace_back(
463+
std::pair<std::shared_ptr<PipelineTask>, std::unique_ptr<RuntimeState>> {
464+
std::move(task), std::move(task_runtime_state)});
463465
}
464466
}
465467

@@ -1692,7 +1694,7 @@ Status PipelineFragmentContext::submit() {
16921694
auto* scheduler = _query_ctx->get_pipe_exec_scheduler();
16931695
for (auto& task : _tasks) {
16941696
for (auto& t : task) {
1695-
st = scheduler->submit(t);
1697+
st = scheduler->submit(t.first);
16961698
DBUG_EXECUTE_IF("PipelineFragmentContext.submit.failed",
16971699
{ st = Status::Aborted("PipelineFragmentContext.submit.failed"); });
16981700
if (!st) {
@@ -1881,15 +1883,15 @@ size_t PipelineFragmentContext::get_revocable_size(bool* has_running_task) const
18811883
// here to traverse the vector.
18821884
for (const auto& task_instances : _tasks) {
18831885
for (const auto& task : task_instances) {
1884-
if (task->is_running()) {
1886+
if (task.first->is_running()) {
18851887
LOG_EVERY_N(INFO, 50) << "Query: " << print_id(_query_id)
1886-
<< " is running, task: " << (void*)task.get()
1887-
<< ", is_running: " << task->is_running();
1888+
<< " is running, task: " << (void*)task.first.get()
1889+
<< ", is_running: " << task.first->is_running();
18881890
*has_running_task = true;
18891891
return 0;
18901892
}
18911893

1892-
size_t revocable_size = task->get_revocable_size();
1894+
size_t revocable_size = task.first->get_revocable_size();
18931895
if (revocable_size >= vectorized::SpillStream::MIN_SPILL_WRITE_BATCH_MEM) {
18941896
res += revocable_size;
18951897
}
@@ -1902,9 +1904,9 @@ std::vector<PipelineTask*> PipelineFragmentContext::get_revocable_tasks() const
19021904
std::vector<PipelineTask*> revocable_tasks;
19031905
for (const auto& task_instances : _tasks) {
19041906
for (const auto& task : task_instances) {
1905-
size_t revocable_size_ = task->get_revocable_size();
1907+
size_t revocable_size_ = task.first->get_revocable_size();
19061908
if (revocable_size_ >= vectorized::SpillStream::MIN_SPILL_WRITE_BATCH_MEM) {
1907-
revocable_tasks.emplace_back(task.get());
1909+
revocable_tasks.emplace_back(task.first.get());
19081910
}
19091911
}
19101912
}

0 commit comments

Comments
 (0)