diff --git a/be/src/exec/exec_node.cpp b/be/src/exec/exec_node.cpp index bcb0771eda..63e9935c48 100644 --- a/be/src/exec/exec_node.cpp +++ b/be/src/exec/exec_node.cpp @@ -90,6 +90,7 @@ ExecNode::ExecNode(ObjectPool* pool, const TPlanNode& tnode, const DescriptorTbl _rows_returned_counter(nullptr), _rows_returned_rate(nullptr), _memory_used_counter(nullptr), + _peak_memory_usage_counter(nullptr), _is_closed(false), _ref(0) { if (tnode.__isset.output_tuple_id) { @@ -134,8 +135,10 @@ Status ExecNode::prepare(RuntimeState* state) { std::bind(&RuntimeProfile::units_per_second, _rows_returned_counter, runtime_profile()->total_time_counter()), ""); - _mem_tracker = std::make_unique("ExecNode:" + _runtime_profile->name(), - _runtime_profile.get(), nullptr, "PeakMemoryUsage"); + _memory_used_counter = ADD_LABEL_COUNTER(runtime_profile(), "MemoryUsage"); + _peak_memory_usage_counter = _runtime_profile->AddHighWaterMarkCounter( + "PeakMemoryUsage", TUnit::BYTES, "MemoryUsage"); + _mem_tracker = std::make_unique("ExecNode:" + _runtime_profile->name()); for (auto& conjunct : _conjuncts) { RETURN_IF_ERROR(conjunct->prepare(state, intermediate_row_desc())); @@ -204,6 +207,9 @@ Status ExecNode::close(RuntimeState* state) { result = st; } } + if (_peak_memory_usage_counter) { + _peak_memory_usage_counter->set(_mem_tracker->peak_consumption()); + } release_resource(state); return result; } @@ -572,6 +578,7 @@ Status ExecNode::get_next_after_projects( if (UNLIKELY(!status.ok())) return status; return do_projections(&_origin_block, block); } + _peak_memory_usage_counter->set(_mem_tracker->peak_consumption()); return func(state, block, eos); } diff --git a/be/src/exec/exec_node.h b/be/src/exec/exec_node.h index 55cdea53b6..ad7eb83074 100644 --- a/be/src/exec/exec_node.h +++ b/be/src/exec/exec_node.h @@ -277,9 +277,10 @@ protected: RuntimeProfile::Counter* _rows_returned_counter; RuntimeProfile::Counter* _rows_returned_rate; - // Account for peak memory used by this node RuntimeProfile::Counter* _memory_used_counter; RuntimeProfile::Counter* _projection_timer; + // Account for peak memory used by this node + RuntimeProfile::Counter* _peak_memory_usage_counter; // OpentelemetrySpan _span; diff --git a/be/src/runtime/memory/mem_tracker.cpp b/be/src/runtime/memory/mem_tracker.cpp index 981a4e63be..df8e9d80a4 100644 --- a/be/src/runtime/memory/mem_tracker.cpp +++ b/be/src/runtime/memory/mem_tracker.cpp @@ -38,26 +38,6 @@ bvar::Adder g_memtracker_cnt("memtracker_cnt"); // Multiple groups are used to reduce the impact of locks. std::vector MemTracker::mem_tracker_pool(1000); -MemTracker::MemTracker(const std::string& label, RuntimeProfile* profile, MemTrackerLimiter* parent, - const std::string& profile_counter_name) - : _label(label) { - _consumption = std::make_shared(); - if (profile != nullptr) { - // By default, memory consumption is tracked via calls to consume()/release(), either to - // the tracker itself or to one of its descendents. Alternatively, a consumption metric - // can be specified, and then the metric's value is used as the consumption rather than - // the tally maintained by consume() and release(). A tcmalloc metric is used to track - // process memory consumption, since the process memory usage may be higher than the - // computed total memory (tcmalloc does not release deallocated memory immediately). - // Other consumption metrics are used in trackers below the process level to account - // for memory (such as free buffer pool buffers) that is not tracked by consume() and - // release(). - _profile_counter = - profile->AddSharedHighWaterMarkCounter(profile_counter_name, TUnit::BYTES); - } - bind_parent(parent); // at the end -} - MemTracker::MemTracker(const std::string& label, MemTrackerLimiter* parent) : _label(label) { _consumption = std::make_shared(); bind_parent(parent); @@ -90,15 +70,6 @@ MemTracker::~MemTracker() { } } -void MemTracker::refresh_all_tracker_profile() { - for (unsigned i = 0; i < mem_tracker_pool.size(); ++i) { - std::lock_guard l(mem_tracker_pool[i].group_lock); - for (auto tracker : mem_tracker_pool[i].trackers) { - tracker->refresh_profile_counter(); - } - } -} - MemTracker::Snapshot MemTracker::make_snapshot() const { Snapshot snapshot; snapshot.label = _label; diff --git a/be/src/runtime/memory/mem_tracker.h b/be/src/runtime/memory/mem_tracker.h index b83502c6c6..94d8360157 100644 --- a/be/src/runtime/memory/mem_tracker.h +++ b/be/src/runtime/memory/mem_tracker.h @@ -34,7 +34,6 @@ // IWYU pragma: no_include #include "common/compiler_util.h" // IWYU pragma: keep #include "util/pretty_printer.h" -#include "util/runtime_profile.h" namespace doris { @@ -114,8 +113,6 @@ public: }; // Creates and adds the tracker to the mem_tracker_pool. - MemTracker(const std::string& label, RuntimeProfile* profile, MemTrackerLimiter* parent, - const std::string& profile_counter_name); MemTracker(const std::string& label, MemTrackerLimiter* parent = nullptr); // For MemTrackerLimiter MemTracker() { _parent_group_num = -1; } @@ -150,14 +147,6 @@ public: void set_consumption(int64_t bytes) { _consumption->set(bytes); } - void refresh_profile_counter() { - if (_profile_counter) { - _profile_counter->set(_consumption->current_value()); - } - } - - static void refresh_all_tracker_profile(); - public: virtual Snapshot make_snapshot() const; // Specify group_num from mem_tracker_pool to generate snapshot. @@ -180,7 +169,6 @@ protected: std::string _label; std::shared_ptr _consumption; - std::shared_ptr _profile_counter; // Tracker is located in group num in mem_tracker_pool int64_t _parent_group_num = 0; diff --git a/be/src/runtime/memory/mem_tracker_limiter.cpp b/be/src/runtime/memory/mem_tracker_limiter.cpp index c939402bb4..fccf29f7f4 100644 --- a/be/src/runtime/memory/mem_tracker_limiter.cpp +++ b/be/src/runtime/memory/mem_tracker_limiter.cpp @@ -61,15 +61,9 @@ static RuntimeProfile::Counter* freed_memory_counter = static RuntimeProfile::Counter* cancel_tasks_counter = ADD_COUNTER(free_top_memory_task_profile, "CancelTasksNum", TUnit::UNIT); -MemTrackerLimiter::MemTrackerLimiter(Type type, const std::string& label, int64_t byte_limit, - RuntimeProfile* profile, - const std::string& profile_counter_name) { +MemTrackerLimiter::MemTrackerLimiter(Type type, const std::string& label, int64_t byte_limit) { DCHECK_GE(byte_limit, -1); _consumption = std::make_shared(); - if (profile != nullptr) { - _profile_counter = - profile->AddSharedHighWaterMarkCounter(profile_counter_name, TUnit::BYTES); - } _type = type; _label = label; _limit = byte_limit; @@ -130,15 +124,6 @@ void MemTrackerLimiter::refresh_global_counter() { } } -void MemTrackerLimiter::refresh_all_tracker_profile() { - for (unsigned i = 0; i < mem_tracker_limiter_pool.size(); ++i) { - std::lock_guard l(mem_tracker_limiter_pool[i].group_lock); - for (auto tracker : mem_tracker_limiter_pool[i].trackers) { - tracker->refresh_profile_counter(); - } - } -} - void MemTrackerLimiter::make_process_snapshots(std::vector* snapshots) { MemTrackerLimiter::refresh_global_counter(); int64_t process_mem_sum = 0; @@ -148,7 +133,7 @@ void MemTrackerLimiter::make_process_snapshots(std::vector snapshot.label = ""; snapshot.limit = -1; snapshot.cur_consumption = it.second->current_value(); - snapshot.peak_consumption = it.second->value(); + snapshot.peak_consumption = it.second->peak_value(); (*snapshots).emplace_back(snapshot); process_mem_sum += it.second->current_value(); } diff --git a/be/src/runtime/memory/mem_tracker_limiter.h b/be/src/runtime/memory/mem_tracker_limiter.h index 218f9e166d..bf456c3455 100644 --- a/be/src/runtime/memory/mem_tracker_limiter.h +++ b/be/src/runtime/memory/mem_tracker_limiter.h @@ -36,12 +36,13 @@ #include "common/config.h" #include "common/status.h" #include "runtime/memory/mem_tracker.h" -#include "util/runtime_profile.h" #include "util/string_util.h" #include "util/uid_util.h" namespace doris { +class RuntimeProfile; + constexpr auto MEM_TRACKER_GROUP_NUM = 1000; namespace taskgroup { @@ -74,29 +75,20 @@ public: std::mutex group_lock; }; - inline static std::unordered_map> - TypeMemSum = {{Type::GLOBAL, - std::make_shared(TUnit::BYTES)}, - {Type::QUERY, - std::make_shared(TUnit::BYTES)}, - {Type::LOAD, - std::make_shared(TUnit::BYTES)}, - {Type::COMPACTION, - std::make_shared(TUnit::BYTES)}, - {Type::SCHEMA_CHANGE, - std::make_shared(TUnit::BYTES)}, - {Type::CLONE, - std::make_shared(TUnit::BYTES)}, - {Type::EXPERIMENTAL, - std::make_shared(TUnit::BYTES)}}; + inline static std::unordered_map> TypeMemSum = { + {Type::GLOBAL, std::make_shared()}, + {Type::QUERY, std::make_shared()}, + {Type::LOAD, std::make_shared()}, + {Type::COMPACTION, std::make_shared()}, + {Type::SCHEMA_CHANGE, std::make_shared()}, + {Type::CLONE, std::make_shared()}, + {Type::EXPERIMENTAL, std::make_shared()}}; public: // byte_limit equal to -1 means no consumption limit, only participate in process memory statistics. - MemTrackerLimiter(Type type, const std::string& label = std::string(), int64_t byte_limit = -1, - RuntimeProfile* profile = nullptr, - const std::string& profile_counter_name = "PeakMemoryUsage"); + MemTrackerLimiter(Type type, const std::string& label = std::string(), int64_t byte_limit = -1); - ~MemTrackerLimiter(); + ~MemTrackerLimiter() override; static std::string type_string(Type type) { switch (type) { @@ -151,7 +143,6 @@ public: } static void refresh_global_counter(); - static void refresh_all_tracker_profile(); Snapshot make_snapshot() const override; // Returns a list of all the valid tracker snapshots. diff --git a/be/src/vec/runtime/vdata_stream_recvr.cpp b/be/src/vec/runtime/vdata_stream_recvr.cpp index 45b910cf82..e577fb341f 100644 --- a/be/src/vec/runtime/vdata_stream_recvr.cpp +++ b/be/src/vec/runtime/vdata_stream_recvr.cpp @@ -297,12 +297,12 @@ VDataStreamRecvr::VDataStreamRecvr( _is_merging(is_merging), _is_closed(false), _profile(profile), + _peak_memory_usage_counter(nullptr), _sub_plan_query_statistics_recvr(sub_plan_query_statistics_recvr), _enable_pipeline(state->enable_pipeline_exec()) { // DataStreamRecvr may be destructed after the instance execution thread ends. _mem_tracker = - std::make_unique("VDataStreamRecvr:" + print_id(_fragment_instance_id), - _profile, nullptr, "PeakMemoryUsage"); + std::make_unique("VDataStreamRecvr:" + print_id(_fragment_instance_id)); SCOPED_CONSUME_MEM_TRACKER(_mem_tracker.get()); // Create one queue per sender if is_merging is true. @@ -322,6 +322,8 @@ VDataStreamRecvr::VDataStreamRecvr( // Initialize the counters _memory_usage_counter = ADD_LABEL_COUNTER(_profile, "MemoryUsage"); _blocks_memory_usage = _profile->AddHighWaterMarkCounter("Blocks", TUnit::BYTES, "MemoryUsage"); + _peak_memory_usage_counter = + _profile->AddHighWaterMarkCounter("PeakMemoryUsage", TUnit::BYTES, "MemoryUsage"); _bytes_received_counter = ADD_COUNTER(_profile, "BytesReceived", TUnit::BYTES); _local_bytes_received_counter = ADD_COUNTER(_profile, "LocalBytesReceived", TUnit::BYTES); @@ -385,6 +387,7 @@ bool VDataStreamRecvr::ready_to_read() { } Status VDataStreamRecvr::get_next(Block* block, bool* eos) { + _peak_memory_usage_counter->set(_mem_tracker->peak_consumption()); if (!_is_merging) { block->clear(); return _sender_queues[0]->get_batch(block, eos); @@ -418,6 +421,9 @@ void VDataStreamRecvr::close() { _mgr = nullptr; _merger.reset(); + if (_peak_memory_usage_counter) { + _peak_memory_usage_counter->set(_mem_tracker->peak_consumption()); + } } } // namespace doris::vectorized diff --git a/be/src/vec/runtime/vdata_stream_recvr.h b/be/src/vec/runtime/vdata_stream_recvr.h index 3e85a649c5..229d87d638 100644 --- a/be/src/vec/runtime/vdata_stream_recvr.h +++ b/be/src/vec/runtime/vdata_stream_recvr.h @@ -154,6 +154,7 @@ private: RuntimeProfile::Counter* _decompress_bytes; RuntimeProfile::Counter* _memory_usage_counter; RuntimeProfile::HighWaterMarkCounter* _blocks_memory_usage; + RuntimeProfile::Counter* _peak_memory_usage_counter; std::shared_ptr _sub_plan_query_statistics_recvr; diff --git a/be/src/vec/sink/vdata_stream_sender.cpp b/be/src/vec/sink/vdata_stream_sender.cpp index ec7226dec7..df705ee37b 100644 --- a/be/src/vec/sink/vdata_stream_sender.cpp +++ b/be/src/vec/sink/vdata_stream_sender.cpp @@ -401,6 +401,7 @@ VDataStreamSender::VDataStreamSender(ObjectPool* pool, const RowDescriptor& row_ _split_block_hash_compute_timer(nullptr), _split_block_distribute_by_channel_timer(nullptr), _blocks_sent_counter(nullptr), + _peak_memory_usage_counter(nullptr), _local_bytes_send_counter(nullptr), _dest_node_id(0) { _cur_pb_block = &_pb_block1; @@ -438,9 +439,8 @@ Status VDataStreamSender::prepare(RuntimeState* state) { _dest_node_id, instances); _profile = _pool->add(new RuntimeProfile(title)); SCOPED_TIMER(_profile->total_time_counter()); - _mem_tracker = std::make_unique( - "VDataStreamSender:" + print_id(state->fragment_instance_id()), _profile, nullptr, - "PeakMemoryUsage"); + _mem_tracker = std::make_unique("VDataStreamSender:" + + print_id(state->fragment_instance_id())); SCOPED_CONSUME_MEM_TRACKER(_mem_tracker.get()); if (_part_type == TPartitionType::UNPARTITIONED || _part_type == TPartitionType::RANDOM) { @@ -475,6 +475,9 @@ Status VDataStreamSender::prepare(RuntimeState* state) { profile()->total_time_counter()), ""); _local_bytes_send_counter = ADD_COUNTER(profile(), "LocalBytesSent", TUnit::BYTES); + _memory_usage_counter = ADD_LABEL_COUNTER(profile(), "MemoryUsage"); + _peak_memory_usage_counter = + profile()->AddHighWaterMarkCounter("PeakMemoryUsage", TUnit::BYTES, "MemoryUsage"); return Status::OK(); } @@ -504,6 +507,7 @@ void VDataStreamSender::_handle_eof_channel(RuntimeState* state, ChannelPtrType Status VDataStreamSender::send(RuntimeState* state, Block* block, bool eos) { SCOPED_TIMER(_profile->total_time_counter()); + _peak_memory_usage_counter->set(_mem_tracker->peak_consumption()); bool all_receiver_eof = true; for (auto channel : _channels) { if (!channel->is_receiver_eof()) { @@ -719,6 +723,9 @@ Status VDataStreamSender::close(RuntimeState* state, Status exec_status) { } } + if (_peak_memory_usage_counter) { + _peak_memory_usage_counter->set(_mem_tracker->peak_consumption()); + } DataSink::close(state, exec_status); return final_st; } diff --git a/be/src/vec/sink/vdata_stream_sender.h b/be/src/vec/sink/vdata_stream_sender.h index e77518df2c..77dfe0b20a 100644 --- a/be/src/vec/sink/vdata_stream_sender.h +++ b/be/src/vec/sink/vdata_stream_sender.h @@ -189,6 +189,8 @@ protected: RuntimeProfile::Counter* _split_block_distribute_by_channel_timer; RuntimeProfile::Counter* _blocks_sent_counter; RuntimeProfile::Counter* _merge_block_timer; + RuntimeProfile::Counter* _memory_usage_counter; + RuntimeProfile::Counter* _peak_memory_usage_counter; std::unique_ptr _mem_tracker; diff --git a/docs/en/docs/admin-manual/config/be-config.md b/docs/en/docs/admin-manual/config/be-config.md index f558d40118..f007b708ba 100644 --- a/docs/en/docs/admin-manual/config/be-config.md +++ b/docs/en/docs/admin-manual/config/be-config.md @@ -182,8 +182,7 @@ There are two ways to configure BE configuration items: * Type: string * Description: Limit the percentage of the server's maximum memory used by the BE process. It is used to prevent BE memory from occupying to many the machine's memory. This parameter must be greater than 0. When the percentage is greater than 100%, the value will default to 100%. - - `auto` means process mem limit is equal to max(physical_mem * 0.9, physical_mem - 6.4G), 6.4G is the maximum memory reserved for the system by default. -* Default value: auto +* Default value: 80% #### `cluster_id` diff --git a/docs/zh-CN/docs/admin-manual/config/be-config.md b/docs/zh-CN/docs/admin-manual/config/be-config.md index edb4e3e10d..e040c5e1b4 100644 --- a/docs/zh-CN/docs/admin-manual/config/be-config.md +++ b/docs/zh-CN/docs/admin-manual/config/be-config.md @@ -191,8 +191,7 @@ BE 重启后该配置将失效。如果想持久化修改结果,使用如下 * 类型:string * 描述:限制BE进程使用服务器最大内存百分比。用于防止BE内存挤占太多的机器内存,该参数必须大于0,当百分大于100%之后,该值会默认为100%。 - - `auto` 等于 max(physical_mem * 0.9, physical_mem - 6.4G),6.4G是默认为系统预留的最大内存。 -* 默认值:auto +* 默认值:80% #### `cluster_id`