diff --git a/be/src/pipeline/pipeline_x/dependency.cpp b/be/src/pipeline/pipeline_x/dependency.cpp index 073bd27d3d..2fcf0906c1 100644 --- a/be/src/pipeline/pipeline_x/dependency.cpp +++ b/be/src/pipeline/pipeline_x/dependency.cpp @@ -24,6 +24,7 @@ #include "pipeline/pipeline_fragment_context.h" #include "pipeline/pipeline_task.h" #include "pipeline/pipeline_x/pipeline_x_task.h" +#include "runtime/exec_env.h" #include "runtime/memory/mem_tracker.h" namespace doris::pipeline { @@ -153,57 +154,6 @@ void RuntimeFilterTimer::call_has_release() { // so there is no need to take any action. } -struct RuntimeFilterTimerQueue { - constexpr static int64_t interval = 50; - void start() { - while (true) { - std::unique_lock lk(cv_m); - - cv.wait(lk, [this] { return !_que.empty(); }); - { - std::unique_lock lc(_que_lock); - std::list> new_que; - for (auto& it : _que) { - if (it.use_count() == 1) { - it->call_has_release(); - } else if (it->has_ready()) { - it->call_has_ready(); - } else { - int64_t ms_since_registration = MonotonicMillis() - it->registration_time(); - if (ms_since_registration > it->wait_time_ms()) { - it->call_timeout(); - } else { - new_que.push_back(std::move(it)); - } - } - } - new_que.swap(_que); - } - std::this_thread::sleep_for(std::chrono::milliseconds(interval)); - } - } - ~RuntimeFilterTimerQueue() { _thread.detach(); } - RuntimeFilterTimerQueue() { _thread = std::thread(&RuntimeFilterTimerQueue::start, this); } - static void push_filter_timer(std::shared_ptr filter) { - static RuntimeFilterTimerQueue timer_que; - - timer_que.push(filter); - } - - void push(std::shared_ptr filter) { - std::unique_lock lc(_que_lock); - _que.push_back(filter); - cv.notify_all(); - } - - std::thread _thread; - std::condition_variable cv; - std::mutex cv_m; - std::mutex _que_lock; - - std::list> _que; -}; - void RuntimeFilterDependency::add_filters(IRuntimeFilter* runtime_filter) { _filters++; int64_t registration_time = runtime_filter->registration_time(); @@ -212,7 +162,7 @@ void RuntimeFilterDependency::add_filters(IRuntimeFilter* runtime_filter) { registration_time, wait_time_ms, std::dynamic_pointer_cast(shared_from_this()), runtime_filter); runtime_filter->set_filter_timer(filter_timer); - RuntimeFilterTimerQueue::push_filter_timer(filter_timer); + ExecEnv::GetInstance()->runtime_filter_timer_queue()->push_filter_timer(filter_timer); } void RuntimeFilterDependency::sub_filters() { diff --git a/be/src/pipeline/pipeline_x/dependency.h b/be/src/pipeline/pipeline_x/dependency.h index 32aa1ff4cf..8a55efcdb7 100644 --- a/be/src/pipeline/pipeline_x/dependency.h +++ b/be/src/pipeline/pipeline_x/dependency.h @@ -190,6 +190,68 @@ private: IRuntimeFilter* _runtime_filter = nullptr; }; +struct RuntimeFilterTimerQueue { + constexpr static int64_t interval = 10; + void run() { _thread.detach(); } + void start() { + while (!_stop) { + std::unique_lock lk(cv_m); + + cv.wait(lk, [this] { return !_que.empty() || _stop; }); + if (_stop) { + break; + } + { + std::unique_lock lc(_que_lock); + std::list> new_que; + for (auto& it : _que) { + if (it.use_count() == 1) { + it->call_has_release(); + } else if (it->has_ready()) { + it->call_has_ready(); + } else { + int64_t ms_since_registration = MonotonicMillis() - it->registration_time(); + if (ms_since_registration > it->wait_time_ms()) { + it->call_timeout(); + } else { + new_que.push_back(std::move(it)); + } + } + } + new_que.swap(_que); + } + std::this_thread::sleep_for(std::chrono::milliseconds(interval)); + } + _shutdown = true; + } + + void stop() { _stop = true; } + + void wait_for_shutdown() const { + while (!_shutdown) { + std::this_thread::sleep_for(std::chrono::milliseconds(interval)); + } + } + + ~RuntimeFilterTimerQueue() { wait_for_shutdown(); } + RuntimeFilterTimerQueue() { _thread = std::thread(&RuntimeFilterTimerQueue::start, this); } + void push_filter_timer(std::shared_ptr filter) { push(filter); } + + void push(std::shared_ptr filter) { + std::unique_lock lc(_que_lock); + _que.push_back(filter); + cv.notify_all(); + } + + std::thread _thread; + std::condition_variable cv; + std::mutex cv_m; + std::mutex _que_lock; + std::atomic_bool _stop = false; + std::atomic_bool _shutdown = false; + std::list> _que; +}; + class RuntimeFilterDependency final : public Dependency { public: RuntimeFilterDependency(int id, int node_id, std::string name, QueryContext* query_ctx) diff --git a/be/src/runtime/exec_env.h b/be/src/runtime/exec_env.h index 2bcf9b1b91..c405953d6c 100644 --- a/be/src/runtime/exec_env.h +++ b/be/src/runtime/exec_env.h @@ -46,6 +46,7 @@ class DeltaWriterV2Pool; namespace pipeline { class TaskScheduler; class BlockedTaskScheduler; +struct RuntimeFilterTimerQueue; } // namespace pipeline namespace taskgroup { class TaskGroupManager; @@ -271,6 +272,10 @@ public: return _global_block_scheduler; } + doris::pipeline::RuntimeFilterTimerQueue* runtime_filter_timer_queue() { + return _runtime_filter_timer_queue; + } + private: ExecEnv(); @@ -382,6 +387,8 @@ private: std::shared_ptr _without_group_block_scheduler; // used for query with workload group cpu soft limit std::shared_ptr _with_group_block_scheduler; + + doris::pipeline::RuntimeFilterTimerQueue* _runtime_filter_timer_queue = nullptr; }; template <> diff --git a/be/src/runtime/exec_env_init.cpp b/be/src/runtime/exec_env_init.cpp index 1deead9170..9703dffb9b 100644 --- a/be/src/runtime/exec_env_init.cpp +++ b/be/src/runtime/exec_env_init.cpp @@ -297,6 +297,8 @@ Status ExecEnv::init_pipeline_task_scheduler() { _global_block_scheduler = std::make_shared(); RETURN_IF_ERROR(_global_block_scheduler->start("GlobalBlockSche")); + _runtime_filter_timer_queue = new doris::pipeline::RuntimeFilterTimerQueue(); + _runtime_filter_timer_queue->run(); return Status::OK(); } @@ -550,6 +552,7 @@ void ExecEnv::destroy() { SAFE_STOP(_external_scan_context_mgr); SAFE_STOP(_fragment_mgr); + SAFE_STOP(_runtime_filter_timer_queue); // NewLoadStreamMgr should be destoried before storage_engine & after fragment_mgr stopped. _new_load_stream_mgr.reset(); _stream_load_executor.reset(); @@ -617,6 +620,7 @@ void ExecEnv::destroy() { SAFE_DELETE(_with_group_task_scheduler); SAFE_DELETE(_without_group_task_scheduler); SAFE_DELETE(_file_cache_factory); + SAFE_DELETE(_runtime_filter_timer_queue); // TODO(zhiqiang): Maybe we should call shutdown before release thread pool? _join_node_thread_pool.reset(nullptr); _send_report_thread_pool.reset(nullptr);