From e22a9ecc3b116546e4aafeeaeebfa46e15adfb4e Mon Sep 17 00:00:00 2001 From: yiguolei <676222867@qq.com> Date: Wed, 1 Mar 2023 08:35:27 +0800 Subject: [PATCH] [enhancement](execute model) using thread pool to execute report or join task instead of staring too many thread (#17212) * [enhancement](execute model) using thread pool to execute report or join task instead of staring too many thread Doris will start report thread and join thread during fragment execution. There are many problems if create and destroy thread very frequently. Jemalloc may not behave very well, it may crashed. jemalloc/jemalloc#1405 It is better to using thread pool to do these tasks. --------- Co-authored-by: yiguolei --- be/src/pipeline/pipeline_fragment_context.cpp | 10 ++++++++-- be/src/pipeline/pipeline_fragment_context.h | 5 ++++- be/src/runtime/exec_env.h | 7 +++++++ be/src/runtime/exec_env_init.cpp | 15 +++++++++++++++ be/src/runtime/plan_fragment_executor.cpp | 14 +++++++++++--- be/src/runtime/plan_fragment_executor.h | 4 +++- be/src/vec/exec/join/vjoin_node_base.cpp | 11 ++++++----- 7 files changed, 54 insertions(+), 12 deletions(-) diff --git a/be/src/pipeline/pipeline_fragment_context.cpp b/be/src/pipeline/pipeline_fragment_context.cpp index f37dd4db7b..df51da373a 100644 --- a/be/src/pipeline/pipeline_fragment_context.cpp +++ b/be/src/pipeline/pipeline_fragment_context.cpp @@ -103,6 +103,7 @@ PipelineFragmentContext::PipelineFragmentContext( _report_thread_active(false), _report_status_cb(report_status_cb), _is_report_on_cancel(true) { + _report_thread_future = _report_thread_promise.get_future(); _fragment_watcher.start(); } @@ -276,7 +277,10 @@ Status PipelineFragmentContext::prepare(const doris::TExecPlanFragmentParams& re if (_is_report_success && config::status_report_interval > 0) { std::unique_lock l(_report_thread_lock); - _report_thread = std::thread(&PipelineFragmentContext::report_profile, this); + _exec_env->send_report_thread_pool()->submit_func([this] { + Defer defer {[&]() { this->_report_thread_promise.set_value(true); }}; + this->report_profile(); + }); // make sure the thread started up, otherwise report_profile() might get into a race // with stop_report_thread() _report_thread_started_cv.wait(l); @@ -475,7 +479,9 @@ void PipelineFragmentContext::_stop_report_thread() { _report_thread_active = false; _stop_report_thread_cv.notify_one(); - _report_thread.join(); + // Wait infinitly to ensure that the report task is finished and the this variable + // is not used in report thread. + _report_thread_future.wait(); } void PipelineFragmentContext::report_profile() { diff --git a/be/src/pipeline/pipeline_fragment_context.h b/be/src/pipeline/pipeline_fragment_context.h index 38b07d503e..0a756ed37f 100644 --- a/be/src/pipeline/pipeline_fragment_context.h +++ b/be/src/pipeline/pipeline_fragment_context.h @@ -17,6 +17,8 @@ #pragma once +#include + #include "io/fs/stream_load_pipe.h" #include "pipeline/pipeline.h" #include "pipeline/pipeline_task.h" @@ -179,7 +181,8 @@ private: bool _report_thread_active; // profile reporting-related report_status_callback _report_status_cb; - std::thread _report_thread; + std::promise _report_thread_promise; + std::future _report_thread_future; std::mutex _report_thread_lock; // Indicates that profile reporting thread should stop. diff --git a/be/src/runtime/exec_env.h b/be/src/runtime/exec_env.h index 4d2163cf39..ae402453be 100644 --- a/be/src/runtime/exec_env.h +++ b/be/src/runtime/exec_env.h @@ -124,6 +124,9 @@ public: MemTrackerLimiter* experimental_mem_tracker() { return _experimental_mem_tracker.get(); } ThreadPool* send_batch_thread_pool() { return _send_batch_thread_pool.get(); } ThreadPool* download_cache_thread_pool() { return _download_cache_thread_pool.get(); } + ThreadPool* send_report_thread_pool() { return _send_report_thread_pool.get(); } + ThreadPool* join_node_thread_pool() { return _join_node_thread_pool.get(); } + void set_serial_download_cache_thread_token() { _serial_download_cache_thread_token = download_cache_thread_pool()->new_token(ThreadPool::ExecutionMode::SERIAL, 1); @@ -215,6 +218,10 @@ private: std::unique_ptr _download_cache_thread_pool; // A token used to submit download cache task serially std::unique_ptr _serial_download_cache_thread_token; + // Pool used by fragment manager to send profile or status to FE coordinator + std::unique_ptr _send_report_thread_pool; + // Pool used by join node to build hash table + std::unique_ptr _join_node_thread_pool; // ThreadPoolToken -> buffer std::unordered_map> _download_cache_buf_map; CgroupsMgr* _cgroups_mgr = nullptr; diff --git a/be/src/runtime/exec_env_init.cpp b/be/src/runtime/exec_env_init.cpp index a2770ce6f5..64c0c7cb20 100644 --- a/be/src/runtime/exec_env_init.cpp +++ b/be/src/runtime/exec_env_init.cpp @@ -102,6 +102,21 @@ Status ExecEnv::_init(const std::vector& store_paths) { init_download_cache_required_components(); + // min num equal to fragment pool's min num + // max num is useless because it will start as many as requested in the past + // queue size is useless because the max thread num is very large + ThreadPoolBuilder("SendReportThreadPool") + .set_min_threads(config::fragment_pool_thread_num_min) + .set_max_threads(std::numeric_limits::max()) + .set_max_queue_size(config::fragment_pool_queue_size) + .build(&_send_report_thread_pool); + + ThreadPoolBuilder("JoinNodeThreadPool") + .set_min_threads(config::fragment_pool_thread_num_min) + .set_max_threads(std::numeric_limits::max()) + .set_max_queue_size(config::fragment_pool_queue_size) + .build(&_join_node_thread_pool); + RETURN_IF_ERROR(init_pipeline_task_scheduler()); _scanner_scheduler = new doris::vectorized::ScannerScheduler(); diff --git a/be/src/runtime/plan_fragment_executor.cpp b/be/src/runtime/plan_fragment_executor.cpp index f62bd4bae1..94c200a5f2 100644 --- a/be/src/runtime/plan_fragment_executor.cpp +++ b/be/src/runtime/plan_fragment_executor.cpp @@ -64,7 +64,9 @@ PlanFragmentExecutor::PlanFragmentExecutor(ExecEnv* exec_env, _is_report_success(false), _is_report_on_cancel(true), _collect_query_statistics_with_every_batch(false), - _cancel_reason(PPlanFragmentCancelReason::INTERNAL_ERROR) {} + _cancel_reason(PPlanFragmentCancelReason::INTERNAL_ERROR) { + _report_thread_future = _report_thread_promise.get_future(); +} PlanFragmentExecutor::~PlanFragmentExecutor() { close(); @@ -228,7 +230,10 @@ Status PlanFragmentExecutor::open() { // at end, otherwise the coordinator hangs in case we finish w/ an error if (_is_report_success && config::status_report_interval > 0) { std::unique_lock l(_report_thread_lock); - _report_thread = std::thread(&PlanFragmentExecutor::report_profile, this); + _exec_env->send_report_thread_pool()->submit_func([this] { + Defer defer {[&]() { this->_report_thread_promise.set_value(true); }}; + this->report_profile(); + }); // make sure the thread started up, otherwise report_profile() might get into a race // with stop_report_thread() _report_thread_started_cv.wait(l); @@ -456,7 +461,10 @@ void PlanFragmentExecutor::stop_report_thread() { _report_thread_active = false; _stop_report_thread_cv.notify_one(); - _report_thread.join(); + // Wait infinitly until the thread is stopped and the future is set. + // The reporting thread depends on the PlanFragmentExecutor object, if not wait infinitly here, the reporting + // thread may crashed because the PlanFragmentExecutor is destroyed. + _report_thread_future.wait(); } void PlanFragmentExecutor::cancel(const PPlanFragmentCancelReason& reason, const std::string& msg) { diff --git a/be/src/runtime/plan_fragment_executor.h b/be/src/runtime/plan_fragment_executor.h index f4bcdcaa13..352edfccbe 100644 --- a/be/src/runtime/plan_fragment_executor.h +++ b/be/src/runtime/plan_fragment_executor.h @@ -22,6 +22,7 @@ #include #include +#include #include #include "common/object_pool.h" @@ -135,7 +136,8 @@ private: // profile reporting-related report_status_callback _report_status_cb; - std::thread _report_thread; + std::promise _report_thread_promise; + std::future _report_thread_future; std::mutex _report_thread_lock; // Indicates that profile reporting thread should stop. diff --git a/be/src/vec/exec/join/vjoin_node_base.cpp b/be/src/vec/exec/join/vjoin_node_base.cpp index e186f15d8c..7daf1fe237 100644 --- a/be/src/vec/exec/join/vjoin_node_base.cpp +++ b/be/src/vec/exec/join/vjoin_node_base.cpp @@ -180,11 +180,12 @@ Status VJoinNodeBase::open(RuntimeState* state) { std::promise thread_status; try { - std::thread([this, state, thread_status_p = &thread_status, - parent_span = opentelemetry::trace::Tracer::GetCurrentSpan()] { - OpentelemetryScope scope {parent_span}; - this->_probe_side_open_thread(state, thread_status_p); - }).detach(); + state->exec_env()->join_node_thread_pool()->submit_func( + [this, state, thread_status_p = &thread_status, + parent_span = opentelemetry::trace::Tracer::GetCurrentSpan()] { + OpentelemetryScope scope {parent_span}; + this->_probe_side_open_thread(state, thread_status_p); + }); } catch (const std::system_error& e) { LOG(WARNING) << "In VJoinNodeBase::open create thread fail, " << e.what(); return Status::InternalError(e.what());