From 46d40b195258d8abc34d56dc177d209d9873b07f Mon Sep 17 00:00:00 2001 From: wangbo Date: Fri, 27 Oct 2023 14:24:41 +0800 Subject: [PATCH] [refactor](executor)Remove empty group logic #26005 --- be/src/common/config.cpp | 1 - be/src/common/config.h | 1 - be/src/pipeline/pipeline_task.cpp | 23 ++--------- be/src/pipeline/pipeline_task.h | 20 --------- be/src/pipeline/task_queue.cpp | 46 +-------------------- be/src/pipeline/task_queue.h | 9 ----- be/src/pipeline/task_scheduler.cpp | 4 -- be/src/runtime/task_group/task_group.cpp | 17 +------- be/src/runtime/task_group/task_group.h | 10 ----- be/src/vec/exec/scan/scan_task_queue.cpp | 47 +--------------------- be/src/vec/exec/scan/scan_task_queue.h | 9 ----- be/src/vec/exec/scan/scanner_scheduler.cpp | 20 +++------ be/src/vec/exec/scan/scanner_scheduler.h | 4 -- 13 files changed, 13 insertions(+), 198 deletions(-) diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp index 5812176872..5b6e393fee 100644 --- a/be/src/common/config.cpp +++ b/be/src/common/config.cpp @@ -1110,7 +1110,6 @@ DEFINE_Bool(enable_flush_file_cache_async, "true"); // cgroup DEFINE_String(doris_cgroup_cpu_path, ""); -DEFINE_Bool(enable_cpu_hard_limit, "false"); DEFINE_Bool(ignore_always_true_predicate_for_segment, "true"); diff --git a/be/src/common/config.h b/be/src/common/config.h index f8350d8731..71c0a1e12c 100644 --- a/be/src/common/config.h +++ b/be/src/common/config.h @@ -1177,7 +1177,6 @@ DECLARE_mBool(exit_on_exception); // cgroup DECLARE_String(doris_cgroup_cpu_path); -DECLARE_Bool(enable_cpu_hard_limit); // This config controls whether the s3 file writer would flush cache asynchronously DECLARE_Bool(enable_flush_file_cache_async); diff --git a/be/src/pipeline/pipeline_task.cpp b/be/src/pipeline/pipeline_task.cpp index 7e907b318d..111259062c 100644 --- a/be/src/pipeline/pipeline_task.cpp +++ b/be/src/pipeline/pipeline_task.cpp @@ -215,16 +215,6 @@ void PipelineTask::set_task_queue(TaskQueue* task_queue) { _task_queue = task_queue; } -void PipelineTask::yield() { - int64_t time_spent = 0; - Defer defer {[&]() { - time_spent = time_spent * _core_num / _total_query_thread_num; - _task_queue->update_statistics(this, time_spent); - }}; - SCOPED_RAW_TIMER(&time_spent); - usleep(THREAD_TIME_SLICE_US); -} - Status PipelineTask::execute(bool* eos) { SCOPED_TIMER(_task_profile->total_time_counter()); SCOPED_CPU_TIMER(_task_cpu_timer); @@ -232,12 +222,8 @@ Status PipelineTask::execute(bool* eos) { SCOPED_ATTACH_TASK(_state); int64_t time_spent = 0; - // todo(wb) use a more lightweight timer - RuntimeProfile::Counter tmp_timer(TUnit::TIME_NS); - Defer defer {[&]() { if (_task_queue) { - time_spent = tmp_timer.value(); _task_queue->update_statistics(this, time_spent); } }}; @@ -245,7 +231,7 @@ Status PipelineTask::execute(bool* eos) { *eos = false; if (!_opened) { { - SCOPED_CPU_TIMER(&tmp_timer); + SCOPED_RAW_TIMER(&time_spent); auto st = _open(); if (st.is()) { set_state(PipelineTaskState::BLOCKED_FOR_RF); @@ -280,12 +266,12 @@ Status PipelineTask::execute(bool* eos) { set_state(PipelineTaskState::BLOCKED_FOR_SINK); break; } - if (tmp_timer.value() > THREAD_TIME_SLICE) { + if (time_spent > THREAD_TIME_SLICE) { COUNTER_UPDATE(_yield_counts, 1); break; } // TODO llj: Pipeline entity should_yield - SCOPED_CPU_TIMER(&tmp_timer); + SCOPED_RAW_TIMER(&time_spent); _block->clear_column_data(_root->row_desc().num_materialized_slots()); auto* block = _block.get(); @@ -477,9 +463,6 @@ std::string PipelineTask::debug_string() { } taskgroup::TaskGroupPipelineTaskEntity* PipelineTask::get_task_group_entity() const { - if (_is_empty_task) { - return _empty_group_entity; - } return _fragment_context->get_task_group_entity(); } diff --git a/be/src/pipeline/pipeline_task.h b/be/src/pipeline/pipeline_task.h index 99e41421bb..ef923868a5 100644 --- a/be/src/pipeline/pipeline_task.h +++ b/be/src/pipeline/pipeline_task.h @@ -117,8 +117,6 @@ public: PipelineFragmentContext* fragment_context, RuntimeProfile* parent_profile); virtual ~PipelineTask() = default; - PipelineTask() = default; - virtual Status prepare(RuntimeState* state); virtual Status execute(bool* eos); @@ -198,7 +196,6 @@ public: TaskQueue* get_task_queue() { return _task_queue; } static constexpr auto THREAD_TIME_SLICE = 100'000'000ULL; - static constexpr auto THREAD_TIME_SLICE_US = 100000L; // 100ms // 1 used for update priority queue // note(wb) an ugly implementation, need refactor later @@ -252,17 +249,6 @@ public: TUniqueId instance_id() const { return _state->fragment_instance_id(); } - void set_empty_task(bool is_empty_task) { _is_empty_task = is_empty_task; } - - bool is_empty_task() const { return _is_empty_task; } - - void yield(); - - void set_task_group_entity( - taskgroup::TaskGroupEntity>* empty_group_entity) { - _empty_group_entity = empty_group_entity; - } - protected: void _finish_p_dependency() { for (const auto& p : _pipeline->_parents) { @@ -302,12 +288,6 @@ protected: bool _try_close_flag = false; - bool _is_empty_task = false; - taskgroup::TaskGroupEntity>* _empty_group_entity; - int _core_num = CpuInfo::num_cores(); - int _total_query_thread_num = - config::doris_scanner_thread_pool_thread_num + config::pipeline_executor_size; - RuntimeProfile* _parent_profile; std::unique_ptr _task_profile; RuntimeProfile::Counter* _task_cpu_timer; diff --git a/be/src/pipeline/task_queue.cpp b/be/src/pipeline/task_queue.cpp index 366e299f7b..a68a2ba4a7 100644 --- a/be/src/pipeline/task_queue.cpp +++ b/be/src/pipeline/task_queue.cpp @@ -222,17 +222,9 @@ bool TaskGroupTaskQueue::TaskGroupSchedEntityComparator::operator()( } TaskGroupTaskQueue::TaskGroupTaskQueue(size_t core_size) - : TaskQueue(core_size), _min_tg_entity(nullptr) { - _empty_pip_task->set_empty_task(true); - _empty_pip_task->set_task_queue(this); - _empty_pip_task->set_task_group_entity(_empty_group_entity); - _empty_group_entity->set_empty_group_entity(true); -} + : TaskQueue(core_size), _min_tg_entity(nullptr) {} -TaskGroupTaskQueue::~TaskGroupTaskQueue() { - delete _empty_group_entity; - delete _empty_pip_task; -} +TaskGroupTaskQueue::~TaskGroupTaskQueue() = default; void TaskGroupTaskQueue::close() { std::unique_lock lock(_rs_mutex); @@ -256,9 +248,6 @@ Status TaskGroupTaskQueue::_push_back(PipelineTask* task) { entity->task_queue()->emplace(task); if (_group_entities.find(entity) == _group_entities.end()) { _enqueue_task_group(entity); - if (_enable_cpu_hard_limit) { - reset_empty_group_entity(); - } } _wait_task.notify_one(); return Status::OK(); @@ -281,15 +270,9 @@ PipelineTask* TaskGroupTaskQueue::take(size_t core_id) { } } } - if (entity->is_empty_group_entity()) { - return _empty_pip_task; - } DCHECK(entity->task_size() > 0); if (entity->task_size() == 1) { _dequeue_task_group(entity); - if (_enable_cpu_hard_limit) { - reset_empty_group_entity(); - } } auto task = entity->task_queue()->front(); if (task) { @@ -391,30 +374,5 @@ void TaskGroupTaskQueue::update_tg_cpu_share(const taskgroup::TaskGroupInfo& tas } } -void TaskGroupTaskQueue::reset_empty_group_entity() { - int user_g_cpu_hard_limit = 0; - bool contains_empty_group = false; - for (auto* entity : _group_entities) { - if (!entity->is_empty_group_entity()) { - user_g_cpu_hard_limit += entity->cpu_share(); - } else { - contains_empty_group = true; - } - } - - // 0 <= user_g_cpu_hard_limit <= 100, bound by FE - // user_g_cpu_hard_limit = 0 means no group exists - int empty_group_cpu_share = 100 - user_g_cpu_hard_limit; - if (empty_group_cpu_share > 0 && empty_group_cpu_share < 100 && !contains_empty_group) { - _empty_group_entity->update_empty_cpu_share(empty_group_cpu_share); - _enqueue_task_group(_empty_group_entity); - } else if ((empty_group_cpu_share == 0 || empty_group_cpu_share == 100) && - contains_empty_group) { - // no need to update empty group here - // only update empty group's cpu share when exec enqueue - _dequeue_task_group(_empty_group_entity); - } -} - } // namespace pipeline } // namespace doris \ No newline at end of file diff --git a/be/src/pipeline/task_queue.h b/be/src/pipeline/task_queue.h index e6ed54417f..d693cbe216 100644 --- a/be/src/pipeline/task_queue.h +++ b/be/src/pipeline/task_queue.h @@ -187,8 +187,6 @@ public: void update_tg_cpu_share(const taskgroup::TaskGroupInfo& task_group_info, taskgroup::TGPTEntityPtr entity) override; - void reset_empty_group_entity(); - private: template Status _push_back(PipelineTask* task); @@ -211,13 +209,6 @@ private: int _total_cpu_share = 0; std::atomic _min_tg_entity = nullptr; uint64_t _min_tg_v_runtime_ns = 0; - - // empty group - taskgroup::TaskGroupEntity>* _empty_group_entity = - new taskgroup::TaskGroupEntity>(); - PipelineTask* _empty_pip_task = new PipelineTask(); - // todo(wb) support auto-switch cpu mode between soft limit and hard limit - bool _enable_cpu_hard_limit = config::enable_cpu_hard_limit; }; } // namespace pipeline diff --git a/be/src/pipeline/task_scheduler.cpp b/be/src/pipeline/task_scheduler.cpp index cdd934d5c7..d197878255 100644 --- a/be/src/pipeline/task_scheduler.cpp +++ b/be/src/pipeline/task_scheduler.cpp @@ -222,10 +222,6 @@ void TaskScheduler::_do_work(size_t index) { if (!task) { continue; } - if (task->is_empty_task()) { - task->yield(); - continue; - } task->set_task_queue(_task_queue.get()); auto* fragment_ctx = task->fragment_context(); signal::query_id_hi = fragment_ctx->get_query_id().hi; diff --git a/be/src/runtime/task_group/task_group.cpp b/be/src/runtime/task_group/task_group.cpp index 7abc08d3c2..37e4b9ae59 100644 --- a/be/src/runtime/task_group/task_group.cpp +++ b/be/src/runtime/task_group/task_group.cpp @@ -84,7 +84,7 @@ uint64_t TaskGroupEntity::cpu_share() const { template uint64_t TaskGroupEntity::task_group_id() const { - return _is_empty_group_entity ? -1 : _tg->id(); + return _tg->id(); } template @@ -101,21 +101,6 @@ std::string TaskGroupEntity::debug_string() const { _tg->id(), _tg->name(), _type, cpu_share(), task_size(), _vruntime_ns); } -template -void TaskGroupEntity::set_empty_group_entity(bool is_empty_group_entity) { - _is_empty_group_entity = is_empty_group_entity; -} - -template -bool TaskGroupEntity::is_empty_group_entity() { - return _is_empty_group_entity; -} - -template -void TaskGroupEntity::update_empty_cpu_share(uint64_t empty_cpu_share) { - _cpu_share = empty_cpu_share; -} - template class TaskGroupEntity>; template class TaskGroupEntity; diff --git a/be/src/runtime/task_group/task_group.h b/be/src/runtime/task_group/task_group.h index 1dc634469e..a948bf53ec 100644 --- a/be/src/runtime/task_group/task_group.h +++ b/be/src/runtime/task_group/task_group.h @@ -50,8 +50,6 @@ public: explicit TaskGroupEntity(taskgroup::TaskGroup* tg, std::string type); ~TaskGroupEntity(); - TaskGroupEntity() = default; // used for empty group entity - uint64_t vruntime_ns() const { return _vruntime_ns; } QueueType* task_queue(); @@ -70,12 +68,6 @@ public: void check_and_update_cpu_share(const TaskGroupInfo& tg_info); - void set_empty_group_entity(bool is_empty_group_entity); - - bool is_empty_group_entity(); - - void update_empty_cpu_share(uint64_t empty_cpu_share); - private: QueueType* _task_queue; @@ -89,8 +81,6 @@ private: // independent updates. int64_t _version; uint64_t _cpu_share; - - bool _is_empty_group_entity = false; }; // TODO llj tg use PriorityTaskQueue to replace std::queue diff --git a/be/src/vec/exec/scan/scan_task_queue.cpp b/be/src/vec/exec/scan/scan_task_queue.cpp index 90171bc87c..89235b6b7a 100644 --- a/be/src/vec/exec/scan/scan_task_queue.cpp +++ b/be/src/vec/exec/scan/scan_task_queue.cpp @@ -53,16 +53,8 @@ bool ScanTaskQueue::try_get(ScanTask* scan_task, uint32_t timeout_ms) { return r; } -ScanTaskTaskGroupQueue::ScanTaskTaskGroupQueue(size_t core_size) : _core_size(core_size) { - _empty_scan_task->scan_entity = _empty_group_entity; - _empty_scan_task->is_empty_task = true; - _empty_group_entity->set_empty_group_entity(true); -} - -ScanTaskTaskGroupQueue::~ScanTaskTaskGroupQueue() { - delete _empty_group_entity; - delete _empty_scan_task; -} +ScanTaskTaskGroupQueue::ScanTaskTaskGroupQueue(size_t core_size) : _core_size(core_size) {} +ScanTaskTaskGroupQueue::~ScanTaskTaskGroupQueue() = default; void ScanTaskTaskGroupQueue::close() { std::unique_lock lock(_rs_mutex); @@ -86,16 +78,9 @@ bool ScanTaskTaskGroupQueue::take(ScanTask* scan_task) { } } } - if (entity->is_empty_group_entity()) { - *scan_task = *_empty_scan_task; - return true; - } DCHECK(entity->task_size() > 0); if (entity->task_size() == 1) { _dequeue_task_group(entity); - if (_enable_cpu_hard_limit) { - reset_empty_group_entity(); - } } return entity->task_queue()->try_get(scan_task, WAIT_CORE_TASK_TIMEOUT_MS /* timeout_ms */); } @@ -110,9 +95,6 @@ bool ScanTaskTaskGroupQueue::push_back(ScanTask scan_task) { } if (_group_entities.find(entity) == _group_entities.end()) { _enqueue_task_group(entity); - if (_enable_cpu_hard_limit) { - reset_empty_group_entity(); - } } _wait_task.notify_one(); return true; @@ -150,31 +132,6 @@ void ScanTaskTaskGroupQueue::update_tg_cpu_share(const taskgroup::TaskGroupInfo& } } -void ScanTaskTaskGroupQueue::reset_empty_group_entity() { - int user_g_cpu_hard_limit = 0; - bool contains_empty_group = false; - for (auto* entity : _group_entities) { - if (!entity->is_empty_group_entity()) { - user_g_cpu_hard_limit += entity->cpu_share(); - } else { - contains_empty_group = true; - } - } - - // 0 <= user_g_cpu_hard_limit <= 100, bound by FE - // user_g_cpu_hard_limit = 0 means no group exists - int empty_group_cpu_share = 100 - user_g_cpu_hard_limit; - if (empty_group_cpu_share > 0 && empty_group_cpu_share < 100 && !contains_empty_group) { - _empty_group_entity->update_empty_cpu_share(empty_group_cpu_share); - _enqueue_task_group(_empty_group_entity); - } else if ((empty_group_cpu_share == 0 || empty_group_cpu_share == 100) && - contains_empty_group) { - // no need to update empty group here - // only update empty group's cpu share when exec enqueue - _dequeue_task_group(_empty_group_entity); - } -} - void ScanTaskTaskGroupQueue::_enqueue_task_group(TGSTEntityPtr tg_entity) { _total_cpu_share += tg_entity->cpu_share(); // TODO llj tg If submitted back to this queue from the scanner thread, `adjust_vruntime_ns` diff --git a/be/src/vec/exec/scan/scan_task_queue.h b/be/src/vec/exec/scan/scan_task_queue.h index 4afd685c79..c694859e3c 100644 --- a/be/src/vec/exec/scan/scan_task_queue.h +++ b/be/src/vec/exec/scan/scan_task_queue.h @@ -29,7 +29,6 @@ namespace taskgroup { using WorkFunction = std::function; static constexpr auto WAIT_CORE_TASK_TIMEOUT_MS = 100; -static constexpr auto SCAN_THREAD_TIME_SLICE_US = 100000L; // 100ms // Like PriorityThreadPool::Task struct ScanTask { @@ -46,7 +45,6 @@ struct ScanTask { vectorized::ScannerContext* scanner_context; TGSTEntityPtr scan_entity; int priority; - bool is_empty_task = false; }; // Like pipeline::PriorityTaskQueue use BlockingPriorityQueue directly? @@ -75,8 +73,6 @@ public: void update_tg_cpu_share(const taskgroup::TaskGroupInfo&, taskgroup::TGSTEntityPtr); - void reset_empty_group_entity(); - private: TGSTEntityPtr _task_entity(ScanTask& scan_task); void _enqueue_task_group(TGSTEntityPtr); @@ -98,11 +94,6 @@ private: std::atomic _min_tg_entity = nullptr; uint64_t _min_tg_v_runtime_ns = 0; size_t _core_size; - - TaskGroupEntity* _empty_group_entity = new TaskGroupEntity(); - taskgroup::ScanTask* _empty_scan_task = new taskgroup::ScanTask(); - // todo(wb) support auto-switch cpu mode between soft limit and hard limit - bool _enable_cpu_hard_limit = config::enable_cpu_hard_limit; }; } // namespace taskgroup diff --git a/be/src/vec/exec/scan/scanner_scheduler.cpp b/be/src/vec/exec/scan/scanner_scheduler.cpp index 7c0064a1f4..8ebb6405bd 100644 --- a/be/src/vec/exec/scan/scanner_scheduler.cpp +++ b/be/src/vec/exec/scan/scanner_scheduler.cpp @@ -379,8 +379,8 @@ void ScannerScheduler::_scanner_scan(ScannerScheduler* scheduler, ScannerContext bool should_stop = false; // Has to wait at least one full block, or it will cause a lot of schedule task in priority // queue, it will affect query latency and query concurrency for example ssb 3.3. - while (!eos && raw_bytes_read < raw_bytes_threshold && - (raw_rows_read < raw_rows_threshold || num_rows_in_block < state->batch_size())) { + while (!eos && raw_bytes_read < raw_bytes_threshold && raw_rows_read < raw_rows_threshold && + num_rows_in_block < state->batch_size()) { // TODO llj task group should should_yield? if (UNLIKELY(ctx->done())) { // No need to set status on error here. @@ -455,19 +455,9 @@ void ScannerScheduler::_task_group_scanner_scan(ScannerScheduler* scheduler, auto success = scan_queue->take(&scan_task); if (success) { int64_t time_spent = 0; - if (!scan_task.is_empty_task) { - RuntimeProfile::Counter tmp_timer(TUnit::TIME_NS); - { - SCOPED_CPU_TIMER(&tmp_timer); - scan_task.scan_func(); - } - time_spent = tmp_timer.value(); - } else { - { - SCOPED_RAW_TIMER(&time_spent); - usleep(taskgroup::SCAN_THREAD_TIME_SLICE_US); - } - time_spent = time_spent * _core_num / _total_query_thread_num; + { + SCOPED_RAW_TIMER(&time_spent); + scan_task.scan_func(); } scan_queue->update_statistics(scan_task, time_spent); } diff --git a/be/src/vec/exec/scan/scanner_scheduler.h b/be/src/vec/exec/scan/scanner_scheduler.h index 32048458ed..25f79e89aa 100644 --- a/be/src/vec/exec/scan/scanner_scheduler.h +++ b/be/src/vec/exec/scan/scanner_scheduler.h @@ -115,10 +115,6 @@ private: // true is the scheduler is closed. std::atomic_bool _is_closed = {false}; bool _is_init = false; - - int _core_num = CpuInfo::num_cores(); - int _total_query_thread_num = - config::doris_scanner_thread_pool_thread_num + config::pipeline_executor_size; }; struct SimplifiedScanTask {