@@ -144,12 +144,7 @@ PipelineFragmentContext::~PipelineFragmentContext() {
144
144
auto st = _query_ctx->exec_status ();
145
145
for (size_t i = 0 ; i < _tasks.size (); i++) {
146
146
if (!_tasks[i].empty ()) {
147
- _call_back (_tasks[i].front ()->runtime_state (), &st);
148
- }
149
- }
150
- for (auto & runtime_states : _task_runtime_states) {
151
- for (auto & runtime_state : runtime_states) {
152
- runtime_state.reset ();
147
+ _call_back (_tasks[i].front ().first ->runtime_state (), &st);
153
148
}
154
149
}
155
150
_tasks.clear ();
@@ -234,7 +229,7 @@ void PipelineFragmentContext::cancel(const Status reason) {
234
229
235
230
for (auto & tasks : _tasks) {
236
231
for (auto & task : tasks) {
237
- task->terminate ();
232
+ task. first ->terminate ();
238
233
}
239
234
}
240
235
}
@@ -379,9 +374,7 @@ Status PipelineFragmentContext::_build_pipeline_tasks(ThreadPool* thread_pool) {
379
374
const auto target_size = _params.local_params .size ();
380
375
_tasks.resize (target_size);
381
376
_runtime_filter_mgr_map.resize (target_size);
382
- _task_runtime_states.resize (_pipelines.size ());
383
377
for (size_t pip_idx = 0 ; pip_idx < _pipelines.size (); pip_idx++) {
384
- _task_runtime_states[pip_idx].resize (_pipelines[pip_idx]->num_tasks ());
385
378
_pip_id_to_pipeline[_pipelines[pip_idx]->id ()] = _pipelines[pip_idx].get ();
386
379
}
387
380
auto pipeline_id_to_profile = _runtime_state->build_pipeline_profile (_pipelines.size ());
@@ -416,14 +409,10 @@ Status PipelineFragmentContext::_build_pipeline_tasks(ThreadPool* thread_pool) {
416
409
for (size_t pip_idx = 0 ; pip_idx < _pipelines.size (); pip_idx++) {
417
410
auto & pipeline = _pipelines[pip_idx];
418
411
if (pipeline->num_tasks () > 1 || i == 0 ) {
419
- DCHECK (_task_runtime_states[pip_idx][i] == nullptr )
420
- << print_id (_task_runtime_states[pip_idx][i]->fragment_instance_id ()) << " "
421
- << pipeline->debug_string ();
422
- _task_runtime_states[pip_idx][i] = RuntimeState::create_unique (
412
+ auto task_runtime_state = RuntimeState::create_unique (
423
413
local_params.fragment_instance_id , _params.query_id , _params.fragment_id ,
424
414
_params.query_options , _query_ctx->query_globals , _exec_env,
425
415
_query_ctx.get ());
426
- auto & task_runtime_state = _task_runtime_states[pip_idx][i];
427
416
{
428
417
// Initialize runtime state for this task
429
418
task_runtime_state->set_query_mem_tracker (_query_ctx->query_mem_tracker ());
@@ -470,7 +459,9 @@ Status PipelineFragmentContext::_build_pipeline_tasks(ThreadPool* thread_pool) {
470
459
pipeline_id_to_profile[pip_idx].get (), get_shared_state (pipeline), i);
471
460
pipeline->incr_created_tasks (i, task.get ());
472
461
pipeline_id_to_task.insert ({pipeline->id (), task.get ()});
473
- _tasks[i].emplace_back (std::move (task));
462
+ _tasks[i].emplace_back (
463
+ std::pair<std::shared_ptr<PipelineTask>, std::unique_ptr<RuntimeState>> {
464
+ std::move (task), std::move (task_runtime_state)});
474
465
}
475
466
}
476
467
@@ -1703,7 +1694,7 @@ Status PipelineFragmentContext::submit() {
1703
1694
auto * scheduler = _query_ctx->get_pipe_exec_scheduler ();
1704
1695
for (auto & task : _tasks) {
1705
1696
for (auto & t : task) {
1706
- st = scheduler->submit (t);
1697
+ st = scheduler->submit (t. first );
1707
1698
DBUG_EXECUTE_IF (" PipelineFragmentContext.submit.failed" ,
1708
1699
{ st = Status::Aborted (" PipelineFragmentContext.submit.failed" ); });
1709
1700
if (!st) {
@@ -1810,12 +1801,9 @@ std::string PipelineFragmentContext::get_load_error_url() {
1810
1801
if (const auto & str = _runtime_state->get_error_log_file_path (); !str.empty ()) {
1811
1802
return to_load_error_http_path (str);
1812
1803
}
1813
- for (auto & task_states : _task_runtime_states) {
1814
- for (auto & task_state : task_states) {
1815
- if (!task_state) {
1816
- continue ;
1817
- }
1818
- if (const auto & str = task_state->get_error_log_file_path (); !str.empty ()) {
1804
+ for (auto & tasks : _tasks) {
1805
+ for (auto & task : tasks) {
1806
+ if (const auto & str = task.second ->get_error_log_file_path (); !str.empty ()) {
1819
1807
return to_load_error_http_path (str);
1820
1808
}
1821
1809
}
@@ -1827,12 +1815,9 @@ std::string PipelineFragmentContext::get_first_error_msg() {
1827
1815
if (const auto & str = _runtime_state->get_first_error_msg (); !str.empty ()) {
1828
1816
return str;
1829
1817
}
1830
- for (auto & task_states : _task_runtime_states) {
1831
- for (auto & task_state : task_states) {
1832
- if (!task_state) {
1833
- continue ;
1834
- }
1835
- if (const auto & str = task_state->get_first_error_msg (); !str.empty ()) {
1818
+ for (auto & tasks : _tasks) {
1819
+ for (auto & task : tasks) {
1820
+ if (const auto & str = task.second ->get_first_error_msg (); !str.empty ()) {
1836
1821
return str;
1837
1822
}
1838
1823
}
@@ -1862,11 +1847,9 @@ Status PipelineFragmentContext::send_report(bool done) {
1862
1847
1863
1848
std::vector<RuntimeState*> runtime_states;
1864
1849
1865
- for (auto & task_states : _task_runtime_states) {
1866
- for (auto & task_state : task_states) {
1867
- if (task_state) {
1868
- runtime_states.push_back (task_state.get ());
1869
- }
1850
+ for (auto & tasks : _tasks) {
1851
+ for (auto & task : tasks) {
1852
+ runtime_states.push_back (task.second .get ());
1870
1853
}
1871
1854
}
1872
1855
@@ -1900,15 +1883,15 @@ size_t PipelineFragmentContext::get_revocable_size(bool* has_running_task) const
1900
1883
// here to traverse the vector.
1901
1884
for (const auto & task_instances : _tasks) {
1902
1885
for (const auto & task : task_instances) {
1903
- if (task->is_running ()) {
1886
+ if (task. first ->is_running ()) {
1904
1887
LOG_EVERY_N (INFO, 50 ) << " Query: " << print_id (_query_id)
1905
- << " is running, task: " << (void *)task.get ()
1906
- << " , is_running: " << task->is_running ();
1888
+ << " is running, task: " << (void *)task.first . get ()
1889
+ << " , is_running: " << task. first ->is_running ();
1907
1890
*has_running_task = true ;
1908
1891
return 0 ;
1909
1892
}
1910
1893
1911
- size_t revocable_size = task->get_revocable_size ();
1894
+ size_t revocable_size = task. first ->get_revocable_size ();
1912
1895
if (revocable_size >= vectorized::SpillStream::MIN_SPILL_WRITE_BATCH_MEM) {
1913
1896
res += revocable_size;
1914
1897
}
@@ -1921,9 +1904,9 @@ std::vector<PipelineTask*> PipelineFragmentContext::get_revocable_tasks() const
1921
1904
std::vector<PipelineTask*> revocable_tasks;
1922
1905
for (const auto & task_instances : _tasks) {
1923
1906
for (const auto & task : task_instances) {
1924
- size_t revocable_size_ = task->get_revocable_size ();
1907
+ size_t revocable_size_ = task. first ->get_revocable_size ();
1925
1908
if (revocable_size_ >= vectorized::SpillStream::MIN_SPILL_WRITE_BATCH_MEM) {
1926
- revocable_tasks.emplace_back (task.get ());
1909
+ revocable_tasks.emplace_back (task.first . get ());
1927
1910
}
1928
1911
}
1929
1912
}
@@ -1936,9 +1919,8 @@ std::string PipelineFragmentContext::debug_string() {
1936
1919
for (size_t j = 0 ; j < _tasks.size (); j++) {
1937
1920
fmt::format_to (debug_string_buffer, " Tasks in instance {}:\n " , j);
1938
1921
for (size_t i = 0 ; i < _tasks[j].size (); i++) {
1939
- fmt::format_to (debug_string_buffer, " Task {}: {}\n {}\n " , i,
1940
- _tasks[j][i]->debug_string (),
1941
- _task_runtime_states[i][j]->local_runtime_filter_mgr ()->debug_string ());
1922
+ fmt::format_to (debug_string_buffer, " Task {}: {}\n " , i,
1923
+ _tasks[j][i].first ->debug_string ());
1942
1924
}
1943
1925
}
1944
1926
@@ -1988,16 +1970,16 @@ PipelineFragmentContext::collect_realtime_load_channel_profile() const {
1988
1970
return nullptr ;
1989
1971
}
1990
1972
1991
- for (const auto & runtime_states : _task_runtime_states ) {
1992
- for (const auto & runtime_state : runtime_states ) {
1993
- if (runtime_state == nullptr || runtime_state ->runtime_profile () == nullptr ) {
1973
+ for (const auto & tasks : _tasks ) {
1974
+ for (const auto & task : tasks ) {
1975
+ if (task. second ->runtime_profile () == nullptr ) {
1994
1976
continue ;
1995
1977
}
1996
1978
1997
1979
auto tmp_load_channel_profile = std::make_shared<TRuntimeProfileTree>();
1998
1980
1999
- runtime_state ->runtime_profile ()->to_thrift (tmp_load_channel_profile.get (),
2000
- _runtime_state->profile_level ());
1981
+ task. second ->runtime_profile ()->to_thrift (tmp_load_channel_profile.get (),
1982
+ _runtime_state->profile_level ());
2001
1983
_runtime_state->load_channel_profile ()->update (*tmp_load_channel_profile);
2002
1984
}
2003
1985
}
0 commit comments