From 2ee1468576bb3a9883ae3214699f0b5fba5a459b Mon Sep 17 00:00:00 2001 From: Lijia Liu Date: Thu, 30 Mar 2023 10:49:50 +0800 Subject: [PATCH] [improvement](executor) Support task group schedule in pipeline engine (#17615) --- be/src/common/config.h | 1 + be/src/pipeline/pipeline_fragment_context.cpp | 6 +- be/src/pipeline/pipeline_fragment_context.h | 2 + be/src/pipeline/pipeline_task.cpp | 30 ++- be/src/pipeline/pipeline_task.h | 9 +- be/src/pipeline/task_queue.cpp | 183 ++++++++++++++++-- be/src/pipeline/task_queue.h | 104 ++++++++-- be/src/pipeline/task_scheduler.cpp | 6 +- be/src/pipeline/task_scheduler.h | 1 + be/src/runtime/CMakeLists.txt | 2 + be/src/runtime/exec_env.h | 5 + be/src/runtime/exec_env_init.cpp | 11 +- be/src/runtime/fragment_mgr.cpp | 164 ++++++---------- be/src/runtime/fragment_mgr.h | 4 + be/src/runtime/query_fragments_ctx.h | 7 + be/src/runtime/task_group/task_group.cpp | 65 +++++++ be/src/runtime/task_group/task_group.h | 82 ++++++++ .../runtime/task_group/task_group_manager.cpp | 52 +++++ .../runtime/task_group/task_group_manager.h | 49 +++++ regression-test/pipeline/p0/conf/be.conf | 2 +- regression-test/pipeline/p1/conf/be.conf | 2 +- 21 files changed, 632 insertions(+), 155 deletions(-) create mode 100644 be/src/runtime/task_group/task_group.cpp create mode 100644 be/src/runtime/task_group/task_group.h create mode 100644 be/src/runtime/task_group/task_group_manager.cpp create mode 100644 be/src/runtime/task_group/task_group_manager.h diff --git a/be/src/common/config.h b/be/src/common/config.h index 9ed5f36482..1d899eccd8 100644 --- a/be/src/common/config.h +++ b/be/src/common/config.h @@ -879,6 +879,7 @@ CONF_Bool(enable_java_support, "true"); CONF_Bool(enable_fuzzy_mode, "false"); CONF_Int32(pipeline_executor_size, "0"); +CONF_mInt16(pipeline_short_query_timeout_s, "20"); // Temp config. True to use optimization for bitmap_index apply predicate except leaf node of the and node. // Will remove after fully test. diff --git a/be/src/pipeline/pipeline_fragment_context.cpp b/be/src/pipeline/pipeline_fragment_context.cpp index 8d3c9fccf5..de9063452e 100644 --- a/be/src/pipeline/pipeline_fragment_context.cpp +++ b/be/src/pipeline/pipeline_fragment_context.cpp @@ -778,8 +778,12 @@ Status PipelineFragmentContext::submit() { int submit_tasks = 0; Status st; + auto* scheduler = _exec_env->pipeline_task_scheduler(); + if (get_task_group()) { + scheduler = _exec_env->pipeline_task_group_scheduler(); + } for (auto& task : _tasks) { - st = _exec_env->pipeline_task_scheduler()->schedule_task(task.get()); + st = scheduler->schedule_task(task.get()); if (!st) { cancel(PPlanFragmentCancelReason::INTERNAL_ERROR, "submit context fail"); _total_tasks = submit_tasks; diff --git a/be/src/pipeline/pipeline_fragment_context.h b/be/src/pipeline/pipeline_fragment_context.h index 5bd25e256d..31520d3cac 100644 --- a/be/src/pipeline/pipeline_fragment_context.h +++ b/be/src/pipeline/pipeline_fragment_context.h @@ -110,6 +110,8 @@ public: return _exec_status; } + taskgroup::TaskGroup* get_task_group() const { return _query_ctx->get_task_group(); } + private: Status _create_sink(const TDataSink& t_data_sink); Status _build_pipelines(ExecNode*, PipelinePtr); diff --git a/be/src/pipeline/pipeline_task.cpp b/be/src/pipeline/pipeline_task.cpp index fecb7d61b3..8121414a6b 100644 --- a/be/src/pipeline/pipeline_task.cpp +++ b/be/src/pipeline/pipeline_task.cpp @@ -17,7 +17,8 @@ #include "pipeline_task.h" -#include "pipeline/pipeline_fragment_context.h" +#include "pipeline_fragment_context.h" +#include "task_queue.h" namespace doris::pipeline { @@ -106,7 +107,7 @@ bool PipelineTask::has_dependency() { return false; } -Status PipelineTask::open() { +Status PipelineTask::_open() { SCOPED_TIMER(_task_profile->total_time_counter()); SCOPED_CPU_TIMER(_task_cpu_timer); SCOPED_TIMER(_open_timer); @@ -120,18 +121,27 @@ Status PipelineTask::open() { return Status::OK(); } +void PipelineTask::set_task_queue(TaskQueue* task_queue) { + _task_queue = task_queue; +} + Status PipelineTask::execute(bool* eos) { SCOPED_TIMER(_task_profile->total_time_counter()); SCOPED_CPU_TIMER(_task_cpu_timer); SCOPED_TIMER(_exec_timer); SCOPED_ATTACH_TASK(_state); int64_t time_spent = 0; + Defer defer {[&]() { + if (_task_queue) { + _task_queue->update_statistics(this, time_spent); + } + }}; // The status must be runnable *eos = false; if (!_opened) { { SCOPED_RAW_TIMER(&time_spent); - auto st = open(); + auto st = _open(); if (st.is()) { set_state(PipelineTaskState::BLOCKED_FOR_RF); return Status::OK(); @@ -193,6 +203,11 @@ Status PipelineTask::execute(bool* eos) { Status PipelineTask::finalize() { SCOPED_TIMER(_task_profile->total_time_counter()); SCOPED_CPU_TIMER(_task_cpu_timer); + Defer defer {[&]() { + if (_task_queue) { + _task_queue->update_statistics(this, _finalize_timer->value()); + } + }}; SCOPED_TIMER(_finalize_timer); return _sink->finalize(_state); } @@ -203,6 +218,11 @@ Status PipelineTask::try_close() { Status PipelineTask::close() { int64_t close_ns = 0; + Defer defer {[&]() { + if (_task_queue) { + _task_queue->update_statistics(this, close_ns); + } + }}; Status s; { SCOPED_RAW_TIMER(&close_ns); @@ -269,4 +289,8 @@ std::string PipelineTask::debug_string() const { return fmt::to_string(debug_string_buffer); } +taskgroup::TaskGroup* PipelineTask::get_task_group() const { + return _fragment_context->get_task_group(); +} + } // namespace doris::pipeline \ No newline at end of file diff --git a/be/src/pipeline/pipeline_task.h b/be/src/pipeline/pipeline_task.h index f7b0745ed8..df7e7de6a5 100644 --- a/be/src/pipeline/pipeline_task.h +++ b/be/src/pipeline/pipeline_task.h @@ -85,6 +85,8 @@ inline const char* get_state_name(PipelineTaskState idx) { __builtin_unreachable(); } +class TaskQueue; + // The class do the pipeline task. Minest schdule union by task scheduler class PipelineTask { public: @@ -179,10 +181,14 @@ public: uint32_t total_schedule_time() const { return _schedule_time; } + taskgroup::TaskGroup* get_task_group() const; + + void set_task_queue(TaskQueue* task_queue); + static constexpr auto THREAD_TIME_SLICE = 100'000'000L; private: - Status open(); + Status _open(); void _init_profile(); uint32_t _index; @@ -203,6 +209,7 @@ private: SourceState _data_state; std::unique_ptr _block; PipelineFragmentContext* _fragment_context; + TaskQueue* _task_queue = nullptr; RuntimeProfile* _parent_profile; std::unique_ptr _task_profile; diff --git a/be/src/pipeline/task_queue.cpp b/be/src/pipeline/task_queue.cpp index 6a05ee0945..0716afa924 100644 --- a/be/src/pipeline/task_queue.cpp +++ b/be/src/pipeline/task_queue.cpp @@ -17,9 +17,13 @@ #include "task_queue.h" +#include "runtime/task_group/task_group.h" + namespace doris { namespace pipeline { +TaskQueue::~TaskQueue() = default; + PipelineTask* SubWorkTaskQueue::try_take(bool is_steal) { if (_queue.empty()) { return nullptr; @@ -35,7 +39,7 @@ PipelineTask* SubWorkTaskQueue::try_take(bool is_steal) { //////////////////// WorkTaskQueue //////////////////// -WorkTaskQueue::WorkTaskQueue() : _closed(false) { +NormalWorkTaskQueue::NormalWorkTaskQueue() : _closed(false) { double factor = 1; for (int i = 0; i < SUB_QUEUE_LEVEL; ++i) { _sub_queues[i].set_factor_for_normal(factor); @@ -49,13 +53,13 @@ WorkTaskQueue::WorkTaskQueue() : _closed(false) { } } -void WorkTaskQueue::close() { +void NormalWorkTaskQueue::close() { std::unique_lock lock(_work_size_mutex); _closed = true; _wait_task.notify_all(); } -PipelineTask* WorkTaskQueue::try_take_unprotected(bool is_steal) { +PipelineTask* NormalWorkTaskQueue::try_take_unprotected(bool is_steal) { if (_total_task_size == 0 || _closed) { return nullptr; } @@ -86,7 +90,7 @@ PipelineTask* WorkTaskQueue::try_take_unprotected(bool is_steal) { return task; } -int WorkTaskQueue::_compute_level(PipelineTask* task) { +int NormalWorkTaskQueue::_compute_level(PipelineTask* task) { uint32_t schedule_time = task->total_schedule_time(); for (int i = 0; i < SUB_QUEUE_LEVEL - 1; ++i) { if (schedule_time <= _task_schedule_limit[i]) { @@ -96,13 +100,13 @@ int WorkTaskQueue::_compute_level(PipelineTask* task) { return SUB_QUEUE_LEVEL - 1; } -PipelineTask* WorkTaskQueue::try_take(bool is_steal) { +PipelineTask* NormalWorkTaskQueue::try_take(bool is_steal) { // TODO other efficient lock? e.g. if get lock fail, return null_ptr std::unique_lock lock(_work_size_mutex); return try_take_unprotected(is_steal); } -PipelineTask* WorkTaskQueue::take(uint32_t timeout_ms) { +PipelineTask* NormalWorkTaskQueue::take(uint32_t timeout_ms) { std::unique_lock lock(_work_size_mutex); auto task = try_take_unprotected(false); if (task) { @@ -117,7 +121,7 @@ PipelineTask* WorkTaskQueue::take(uint32_t timeout_ms) { } } -Status WorkTaskQueue::push(PipelineTask* task) { +Status NormalWorkTaskQueue::push(PipelineTask* task) { if (_closed) { return Status::InternalError("WorkTaskQueue closed"); } @@ -129,23 +133,27 @@ Status WorkTaskQueue::push(PipelineTask* task) { return Status::OK(); } -////////////////// TaskQueue //////////// +NormalTaskQueue::~NormalTaskQueue() = default; -void TaskQueue::close() { +NormalTaskQueue::NormalTaskQueue(size_t core_size) : TaskQueue(core_size), _closed(false) { + _async_queue.reset(new NormalWorkTaskQueue[core_size]); +} + +void NormalTaskQueue::close() { _closed = true; for (int i = 0; i < _core_size; ++i) { _async_queue[i].close(); } } -PipelineTask* TaskQueue::try_take(size_t core_id) { +PipelineTask* NormalTaskQueue::take(size_t core_id) { PipelineTask* task = nullptr; while (!_closed) { task = _async_queue[core_id].try_take(false); if (task) { break; } - task = steal_take(core_id); + task = _steal_take(core_id); if (task) { break; } @@ -160,7 +168,7 @@ PipelineTask* TaskQueue::try_take(size_t core_id) { return task; } -PipelineTask* TaskQueue::steal_take(size_t core_id) { +PipelineTask* NormalTaskQueue::_steal_take(size_t core_id) { DCHECK(core_id < _core_size); size_t next_id = core_id; for (size_t i = 1; i < _core_size; ++i) { @@ -177,7 +185,7 @@ PipelineTask* TaskQueue::steal_take(size_t core_id) { return nullptr; } -Status TaskQueue::push_back(PipelineTask* task) { +Status NormalTaskQueue::push_back(PipelineTask* task) { int core_id = task->get_previous_core_id(); if (core_id < 0) { core_id = _next_core.fetch_add(1) % _core_size; @@ -185,11 +193,158 @@ Status TaskQueue::push_back(PipelineTask* task) { return push_back(task, core_id); } -Status TaskQueue::push_back(PipelineTask* task, size_t core_id) { +Status NormalTaskQueue::push_back(PipelineTask* task, size_t core_id) { DCHECK(core_id < _core_size); task->put_in_runnable_queue(); return _async_queue[core_id].push(task); } +bool TaskGroupTaskQueue::TaskGroupSchedEntityComparator::operator()( + const taskgroup::TGEntityPtr& lhs_ptr, const taskgroup::TGEntityPtr& rhs_ptr) const { + int64_t lhs_val = lhs_ptr->vruntime_ns(); + int64_t rhs_val = rhs_ptr->vruntime_ns(); + if (lhs_val != rhs_val) { + return lhs_val < rhs_val; + } else { + auto l_share = lhs_ptr->cpu_share(); + auto r_share = rhs_ptr->cpu_share(); + if (l_share != r_share) { + return l_share < rhs_val; + } else { + return lhs_ptr < rhs_ptr; + } + } +} + +TaskGroupTaskQueue::TaskGroupTaskQueue(size_t core_size) : TaskQueue(core_size) {} + +TaskGroupTaskQueue::~TaskGroupTaskQueue() = default; + +void TaskGroupTaskQueue::close() { + std::unique_lock lock(_rs_mutex); + _closed = true; + _wait_task.notify_all(); +} + +Status TaskGroupTaskQueue::push_back(PipelineTask* task) { + return _push_back(task); +} + +Status TaskGroupTaskQueue::push_back(PipelineTask* task, size_t core_id) { + return _push_back(task); +} + +template +Status TaskGroupTaskQueue::_push_back(PipelineTask* task) { + auto* entity = task->get_task_group()->task_entity(); + std::unique_lock lock(_rs_mutex); + entity->push_back(task); + if (_group_entities.find(entity) == _group_entities.end()) { + _enqueue_task_group(entity); + } + _wait_task.notify_one(); + return Status::OK(); +} + +// TODO pipeline support steal +PipelineTask* TaskGroupTaskQueue::take(size_t core_id) { + std::unique_lock lock(_rs_mutex); + taskgroup::TGEntityPtr entity = nullptr; + while (entity == nullptr) { + if (_closed) { + return nullptr; + } + if (_group_entities.empty()) { + _wait_task.wait(lock); + } else { + entity = _next_tg_entity(); + if (!entity) { + _wait_task.wait_for(lock, std::chrono::milliseconds(WAIT_CORE_TASK_TIMEOUT_MS)); + } + } + } + DCHECK(entity->task_size() > 0); + if (entity->task_size() == 1) { + _dequeue_task_group(entity); + } + return entity->take(); +} + +template +void TaskGroupTaskQueue::_enqueue_task_group(taskgroup::TGEntityPtr tg_entity) { + _total_cpu_share += tg_entity->cpu_share(); + if constexpr (!from_worker) { + /** + * If a task group entity leaves task queue for a long time, its v runtime will be very + * small. This can cause it to preempt too many execution time. So, in order to avoid this + * situation, it is necessary to adjust the task group's v runtime. + * */ + auto old_v_ns = tg_entity->vruntime_ns(); + auto* min_entity = _min_tg_entity.load(); + if (min_entity) { + int64_t new_vruntime_ns = min_entity->vruntime_ns() - _ideal_runtime_ns(tg_entity) / 2; + if (new_vruntime_ns > old_v_ns) { + tg_entity->adjust_vruntime_ns(new_vruntime_ns); + } + } else if (old_v_ns < _min_tg_v_runtime_ns) { + tg_entity->adjust_vruntime_ns(_min_tg_v_runtime_ns); + } + } + _group_entities.emplace(tg_entity); + VLOG_DEBUG << "enqueue tg " << tg_entity->debug_string() + << ", group entity size: " << _group_entities.size(); + _update_min_tg(); +} + +void TaskGroupTaskQueue::_dequeue_task_group(taskgroup::TGEntityPtr tg_entity) { + _total_cpu_share -= tg_entity->cpu_share(); + _group_entities.erase(tg_entity); + VLOG_DEBUG << "dequeue tg " << tg_entity->debug_string() + << ", group entity size: " << _group_entities.size(); + _update_min_tg(); +} + +void TaskGroupTaskQueue::_update_min_tg() { + auto* min_entity = _next_tg_entity(); + _min_tg_entity = min_entity; + if (min_entity) { + auto min_v_runtime = min_entity->vruntime_ns(); + if (min_v_runtime > _min_tg_v_runtime_ns) { + _min_tg_v_runtime_ns = min_v_runtime; + } + } +} + +// like sched_fair.c calc_delta_fair, THREAD_TIME_SLICE maybe a dynamic value. +int64_t TaskGroupTaskQueue::_ideal_runtime_ns(taskgroup::TGEntityPtr tg_entity) const { + return PipelineTask::THREAD_TIME_SLICE * _core_size * tg_entity->cpu_share() / _total_cpu_share; +} + +taskgroup::TGEntityPtr TaskGroupTaskQueue::_next_tg_entity() { + taskgroup::TGEntityPtr res = nullptr; + for (auto* entity : _group_entities) { + res = entity; + break; + } + return res; +} + +void TaskGroupTaskQueue::update_statistics(PipelineTask* task, int64_t time_spent) { + std::unique_lock lock(_rs_mutex); + auto* group = task->get_task_group(); + auto* entity = group->task_entity(); + auto find_entity = _group_entities.find(entity); + bool is_in_queue = find_entity != _group_entities.end(); + VLOG_DEBUG << "update_statistics " << entity->debug_string() << ", in queue:" << is_in_queue; + if (is_in_queue) { + _group_entities.erase(entity); + } + entity->incr_runtime_ns(time_spent); + if (is_in_queue) { + _group_entities.emplace(entity); + _update_min_tg(); + } +} + } // namespace pipeline } // namespace doris \ No newline at end of file diff --git a/be/src/pipeline/task_queue.h b/be/src/pipeline/task_queue.h index 54e48efe42..6d225bc1a5 100644 --- a/be/src/pipeline/task_queue.h +++ b/be/src/pipeline/task_queue.h @@ -21,10 +21,39 @@ #include "pipeline_task.h" namespace doris { +namespace taskgroup { +class TaskGroup; +} + namespace pipeline { +class TaskQueue { +public: + TaskQueue(size_t core_size) : _core_size(core_size) {} + virtual ~TaskQueue(); + virtual void close() = 0; + // Get the task by core id. + // TODO: To think the logic is useful? + virtual PipelineTask* take(size_t core_id) = 0; + + // push from scheduler + virtual Status push_back(PipelineTask* task) = 0; + + // push from worker + virtual Status push_back(PipelineTask* task, size_t core_id) = 0; + + virtual void update_statistics(PipelineTask* task, int64_t time_spent) {} + + int cores() const { return _core_size; } + +protected: + size_t _core_size; + static constexpr auto WAIT_CORE_TASK_TIMEOUT_MS = 100; +}; + class SubWorkTaskQueue { friend class WorkTaskQueue; + friend class NormalWorkTaskQueue; public: void push_back(PipelineTask* task) { _queue.emplace(task); } @@ -47,9 +76,9 @@ private: }; // Each thread have private MLFQ -class WorkTaskQueue { +class NormalWorkTaskQueue { public: - explicit WorkTaskQueue(); + explicit NormalWorkTaskQueue(); void close(); @@ -61,9 +90,6 @@ public: Status push(PipelineTask* task); - // Get the each thread task size to do - size_t size() { return _total_task_size; } - private: static constexpr auto LEVEL_QUEUE_TIME_FACTOR = 1.5; static constexpr size_t SUB_QUEUE_LEVEL = 5; @@ -80,35 +106,73 @@ private: }; // Need consider NUMA architecture -class TaskQueue { +class NormalTaskQueue : public TaskQueue { public: - explicit TaskQueue(size_t core_size) : _core_size(core_size), _closed(false) { - _async_queue.reset(new WorkTaskQueue[core_size]); - } + explicit NormalTaskQueue(size_t core_size); - ~TaskQueue() = default; + ~NormalTaskQueue() override; - void close(); + void close() override; // Get the task by core id. // TODO: To think the logic is useful? - PipelineTask* try_take(size_t core_id); - - PipelineTask* steal_take(size_t core_id); + PipelineTask* take(size_t core_id) override; // TODO combine these methods to `push_back(task, core_id = -1)` - Status push_back(PipelineTask* task); + Status push_back(PipelineTask* task) override; - Status push_back(PipelineTask* task, size_t core_id); + Status push_back(PipelineTask* task, size_t core_id) override; - int cores() const { return _core_size; } + // TODO pipeline update NormalWorkTaskQueue by time_spent. + // void update_statistics(PipelineTask* task, int64_t time_spent) override; private: - std::unique_ptr _async_queue; - size_t _core_size; + PipelineTask* _steal_take(size_t core_id); + + std::unique_ptr _async_queue; std::atomic _next_core = 0; std::atomic _closed; - static constexpr auto WAIT_CORE_TASK_TIMEOUT_MS = 100; +}; + +class TaskGroupTaskQueue : public TaskQueue { +public: + explicit TaskGroupTaskQueue(size_t); + ~TaskGroupTaskQueue() override; + + void close() override; + + PipelineTask* take(size_t core_id) override; + + // from TaskScheduler or BlockedTaskScheduler + Status push_back(PipelineTask* task) override; + + // from worker + Status push_back(PipelineTask* task, size_t core_id) override; + + void update_statistics(PipelineTask* task, int64_t time_spent) override; + +private: + template + Status _push_back(PipelineTask* task); + template + void _enqueue_task_group(taskgroup::TGEntityPtr); + void _dequeue_task_group(taskgroup::TGEntityPtr); + taskgroup::TGEntityPtr _next_tg_entity(); + int64_t _ideal_runtime_ns(taskgroup::TGEntityPtr tg_entity) const; + void _update_min_tg(); + + // Like cfs rb tree in sched_entity + struct TaskGroupSchedEntityComparator { + bool operator()(const taskgroup::TGEntityPtr&, const taskgroup::TGEntityPtr&) const; + }; + using ResouceGroupSet = std::set; + ResouceGroupSet _group_entities; + std::condition_variable _wait_task; + std::mutex _rs_mutex; + bool _closed = false; + int _total_cpu_share = 0; + std::atomic _min_tg_entity = nullptr; + uint64_t _min_tg_v_runtime_ns = 0; }; } // namespace pipeline diff --git a/be/src/pipeline/task_scheduler.cpp b/be/src/pipeline/task_scheduler.cpp index 07a012df21..e90fb5f536 100644 --- a/be/src/pipeline/task_scheduler.cpp +++ b/be/src/pipeline/task_scheduler.cpp @@ -213,13 +213,13 @@ Status TaskScheduler::schedule_task(PipelineTask* task) { } void TaskScheduler::_do_work(size_t index) { - auto queue = _task_queue; const auto& marker = _markers[index]; while (*marker) { - auto task = queue->try_take(index); + auto* task = _task_queue->take(index); if (!task) { continue; } + task->set_task_queue(_task_queue.get()); auto* fragment_ctx = task->fragment_context(); doris::signal::query_id_hi = fragment_ctx->get_query_id().hi; doris::signal::query_id_lo = fragment_ctx->get_query_id().lo; @@ -283,7 +283,7 @@ void TaskScheduler::_do_work(size_t index) { _blocked_task_scheduler->add_blocked_task(task); break; case PipelineTaskState::RUNNABLE: - queue->push_back(task, index); + _task_queue->push_back(task, index); break; default: DCHECK(false) << "error state after run task, " << get_state_name(pipeline_state); diff --git a/be/src/pipeline/task_scheduler.h b/be/src/pipeline/task_scheduler.h index 4b47778345..7fcceddcf3 100644 --- a/be/src/pipeline/task_scheduler.h +++ b/be/src/pipeline/task_scheduler.h @@ -84,6 +84,7 @@ private: std::atomic _shutdown; void _do_work(size_t index); + // after _try_close_task, task maybe destructed. void _try_close_task(PipelineTask* task, PipelineTaskState state); }; } // namespace doris::pipeline \ No newline at end of file diff --git a/be/src/runtime/CMakeLists.txt b/be/src/runtime/CMakeLists.txt index 28c7053b8a..d596c610ad 100644 --- a/be/src/runtime/CMakeLists.txt +++ b/be/src/runtime/CMakeLists.txt @@ -76,6 +76,8 @@ set(RUNTIME_FILES cache/result_node.cpp cache/result_cache.cpp block_spill_manager.cpp + task_group/task_group.cpp + task_group/task_group_manager.cpp ) if (USE_JEMALLOC AND USE_MEM_TRACKER) diff --git a/be/src/runtime/exec_env.h b/be/src/runtime/exec_env.h index be7ce3dce9..4ce64ff153 100644 --- a/be/src/runtime/exec_env.h +++ b/be/src/runtime/exec_env.h @@ -109,6 +109,9 @@ public: ClientCache* broker_client_cache() { return _broker_client_cache; } pipeline::TaskScheduler* pipeline_task_scheduler() { return _pipeline_task_scheduler; } + pipeline::TaskScheduler* pipeline_task_group_scheduler() { + return _pipeline_task_group_scheduler; + } // using template to simplify client cache management template @@ -222,6 +225,8 @@ private: std::unordered_map> _download_cache_buf_map; FragmentMgr* _fragment_mgr = nullptr; pipeline::TaskScheduler* _pipeline_task_scheduler = nullptr; + pipeline::TaskScheduler* _pipeline_task_group_scheduler = nullptr; + ResultCache* _result_cache = nullptr; TMasterInfo* _master_info = nullptr; LoadPathMgr* _load_path_mgr = nullptr; diff --git a/be/src/runtime/exec_env_init.cpp b/be/src/runtime/exec_env_init.cpp index 344d3280bb..4287ed8034 100644 --- a/be/src/runtime/exec_env_init.cpp +++ b/be/src/runtime/exec_env_init.cpp @@ -160,10 +160,18 @@ Status ExecEnv::init_pipeline_task_scheduler() { if (executors_size <= 0) { executors_size = CpuInfo::num_cores(); } - auto t_queue = std::make_shared(executors_size); + + // TODO pipeline task group combie two blocked schedulers. + auto t_queue = std::make_shared(executors_size); auto b_scheduler = std::make_shared(t_queue); _pipeline_task_scheduler = new pipeline::TaskScheduler(this, b_scheduler, t_queue); RETURN_IF_ERROR(_pipeline_task_scheduler->start()); + + auto tg_queue = std::make_shared(executors_size); + auto tg_b_scheduler = std::make_shared(tg_queue); + _pipeline_task_group_scheduler = new pipeline::TaskScheduler(this, tg_b_scheduler, tg_queue); + RETURN_IF_ERROR(_pipeline_task_group_scheduler->start()); + return Status::OK(); } @@ -338,6 +346,7 @@ void ExecEnv::_destroy() { SAFE_DELETE(_load_path_mgr); SAFE_DELETE(_master_info); SAFE_DELETE(_pipeline_task_scheduler); + SAFE_DELETE(_pipeline_task_group_scheduler); SAFE_DELETE(_fragment_mgr); SAFE_DELETE(_broker_client_cache); SAFE_DELETE(_frontend_client_cache); diff --git a/be/src/runtime/fragment_mgr.cpp b/be/src/runtime/fragment_mgr.cpp index c8970358a1..64b84a262b 100755 --- a/be/src/runtime/fragment_mgr.cpp +++ b/be/src/runtime/fragment_mgr.cpp @@ -43,6 +43,7 @@ #include "runtime/runtime_filter_mgr.h" #include "runtime/stream_load/new_load_stream_mgr.h" #include "runtime/stream_load/stream_load_context.h" +#include "runtime/task_group/task_group_manager.h" #include "runtime/thread_context.h" #include "service/backend_options.h" #include "util/doris_metrics.h" @@ -583,35 +584,13 @@ void FragmentMgr::remove_pipeline_context( } } -Status FragmentMgr::exec_plan_fragment(const TExecPlanFragmentParams& params, - const FinishCallback& cb) { - auto tracer = telemetry::is_current_span_valid() ? telemetry::get_tracer("tracer") - : telemetry::get_noop_tracer(); - VLOG_ROW << "exec_plan_fragment params is " - << apache::thrift::ThriftDebugString(params).c_str(); - // sometimes TExecPlanFragmentParams debug string is too long and glog - // will truncate the log line, so print query options seperately for debuggin purpose - VLOG_ROW << "query options is " - << apache::thrift::ThriftDebugString(params.query_options).c_str(); - START_AND_SCOPE_SPAN(tracer, span, "FragmentMgr::exec_plan_fragment"); - const TUniqueId& fragment_instance_id = params.params.fragment_instance_id; - { - std::lock_guard lock(_lock); - auto iter = _fragment_map.find(fragment_instance_id); - if (iter != _fragment_map.end()) { - // Duplicated - return Status::OK(); - } - } - - std::shared_ptr exec_state; - std::shared_ptr fragments_ctx; - bool pipeline_engine_enabled = params.query_options.__isset.enable_pipeline_engine && - params.query_options.enable_pipeline_engine; +template +Status FragmentMgr::_get_query_ctx(const Params& params, TUniqueId query_id, bool pipeline, + std::shared_ptr& fragments_ctx) { if (params.is_simplified_param) { // Get common components from _fragments_ctx_map std::lock_guard lock(_lock); - auto search = _fragments_ctx_map.find(params.params.query_id); + auto search = _fragments_ctx_map.find(query_id); if (search == _fragments_ctx_map.end()) { return Status::InternalError( "Failed to get query fragments context. Query may be " @@ -623,7 +602,7 @@ Status FragmentMgr::exec_plan_fragment(const TExecPlanFragmentParams& params, // This may be a first fragment request of the query. // Create the query fragments context. fragments_ctx.reset(new QueryFragmentsCtx(params.fragment_num_on_host, _exec_env)); - fragments_ctx->query_id = params.params.query_id; + fragments_ctx->query_id = query_id; RETURN_IF_ERROR(DescriptorTbl::create(&(fragments_ctx->obj_pool), params.desc_tbl, &(fragments_ctx->desc_tbl))); fragments_ctx->coord_addr = params.coord; @@ -639,8 +618,7 @@ Status FragmentMgr::exec_plan_fragment(const TExecPlanFragmentParams& params, fragments_ctx->set_rsc_info = true; } - fragments_ctx->get_shared_hash_table_controller()->set_pipeline_engine_enabled( - pipeline_engine_enabled); + fragments_ctx->get_shared_hash_table_controller()->set_pipeline_engine_enabled(pipeline); fragments_ctx->timeout_second = params.query_options.execution_timeout; _set_scan_concurrency(params, fragments_ctx.get()); @@ -672,11 +650,24 @@ Status FragmentMgr::exec_plan_fragment(const TExecPlanFragmentParams& params, fragments_ctx->query_mem_tracker->enable_print_log_usage(); } + if (pipeline) { + int ts = fragments_ctx->timeout_second; + taskgroup::TaskGroupPtr tg; + auto ts_id = taskgroup::TaskGroupManager::DEFAULT_TG_ID; + if (ts > 0 && ts <= config::pipeline_short_query_timeout_s) { + ts_id = taskgroup::TaskGroupManager::SHORT_TG_ID; + } + tg = taskgroup::TaskGroupManager::instance()->get_task_group(ts_id); + fragments_ctx->set_task_group(tg); + LOG(INFO) << "Query/load id: " << print_id(fragments_ctx->query_id) + << "use task group: " << tg->debug_string(); + } + { // Find _fragments_ctx_map again, in case some other request has already // create the query fragments context. std::lock_guard lock(_lock); - auto search = _fragments_ctx_map.find(params.params.query_id); + auto search = _fragments_ctx_map.find(query_id); if (search == _fragments_ctx_map.end()) { _fragments_ctx_map.insert(std::make_pair(fragments_ctx->query_id, fragments_ctx)); LOG(INFO) << "Register query/load memory tracker, query/load id: " @@ -688,6 +679,36 @@ Status FragmentMgr::exec_plan_fragment(const TExecPlanFragmentParams& params, } } } + return Status::OK(); +} + +Status FragmentMgr::exec_plan_fragment(const TExecPlanFragmentParams& params, + const FinishCallback& cb) { + auto tracer = telemetry::is_current_span_valid() ? telemetry::get_tracer("tracer") + : telemetry::get_noop_tracer(); + VLOG_ROW << "exec_plan_fragment params is " + << apache::thrift::ThriftDebugString(params).c_str(); + // sometimes TExecPlanFragmentParams debug string is too long and glog + // will truncate the log line, so print query options seperately for debuggin purpose + VLOG_ROW << "query options is " + << apache::thrift::ThriftDebugString(params.query_options).c_str(); + START_AND_SCOPE_SPAN(tracer, span, "FragmentMgr::exec_plan_fragment"); + const TUniqueId& fragment_instance_id = params.params.fragment_instance_id; + { + std::lock_guard lock(_lock); + auto iter = _fragment_map.find(fragment_instance_id); + if (iter != _fragment_map.end()) { + // Duplicated + return Status::OK(); + } + } + + std::shared_ptr exec_state; + std::shared_ptr fragments_ctx; + bool pipeline_engine_enabled = params.query_options.__isset.enable_pipeline_engine && + params.query_options.enable_pipeline_engine; + RETURN_IF_ERROR( + _get_query_ctx(params, params.params.query_id, pipeline_engine_enabled, fragments_ctx)); fragments_ctx->fragment_ids.push_back(fragment_instance_id); exec_state.reset( @@ -787,84 +808,7 @@ Status FragmentMgr::exec_plan_fragment(const TPipelineFragmentParams& params, std::shared_ptr exec_state; std::shared_ptr fragments_ctx; - if (params.is_simplified_param) { - // Get common components from _fragments_ctx_map - std::lock_guard lock(_lock); - auto search = _fragments_ctx_map.find(params.query_id); - if (search == _fragments_ctx_map.end()) { - return Status::InternalError( - "Failed to get query fragments context. Query may be " - "timeout or be cancelled. host: {}", - BackendOptions::get_localhost()); - } - fragments_ctx = search->second; - } else { - // This may be a first fragment request of the query. - // Create the query fragments context. - fragments_ctx.reset(new QueryFragmentsCtx(params.fragment_num_on_host, _exec_env)); - fragments_ctx->query_id = params.query_id; - RETURN_IF_ERROR(DescriptorTbl::create(&(fragments_ctx->obj_pool), params.desc_tbl, - &(fragments_ctx->desc_tbl))); - fragments_ctx->coord_addr = params.coord; - LOG(INFO) << "query_id: " - << UniqueId(fragments_ctx->query_id.hi, fragments_ctx->query_id.lo) - << " coord_addr " << fragments_ctx->coord_addr - << " total fragment num on current host: " << params.fragment_num_on_host; - fragments_ctx->query_globals = params.query_globals; - - if (params.__isset.resource_info) { - fragments_ctx->user = params.resource_info.user; - fragments_ctx->group = params.resource_info.group; - fragments_ctx->set_rsc_info = true; - } - - fragments_ctx->get_shared_hash_table_controller()->set_pipeline_engine_enabled(true); - fragments_ctx->timeout_second = params.query_options.execution_timeout; - _set_scan_concurrency(params, fragments_ctx.get()); - - bool has_query_mem_tracker = - params.query_options.__isset.mem_limit && (params.query_options.mem_limit > 0); - int64_t bytes_limit = has_query_mem_tracker ? params.query_options.mem_limit : -1; - if (bytes_limit > MemInfo::mem_limit()) { - VLOG_NOTICE << "Query memory limit " << PrettyPrinter::print(bytes_limit, TUnit::BYTES) - << " exceeds process memory limit of " - << PrettyPrinter::print(MemInfo::mem_limit(), TUnit::BYTES) - << ". Using process memory limit instead"; - bytes_limit = MemInfo::mem_limit(); - } - if (params.query_options.query_type == TQueryType::SELECT) { - fragments_ctx->query_mem_tracker = std::make_shared( - MemTrackerLimiter::Type::QUERY, - fmt::format("Query#Id={}", print_id(fragments_ctx->query_id)), bytes_limit); - } else if (params.query_options.query_type == TQueryType::LOAD) { - fragments_ctx->query_mem_tracker = std::make_shared( - MemTrackerLimiter::Type::LOAD, - fmt::format("Load#Id={}", print_id(fragments_ctx->query_id)), bytes_limit); - } else { // EXTERNAL - fragments_ctx->query_mem_tracker = std::make_shared( - MemTrackerLimiter::Type::LOAD, - fmt::format("External#Id={}", print_id(fragments_ctx->query_id)), bytes_limit); - } - if (params.query_options.__isset.is_report_success && - params.query_options.is_report_success) { - fragments_ctx->query_mem_tracker->enable_print_log_usage(); - } - { - // Find _fragments_ctx_map again, in case some other request has already - // create the query fragments context. - std::lock_guard lock(_lock); - auto search = _fragments_ctx_map.find(params.query_id); - if (search == _fragments_ctx_map.end()) { - _fragments_ctx_map.insert(std::make_pair(fragments_ctx->query_id, fragments_ctx)); - LOG(INFO) << "Register query/load memory tracker, query/load id: " - << print_id(fragments_ctx->query_id) - << " limit: " << PrettyPrinter::print(bytes_limit, TUnit::BYTES); - } else { - // Already has a query fragments context, use it - fragments_ctx = search->second; - } - } - } + RETURN_IF_ERROR(_get_query_ctx(params, params.query_id, true, fragments_ctx)); for (size_t i = 0; i < params.local_params.size(); i++) { const auto& local_params = params.local_params[i]; @@ -872,8 +816,8 @@ Status FragmentMgr::exec_plan_fragment(const TPipelineFragmentParams& params, const TUniqueId& fragment_instance_id = local_params.fragment_instance_id; { std::lock_guard lock(_lock); - auto iter = _fragment_map.find(fragment_instance_id); - if (iter != _fragment_map.end()) { + auto iter = _pipeline_map.find(fragment_instance_id); + if (iter != _pipeline_map.end()) { // Duplicated continue; } diff --git a/be/src/runtime/fragment_mgr.h b/be/src/runtime/fragment_mgr.h index 9c066c58f3..2e42fab174 100644 --- a/be/src/runtime/fragment_mgr.h +++ b/be/src/runtime/fragment_mgr.h @@ -151,6 +151,10 @@ private: RuntimeState* state, QueryFragmentsCtx* fragments_ctx); + template + Status _get_query_ctx(const Params& params, TUniqueId query_id, bool pipeline, + std::shared_ptr& fragments_ctx); + // This is input params ExecEnv* _exec_env; diff --git a/be/src/runtime/query_fragments_ctx.h b/be/src/runtime/query_fragments_ctx.h index 86fcd02f8d..bbf5be6890 100644 --- a/be/src/runtime/query_fragments_ctx.h +++ b/be/src/runtime/query_fragments_ctx.h @@ -29,6 +29,7 @@ #include "runtime/exec_env.h" #include "runtime/memory/mem_tracker_limiter.h" #include "runtime/runtime_predicate.h" +#include "task_group/task_group.h" #include "util/pretty_printer.h" #include "util/threadpool.h" #include "vec/exec/scan/scanner_scheduler.h" @@ -130,6 +131,10 @@ public: vectorized::RuntimePredicate& get_runtime_predicate() { return _runtime_predicate; } + void set_task_group(taskgroup::TaskGroupPtr& tg) { _task_group = tg; } + + taskgroup::TaskGroup* get_task_group() const { return _task_group.get(); } + public: TUniqueId query_id; DescriptorTbl* desc_tbl; @@ -175,6 +180,8 @@ private: std::shared_ptr _shared_hash_table_controller; std::shared_ptr _shared_scanner_controller; vectorized::RuntimePredicate _runtime_predicate; + + taskgroup::TaskGroupPtr _task_group; }; } // namespace doris diff --git a/be/src/runtime/task_group/task_group.cpp b/be/src/runtime/task_group/task_group.cpp new file mode 100644 index 0000000000..63f0a564fa --- /dev/null +++ b/be/src/runtime/task_group/task_group.cpp @@ -0,0 +1,65 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "task_group.h" + +#include "pipeline/pipeline_task.h" + +namespace doris { +namespace taskgroup { + +pipeline::PipelineTask* TaskGroupEntity::take() { + if (_queue.empty()) { + return nullptr; + } + auto task = _queue.front(); + _queue.pop(); + return task; +} + +void TaskGroupEntity::incr_runtime_ns(uint64_t runtime_ns) { + auto v_time = runtime_ns / _tg->share(); + _vruntime_ns += v_time; +} + +void TaskGroupEntity::adjust_vruntime_ns(uint64_t vruntime_ns) { + VLOG_DEBUG << "adjust " << debug_string() << "vtime to " << vruntime_ns; + _vruntime_ns = vruntime_ns; +} + +void TaskGroupEntity::push_back(pipeline::PipelineTask* task) { + _queue.emplace(task); +} + +uint64_t TaskGroupEntity::cpu_share() const { + return _tg->share(); +} + +std::string TaskGroupEntity::debug_string() const { + return fmt::format("TGE[id = {}, cpu_share = {}, task size: {}, v_time:{}ns]", _tg->id(), + cpu_share(), _queue.size(), _vruntime_ns); +} + +TaskGroup::TaskGroup(uint64_t id, std::string name, uint64_t share) + : _id(id), _name(name), _share(share), _task_entity(this) {} + +std::string TaskGroup::debug_string() const { + return fmt::format("TG[id = {}, name = {}, share = {}", _id, _name, share()); +} + +} // namespace taskgroup +} // namespace doris diff --git a/be/src/runtime/task_group/task_group.h b/be/src/runtime/task_group/task_group.h new file mode 100644 index 0000000000..62f06eb2dd --- /dev/null +++ b/be/src/runtime/task_group/task_group.h @@ -0,0 +1,82 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +#pragma once +#include + +#include "olap/olap_define.h" + +namespace doris { + +namespace pipeline { +class PipelineTask; +} + +class QueryFragmentsCtx; + +namespace taskgroup { + +class TaskGroup; + +class TaskGroupEntity { +public: + explicit TaskGroupEntity(taskgroup::TaskGroup* ts) : _tg(ts) {} + void push_back(pipeline::PipelineTask* task); + uint64_t vruntime_ns() const { return _vruntime_ns; } + + pipeline::PipelineTask* take(); + + void incr_runtime_ns(uint64_t runtime_ns); + + void adjust_vruntime_ns(uint64_t vruntime_ns); + + size_t task_size() const { return _queue.size(); } + + uint64_t cpu_share() const; + + std::string debug_string() const; + +private: + // TODO pipeline use MLFQ + std::queue _queue; + taskgroup::TaskGroup* _tg; + uint64_t _vruntime_ns = 0; +}; + +using TGEntityPtr = TaskGroupEntity*; + +class TaskGroup { +public: + TaskGroup(uint64_t id, std::string name, uint64_t cpu_share); + + TaskGroupEntity* task_entity() { return &_task_entity; } + + uint64_t share() const { return _share; } + uint64_t id() const { return _id; } + + std::string debug_string() const; + +private: + uint64_t _id; + std::string _name; + uint64_t _share; + TaskGroupEntity _task_entity; +}; + +using TaskGroupPtr = std::shared_ptr; + +} // namespace taskgroup +} // namespace doris diff --git a/be/src/runtime/task_group/task_group_manager.cpp b/be/src/runtime/task_group/task_group_manager.cpp new file mode 100644 index 0000000000..c359470579 --- /dev/null +++ b/be/src/runtime/task_group/task_group_manager.cpp @@ -0,0 +1,52 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "task_group_manager.h" + +namespace doris::taskgroup { + +TaskGroupManager::TaskGroupManager() { + _create_default_task_group(); + _create_short_task_group(); +} +TaskGroupManager::~TaskGroupManager() = default; + +TaskGroupManager* TaskGroupManager::instance() { + static TaskGroupManager tgm; + return &tgm; +} + +TaskGroupPtr TaskGroupManager::get_task_group(uint64_t id) { + std::shared_lock r_lock(_group_mutex); + if (_task_groups.count(id)) { + return _task_groups[id]; + } else { + return _task_groups[DEFAULT_TG_ID]; + } +} + +void TaskGroupManager::_create_default_task_group() { + _task_groups[DEFAULT_TG_ID] = + std::make_shared(DEFAULT_TG_ID, "default_tg", DEFAULT_TG_CPU_SHARE); +} + +void TaskGroupManager::_create_short_task_group() { + _task_groups[SHORT_TG_ID] = + std::make_shared(SHORT_TG_ID, "short_tg", SHORT_TG_CPU_SHARE); +} + +} // namespace doris::taskgroup diff --git a/be/src/runtime/task_group/task_group_manager.h b/be/src/runtime/task_group/task_group_manager.h new file mode 100644 index 0000000000..4754e949fd --- /dev/null +++ b/be/src/runtime/task_group/task_group_manager.h @@ -0,0 +1,49 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +#pragma once + +#include +#include + +#include "task_group.h" + +namespace doris::taskgroup { + +class TaskGroupManager { +public: + TaskGroupManager(); + ~TaskGroupManager(); + static TaskGroupManager* instance(); + + // TODO pipeline task group + TaskGroupPtr get_task_group(uint64_t id); + + static constexpr uint64_t DEFAULT_TG_ID = 0; + static constexpr uint64_t DEFAULT_TG_CPU_SHARE = 64; + + static constexpr uint64_t SHORT_TG_ID = 1; + static constexpr uint64_t SHORT_TG_CPU_SHARE = 128; + +private: + void _create_default_task_group(); + void _create_short_task_group(); + + std::shared_mutex _group_mutex; + std::unordered_map _task_groups; +}; + +} // namespace doris::taskgroup diff --git a/regression-test/pipeline/p0/conf/be.conf b/regression-test/pipeline/p0/conf/be.conf index 3ee7730e39..036e8e26fa 100644 --- a/regression-test/pipeline/p0/conf/be.conf +++ b/regression-test/pipeline/p0/conf/be.conf @@ -65,7 +65,7 @@ log_buffer_level = -1 storage_root_path=/mnt/ssd01/cluster_storage/doris.SSD/P0/cluster1 disable_auto_compaction=true -tablet_map_shard_size=4 +tablet_map_shard_size=256 priority_networks=172.19.0.0/24 fragment_pool_thread_num_max=5000 enable_fuzzy_mode=true diff --git a/regression-test/pipeline/p1/conf/be.conf b/regression-test/pipeline/p1/conf/be.conf index 1ff72c537c..aa6ff8d54c 100644 --- a/regression-test/pipeline/p1/conf/be.conf +++ b/regression-test/pipeline/p1/conf/be.conf @@ -64,6 +64,6 @@ priority_networks=172.19.0.0/24 # palo_cgroups disable_auto_compaction=true -tablet_map_shard_size=4 +tablet_map_shard_size=256 fragment_pool_thread_num_max=5000 enable_fuzzy_mode=true