[enhancement](execute model) using thread pool to execute report or join task instead of staring too many thread (#17212)
* [enhancement](execute model) using thread pool to execute report or join task instead of staring too many thread Doris will start report thread and join thread during fragment execution. There are many problems if create and destroy thread very frequently. Jemalloc may not behave very well, it may crashed. jemalloc/jemalloc#1405 It is better to using thread pool to do these tasks. --------- Co-authored-by: yiguolei <yiguolei@gmail.com>
This commit is contained in:
@ -103,6 +103,7 @@ PipelineFragmentContext::PipelineFragmentContext(
|
||||
_report_thread_active(false),
|
||||
_report_status_cb(report_status_cb),
|
||||
_is_report_on_cancel(true) {
|
||||
_report_thread_future = _report_thread_promise.get_future();
|
||||
_fragment_watcher.start();
|
||||
}
|
||||
|
||||
@ -276,7 +277,10 @@ Status PipelineFragmentContext::prepare(const doris::TExecPlanFragmentParams& re
|
||||
|
||||
if (_is_report_success && config::status_report_interval > 0) {
|
||||
std::unique_lock<std::mutex> l(_report_thread_lock);
|
||||
_report_thread = std::thread(&PipelineFragmentContext::report_profile, this);
|
||||
_exec_env->send_report_thread_pool()->submit_func([this] {
|
||||
Defer defer {[&]() { this->_report_thread_promise.set_value(true); }};
|
||||
this->report_profile();
|
||||
});
|
||||
// make sure the thread started up, otherwise report_profile() might get into a race
|
||||
// with stop_report_thread()
|
||||
_report_thread_started_cv.wait(l);
|
||||
@ -475,7 +479,9 @@ void PipelineFragmentContext::_stop_report_thread() {
|
||||
_report_thread_active = false;
|
||||
|
||||
_stop_report_thread_cv.notify_one();
|
||||
_report_thread.join();
|
||||
// Wait infinitly to ensure that the report task is finished and the this variable
|
||||
// is not used in report thread.
|
||||
_report_thread_future.wait();
|
||||
}
|
||||
|
||||
void PipelineFragmentContext::report_profile() {
|
||||
|
||||
@ -17,6 +17,8 @@
|
||||
|
||||
#pragma once
|
||||
|
||||
#include <future>
|
||||
|
||||
#include "io/fs/stream_load_pipe.h"
|
||||
#include "pipeline/pipeline.h"
|
||||
#include "pipeline/pipeline_task.h"
|
||||
@ -179,7 +181,8 @@ private:
|
||||
bool _report_thread_active;
|
||||
// profile reporting-related
|
||||
report_status_callback _report_status_cb;
|
||||
std::thread _report_thread;
|
||||
std::promise<bool> _report_thread_promise;
|
||||
std::future<bool> _report_thread_future;
|
||||
std::mutex _report_thread_lock;
|
||||
|
||||
// Indicates that profile reporting thread should stop.
|
||||
|
||||
@ -124,6 +124,9 @@ public:
|
||||
MemTrackerLimiter* experimental_mem_tracker() { return _experimental_mem_tracker.get(); }
|
||||
ThreadPool* send_batch_thread_pool() { return _send_batch_thread_pool.get(); }
|
||||
ThreadPool* download_cache_thread_pool() { return _download_cache_thread_pool.get(); }
|
||||
ThreadPool* send_report_thread_pool() { return _send_report_thread_pool.get(); }
|
||||
ThreadPool* join_node_thread_pool() { return _join_node_thread_pool.get(); }
|
||||
|
||||
void set_serial_download_cache_thread_token() {
|
||||
_serial_download_cache_thread_token =
|
||||
download_cache_thread_pool()->new_token(ThreadPool::ExecutionMode::SERIAL, 1);
|
||||
@ -215,6 +218,10 @@ private:
|
||||
std::unique_ptr<ThreadPool> _download_cache_thread_pool;
|
||||
// A token used to submit download cache task serially
|
||||
std::unique_ptr<ThreadPoolToken> _serial_download_cache_thread_token;
|
||||
// Pool used by fragment manager to send profile or status to FE coordinator
|
||||
std::unique_ptr<ThreadPool> _send_report_thread_pool;
|
||||
// Pool used by join node to build hash table
|
||||
std::unique_ptr<ThreadPool> _join_node_thread_pool;
|
||||
// ThreadPoolToken -> buffer
|
||||
std::unordered_map<ThreadPoolToken*, std::unique_ptr<char[]>> _download_cache_buf_map;
|
||||
CgroupsMgr* _cgroups_mgr = nullptr;
|
||||
|
||||
@ -102,6 +102,21 @@ Status ExecEnv::_init(const std::vector<StorePath>& store_paths) {
|
||||
|
||||
init_download_cache_required_components();
|
||||
|
||||
// min num equal to fragment pool's min num
|
||||
// max num is useless because it will start as many as requested in the past
|
||||
// queue size is useless because the max thread num is very large
|
||||
ThreadPoolBuilder("SendReportThreadPool")
|
||||
.set_min_threads(config::fragment_pool_thread_num_min)
|
||||
.set_max_threads(std::numeric_limits<int>::max())
|
||||
.set_max_queue_size(config::fragment_pool_queue_size)
|
||||
.build(&_send_report_thread_pool);
|
||||
|
||||
ThreadPoolBuilder("JoinNodeThreadPool")
|
||||
.set_min_threads(config::fragment_pool_thread_num_min)
|
||||
.set_max_threads(std::numeric_limits<int>::max())
|
||||
.set_max_queue_size(config::fragment_pool_queue_size)
|
||||
.build(&_join_node_thread_pool);
|
||||
|
||||
RETURN_IF_ERROR(init_pipeline_task_scheduler());
|
||||
_scanner_scheduler = new doris::vectorized::ScannerScheduler();
|
||||
|
||||
|
||||
@ -64,7 +64,9 @@ PlanFragmentExecutor::PlanFragmentExecutor(ExecEnv* exec_env,
|
||||
_is_report_success(false),
|
||||
_is_report_on_cancel(true),
|
||||
_collect_query_statistics_with_every_batch(false),
|
||||
_cancel_reason(PPlanFragmentCancelReason::INTERNAL_ERROR) {}
|
||||
_cancel_reason(PPlanFragmentCancelReason::INTERNAL_ERROR) {
|
||||
_report_thread_future = _report_thread_promise.get_future();
|
||||
}
|
||||
|
||||
PlanFragmentExecutor::~PlanFragmentExecutor() {
|
||||
close();
|
||||
@ -228,7 +230,10 @@ Status PlanFragmentExecutor::open() {
|
||||
// at end, otherwise the coordinator hangs in case we finish w/ an error
|
||||
if (_is_report_success && config::status_report_interval > 0) {
|
||||
std::unique_lock<std::mutex> l(_report_thread_lock);
|
||||
_report_thread = std::thread(&PlanFragmentExecutor::report_profile, this);
|
||||
_exec_env->send_report_thread_pool()->submit_func([this] {
|
||||
Defer defer {[&]() { this->_report_thread_promise.set_value(true); }};
|
||||
this->report_profile();
|
||||
});
|
||||
// make sure the thread started up, otherwise report_profile() might get into a race
|
||||
// with stop_report_thread()
|
||||
_report_thread_started_cv.wait(l);
|
||||
@ -456,7 +461,10 @@ void PlanFragmentExecutor::stop_report_thread() {
|
||||
_report_thread_active = false;
|
||||
|
||||
_stop_report_thread_cv.notify_one();
|
||||
_report_thread.join();
|
||||
// Wait infinitly until the thread is stopped and the future is set.
|
||||
// The reporting thread depends on the PlanFragmentExecutor object, if not wait infinitly here, the reporting
|
||||
// thread may crashed because the PlanFragmentExecutor is destroyed.
|
||||
_report_thread_future.wait();
|
||||
}
|
||||
|
||||
void PlanFragmentExecutor::cancel(const PPlanFragmentCancelReason& reason, const std::string& msg) {
|
||||
|
||||
@ -22,6 +22,7 @@
|
||||
|
||||
#include <condition_variable>
|
||||
#include <functional>
|
||||
#include <future>
|
||||
#include <vector>
|
||||
|
||||
#include "common/object_pool.h"
|
||||
@ -135,7 +136,8 @@ private:
|
||||
|
||||
// profile reporting-related
|
||||
report_status_callback _report_status_cb;
|
||||
std::thread _report_thread;
|
||||
std::promise<bool> _report_thread_promise;
|
||||
std::future<bool> _report_thread_future;
|
||||
std::mutex _report_thread_lock;
|
||||
|
||||
// Indicates that profile reporting thread should stop.
|
||||
|
||||
@ -180,11 +180,12 @@ Status VJoinNodeBase::open(RuntimeState* state) {
|
||||
|
||||
std::promise<Status> thread_status;
|
||||
try {
|
||||
std::thread([this, state, thread_status_p = &thread_status,
|
||||
parent_span = opentelemetry::trace::Tracer::GetCurrentSpan()] {
|
||||
OpentelemetryScope scope {parent_span};
|
||||
this->_probe_side_open_thread(state, thread_status_p);
|
||||
}).detach();
|
||||
state->exec_env()->join_node_thread_pool()->submit_func(
|
||||
[this, state, thread_status_p = &thread_status,
|
||||
parent_span = opentelemetry::trace::Tracer::GetCurrentSpan()] {
|
||||
OpentelemetryScope scope {parent_span};
|
||||
this->_probe_side_open_thread(state, thread_status_p);
|
||||
});
|
||||
} catch (const std::system_error& e) {
|
||||
LOG(WARNING) << "In VJoinNodeBase::open create thread fail, " << e.what();
|
||||
return Status::InternalError(e.what());
|
||||
|
||||
Reference in New Issue
Block a user