diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp index 2ea50568fc..a9cb431db4 100644 --- a/be/src/common/config.cpp +++ b/be/src/common/config.cpp @@ -941,7 +941,7 @@ DEFINE_Bool(enable_java_support, "true"); DEFINE_Bool(enable_fuzzy_mode, "false"); DEFINE_Int32(pipeline_executor_size, "0"); -DEFINE_mInt16(pipeline_short_query_timeout_s, "20"); +DEFINE_Bool(enable_workload_group_for_scan, "false"); // Temp config. True to use optimization for bitmap_index apply predicate except leaf node of the and node. // Will remove after fully test. diff --git a/be/src/common/config.h b/be/src/common/config.h index 0cd4e86302..81b5308132 100644 --- a/be/src/common/config.h +++ b/be/src/common/config.h @@ -967,7 +967,7 @@ DECLARE_Bool(enable_java_support); DECLARE_Bool(enable_fuzzy_mode); DECLARE_Int32(pipeline_executor_size); -DECLARE_mInt16(pipeline_short_query_timeout_s); +DECLARE_Bool(enable_workload_group_for_scan); // Temp config. True to use optimization for bitmap_index apply predicate except leaf node of the and node. // Will remove after fully test. diff --git a/be/src/pipeline/pipeline_fragment_context.cpp b/be/src/pipeline/pipeline_fragment_context.cpp index 89231b026a..7c24be8ded 100644 --- a/be/src/pipeline/pipeline_fragment_context.cpp +++ b/be/src/pipeline/pipeline_fragment_context.cpp @@ -125,6 +125,9 @@ PipelineFragmentContext::PipelineFragmentContext( _report_thread_active(false), _report_status_cb(report_status_cb), _is_report_on_cancel(true) { + if (_query_ctx->get_task_group()) { + _task_group_entity = _query_ctx->get_task_group()->task_entity(); + } _report_thread_future = _report_thread_promise.get_future(); _fragment_watcher.start(); } @@ -678,7 +681,7 @@ Status PipelineFragmentContext::submit() { int submit_tasks = 0; Status st; auto* scheduler = _exec_env->pipeline_task_scheduler(); - if (get_task_group()) { + if (_task_group_entity) { scheduler = _exec_env->pipeline_task_group_scheduler(); } for (auto& task : _tasks) { diff --git a/be/src/pipeline/pipeline_fragment_context.h b/be/src/pipeline/pipeline_fragment_context.h index 25c8b8ac51..262794154b 100644 --- a/be/src/pipeline/pipeline_fragment_context.h +++ b/be/src/pipeline/pipeline_fragment_context.h @@ -48,10 +48,6 @@ class RuntimeFilterMergeControllerEntity; class TDataSink; class TPipelineFragmentParams; -namespace taskgroup { -class TaskGroup; -} // namespace taskgroup - namespace pipeline { class PipelineFragmentContext : public std::enable_shared_from_this { @@ -120,7 +116,9 @@ public: return _exec_status; } - taskgroup::TaskGroup* get_task_group() const { return _query_ctx->get_task_group(); } + taskgroup::TaskGroupPipelineTaskEntity* get_task_group_entity() const { + return _task_group_entity; + } private: Status _create_sink(int sender_id, const TDataSink& t_data_sink, RuntimeState* state); @@ -175,6 +173,8 @@ private: std::shared_ptr _query_ctx; + taskgroup::TaskGroupPipelineTaskEntity* _task_group_entity = nullptr; + std::shared_ptr _merge_controller_handler; MonotonicStopWatch _fragment_watcher; diff --git a/be/src/pipeline/pipeline_task.cpp b/be/src/pipeline/pipeline_task.cpp index 9c8c23ec7e..a6dfd238c2 100644 --- a/be/src/pipeline/pipeline_task.cpp +++ b/be/src/pipeline/pipeline_task.cpp @@ -36,13 +36,29 @@ namespace doris { class RuntimeState; -namespace taskgroup { -class TaskGroup; -} // namespace taskgroup } // namespace doris namespace doris::pipeline { +PipelineTask::PipelineTask(PipelinePtr& pipeline, uint32_t index, RuntimeState* state, + Operators& operators, OperatorPtr& sink, + PipelineFragmentContext* fragment_context, + RuntimeProfile* parent_profile) + : _index(index), + _pipeline(pipeline), + _operators(operators), + _source(_operators.front()), + _root(_operators.back()), + _sink(sink), + _prepared(false), + _opened(false), + _can_steal(pipeline->_can_steal), + _state(state), + _cur_state(PipelineTaskState::NOT_READY), + _data_state(SourceState::DEPEND_ON_SOURCE), + _fragment_context(fragment_context), + _parent_profile(parent_profile) {} + void PipelineTask::_fresh_profile_counter() { COUNTER_SET(_wait_source_timer, (int64_t)_wait_source_watcher.elapsed_time()); COUNTER_SET(_schedule_counts, (int64_t)_schedule_time); @@ -208,6 +224,7 @@ Status PipelineTask::execute(bool* eos) { COUNTER_UPDATE(_yield_counts, 1); break; } + // TODO llj: Pipeline entity should_yield SCOPED_RAW_TIMER(&time_spent); _block->clear_column_data(_root->row_desc().num_materialized_slots()); auto* block = _block.get(); @@ -329,7 +346,8 @@ std::string PipelineTask::debug_string() { _task_profile->pretty_print(&profile_ss, ""); fmt::format_to(debug_string_buffer, "Profile: {}\n", profile_ss.str()); } - fmt::format_to(debug_string_buffer, "PipelineTask[id = {}, state = {}]\noperators: ", _index, + fmt::format_to(debug_string_buffer, + "PipelineTask[this = {}, state = {}]\noperators: ", (void*)this, get_state_name(_cur_state)); for (size_t i = 0; i < _operators.size(); i++) { fmt::format_to(debug_string_buffer, "\n{}{}", std::string(i * 2, ' '), @@ -349,8 +367,8 @@ std::string PipelineTask::debug_string() { return fmt::to_string(debug_string_buffer); } -taskgroup::TaskGroup* PipelineTask::get_task_group() const { - return _fragment_context->get_task_group(); +taskgroup::TaskGroupPipelineTaskEntity* PipelineTask::get_task_group_entity() const { + return _fragment_context->get_task_group_entity(); } } // namespace doris::pipeline \ No newline at end of file diff --git a/be/src/pipeline/pipeline_task.h b/be/src/pipeline/pipeline_task.h index 65e9ad83ed..1652ead170 100644 --- a/be/src/pipeline/pipeline_task.h +++ b/be/src/pipeline/pipeline_task.h @@ -26,6 +26,7 @@ #include "common/status.h" #include "exec/operator.h" #include "pipeline.h" +#include "runtime/task_group/task_group.h" #include "util/runtime_profile.h" #include "util/stopwatch.hpp" #include "vec/core/block.h" @@ -36,9 +37,6 @@ class RuntimeState; namespace pipeline { class PipelineFragmentContext; } // namespace pipeline -namespace taskgroup { -class TaskGroup; -} // namespace taskgroup } // namespace doris namespace doris::pipeline { @@ -106,27 +104,14 @@ inline const char* get_state_name(PipelineTaskState idx) { } class TaskQueue; +class PriorityTaskQueue; // The class do the pipeline task. Minest schdule union by task scheduler class PipelineTask { public: PipelineTask(PipelinePtr& pipeline, uint32_t index, RuntimeState* state, Operators& operators, OperatorPtr& sink, PipelineFragmentContext* fragment_context, - RuntimeProfile* parent_profile) - : _index(index), - _pipeline(pipeline), - _operators(operators), - _source(_operators.front()), - _root(_operators.back()), - _sink(sink), - _prepared(false), - _opened(false), - _can_steal(pipeline->_can_steal), - _state(state), - _cur_state(PipelineTaskState::NOT_READY), - _data_state(SourceState::DEPEND_ON_SOURCE), - _fragment_context(fragment_context), - _parent_profile(parent_profile) {} + RuntimeProfile* parent_profile); Status prepare(RuntimeState* state); @@ -188,11 +173,11 @@ public: std::string debug_string(); - taskgroup::TaskGroup* get_task_group() const; + taskgroup::TaskGroupPipelineTaskEntity* get_task_group_entity() const; void set_task_queue(TaskQueue* task_queue); - static constexpr auto THREAD_TIME_SLICE = 100'000'000L; + static constexpr auto THREAD_TIME_SLICE = 100'000'000ULL; // 1 used for update priority queue // note(wb) an ugly implementation, need refactor later diff --git a/be/src/pipeline/task_queue.cpp b/be/src/pipeline/task_queue.cpp index 807a344cf9..d3dc4440bb 100644 --- a/be/src/pipeline/task_queue.cpp +++ b/be/src/pipeline/task_queue.cpp @@ -23,7 +23,6 @@ #include "common/logging.h" #include "pipeline/pipeline_task.h" -#include "runtime/task_group/task_group.h" namespace doris { namespace pipeline { @@ -58,7 +57,7 @@ void PriorityTaskQueue::close() { _wait_task.notify_all(); } -PipelineTask* PriorityTaskQueue::try_take_unprotected(bool is_steal) { +PipelineTask* PriorityTaskQueue::_try_take_unprotected(bool is_steal) { if (_total_task_size == 0 || _closed) { return nullptr; } @@ -97,12 +96,12 @@ int PriorityTaskQueue::_compute_level(uint64_t runtime) { PipelineTask* PriorityTaskQueue::try_take(bool is_steal) { // TODO other efficient lock? e.g. if get lock fail, return null_ptr std::unique_lock lock(_work_size_mutex); - return try_take_unprotected(is_steal); + return _try_take_unprotected(is_steal); } PipelineTask* PriorityTaskQueue::take(uint32_t timeout_ms) { std::unique_lock lock(_work_size_mutex); - auto task = try_take_unprotected(false); + auto task = _try_take_unprotected(false); if (task) { return task; } else { @@ -111,7 +110,7 @@ PipelineTask* PriorityTaskQueue::take(uint32_t timeout_ms) { } else { _wait_task.wait(lock); } - return try_take_unprotected(false); + return _try_take_unprotected(false); } } @@ -134,6 +133,11 @@ Status PriorityTaskQueue::push(PipelineTask* task) { return Status::OK(); } +int PriorityTaskQueue::task_size() { + std::unique_lock lock(_work_size_mutex); + return _total_task_size; +} + MultiCoreTaskQueue::~MultiCoreTaskQueue() = default; MultiCoreTaskQueue::MultiCoreTaskQueue(size_t core_size) : TaskQueue(core_size), _closed(false) { @@ -204,9 +208,9 @@ Status MultiCoreTaskQueue::push_back(PipelineTask* task, size_t core_id) { } bool TaskGroupTaskQueue::TaskGroupSchedEntityComparator::operator()( - const taskgroup::TGEntityPtr& lhs_ptr, const taskgroup::TGEntityPtr& rhs_ptr) const { - int64_t lhs_val = lhs_ptr->vruntime_ns(); - int64_t rhs_val = rhs_ptr->vruntime_ns(); + const taskgroup::TGPTEntityPtr& lhs_ptr, const taskgroup::TGPTEntityPtr& rhs_ptr) const { + auto lhs_val = lhs_ptr->vruntime_ns(); + auto rhs_val = rhs_ptr->vruntime_ns(); if (lhs_val != rhs_val) { return lhs_val < rhs_val; } else { @@ -220,7 +224,8 @@ bool TaskGroupTaskQueue::TaskGroupSchedEntityComparator::operator()( } } -TaskGroupTaskQueue::TaskGroupTaskQueue(size_t core_size) : TaskQueue(core_size) {} +TaskGroupTaskQueue::TaskGroupTaskQueue(size_t core_size) + : TaskQueue(core_size), _min_tg_entity(nullptr) {} TaskGroupTaskQueue::~TaskGroupTaskQueue() = default; @@ -240,9 +245,10 @@ Status TaskGroupTaskQueue::push_back(PipelineTask* task, size_t core_id) { template Status TaskGroupTaskQueue::_push_back(PipelineTask* task) { - auto* entity = task->get_task_group()->task_entity(); + task->put_in_runnable_queue(); + auto* entity = task->get_task_group_entity(); std::unique_lock lock(_rs_mutex); - entity->push_back(task); + entity->task_queue()->emplace(task); if (_group_entities.find(entity) == _group_entities.end()) { _enqueue_task_group(entity); } @@ -253,7 +259,7 @@ Status TaskGroupTaskQueue::_push_back(PipelineTask* task) { // TODO pipeline support steal PipelineTask* TaskGroupTaskQueue::take(size_t core_id) { std::unique_lock lock(_rs_mutex); - taskgroup::TGEntityPtr entity = nullptr; + taskgroup::TGPTEntityPtr entity = nullptr; while (entity == nullptr) { if (_closed) { return nullptr; @@ -271,11 +277,16 @@ PipelineTask* TaskGroupTaskQueue::take(size_t core_id) { if (entity->task_size() == 1) { _dequeue_task_group(entity); } - return entity->take(); + auto task = entity->task_queue()->front(); + if (task) { + entity->task_queue()->pop(); + task->pop_out_runnable_queue(); + } + return task; } template -void TaskGroupTaskQueue::_enqueue_task_group(taskgroup::TGEntityPtr tg_entity) { +void TaskGroupTaskQueue::_enqueue_task_group(taskgroup::TGPTEntityPtr tg_entity) { _total_cpu_share += tg_entity->cpu_share(); if constexpr (!from_worker) { /** @@ -286,7 +297,9 @@ void TaskGroupTaskQueue::_enqueue_task_group(taskgroup::TGEntityPtr tg_entity) { auto old_v_ns = tg_entity->vruntime_ns(); auto* min_entity = _min_tg_entity.load(); if (min_entity) { - int64_t new_vruntime_ns = min_entity->vruntime_ns() - _ideal_runtime_ns(tg_entity) / 2; + auto min_tg_v = min_entity->vruntime_ns(); + auto ideal_r = _ideal_runtime_ns(tg_entity) / 2; + uint64_t new_vruntime_ns = min_tg_v > ideal_r ? min_tg_v - ideal_r : min_tg_v; if (new_vruntime_ns > old_v_ns) { tg_entity->adjust_vruntime_ns(new_vruntime_ns); } @@ -300,7 +313,7 @@ void TaskGroupTaskQueue::_enqueue_task_group(taskgroup::TGEntityPtr tg_entity) { _update_min_tg(); } -void TaskGroupTaskQueue::_dequeue_task_group(taskgroup::TGEntityPtr tg_entity) { +void TaskGroupTaskQueue::_dequeue_task_group(taskgroup::TGPTEntityPtr tg_entity) { _total_cpu_share -= tg_entity->cpu_share(); _group_entities.erase(tg_entity); VLOG_DEBUG << "dequeue tg " << tg_entity->debug_string() @@ -320,12 +333,12 @@ void TaskGroupTaskQueue::_update_min_tg() { } // like sched_fair.c calc_delta_fair, THREAD_TIME_SLICE maybe a dynamic value. -int64_t TaskGroupTaskQueue::_ideal_runtime_ns(taskgroup::TGEntityPtr tg_entity) const { +uint64_t TaskGroupTaskQueue::_ideal_runtime_ns(taskgroup::TGPTEntityPtr tg_entity) const { return PipelineTask::THREAD_TIME_SLICE * _core_size * tg_entity->cpu_share() / _total_cpu_share; } -taskgroup::TGEntityPtr TaskGroupTaskQueue::_next_tg_entity() { - taskgroup::TGEntityPtr res = nullptr; +taskgroup::TGPTEntityPtr TaskGroupTaskQueue::_next_tg_entity() { + taskgroup::TGPTEntityPtr res = nullptr; for (auto* entity : _group_entities) { res = entity; break; @@ -335,8 +348,7 @@ taskgroup::TGEntityPtr TaskGroupTaskQueue::_next_tg_entity() { void TaskGroupTaskQueue::update_statistics(PipelineTask* task, int64_t time_spent) { std::unique_lock lock(_rs_mutex); - auto* group = task->get_task_group(); - auto* entity = group->task_entity(); + auto* entity = task->get_task_group_entity(); auto find_entity = _group_entities.find(entity); bool is_in_queue = find_entity != _group_entities.end(); VLOG_DEBUG << "update_statistics " << entity->debug_string() << ", in queue:" << is_in_queue; @@ -351,15 +363,14 @@ void TaskGroupTaskQueue::update_statistics(PipelineTask* task, int64_t time_spen } void TaskGroupTaskQueue::update_tg_cpu_share(const taskgroup::TaskGroupInfo& task_group_info, - taskgroup::TaskGroupPtr task_group) { + taskgroup::TGPTEntityPtr entity) { std::unique_lock lock(_rs_mutex); - auto* entity = task_group->task_entity(); bool is_in_queue = _group_entities.find(entity) != _group_entities.end(); if (is_in_queue) { _group_entities.erase(entity); _total_cpu_share -= entity->cpu_share(); } - task_group->update_cpu_share_unlock(task_group_info); + entity->check_and_update_cpu_share(task_group_info); if (is_in_queue) { _group_entities.emplace(entity); _total_cpu_share += entity->cpu_share(); diff --git a/be/src/pipeline/task_queue.h b/be/src/pipeline/task_queue.h index 9956ba3cb9..d693cbe216 100644 --- a/be/src/pipeline/task_queue.h +++ b/be/src/pipeline/task_queue.h @@ -33,7 +33,6 @@ #include "runtime/task_group/task_group.h" namespace doris { - namespace pipeline { class TaskQueue { @@ -54,7 +53,7 @@ public: virtual void update_statistics(PipelineTask* task, int64_t time_spent) {} virtual void update_tg_cpu_share(const taskgroup::TaskGroupInfo& task_group_info, - taskgroup::TaskGroupPtr task_group) = 0; + taskgroup::TGPTEntityPtr entity) = 0; int cores() const { return _core_size; } @@ -95,12 +94,10 @@ private: // A Multilevel Feedback Queue class PriorityTaskQueue { public: - explicit PriorityTaskQueue(); + PriorityTaskQueue(); void close(); - PipelineTask* try_take_unprotected(bool is_steal); - PipelineTask* try_take(bool is_steal); PipelineTask* take(uint32_t timeout_ms = 0); @@ -111,7 +108,10 @@ public: _sub_queues[level].inc_runtime(runtime); } + int task_size(); + private: + PipelineTask* _try_take_unprotected(bool is_steal); static constexpr auto LEVEL_QUEUE_TIME_FACTOR = 2; static constexpr size_t SUB_QUEUE_LEVEL = 6; SubTaskQueue _sub_queues[SUB_QUEUE_LEVEL]; @@ -155,7 +155,7 @@ public: } void update_tg_cpu_share(const taskgroup::TaskGroupInfo& task_group_info, - taskgroup::TaskGroupPtr task_group) override { + taskgroup::TGPTEntityPtr entity) override { LOG(FATAL) << "update_tg_cpu_share not implemented"; } @@ -185,29 +185,29 @@ public: void update_statistics(PipelineTask* task, int64_t time_spent) override; void update_tg_cpu_share(const taskgroup::TaskGroupInfo& task_group_info, - taskgroup::TaskGroupPtr task_group) override; + taskgroup::TGPTEntityPtr entity) override; private: template Status _push_back(PipelineTask* task); template - void _enqueue_task_group(taskgroup::TGEntityPtr); - void _dequeue_task_group(taskgroup::TGEntityPtr); - taskgroup::TGEntityPtr _next_tg_entity(); - int64_t _ideal_runtime_ns(taskgroup::TGEntityPtr tg_entity) const; + void _enqueue_task_group(taskgroup::TGPTEntityPtr); + void _dequeue_task_group(taskgroup::TGPTEntityPtr); + taskgroup::TGPTEntityPtr _next_tg_entity(); + uint64_t _ideal_runtime_ns(taskgroup::TGPTEntityPtr tg_entity) const; void _update_min_tg(); // Like cfs rb tree in sched_entity struct TaskGroupSchedEntityComparator { - bool operator()(const taskgroup::TGEntityPtr&, const taskgroup::TGEntityPtr&) const; + bool operator()(const taskgroup::TGPTEntityPtr&, const taskgroup::TGPTEntityPtr&) const; }; - using ResouceGroupSet = std::set; + using ResouceGroupSet = std::set; ResouceGroupSet _group_entities; std::condition_variable _wait_task; std::mutex _rs_mutex; bool _closed = false; int _total_cpu_share = 0; - std::atomic _min_tg_entity = nullptr; + std::atomic _min_tg_entity = nullptr; uint64_t _min_tg_v_runtime_ns = 0; }; diff --git a/be/src/pipeline/task_scheduler.cpp b/be/src/pipeline/task_scheduler.cpp index 06baaaecaa..37833c3c27 100644 --- a/be/src/pipeline/task_scheduler.cpp +++ b/be/src/pipeline/task_scheduler.cpp @@ -112,6 +112,7 @@ void BlockedTaskScheduler::_schedule() { if (state == PipelineTaskState::PENDING_FINISH) { // should cancel or should finish if (task->is_pending_finish()) { + VLOG_DEBUG << "Task pending" << task->debug_string(); iter++; } else { _make_task_run(local_blocked_tasks, iter, ready_tasks, @@ -361,9 +362,4 @@ void TaskScheduler::shutdown() { } } -void TaskScheduler::update_tg_cpu_share(const taskgroup::TaskGroupInfo& task_group_info, - taskgroup::TaskGroupPtr task_group) { - _task_queue->update_tg_cpu_share(task_group_info, task_group); -} - } // namespace doris::pipeline diff --git a/be/src/pipeline/task_scheduler.h b/be/src/pipeline/task_scheduler.h index 1fcbe2c068..f6744f983c 100644 --- a/be/src/pipeline/task_scheduler.h +++ b/be/src/pipeline/task_scheduler.h @@ -91,8 +91,7 @@ public: void shutdown(); - void update_tg_cpu_share(const taskgroup::TaskGroupInfo& task_group_info, - taskgroup::TaskGroupPtr task_group); + TaskQueue* task_queue() const { return _task_queue.get(); } private: std::unique_ptr _fix_thread_pool; diff --git a/be/src/runtime/exec_env.h b/be/src/runtime/exec_env.h index 46e1157820..056f2ca125 100644 --- a/be/src/runtime/exec_env.h +++ b/be/src/runtime/exec_env.h @@ -38,6 +38,9 @@ class ScannerScheduler; namespace pipeline { class TaskScheduler; } +namespace taskgroup { +class TaskGroupManager; +} class BfdParser; class BrokerMgr; template @@ -107,6 +110,7 @@ public: pipeline::TaskScheduler* pipeline_task_group_scheduler() { return _pipeline_task_group_scheduler; } + taskgroup::TaskGroupManager* task_group_manager() { return _task_group_manager; } // using template to simplify client cache management template @@ -233,6 +237,7 @@ private: FragmentMgr* _fragment_mgr = nullptr; pipeline::TaskScheduler* _pipeline_task_scheduler = nullptr; pipeline::TaskScheduler* _pipeline_task_group_scheduler = nullptr; + taskgroup::TaskGroupManager* _task_group_manager = nullptr; ResultCache* _result_cache = nullptr; TMasterInfo* _master_info = nullptr; diff --git a/be/src/runtime/exec_env_init.cpp b/be/src/runtime/exec_env_init.cpp index ecab683831..5d977e1004 100644 --- a/be/src/runtime/exec_env_init.cpp +++ b/be/src/runtime/exec_env_init.cpp @@ -65,6 +65,7 @@ #include "runtime/small_file_mgr.h" #include "runtime/stream_load/new_load_stream_mgr.h" #include "runtime/stream_load/stream_load_executor.h" +#include "runtime/task_group/task_group_manager.h" #include "runtime/thread_context.h" #include "service/point_query_executor.h" #include "util/bfd_parser.h" @@ -147,6 +148,7 @@ Status ExecEnv::_init(const std::vector& store_paths) { .build(&_join_node_thread_pool); RETURN_IF_ERROR(init_pipeline_task_scheduler()); + _task_group_manager = new taskgroup::TaskGroupManager(); _scanner_scheduler = new doris::vectorized::ScannerScheduler(); _fragment_mgr = new FragmentMgr(this); _result_cache = new ResultCache(config::query_cache_max_size_mb, @@ -397,6 +399,7 @@ void ExecEnv::_destroy() { SAFE_DELETE(_load_path_mgr); SAFE_DELETE(_pipeline_task_scheduler); SAFE_DELETE(_pipeline_task_group_scheduler); + SAFE_DELETE(_task_group_manager); SAFE_DELETE(_fragment_mgr); SAFE_DELETE(_broker_client_cache); SAFE_DELETE(_frontend_client_cache); diff --git a/be/src/runtime/fragment_mgr.cpp b/be/src/runtime/fragment_mgr.cpp index 648c9373d1..cea59acdfb 100644 --- a/be/src/runtime/fragment_mgr.cpp +++ b/be/src/runtime/fragment_mgr.cpp @@ -722,7 +722,7 @@ Status FragmentMgr::_get_query_ctx(const Params& params, TUniqueId query_id, boo auto status = taskgroup::TaskGroupInfo::parse_group_info(params.workload_groups[0], &task_group_info); if (status.ok()) { - auto tg = taskgroup::TaskGroupManager::instance()->get_or_create_task_group( + auto tg = _exec_env->task_group_manager()->get_or_create_task_group( task_group_info); tg->add_mem_tracker_limiter(query_ctx->query_mem_tracker); query_ctx->set_task_group(tg); diff --git a/be/src/runtime/task_group/task_group.cpp b/be/src/runtime/task_group/task_group.cpp index 37149ddfd8..7c3d8ff42b 100644 --- a/be/src/runtime/task_group/task_group.cpp +++ b/be/src/runtime/task_group/task_group.cpp @@ -27,11 +27,14 @@ #include #include "common/logging.h" +#include "pipeline/task_queue.h" #include "pipeline/task_scheduler.h" #include "runtime/exec_env.h" #include "runtime/memory/mem_tracker_limiter.h" #include "util/mem_info.h" #include "util/parse_util.h" +#include "vec/exec/scan/scan_task_queue.h" +#include "vec/exec/scan/scanner_scheduler.h" namespace doris { namespace taskgroup { @@ -40,50 +43,75 @@ const static std::string CPU_SHARE = "cpu_share"; const static std::string MEMORY_LIMIT = "memory_limit"; const static std::string ENABLE_MEMORY_OVERCOMMIT = "enable_memory_overcommit"; -pipeline::PipelineTask* TaskGroupEntity::take() { - if (_queue.empty()) { - return nullptr; - } - auto task = _queue.front(); - _queue.pop(); - return task; +template +TaskGroupEntity::TaskGroupEntity(taskgroup::TaskGroup* tg, std::string type) + : _tg(tg), _type(type), _version(tg->version()), _cpu_share(tg->cpu_share()) { + _task_queue = new QueueType(); } -void TaskGroupEntity::incr_runtime_ns(uint64_t runtime_ns) { - auto v_time = runtime_ns / _tg->cpu_share(); +template +TaskGroupEntity::~TaskGroupEntity() { + delete _task_queue; +} + +template +QueueType* TaskGroupEntity::task_queue() { + return _task_queue; +} + +template +void TaskGroupEntity::incr_runtime_ns(uint64_t runtime_ns) { + auto v_time = runtime_ns / _cpu_share; _vruntime_ns += v_time; } -void TaskGroupEntity::adjust_vruntime_ns(uint64_t vruntime_ns) { +template +void TaskGroupEntity::adjust_vruntime_ns(uint64_t vruntime_ns) { VLOG_DEBUG << "adjust " << debug_string() << "vtime to " << vruntime_ns; _vruntime_ns = vruntime_ns; } -void TaskGroupEntity::push_back(pipeline::PipelineTask* task) { - _queue.emplace(task); +template +size_t TaskGroupEntity::task_size() const { + return _task_queue->size(); } -uint64_t TaskGroupEntity::cpu_share() const { - return _tg->cpu_share(); +template +uint64_t TaskGroupEntity::cpu_share() const { + return _cpu_share; } -uint64_t TaskGroupEntity::task_group_id() const { +template +uint64_t TaskGroupEntity::task_group_id() const { return _tg->id(); } -std::string TaskGroupEntity::debug_string() const { - return fmt::format("TGE[id = {}, cpu_share = {}, task size: {}, v_time:{}ns]", _tg->id(), - cpu_share(), _queue.size(), _vruntime_ns); +template +void TaskGroupEntity::check_and_update_cpu_share(const TaskGroupInfo& tg_info) { + if (tg_info.version > _version) { + _cpu_share = tg_info.cpu_share; + _version = tg_info.version; + } } +template +std::string TaskGroupEntity::debug_string() const { + return fmt::format("TGE[id = {}, name = {}-{}, cpu_share = {}, task size: {}, v_time:{} ns]", + _tg->id(), _tg->name(), _type, cpu_share(), task_size(), _vruntime_ns); +} + +template class TaskGroupEntity>; +template class TaskGroupEntity; + TaskGroup::TaskGroup(const TaskGroupInfo& tg_info) : _id(tg_info.id), _name(tg_info.name), - _cpu_share(tg_info.cpu_share), + _version(tg_info.version), _memory_limit(tg_info.memory_limit), _enable_memory_overcommit(tg_info.enable_memory_overcommit), - _version(tg_info.version), - _task_entity(this), + _cpu_share(tg_info.cpu_share), + _task_entity(this, "pipeline task entity"), + _local_scan_entity(this, "local scan entity"), _mem_tracker_limiter_pool(MEM_TRACKER_GROUP_NUM) {} std::string TaskGroup::debug_string() const { @@ -105,22 +133,22 @@ void TaskGroup::check_and_update(const TaskGroupInfo& tg_info) { return; } } - - std::lock_guard wl {_mutex}; - if (tg_info.version > _version) { - _name = tg_info.name; - _version = tg_info.version; - _memory_limit = tg_info.memory_limit; - _enable_memory_overcommit = tg_info.enable_memory_overcommit; - if (_cpu_share != tg_info.cpu_share) { - ExecEnv::GetInstance()->pipeline_task_group_scheduler()->update_tg_cpu_share( - tg_info, shared_from_this()); + { + std::lock_guard wl {_mutex}; + if (tg_info.version > _version) { + _name = tg_info.name; + _version = tg_info.version; + _memory_limit = tg_info.memory_limit; + _enable_memory_overcommit = tg_info.enable_memory_overcommit; + _cpu_share = tg_info.cpu_share; + } else { + return; } } -} - -void TaskGroup::update_cpu_share_unlock(const TaskGroupInfo& tg_info) { - _cpu_share = tg_info.cpu_share; + ExecEnv::GetInstance()->pipeline_task_group_scheduler()->task_queue()->update_tg_cpu_share( + tg_info, &_task_entity); + ExecEnv::GetInstance()->scanner_scheduler()->local_scan_task_queue()->update_tg_cpu_share( + tg_info, &_local_scan_entity); } int64_t TaskGroup::memory_used() { diff --git a/be/src/runtime/task_group/task_group.h b/be/src/runtime/task_group/task_group.h index 876f981cba..3a8672ef59 100644 --- a/be/src/runtime/task_group/task_group.h +++ b/be/src/runtime/task_group/task_group.h @@ -31,31 +31,34 @@ namespace doris { -namespace pipeline { -class PipelineTask; -} - class TPipelineWorkloadGroup; class MemTrackerLimiter; +namespace pipeline { +class PipelineTask; +} // namespace pipeline + namespace taskgroup { class TaskGroup; struct TaskGroupInfo; +class ScanTaskQueue; +template class TaskGroupEntity { public: - explicit TaskGroupEntity(taskgroup::TaskGroup* ts) : _tg(ts) {} - void push_back(pipeline::PipelineTask* task); + explicit TaskGroupEntity(taskgroup::TaskGroup* tg, std::string type); + ~TaskGroupEntity(); + uint64_t vruntime_ns() const { return _vruntime_ns; } - pipeline::PipelineTask* take(); + QueueType* task_queue(); void incr_runtime_ns(uint64_t runtime_ns); void adjust_vruntime_ns(uint64_t vruntime_ns); - size_t task_size() const { return _queue.size(); } + size_t task_size() const; uint64_t cpu_share() const; @@ -63,14 +66,29 @@ public: uint64_t task_group_id() const; + void check_and_update_cpu_share(const TaskGroupInfo& tg_info); + private: - // TODO pipeline use MLFQ - std::queue _queue; - taskgroup::TaskGroup* _tg; + QueueType* _task_queue; + uint64_t _vruntime_ns = 0; + taskgroup::TaskGroup* _tg; + + std::string _type; + + // Because updating cpu share of entity requires locking the task queue(pipeline task queue or + // scan task queue) contains that entity, we kept version and cpu share in entity for + // independent updates. + int64_t _version; + uint64_t _cpu_share; }; -using TGEntityPtr = TaskGroupEntity*; +// TODO llj tg use PriorityTaskQueue to replace std::queue +using TaskGroupPipelineTaskEntity = TaskGroupEntity>; +using TGPTEntityPtr = TaskGroupPipelineTaskEntity*; + +using TaskGroupScanTaskEntity = TaskGroupEntity; +using TGSTEntityPtr = TaskGroupScanTaskEntity*; struct TgTrackerLimiterGroup { std::unordered_set> trackers; @@ -81,12 +99,17 @@ class TaskGroup : public std::enable_shared_from_this { public: explicit TaskGroup(const TaskGroupInfo& tg_info); - TaskGroupEntity* task_entity() { return &_task_entity; } + TaskGroupPipelineTaskEntity* task_entity() { return &_task_entity; } + TGSTEntityPtr local_scan_task_entity() { return &_local_scan_entity; } + + int64_t version() const { return _version; } uint64_t cpu_share() const { return _cpu_share.load(); } uint64_t id() const { return _id; } + std::string name() const { return _name; }; + bool enable_memory_overcommit() const { std::shared_lock r_lock(_mutex); return _enable_memory_overcommit; @@ -103,8 +126,6 @@ public: void check_and_update(const TaskGroupInfo& tg_info); - void update_cpu_share_unlock(const TaskGroupInfo& tg_info); - void add_mem_tracker_limiter(std::shared_ptr mem_tracker_ptr); void remove_mem_tracker_limiter(std::shared_ptr mem_tracker_ptr); @@ -119,12 +140,12 @@ private: mutable std::shared_mutex _mutex; // lock _name, _version, _cpu_share, _memory_limit const uint64_t _id; std::string _name; - std::atomic _cpu_share; + int64_t _version; int64_t _memory_limit; // bytes bool _enable_memory_overcommit; - int64_t _version; - TaskGroupEntity _task_entity; - + std::atomic _cpu_share; + TaskGroupPipelineTaskEntity _task_entity; + TaskGroupScanTaskEntity _local_scan_entity; std::vector _mem_tracker_limiter_pool; }; diff --git a/be/src/runtime/task_group/task_group_manager.cpp b/be/src/runtime/task_group/task_group_manager.cpp index 552ab2c0a9..6ce6d31604 100644 --- a/be/src/runtime/task_group/task_group_manager.cpp +++ b/be/src/runtime/task_group/task_group_manager.cpp @@ -20,19 +20,17 @@ #include #include +#include "pipeline/task_scheduler.h" +#include "runtime/exec_env.h" #include "runtime/memory/mem_tracker_limiter.h" #include "runtime/task_group/task_group.h" +#include "vec/exec/scan/scanner_scheduler.h" namespace doris::taskgroup { TaskGroupManager::TaskGroupManager() = default; TaskGroupManager::~TaskGroupManager() = default; -TaskGroupManager* TaskGroupManager::instance() { - static TaskGroupManager tgm; - return &tgm; -} - TaskGroupPtr TaskGroupManager::get_or_create_task_group(const TaskGroupInfo& task_group_info) { { std::shared_lock r_lock(_group_mutex); diff --git a/be/src/runtime/task_group/task_group_manager.h b/be/src/runtime/task_group/task_group_manager.h index 0b7472438c..375208dc6e 100644 --- a/be/src/runtime/task_group/task_group_manager.h +++ b/be/src/runtime/task_group/task_group_manager.h @@ -23,13 +23,14 @@ #include "task_group.h" -namespace doris::taskgroup { +namespace doris { +class ExecEnv; +namespace taskgroup { class TaskGroupManager { public: TaskGroupManager(); ~TaskGroupManager(); - static TaskGroupManager* instance(); TaskGroupPtr get_or_create_task_group(const TaskGroupInfo& task_group_info); @@ -41,4 +42,5 @@ private: std::unordered_map _task_groups; }; -} // namespace doris::taskgroup +} // namespace taskgroup +} // namespace doris diff --git a/be/src/util/mem_info.cpp b/be/src/util/mem_info.cpp index 8d81089323..3acdc48ff0 100644 --- a/be/src/util/mem_info.cpp +++ b/be/src/util/mem_info.cpp @@ -240,7 +240,8 @@ bool MemInfo::process_full_gc() { int64_t MemInfo::tg_hard_memory_limit_gc() { std::vector task_groups; - taskgroup::TaskGroupManager::instance()->get_resource_groups( + + ExecEnv::GetInstance()->task_group_manager()->get_resource_groups( [](const taskgroup::TaskGroupPtr& task_group) { return !task_group->enable_memory_overcommit(); }, @@ -260,7 +261,7 @@ int64_t MemInfo::tg_hard_memory_limit_gc() { int64_t MemInfo::tg_soft_memory_limit_gc(int64_t request_free_memory) { std::vector task_groups; - taskgroup::TaskGroupManager::instance()->get_resource_groups( + ExecEnv::GetInstance()->task_group_manager()->get_resource_groups( [](const taskgroup::TaskGroupPtr& task_group) { return task_group->enable_memory_overcommit(); }, diff --git a/be/src/vec/exec/scan/scan_task_queue.cpp b/be/src/vec/exec/scan/scan_task_queue.cpp new file mode 100644 index 0000000000..538f77211c --- /dev/null +++ b/be/src/vec/exec/scan/scan_task_queue.cpp @@ -0,0 +1,213 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "scan_task_queue.h" + +#include "pipeline/pipeline_task.h" +#include "runtime/task_group/task_group.h" +#include "vec/exec/scan/scanner_context.h" + +namespace doris { +namespace taskgroup { +static void empty_function() {} +ScanTask::ScanTask() : ScanTask(empty_function, nullptr, 1) {} + +ScanTask::ScanTask(WorkFunction scan_func, vectorized::ScannerContext* scanner_context, + int priority) + : scan_func(std::move(scan_func)), scanner_context(scanner_context), priority(priority) {} + +ScanTaskQueue::ScanTaskQueue() : _queue(config::doris_scanner_thread_pool_queue_size) {} + +Status ScanTaskQueue::try_push_back(ScanTask scan_task) { + if (_queue.try_put(std::move(scan_task))) { + VLOG_DEBUG << "try_push_back scan task " << scan_task.scanner_context->ctx_id << " " + << scan_task.priority; + return Status::OK(); + } else { + return Status::InternalError("failed to submit scan task to ScanTaskQueue"); + } +} +bool ScanTaskQueue::try_get(ScanTask* scan_task, uint32_t timeout_ms) { + auto r = _queue.blocking_get(scan_task, timeout_ms); + if (r) { + VLOG_DEBUG << "try get scan task " << scan_task->scanner_context->ctx_id << " " + << scan_task->priority; + } + return r; +} + +ScanTaskTaskGroupQueue::ScanTaskTaskGroupQueue(size_t core_size) : _core_size(core_size) {} +ScanTaskTaskGroupQueue::~ScanTaskTaskGroupQueue() = default; + +void ScanTaskTaskGroupQueue::close() { + std::unique_lock lock(_rs_mutex); + _closed = true; + _wait_task.notify_all(); +} + +bool ScanTaskTaskGroupQueue::take(ScanTask* scan_task) { + std::unique_lock lock(_rs_mutex); + taskgroup::TGSTEntityPtr entity = nullptr; + while (entity == nullptr) { + if (_closed) { + return false; + } + if (_group_entities.empty()) { + _wait_task.wait_for(lock, std::chrono::milliseconds(WAIT_CORE_TASK_TIMEOUT_MS * 5)); + } else { + entity = _next_tg_entity(); + if (!entity) { + _wait_task.wait_for(lock, std::chrono::milliseconds(WAIT_CORE_TASK_TIMEOUT_MS)); + } + } + } + DCHECK(entity->task_size() > 0); + if (entity->task_size() == 1) { + _dequeue_task_group(entity); + } + return entity->task_queue()->try_get(scan_task, WAIT_CORE_TASK_TIMEOUT_MS /* timeout_ms */); +} + +bool ScanTaskTaskGroupQueue::push_back(ScanTask scan_task) { + auto* entity = scan_task.scanner_context->get_task_group()->local_scan_task_entity(); + std::unique_lock lock(_rs_mutex); + auto status = entity->task_queue()->try_push_back(scan_task); + if (!status.ok()) { + LOG(WARNING) << "try_push_back scan task fail: " << status; + return false; + } + if (_group_entities.find(entity) == _group_entities.end()) { + _enqueue_task_group(entity); + } + _wait_task.notify_one(); + return true; +} + +void ScanTaskTaskGroupQueue::update_statistics(ScanTask scan_task, int64_t time_spent) { + auto* entity = scan_task.scanner_context->get_task_group()->local_scan_task_entity(); + std::unique_lock lock(_rs_mutex); + auto find_entity = _group_entities.find(entity); + bool is_in_queue = find_entity != _group_entities.end(); + VLOG_DEBUG << "scan task task group queue update_statistics " << entity->debug_string() + << ", in queue:" << is_in_queue << ", time_spent: " << time_spent; + if (is_in_queue) { + _group_entities.erase(entity); + } + entity->incr_runtime_ns(time_spent); + if (is_in_queue) { + _group_entities.emplace(entity); + _update_min_tg(); + } +} + +void ScanTaskTaskGroupQueue::update_tg_cpu_share(const taskgroup::TaskGroupInfo& task_group_info, + taskgroup::TGSTEntityPtr entity) { + std::unique_lock lock(_rs_mutex); + bool is_in_queue = _group_entities.find(entity) != _group_entities.end(); + if (is_in_queue) { + _group_entities.erase(entity); + _total_cpu_share -= entity->cpu_share(); + } + entity->check_and_update_cpu_share(task_group_info); + if (is_in_queue) { + _group_entities.emplace(entity); + _total_cpu_share += entity->cpu_share(); + } +} + +void ScanTaskTaskGroupQueue::_enqueue_task_group(TGSTEntityPtr tg_entity) { + _total_cpu_share += tg_entity->cpu_share(); + // TODO llj tg If submitted back to this queue from the scanner thread, `adjust_vruntime_ns` + // should be avoided. + /** + * If a task group entity leaves task queue for a long time, its v runtime will be very + * small. This can cause it to preempt too many execution time. So, in order to avoid this + * situation, it is necessary to adjust the task group's v runtime. + * */ + auto old_v_ns = tg_entity->vruntime_ns(); + auto* min_entity = _min_tg_entity.load(); + if (min_entity) { + auto min_tg_v = min_entity->vruntime_ns(); + auto ideal_r = _ideal_runtime_ns(tg_entity) / 2; + uint64_t new_vruntime_ns = min_tg_v > ideal_r ? min_tg_v - ideal_r : min_tg_v; + if (new_vruntime_ns > old_v_ns) { + VLOG_DEBUG << tg_entity->debug_string() << ", adjust to new " << new_vruntime_ns; + tg_entity->adjust_vruntime_ns(new_vruntime_ns); + } + } else if (old_v_ns < _min_tg_v_runtime_ns) { + VLOG_DEBUG << tg_entity->debug_string() << ", adjust to " << _min_tg_v_runtime_ns; + tg_entity->adjust_vruntime_ns(_min_tg_v_runtime_ns); + } + _group_entities.emplace(tg_entity); + VLOG_DEBUG << "scan enqueue tg " << tg_entity->debug_string() + << ", group entity size: " << _group_entities.size(); + _update_min_tg(); +} + +void ScanTaskTaskGroupQueue::_dequeue_task_group(TGSTEntityPtr tg_entity) { + _total_cpu_share -= tg_entity->cpu_share(); + _group_entities.erase(tg_entity); + VLOG_DEBUG << "scan task group queue dequeue tg " << tg_entity->debug_string() + << ", group entity size: " << _group_entities.size(); + _update_min_tg(); +} + +TGSTEntityPtr ScanTaskTaskGroupQueue::_next_tg_entity() { + taskgroup::TGSTEntityPtr res = nullptr; + for (auto* entity : _group_entities) { + res = entity; + break; + } + return res; +} + +uint64_t ScanTaskTaskGroupQueue::_ideal_runtime_ns(TGSTEntityPtr tg_entity) const { + // Scan task does not have time slice, so we use pipeline task's instead. + return pipeline::PipelineTask::THREAD_TIME_SLICE * _core_size * tg_entity->cpu_share() / + _total_cpu_share; +} + +void ScanTaskTaskGroupQueue::_update_min_tg() { + auto* min_entity = _next_tg_entity(); + _min_tg_entity = min_entity; + if (min_entity) { + auto min_v_runtime = min_entity->vruntime_ns(); + if (min_v_runtime > _min_tg_v_runtime_ns) { + _min_tg_v_runtime_ns = min_v_runtime; + } + } +} + +bool ScanTaskTaskGroupQueue::TaskGroupSchedEntityComparator::operator()( + const taskgroup::TGSTEntityPtr& lhs_ptr, const taskgroup::TGSTEntityPtr& rhs_ptr) const { + auto lhs_val = lhs_ptr->vruntime_ns(); + auto rhs_val = rhs_ptr->vruntime_ns(); + if (lhs_val != rhs_val) { + return lhs_val < rhs_val; + } else { + auto l_share = lhs_ptr->cpu_share(); + auto r_share = rhs_ptr->cpu_share(); + if (l_share != r_share) { + return l_share < r_share; + } else { + return lhs_ptr->task_group_id() < rhs_ptr->task_group_id(); + } + } +} + +} // namespace taskgroup +} // namespace doris \ No newline at end of file diff --git a/be/src/vec/exec/scan/scan_task_queue.h b/be/src/vec/exec/scan/scan_task_queue.h new file mode 100644 index 0000000000..f3c3b792a4 --- /dev/null +++ b/be/src/vec/exec/scan/scan_task_queue.h @@ -0,0 +1,98 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +#pragma once + +#include "olap/tablet.h" +#include "runtime/task_group/task_group.h" +#include "util/blocking_priority_queue.hpp" + +namespace doris { +namespace vectorized { +class ScannerContext; +}; + +namespace taskgroup { + +using WorkFunction = std::function; +static constexpr auto WAIT_CORE_TASK_TIMEOUT_MS = 100; + +// Like PriorityThreadPool::Task +struct ScanTask { + ScanTask(); + ScanTask(WorkFunction scan_func, vectorized::ScannerContext* scanner_context, int priority); + bool operator<(const ScanTask& o) const { return priority < o.priority; } + ScanTask& operator++() { + priority += 2; + return *this; + } + + WorkFunction scan_func; + vectorized::ScannerContext* scanner_context; + int priority; +}; + +// Like pipeline::PriorityTaskQueue use BlockingPriorityQueue directly? +class ScanTaskQueue { +public: + ScanTaskQueue(); + Status try_push_back(ScanTask); + bool try_get(ScanTask* scan_task, uint32_t timeout_ms); + int size() { return _queue.get_size(); } + +private: + BlockingPriorityQueue _queue; +}; + +// Like TaskGroupTaskQueue +class ScanTaskTaskGroupQueue { +public: + explicit ScanTaskTaskGroupQueue(size_t core_size); + ~ScanTaskTaskGroupQueue(); + + void close(); + bool take(ScanTask* scan_task); + bool push_back(ScanTask); + + void update_statistics(ScanTask task, int64_t time_spent); + + void update_tg_cpu_share(const taskgroup::TaskGroupInfo&, taskgroup::TGSTEntityPtr); + +private: + TGSTEntityPtr _task_entity(ScanTask& scan_task); + void _enqueue_task_group(TGSTEntityPtr); + void _dequeue_task_group(TGSTEntityPtr); + TGSTEntityPtr _next_tg_entity(); + uint64_t _ideal_runtime_ns(TGSTEntityPtr tg_entity) const; + void _update_min_tg(); + + // Like cfs rb tree in sched_entity + struct TaskGroupSchedEntityComparator { + bool operator()(const taskgroup::TGSTEntityPtr&, const taskgroup::TGSTEntityPtr&) const; + }; + using ResouceGroupSet = std::set; + ResouceGroupSet _group_entities; + std::condition_variable _wait_task; + std::mutex _rs_mutex; + bool _closed = false; + int _total_cpu_share = 0; + std::atomic _min_tg_entity = nullptr; + uint64_t _min_tg_v_runtime_ns = 0; + size_t _core_size; +}; + +} // namespace taskgroup +} // namespace doris diff --git a/be/src/vec/exec/scan/scanner_context.cpp b/be/src/vec/exec/scan/scanner_context.cpp index 6cdd33c551..fed8215ad2 100644 --- a/be/src/vec/exec/scan/scanner_context.cpp +++ b/be/src/vec/exec/scan/scanner_context.cpp @@ -247,8 +247,10 @@ Status ScannerContext::_close_and_clear_scanners(VScanNode* node, RuntimeState* if (state->enable_profile()) { std::stringstream scanner_statistics; std::stringstream scanner_rows_read; + std::stringstream scanner_wait_worker_time; scanner_statistics << "["; scanner_rows_read << "["; + scanner_wait_worker_time << "["; for (auto finished_scanner_time : _finished_scanner_runtime) { scanner_statistics << PrettyPrinter::print(finished_scanner_time, TUnit::TIME_NS) << ", "; @@ -256,6 +258,10 @@ Status ScannerContext::_close_and_clear_scanners(VScanNode* node, RuntimeState* for (auto finished_scanner_rows : _finished_scanner_rows_read) { scanner_rows_read << PrettyPrinter::print(finished_scanner_rows, TUnit::UNIT) << ", "; } + for (auto finished_scanner_wait_time : _finished_scanner_wait_worker_time) { + scanner_wait_worker_time + << PrettyPrinter::print(finished_scanner_wait_time, TUnit::TIME_NS) << ", "; + } // Only unfinished scanners here for (auto& scanner : _scanners) { // Scanners are in ObjPool in ScanNode, @@ -265,11 +271,18 @@ Status ScannerContext::_close_and_clear_scanners(VScanNode* node, RuntimeState* << ", "; scanner_rows_read << PrettyPrinter::print(scanner->get_rows_read(), TUnit::UNIT) << ", "; + scanner_wait_worker_time + << PrettyPrinter::print(scanner->get_scanner_wait_worker_timer(), + TUnit::TIME_NS) + << ", "; } scanner_statistics << "]"; scanner_rows_read << "]"; + scanner_wait_worker_time << "]"; node->_scanner_profile->add_info_string("PerScannerRunningTime", scanner_statistics.str()); node->_scanner_profile->add_info_string("PerScannerRowsRead", scanner_rows_read.str()); + node->_scanner_profile->add_info_string("PerScannerWaitTime", + scanner_wait_worker_time.str()); } // Only unfinished scanners here for (auto& scanner : _scanners) { @@ -380,6 +393,8 @@ void ScannerContext::get_next_batch_of_scanners(std::list* current if (scanner->need_to_close()) { _finished_scanner_runtime.push_back(scanner->get_time_cost_ns()); _finished_scanner_rows_read.push_back(scanner->get_rows_read()); + _finished_scanner_wait_worker_time.push_back( + scanner->get_scanner_wait_worker_timer()); scanner->close(_state); } else { current_run->push_back(scanner); @@ -389,4 +404,8 @@ void ScannerContext::get_next_batch_of_scanners(std::list* current } } +taskgroup::TaskGroup* ScannerContext::get_task_group() const { + return _state->get_query_ctx()->get_task_group(); +} + } // namespace doris::vectorized diff --git a/be/src/vec/exec/scan/scanner_context.h b/be/src/vec/exec/scan/scanner_context.h index 84d8839bfe..5986384edd 100644 --- a/be/src/vec/exec/scan/scanner_context.h +++ b/be/src/vec/exec/scan/scanner_context.h @@ -40,6 +40,10 @@ class ThreadPoolToken; class RuntimeState; class TupleDescriptor; +namespace taskgroup { +class TaskGroup; +} // namespace taskgroup + namespace vectorized { class VScanner; @@ -149,6 +153,7 @@ public: } return thread_slot_num; } + taskgroup::TaskGroup* get_task_group() const; void reschedule_scanner_ctx(); @@ -248,6 +253,7 @@ protected: std::list _scanners; std::vector _finished_scanner_runtime; std::vector _finished_scanner_rows_read; + std::vector _finished_scanner_wait_worker_time; const int _num_parallel_instances; diff --git a/be/src/vec/exec/scan/scanner_scheduler.cpp b/be/src/vec/exec/scan/scanner_scheduler.cpp index 91bed3ee2e..be9e4277c3 100644 --- a/be/src/vec/exec/scan/scanner_scheduler.cpp +++ b/be/src/vec/exec/scan/scanner_scheduler.cpp @@ -36,10 +36,13 @@ #include "runtime/exec_env.h" #include "runtime/runtime_state.h" #include "runtime/thread_context.h" +#include "scan_task_queue.h" #include "util/async_io.h" // IWYU pragma: keep #include "util/blocking_queue.hpp" +#include "util/defer_op.h" #include "util/priority_thread_pool.hpp" #include "util/priority_work_stealing_thread_pool.hpp" +#include "util/runtime_profile.h" #include "util/thread.h" #include "util/threadpool.h" #include "vec/core/block.h" @@ -67,6 +70,7 @@ ScannerScheduler::~ScannerScheduler() { _local_scan_thread_pool->shutdown(); _remote_scan_thread_pool->shutdown(); _limited_scan_thread_pool->shutdown(); + _group_local_scan_thread_pool->shutdown(); _scheduler_pool->wait(); _local_scan_thread_pool->join(); @@ -75,6 +79,9 @@ ScannerScheduler::~ScannerScheduler() { delete _pending_queues[i]; } delete[] _pending_queues; + + _task_group_local_scan_queue->close(); + _group_local_scan_thread_pool->wait(); } Status ScannerScheduler::init(ExecEnv* env) { @@ -109,6 +116,19 @@ Status ScannerScheduler::init(ExecEnv* env) { .set_max_queue_size(config::doris_scanner_thread_pool_queue_size) .build(&_limited_scan_thread_pool); + // 5. task group local scan + _task_group_local_scan_queue = std::make_unique( + config::doris_scanner_thread_pool_thread_num); + ThreadPoolBuilder("local_scan_group") + .set_min_threads(config::doris_scanner_thread_pool_thread_num) + .set_max_threads(config::doris_scanner_thread_pool_thread_num) + .build(&_group_local_scan_thread_pool); + for (int i = 0; i < config::doris_scanner_thread_pool_thread_num; i++) { + _group_local_scan_thread_pool->submit_func([this] { + this->_task_group_scanner_scan(this, _task_group_local_scan_queue.get()); + }); + } + _is_init = true; return Status::OK(); } @@ -183,6 +203,7 @@ void ScannerScheduler::_schedule_scanners(ScannerContext* ctx) { auto submit_to_thread_pool = [&] { ctx->incr_num_scanner_scheduling(this_run.size()); if (ctx->thread_token != nullptr) { + // TODO llj tg how to treat this? while (iter != this_run.end()) { (*iter)->start_wait_worker_timer(); auto s = ctx->thread_token->submit_func( @@ -200,13 +221,21 @@ void ScannerScheduler::_schedule_scanners(ScannerContext* ctx) { TabletStorageType type = (*iter)->get_storage_type(); bool ret = false; if (type == TabletStorageType::STORAGE_TYPE_LOCAL) { - PriorityThreadPool::Task task; - task.work_function = [this, scanner = *iter, ctx] { - this->_scanner_scan(this, ctx, scanner); - }; - task.priority = nice; - task.queue_id = (*iter)->queue_id(); - ret = _local_scan_thread_pool->offer(task); + if (ctx->get_task_group() && config::enable_workload_group_for_scan) { + auto work_func = [this, scanner = *iter, ctx] { + this->_scanner_scan(this, ctx, scanner); + }; + taskgroup::ScanTask scan_task = {work_func, ctx, nice}; + ret = _task_group_local_scan_queue->push_back(scan_task); + } else { + PriorityThreadPool::Task task; + task.work_function = [this, scanner = *iter, ctx] { + this->_scanner_scan(this, ctx, scanner); + }; + task.priority = nice; + task.queue_id = (*iter)->queue_id(); + ret = _local_scan_thread_pool->offer(task); + } } else { ret = _remote_scan_thread_pool->submit_func([this, scanner = *iter, ctx] { this->_scanner_scan(this, ctx, scanner); @@ -321,6 +350,7 @@ void ScannerScheduler::_scanner_scan(ScannerScheduler* scheduler, ScannerContext while (!eos && raw_bytes_read < raw_bytes_threshold && ((raw_rows_read < raw_rows_threshold && has_free_block) || num_rows_in_block < state->batch_size())) { + // TODO llj task group should should_yield? if (UNLIKELY(ctx->done())) { // No need to set status on error here. // Because done() maybe caused by "should_stop" @@ -388,4 +418,20 @@ void ScannerScheduler::_scanner_scan(ScannerScheduler* scheduler, ScannerContext ctx->push_back_scanner_and_reschedule(scanner); } +void ScannerScheduler::_task_group_scanner_scan(ScannerScheduler* scheduler, + taskgroup::ScanTaskTaskGroupQueue* scan_queue) { + while (!_is_closed) { + taskgroup::ScanTask scan_task; + auto success = scan_queue->take(&scan_task); + if (success) { + int64_t time_spent = 0; + { + SCOPED_RAW_TIMER(&time_spent); + scan_task.scan_func(); + } + scan_queue->update_statistics(scan_task, time_spent); + } + } +} + } // namespace doris::vectorized diff --git a/be/src/vec/exec/scan/scanner_scheduler.h b/be/src/vec/exec/scan/scanner_scheduler.h index d8f51d4ad0..d88c844a3e 100644 --- a/be/src/vec/exec/scan/scanner_scheduler.h +++ b/be/src/vec/exec/scan/scanner_scheduler.h @@ -21,6 +21,7 @@ #include #include "common/status.h" +#include "scan_task_queue.h" #include "util/threadpool.h" #include "vec/exec/scan/vscanner.h" @@ -31,6 +32,9 @@ class PriorityThreadPool; namespace vectorized { class VScanner; } // namespace vectorized +namespace taskgroup { +class ScanTaskTaskGroupQueue; +} template class BlockingQueue; } // namespace doris @@ -67,6 +71,9 @@ public: std::unique_ptr new_limited_scan_pool_token(ThreadPool::ExecutionMode mode, int max_concurrency); + taskgroup::ScanTaskTaskGroupQueue* local_scan_task_queue() { + return _task_group_local_scan_queue.get(); + } private: // scheduling thread function @@ -76,6 +83,9 @@ private: // execution thread function void _scanner_scan(ScannerScheduler* scheduler, ScannerContext* ctx, VScannerSPtr scanner); + void _task_group_scanner_scan(ScannerScheduler* scheduler, + taskgroup::ScanTaskTaskGroupQueue* scan_queue); + private: // Scheduling queue number. // TODO: make it configurable. @@ -98,6 +108,9 @@ private: std::unique_ptr _remote_scan_thread_pool; std::unique_ptr _limited_scan_thread_pool; + std::unique_ptr _task_group_local_scan_queue; + std::unique_ptr _group_local_scan_thread_pool; + // true is the scheduler is closed. std::atomic_bool _is_closed = {false}; bool _is_init = false; diff --git a/be/src/vec/exec/scan/vscanner.h b/be/src/vec/exec/scan/vscanner.h index 7103449940..f8e0342e9b 100644 --- a/be/src/vec/exec/scan/vscanner.h +++ b/be/src/vec/exec/scan/vscanner.h @@ -108,6 +108,8 @@ public: void update_wait_worker_timer() { _scanner_wait_worker_timer += _watch.elapsed_time(); } + int64_t get_scanner_wait_worker_timer() { return _scanner_wait_worker_timer; } + void update_scan_cpu_timer() { _scan_cpu_timer += _cpu_watch.elapsed_time(); } RuntimeState* runtime_state() { return _state; } diff --git a/docs/en/docs/admin-manual/query-profile.md b/docs/en/docs/admin-manual/query-profile.md index ac5f631e01..9bfd261023 100644 --- a/docs/en/docs/admin-manual/query-profile.md +++ b/docs/en/docs/admin-manual/query-profile.md @@ -210,7 +210,7 @@ OLAP_SCAN_NODE (id=0):(Active: 1.2ms,% non-child: 0.00%) - RowsReturnedRate: 6.979K /sec # RowsReturned/ActiveTime - TabletCount: 20 # The number of Tablets involved in this ScanNode. - TotalReadThroughput: 74.70 KB/sec # BytesRead divided by the total time spent in this node (from Open to Close). For IO bounded queries, this should be very close to the total throughput of all the disks - - ScannerBatchWaitTime: 426.886us # To count the time the transfer thread waits for the scaner thread to return rowbatch. + - ScannerBatchWaitTime: 426.886us # To count the time the transfer thread waits for the scaner thread to return rowbatch. In pipeline, this value is always 0. - ScannerWorkerWaitTime: 17.745us # To count the time that the scanner thread waits for the available worker threads in the thread pool. OlapScanner: - BlockConvertTime: 8.941us # The time it takes to convert a vectorized Block into a RowBlock with a row structure. The vectorized Block is VectorizedRowBatch in V1 and RowBlockV2 in V2. diff --git a/docs/zh-CN/docs/admin-manual/query-profile.md b/docs/zh-CN/docs/admin-manual/query-profile.md index c305a6a07e..9960bc18a9 100644 --- a/docs/zh-CN/docs/admin-manual/query-profile.md +++ b/docs/zh-CN/docs/admin-manual/query-profile.md @@ -209,7 +209,7 @@ OLAP_SCAN_NODE (id=0):(Active: 1.2ms, % non-child: 0.00%) - RowsReturnedRate: 6.979K /sec # RowsReturned/ActiveTime - TabletCount : 20 # 该 ScanNode 涉及的 Tablet 数量。 - TotalReadThroughput: 74.70 KB/sec # BytesRead除以该节点运行的总时间(从Open到Close),对于IO受限的查询,接近磁盘的总吞吐量。 - - ScannerBatchWaitTime: 426.886us # 用于统计transfer 线程等待scaner 线程返回rowbatch的时间。 + - ScannerBatchWaitTime: 426.886us # 用于统计transfer 线程等待scaner 线程返回rowbatch的时间。在Pipeline调度中,此值无意义。 - ScannerWorkerWaitTime: 17.745us # 用于统计scanner thread 等待线程池中可用工作线程的时间。 OlapScanner: - BlockConvertTime: 8.941us # 将向量化Block转换为行结构的 RowBlock 的耗时。向量化 Block 在 V1 中为 VectorizedRowBatch,V2中为 RowBlockV2。 diff --git a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java index 8038b86b93..fd4e19f574 100644 --- a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java +++ b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java @@ -1498,7 +1498,7 @@ public class Config extends ConfigBase { public static boolean enable_pipeline_load = true; // enable_workload_group should be immutable and temporarily set to mutable during the development test phase - @ConfField(mutable = true, masterOnly = true, expType = ExperimentalType.EXPERIMENTAL) + @ConfField(mutable = true, expType = ExperimentalType.EXPERIMENTAL) public static boolean enable_workload_group = false; @ConfField(mutable = true)