[fix](memory) Independent count exec node memory profile (#22598)

Independent count exec node memory profile, after #22582
This commit is contained in:
Xinyi Zou
2023-08-06 10:56:31 +08:00
committed by GitHub
parent 1073a924c1
commit 96f42ca20a
12 changed files with 48 additions and 91 deletions

View File

@ -90,6 +90,7 @@ ExecNode::ExecNode(ObjectPool* pool, const TPlanNode& tnode, const DescriptorTbl
_rows_returned_counter(nullptr),
_rows_returned_rate(nullptr),
_memory_used_counter(nullptr),
_peak_memory_usage_counter(nullptr),
_is_closed(false),
_ref(0) {
if (tnode.__isset.output_tuple_id) {
@ -134,8 +135,10 @@ Status ExecNode::prepare(RuntimeState* state) {
std::bind<int64_t>(&RuntimeProfile::units_per_second, _rows_returned_counter,
runtime_profile()->total_time_counter()),
"");
_mem_tracker = std::make_unique<MemTracker>("ExecNode:" + _runtime_profile->name(),
_runtime_profile.get(), nullptr, "PeakMemoryUsage");
_memory_used_counter = ADD_LABEL_COUNTER(runtime_profile(), "MemoryUsage");
_peak_memory_usage_counter = _runtime_profile->AddHighWaterMarkCounter(
"PeakMemoryUsage", TUnit::BYTES, "MemoryUsage");
_mem_tracker = std::make_unique<MemTracker>("ExecNode:" + _runtime_profile->name());
for (auto& conjunct : _conjuncts) {
RETURN_IF_ERROR(conjunct->prepare(state, intermediate_row_desc()));
@ -204,6 +207,9 @@ Status ExecNode::close(RuntimeState* state) {
result = st;
}
}
if (_peak_memory_usage_counter) {
_peak_memory_usage_counter->set(_mem_tracker->peak_consumption());
}
release_resource(state);
return result;
}
@ -572,6 +578,7 @@ Status ExecNode::get_next_after_projects(
if (UNLIKELY(!status.ok())) return status;
return do_projections(&_origin_block, block);
}
_peak_memory_usage_counter->set(_mem_tracker->peak_consumption());
return func(state, block, eos);
}

View File

@ -277,9 +277,10 @@ protected:
RuntimeProfile::Counter* _rows_returned_counter;
RuntimeProfile::Counter* _rows_returned_rate;
// Account for peak memory used by this node
RuntimeProfile::Counter* _memory_used_counter;
RuntimeProfile::Counter* _projection_timer;
// Account for peak memory used by this node
RuntimeProfile::Counter* _peak_memory_usage_counter;
//
OpentelemetrySpan _span;

View File

@ -38,26 +38,6 @@ bvar::Adder<int64_t> g_memtracker_cnt("memtracker_cnt");
// Multiple groups are used to reduce the impact of locks.
std::vector<MemTracker::TrackerGroup> MemTracker::mem_tracker_pool(1000);
MemTracker::MemTracker(const std::string& label, RuntimeProfile* profile, MemTrackerLimiter* parent,
const std::string& profile_counter_name)
: _label(label) {
_consumption = std::make_shared<MemCounter>();
if (profile != nullptr) {
// By default, memory consumption is tracked via calls to consume()/release(), either to
// the tracker itself or to one of its descendents. Alternatively, a consumption metric
// can be specified, and then the metric's value is used as the consumption rather than
// the tally maintained by consume() and release(). A tcmalloc metric is used to track
// process memory consumption, since the process memory usage may be higher than the
// computed total memory (tcmalloc does not release deallocated memory immediately).
// Other consumption metrics are used in trackers below the process level to account
// for memory (such as free buffer pool buffers) that is not tracked by consume() and
// release().
_profile_counter =
profile->AddSharedHighWaterMarkCounter(profile_counter_name, TUnit::BYTES);
}
bind_parent(parent); // at the end
}
MemTracker::MemTracker(const std::string& label, MemTrackerLimiter* parent) : _label(label) {
_consumption = std::make_shared<MemCounter>();
bind_parent(parent);
@ -90,15 +70,6 @@ MemTracker::~MemTracker() {
}
}
void MemTracker::refresh_all_tracker_profile() {
for (unsigned i = 0; i < mem_tracker_pool.size(); ++i) {
std::lock_guard<std::mutex> l(mem_tracker_pool[i].group_lock);
for (auto tracker : mem_tracker_pool[i].trackers) {
tracker->refresh_profile_counter();
}
}
}
MemTracker::Snapshot MemTracker::make_snapshot() const {
Snapshot snapshot;
snapshot.label = _label;

View File

@ -34,7 +34,6 @@
// IWYU pragma: no_include <opentelemetry/common/threadlocal.h>
#include "common/compiler_util.h" // IWYU pragma: keep
#include "util/pretty_printer.h"
#include "util/runtime_profile.h"
namespace doris {
@ -114,8 +113,6 @@ public:
};
// Creates and adds the tracker to the mem_tracker_pool.
MemTracker(const std::string& label, RuntimeProfile* profile, MemTrackerLimiter* parent,
const std::string& profile_counter_name);
MemTracker(const std::string& label, MemTrackerLimiter* parent = nullptr);
// For MemTrackerLimiter
MemTracker() { _parent_group_num = -1; }
@ -150,14 +147,6 @@ public:
void set_consumption(int64_t bytes) { _consumption->set(bytes); }
void refresh_profile_counter() {
if (_profile_counter) {
_profile_counter->set(_consumption->current_value());
}
}
static void refresh_all_tracker_profile();
public:
virtual Snapshot make_snapshot() const;
// Specify group_num from mem_tracker_pool to generate snapshot.
@ -180,7 +169,6 @@ protected:
std::string _label;
std::shared_ptr<MemCounter> _consumption;
std::shared_ptr<RuntimeProfile::HighWaterMarkCounter> _profile_counter;
// Tracker is located in group num in mem_tracker_pool
int64_t _parent_group_num = 0;

View File

@ -61,15 +61,9 @@ static RuntimeProfile::Counter* freed_memory_counter =
static RuntimeProfile::Counter* cancel_tasks_counter =
ADD_COUNTER(free_top_memory_task_profile, "CancelTasksNum", TUnit::UNIT);
MemTrackerLimiter::MemTrackerLimiter(Type type, const std::string& label, int64_t byte_limit,
RuntimeProfile* profile,
const std::string& profile_counter_name) {
MemTrackerLimiter::MemTrackerLimiter(Type type, const std::string& label, int64_t byte_limit) {
DCHECK_GE(byte_limit, -1);
_consumption = std::make_shared<MemCounter>();
if (profile != nullptr) {
_profile_counter =
profile->AddSharedHighWaterMarkCounter(profile_counter_name, TUnit::BYTES);
}
_type = type;
_label = label;
_limit = byte_limit;
@ -130,15 +124,6 @@ void MemTrackerLimiter::refresh_global_counter() {
}
}
void MemTrackerLimiter::refresh_all_tracker_profile() {
for (unsigned i = 0; i < mem_tracker_limiter_pool.size(); ++i) {
std::lock_guard<std::mutex> l(mem_tracker_limiter_pool[i].group_lock);
for (auto tracker : mem_tracker_limiter_pool[i].trackers) {
tracker->refresh_profile_counter();
}
}
}
void MemTrackerLimiter::make_process_snapshots(std::vector<MemTracker::Snapshot>* snapshots) {
MemTrackerLimiter::refresh_global_counter();
int64_t process_mem_sum = 0;
@ -148,7 +133,7 @@ void MemTrackerLimiter::make_process_snapshots(std::vector<MemTracker::Snapshot>
snapshot.label = "";
snapshot.limit = -1;
snapshot.cur_consumption = it.second->current_value();
snapshot.peak_consumption = it.second->value();
snapshot.peak_consumption = it.second->peak_value();
(*snapshots).emplace_back(snapshot);
process_mem_sum += it.second->current_value();
}

View File

@ -36,12 +36,13 @@
#include "common/config.h"
#include "common/status.h"
#include "runtime/memory/mem_tracker.h"
#include "util/runtime_profile.h"
#include "util/string_util.h"
#include "util/uid_util.h"
namespace doris {
class RuntimeProfile;
constexpr auto MEM_TRACKER_GROUP_NUM = 1000;
namespace taskgroup {
@ -74,29 +75,20 @@ public:
std::mutex group_lock;
};
inline static std::unordered_map<Type, std::shared_ptr<RuntimeProfile::HighWaterMarkCounter>>
TypeMemSum = {{Type::GLOBAL,
std::make_shared<RuntimeProfile::HighWaterMarkCounter>(TUnit::BYTES)},
{Type::QUERY,
std::make_shared<RuntimeProfile::HighWaterMarkCounter>(TUnit::BYTES)},
{Type::LOAD,
std::make_shared<RuntimeProfile::HighWaterMarkCounter>(TUnit::BYTES)},
{Type::COMPACTION,
std::make_shared<RuntimeProfile::HighWaterMarkCounter>(TUnit::BYTES)},
{Type::SCHEMA_CHANGE,
std::make_shared<RuntimeProfile::HighWaterMarkCounter>(TUnit::BYTES)},
{Type::CLONE,
std::make_shared<RuntimeProfile::HighWaterMarkCounter>(TUnit::BYTES)},
{Type::EXPERIMENTAL,
std::make_shared<RuntimeProfile::HighWaterMarkCounter>(TUnit::BYTES)}};
inline static std::unordered_map<Type, std::shared_ptr<MemCounter>> TypeMemSum = {
{Type::GLOBAL, std::make_shared<MemCounter>()},
{Type::QUERY, std::make_shared<MemCounter>()},
{Type::LOAD, std::make_shared<MemCounter>()},
{Type::COMPACTION, std::make_shared<MemCounter>()},
{Type::SCHEMA_CHANGE, std::make_shared<MemCounter>()},
{Type::CLONE, std::make_shared<MemCounter>()},
{Type::EXPERIMENTAL, std::make_shared<MemCounter>()}};
public:
// byte_limit equal to -1 means no consumption limit, only participate in process memory statistics.
MemTrackerLimiter(Type type, const std::string& label = std::string(), int64_t byte_limit = -1,
RuntimeProfile* profile = nullptr,
const std::string& profile_counter_name = "PeakMemoryUsage");
MemTrackerLimiter(Type type, const std::string& label = std::string(), int64_t byte_limit = -1);
~MemTrackerLimiter();
~MemTrackerLimiter() override;
static std::string type_string(Type type) {
switch (type) {
@ -151,7 +143,6 @@ public:
}
static void refresh_global_counter();
static void refresh_all_tracker_profile();
Snapshot make_snapshot() const override;
// Returns a list of all the valid tracker snapshots.

View File

@ -297,12 +297,12 @@ VDataStreamRecvr::VDataStreamRecvr(
_is_merging(is_merging),
_is_closed(false),
_profile(profile),
_peak_memory_usage_counter(nullptr),
_sub_plan_query_statistics_recvr(sub_plan_query_statistics_recvr),
_enable_pipeline(state->enable_pipeline_exec()) {
// DataStreamRecvr may be destructed after the instance execution thread ends.
_mem_tracker =
std::make_unique<MemTracker>("VDataStreamRecvr:" + print_id(_fragment_instance_id),
_profile, nullptr, "PeakMemoryUsage");
std::make_unique<MemTracker>("VDataStreamRecvr:" + print_id(_fragment_instance_id));
SCOPED_CONSUME_MEM_TRACKER(_mem_tracker.get());
// Create one queue per sender if is_merging is true.
@ -322,6 +322,8 @@ VDataStreamRecvr::VDataStreamRecvr(
// Initialize the counters
_memory_usage_counter = ADD_LABEL_COUNTER(_profile, "MemoryUsage");
_blocks_memory_usage = _profile->AddHighWaterMarkCounter("Blocks", TUnit::BYTES, "MemoryUsage");
_peak_memory_usage_counter =
_profile->AddHighWaterMarkCounter("PeakMemoryUsage", TUnit::BYTES, "MemoryUsage");
_bytes_received_counter = ADD_COUNTER(_profile, "BytesReceived", TUnit::BYTES);
_local_bytes_received_counter = ADD_COUNTER(_profile, "LocalBytesReceived", TUnit::BYTES);
@ -385,6 +387,7 @@ bool VDataStreamRecvr::ready_to_read() {
}
Status VDataStreamRecvr::get_next(Block* block, bool* eos) {
_peak_memory_usage_counter->set(_mem_tracker->peak_consumption());
if (!_is_merging) {
block->clear();
return _sender_queues[0]->get_batch(block, eos);
@ -418,6 +421,9 @@ void VDataStreamRecvr::close() {
_mgr = nullptr;
_merger.reset();
if (_peak_memory_usage_counter) {
_peak_memory_usage_counter->set(_mem_tracker->peak_consumption());
}
}
} // namespace doris::vectorized

View File

@ -154,6 +154,7 @@ private:
RuntimeProfile::Counter* _decompress_bytes;
RuntimeProfile::Counter* _memory_usage_counter;
RuntimeProfile::HighWaterMarkCounter* _blocks_memory_usage;
RuntimeProfile::Counter* _peak_memory_usage_counter;
std::shared_ptr<QueryStatisticsRecvr> _sub_plan_query_statistics_recvr;

View File

@ -401,6 +401,7 @@ VDataStreamSender::VDataStreamSender(ObjectPool* pool, const RowDescriptor& row_
_split_block_hash_compute_timer(nullptr),
_split_block_distribute_by_channel_timer(nullptr),
_blocks_sent_counter(nullptr),
_peak_memory_usage_counter(nullptr),
_local_bytes_send_counter(nullptr),
_dest_node_id(0) {
_cur_pb_block = &_pb_block1;
@ -438,9 +439,8 @@ Status VDataStreamSender::prepare(RuntimeState* state) {
_dest_node_id, instances);
_profile = _pool->add(new RuntimeProfile(title));
SCOPED_TIMER(_profile->total_time_counter());
_mem_tracker = std::make_unique<MemTracker>(
"VDataStreamSender:" + print_id(state->fragment_instance_id()), _profile, nullptr,
"PeakMemoryUsage");
_mem_tracker = std::make_unique<MemTracker>("VDataStreamSender:" +
print_id(state->fragment_instance_id()));
SCOPED_CONSUME_MEM_TRACKER(_mem_tracker.get());
if (_part_type == TPartitionType::UNPARTITIONED || _part_type == TPartitionType::RANDOM) {
@ -475,6 +475,9 @@ Status VDataStreamSender::prepare(RuntimeState* state) {
profile()->total_time_counter()),
"");
_local_bytes_send_counter = ADD_COUNTER(profile(), "LocalBytesSent", TUnit::BYTES);
_memory_usage_counter = ADD_LABEL_COUNTER(profile(), "MemoryUsage");
_peak_memory_usage_counter =
profile()->AddHighWaterMarkCounter("PeakMemoryUsage", TUnit::BYTES, "MemoryUsage");
return Status::OK();
}
@ -504,6 +507,7 @@ void VDataStreamSender::_handle_eof_channel(RuntimeState* state, ChannelPtrType
Status VDataStreamSender::send(RuntimeState* state, Block* block, bool eos) {
SCOPED_TIMER(_profile->total_time_counter());
_peak_memory_usage_counter->set(_mem_tracker->peak_consumption());
bool all_receiver_eof = true;
for (auto channel : _channels) {
if (!channel->is_receiver_eof()) {
@ -719,6 +723,9 @@ Status VDataStreamSender::close(RuntimeState* state, Status exec_status) {
}
}
if (_peak_memory_usage_counter) {
_peak_memory_usage_counter->set(_mem_tracker->peak_consumption());
}
DataSink::close(state, exec_status);
return final_st;
}

View File

@ -189,6 +189,8 @@ protected:
RuntimeProfile::Counter* _split_block_distribute_by_channel_timer;
RuntimeProfile::Counter* _blocks_sent_counter;
RuntimeProfile::Counter* _merge_block_timer;
RuntimeProfile::Counter* _memory_usage_counter;
RuntimeProfile::Counter* _peak_memory_usage_counter;
std::unique_ptr<MemTracker> _mem_tracker;

View File

@ -182,8 +182,7 @@ There are two ways to configure BE configuration items:
* Type: string
* Description: Limit the percentage of the server's maximum memory used by the BE process. It is used to prevent BE memory from occupying to many the machine's memory. This parameter must be greater than 0. When the percentage is greater than 100%, the value will default to 100%.
- `auto` means process mem limit is equal to max(physical_mem * 0.9, physical_mem - 6.4G), 6.4G is the maximum memory reserved for the system by default.
* Default value: auto
* Default value: 80%
#### `cluster_id`

View File

@ -191,8 +191,7 @@ BE 重启后该配置将失效。如果想持久化修改结果,使用如下
* 类型:string
* 描述:限制BE进程使用服务器最大内存百分比。用于防止BE内存挤占太多的机器内存,该参数必须大于0,当百分大于100%之后,该值会默认为100%。
- `auto` 等于 max(physical_mem * 0.9, physical_mem - 6.4G),6.4G是默认为系统预留的最大内存。
* 默认值:auto
* 默认值:80%
#### `cluster_id`