diff --git a/be/src/pipeline/pipeline_fragment_context.cpp b/be/src/pipeline/pipeline_fragment_context.cpp index f37dd4db7b..df51da373a 100644 --- a/be/src/pipeline/pipeline_fragment_context.cpp +++ b/be/src/pipeline/pipeline_fragment_context.cpp @@ -103,6 +103,7 @@ PipelineFragmentContext::PipelineFragmentContext( _report_thread_active(false), _report_status_cb(report_status_cb), _is_report_on_cancel(true) { + _report_thread_future = _report_thread_promise.get_future(); _fragment_watcher.start(); } @@ -276,7 +277,10 @@ Status PipelineFragmentContext::prepare(const doris::TExecPlanFragmentParams& re if (_is_report_success && config::status_report_interval > 0) { std::unique_lock l(_report_thread_lock); - _report_thread = std::thread(&PipelineFragmentContext::report_profile, this); + _exec_env->send_report_thread_pool()->submit_func([this] { + Defer defer {[&]() { this->_report_thread_promise.set_value(true); }}; + this->report_profile(); + }); // make sure the thread started up, otherwise report_profile() might get into a race // with stop_report_thread() _report_thread_started_cv.wait(l); @@ -475,7 +479,9 @@ void PipelineFragmentContext::_stop_report_thread() { _report_thread_active = false; _stop_report_thread_cv.notify_one(); - _report_thread.join(); + // Wait infinitly to ensure that the report task is finished and the this variable + // is not used in report thread. + _report_thread_future.wait(); } void PipelineFragmentContext::report_profile() { diff --git a/be/src/pipeline/pipeline_fragment_context.h b/be/src/pipeline/pipeline_fragment_context.h index 38b07d503e..0a756ed37f 100644 --- a/be/src/pipeline/pipeline_fragment_context.h +++ b/be/src/pipeline/pipeline_fragment_context.h @@ -17,6 +17,8 @@ #pragma once +#include + #include "io/fs/stream_load_pipe.h" #include "pipeline/pipeline.h" #include "pipeline/pipeline_task.h" @@ -179,7 +181,8 @@ private: bool _report_thread_active; // profile reporting-related report_status_callback _report_status_cb; - std::thread _report_thread; + std::promise _report_thread_promise; + std::future _report_thread_future; std::mutex _report_thread_lock; // Indicates that profile reporting thread should stop. diff --git a/be/src/runtime/exec_env.h b/be/src/runtime/exec_env.h index 4d2163cf39..ae402453be 100644 --- a/be/src/runtime/exec_env.h +++ b/be/src/runtime/exec_env.h @@ -124,6 +124,9 @@ public: MemTrackerLimiter* experimental_mem_tracker() { return _experimental_mem_tracker.get(); } ThreadPool* send_batch_thread_pool() { return _send_batch_thread_pool.get(); } ThreadPool* download_cache_thread_pool() { return _download_cache_thread_pool.get(); } + ThreadPool* send_report_thread_pool() { return _send_report_thread_pool.get(); } + ThreadPool* join_node_thread_pool() { return _join_node_thread_pool.get(); } + void set_serial_download_cache_thread_token() { _serial_download_cache_thread_token = download_cache_thread_pool()->new_token(ThreadPool::ExecutionMode::SERIAL, 1); @@ -215,6 +218,10 @@ private: std::unique_ptr _download_cache_thread_pool; // A token used to submit download cache task serially std::unique_ptr _serial_download_cache_thread_token; + // Pool used by fragment manager to send profile or status to FE coordinator + std::unique_ptr _send_report_thread_pool; + // Pool used by join node to build hash table + std::unique_ptr _join_node_thread_pool; // ThreadPoolToken -> buffer std::unordered_map> _download_cache_buf_map; CgroupsMgr* _cgroups_mgr = nullptr; diff --git a/be/src/runtime/exec_env_init.cpp b/be/src/runtime/exec_env_init.cpp index a2770ce6f5..64c0c7cb20 100644 --- a/be/src/runtime/exec_env_init.cpp +++ b/be/src/runtime/exec_env_init.cpp @@ -102,6 +102,21 @@ Status ExecEnv::_init(const std::vector& store_paths) { init_download_cache_required_components(); + // min num equal to fragment pool's min num + // max num is useless because it will start as many as requested in the past + // queue size is useless because the max thread num is very large + ThreadPoolBuilder("SendReportThreadPool") + .set_min_threads(config::fragment_pool_thread_num_min) + .set_max_threads(std::numeric_limits::max()) + .set_max_queue_size(config::fragment_pool_queue_size) + .build(&_send_report_thread_pool); + + ThreadPoolBuilder("JoinNodeThreadPool") + .set_min_threads(config::fragment_pool_thread_num_min) + .set_max_threads(std::numeric_limits::max()) + .set_max_queue_size(config::fragment_pool_queue_size) + .build(&_join_node_thread_pool); + RETURN_IF_ERROR(init_pipeline_task_scheduler()); _scanner_scheduler = new doris::vectorized::ScannerScheduler(); diff --git a/be/src/runtime/plan_fragment_executor.cpp b/be/src/runtime/plan_fragment_executor.cpp index f62bd4bae1..94c200a5f2 100644 --- a/be/src/runtime/plan_fragment_executor.cpp +++ b/be/src/runtime/plan_fragment_executor.cpp @@ -64,7 +64,9 @@ PlanFragmentExecutor::PlanFragmentExecutor(ExecEnv* exec_env, _is_report_success(false), _is_report_on_cancel(true), _collect_query_statistics_with_every_batch(false), - _cancel_reason(PPlanFragmentCancelReason::INTERNAL_ERROR) {} + _cancel_reason(PPlanFragmentCancelReason::INTERNAL_ERROR) { + _report_thread_future = _report_thread_promise.get_future(); +} PlanFragmentExecutor::~PlanFragmentExecutor() { close(); @@ -228,7 +230,10 @@ Status PlanFragmentExecutor::open() { // at end, otherwise the coordinator hangs in case we finish w/ an error if (_is_report_success && config::status_report_interval > 0) { std::unique_lock l(_report_thread_lock); - _report_thread = std::thread(&PlanFragmentExecutor::report_profile, this); + _exec_env->send_report_thread_pool()->submit_func([this] { + Defer defer {[&]() { this->_report_thread_promise.set_value(true); }}; + this->report_profile(); + }); // make sure the thread started up, otherwise report_profile() might get into a race // with stop_report_thread() _report_thread_started_cv.wait(l); @@ -456,7 +461,10 @@ void PlanFragmentExecutor::stop_report_thread() { _report_thread_active = false; _stop_report_thread_cv.notify_one(); - _report_thread.join(); + // Wait infinitly until the thread is stopped and the future is set. + // The reporting thread depends on the PlanFragmentExecutor object, if not wait infinitly here, the reporting + // thread may crashed because the PlanFragmentExecutor is destroyed. + _report_thread_future.wait(); } void PlanFragmentExecutor::cancel(const PPlanFragmentCancelReason& reason, const std::string& msg) { diff --git a/be/src/runtime/plan_fragment_executor.h b/be/src/runtime/plan_fragment_executor.h index f4bcdcaa13..352edfccbe 100644 --- a/be/src/runtime/plan_fragment_executor.h +++ b/be/src/runtime/plan_fragment_executor.h @@ -22,6 +22,7 @@ #include #include +#include #include #include "common/object_pool.h" @@ -135,7 +136,8 @@ private: // profile reporting-related report_status_callback _report_status_cb; - std::thread _report_thread; + std::promise _report_thread_promise; + std::future _report_thread_future; std::mutex _report_thread_lock; // Indicates that profile reporting thread should stop. diff --git a/be/src/vec/exec/join/vjoin_node_base.cpp b/be/src/vec/exec/join/vjoin_node_base.cpp index e186f15d8c..7daf1fe237 100644 --- a/be/src/vec/exec/join/vjoin_node_base.cpp +++ b/be/src/vec/exec/join/vjoin_node_base.cpp @@ -180,11 +180,12 @@ Status VJoinNodeBase::open(RuntimeState* state) { std::promise thread_status; try { - std::thread([this, state, thread_status_p = &thread_status, - parent_span = opentelemetry::trace::Tracer::GetCurrentSpan()] { - OpentelemetryScope scope {parent_span}; - this->_probe_side_open_thread(state, thread_status_p); - }).detach(); + state->exec_env()->join_node_thread_pool()->submit_func( + [this, state, thread_status_p = &thread_status, + parent_span = opentelemetry::trace::Tracer::GetCurrentSpan()] { + OpentelemetryScope scope {parent_span}; + this->_probe_side_open_thread(state, thread_status_p); + }); } catch (const std::system_error& e) { LOG(WARNING) << "In VJoinNodeBase::open create thread fail, " << e.what(); return Status::InternalError(e.what());