[fix](pipelineX) make RuntimeFilterTimerQueue graceful exit (#27653)

make RuntimeFilterTimerQueue graceful exit
This commit is contained in:
Mryange
2023-11-29 18:53:13 +08:00
committed by GitHub
parent 498d27c905
commit f1e9e6dba8
4 changed files with 75 additions and 52 deletions

View File

@ -24,6 +24,7 @@
#include "pipeline/pipeline_fragment_context.h"
#include "pipeline/pipeline_task.h"
#include "pipeline/pipeline_x/pipeline_x_task.h"
#include "runtime/exec_env.h"
#include "runtime/memory/mem_tracker.h"
namespace doris::pipeline {
@ -153,57 +154,6 @@ void RuntimeFilterTimer::call_has_release() {
// so there is no need to take any action.
}
struct RuntimeFilterTimerQueue {
constexpr static int64_t interval = 50;
void start() {
while (true) {
std::unique_lock<std::mutex> lk(cv_m);
cv.wait(lk, [this] { return !_que.empty(); });
{
std::unique_lock<std::mutex> lc(_que_lock);
std::list<std::shared_ptr<RuntimeFilterTimer>> new_que;
for (auto& it : _que) {
if (it.use_count() == 1) {
it->call_has_release();
} else if (it->has_ready()) {
it->call_has_ready();
} else {
int64_t ms_since_registration = MonotonicMillis() - it->registration_time();
if (ms_since_registration > it->wait_time_ms()) {
it->call_timeout();
} else {
new_que.push_back(std::move(it));
}
}
}
new_que.swap(_que);
}
std::this_thread::sleep_for(std::chrono::milliseconds(interval));
}
}
~RuntimeFilterTimerQueue() { _thread.detach(); }
RuntimeFilterTimerQueue() { _thread = std::thread(&RuntimeFilterTimerQueue::start, this); }
static void push_filter_timer(std::shared_ptr<RuntimeFilterTimer> filter) {
static RuntimeFilterTimerQueue timer_que;
timer_que.push(filter);
}
void push(std::shared_ptr<RuntimeFilterTimer> filter) {
std::unique_lock<std::mutex> lc(_que_lock);
_que.push_back(filter);
cv.notify_all();
}
std::thread _thread;
std::condition_variable cv;
std::mutex cv_m;
std::mutex _que_lock;
std::list<std::shared_ptr<RuntimeFilterTimer>> _que;
};
void RuntimeFilterDependency::add_filters(IRuntimeFilter* runtime_filter) {
_filters++;
int64_t registration_time = runtime_filter->registration_time();
@ -212,7 +162,7 @@ void RuntimeFilterDependency::add_filters(IRuntimeFilter* runtime_filter) {
registration_time, wait_time_ms,
std::dynamic_pointer_cast<RuntimeFilterDependency>(shared_from_this()), runtime_filter);
runtime_filter->set_filter_timer(filter_timer);
RuntimeFilterTimerQueue::push_filter_timer(filter_timer);
ExecEnv::GetInstance()->runtime_filter_timer_queue()->push_filter_timer(filter_timer);
}
void RuntimeFilterDependency::sub_filters() {

View File

@ -190,6 +190,68 @@ private:
IRuntimeFilter* _runtime_filter = nullptr;
};
struct RuntimeFilterTimerQueue {
constexpr static int64_t interval = 10;
void run() { _thread.detach(); }
void start() {
while (!_stop) {
std::unique_lock<std::mutex> lk(cv_m);
cv.wait(lk, [this] { return !_que.empty() || _stop; });
if (_stop) {
break;
}
{
std::unique_lock<std::mutex> lc(_que_lock);
std::list<std::shared_ptr<pipeline::RuntimeFilterTimer>> new_que;
for (auto& it : _que) {
if (it.use_count() == 1) {
it->call_has_release();
} else if (it->has_ready()) {
it->call_has_ready();
} else {
int64_t ms_since_registration = MonotonicMillis() - it->registration_time();
if (ms_since_registration > it->wait_time_ms()) {
it->call_timeout();
} else {
new_que.push_back(std::move(it));
}
}
}
new_que.swap(_que);
}
std::this_thread::sleep_for(std::chrono::milliseconds(interval));
}
_shutdown = true;
}
void stop() { _stop = true; }
void wait_for_shutdown() const {
while (!_shutdown) {
std::this_thread::sleep_for(std::chrono::milliseconds(interval));
}
}
~RuntimeFilterTimerQueue() { wait_for_shutdown(); }
RuntimeFilterTimerQueue() { _thread = std::thread(&RuntimeFilterTimerQueue::start, this); }
void push_filter_timer(std::shared_ptr<pipeline::RuntimeFilterTimer> filter) { push(filter); }
void push(std::shared_ptr<pipeline::RuntimeFilterTimer> filter) {
std::unique_lock<std::mutex> lc(_que_lock);
_que.push_back(filter);
cv.notify_all();
}
std::thread _thread;
std::condition_variable cv;
std::mutex cv_m;
std::mutex _que_lock;
std::atomic_bool _stop = false;
std::atomic_bool _shutdown = false;
std::list<std::shared_ptr<pipeline::RuntimeFilterTimer>> _que;
};
class RuntimeFilterDependency final : public Dependency {
public:
RuntimeFilterDependency(int id, int node_id, std::string name, QueryContext* query_ctx)

View File

@ -46,6 +46,7 @@ class DeltaWriterV2Pool;
namespace pipeline {
class TaskScheduler;
class BlockedTaskScheduler;
struct RuntimeFilterTimerQueue;
} // namespace pipeline
namespace taskgroup {
class TaskGroupManager;
@ -271,6 +272,10 @@ public:
return _global_block_scheduler;
}
doris::pipeline::RuntimeFilterTimerQueue* runtime_filter_timer_queue() {
return _runtime_filter_timer_queue;
}
private:
ExecEnv();
@ -382,6 +387,8 @@ private:
std::shared_ptr<doris::pipeline::BlockedTaskScheduler> _without_group_block_scheduler;
// used for query with workload group cpu soft limit
std::shared_ptr<doris::pipeline::BlockedTaskScheduler> _with_group_block_scheduler;
doris::pipeline::RuntimeFilterTimerQueue* _runtime_filter_timer_queue = nullptr;
};
template <>

View File

@ -297,6 +297,8 @@ Status ExecEnv::init_pipeline_task_scheduler() {
_global_block_scheduler = std::make_shared<pipeline::BlockedTaskScheduler>();
RETURN_IF_ERROR(_global_block_scheduler->start("GlobalBlockSche"));
_runtime_filter_timer_queue = new doris::pipeline::RuntimeFilterTimerQueue();
_runtime_filter_timer_queue->run();
return Status::OK();
}
@ -550,6 +552,7 @@ void ExecEnv::destroy() {
SAFE_STOP(_external_scan_context_mgr);
SAFE_STOP(_fragment_mgr);
SAFE_STOP(_runtime_filter_timer_queue);
// NewLoadStreamMgr should be destoried before storage_engine & after fragment_mgr stopped.
_new_load_stream_mgr.reset();
_stream_load_executor.reset();
@ -617,6 +620,7 @@ void ExecEnv::destroy() {
SAFE_DELETE(_with_group_task_scheduler);
SAFE_DELETE(_without_group_task_scheduler);
SAFE_DELETE(_file_cache_factory);
SAFE_DELETE(_runtime_filter_timer_queue);
// TODO(zhiqiang): Maybe we should call shutdown before release thread pool?
_join_node_thread_pool.reset(nullptr);
_send_report_thread_pool.reset(nullptr);