diff --git a/be/src/exec/exec_node.cpp b/be/src/exec/exec_node.cpp index 97afb5fa05..0a854ba89b 100644 --- a/be/src/exec/exec_node.cpp +++ b/be/src/exec/exec_node.cpp @@ -206,7 +206,7 @@ Status ExecNode::prepare(RuntimeState* state) { std::bind(&RuntimeProfile::units_per_second, _rows_returned_counter, runtime_profile()->total_time_counter()), ""); - _mem_tracker = std::make_unique("ExecNode:" + _runtime_profile->name(), nullptr, + _mem_tracker = std::make_unique("ExecNode:" + _runtime_profile->name(), _runtime_profile.get()); SCOPED_CONSUME_MEM_TRACKER(_mem_tracker.get()); diff --git a/be/src/olap/memtable.cpp b/be/src/olap/memtable.cpp index a1ade071d0..a9675fba7d 100644 --- a/be/src/olap/memtable.cpp +++ b/be/src/olap/memtable.cpp @@ -144,7 +144,9 @@ MemTable::~MemTable() { _mem_tracker->release(_mem_usage); _buffer_mem_pool->free_all(); _table_mem_pool->free_all(); - _mem_tracker->memory_leak_check(); + DCHECK_EQ(_mem_tracker->consumption(), 0) + << std::endl + << MemTracker::log_usage(_mem_tracker->make_snapshot(0)); } MemTable::RowCursorComparator::RowCursorComparator(const Schema* schema) : _schema(schema) {} diff --git a/be/src/olap/schema_change.cpp b/be/src/olap/schema_change.cpp index ee9653203f..ac203e858f 100644 --- a/be/src/olap/schema_change.cpp +++ b/be/src/olap/schema_change.cpp @@ -1364,10 +1364,8 @@ VSchemaChangeWithSorting::VSchemaChangeWithSorting(const RowBlockChanger& row_bl : _changer(row_block_changer), _memory_limitation(memory_limitation), _temp_delta_versions(Version::mock()) { - _mem_tracker = - std::make_unique(fmt::format("VSchemaChangeWithSorting:changer={}", - std::to_string(int64(&row_block_changer))), - StorageEngine::instance()->schema_change_mem_tracker()); + _mem_tracker = std::make_unique(fmt::format( + "VSchemaChangeWithSorting:changer={}", std::to_string(int64(&row_block_changer)))); } Status SchemaChangeWithSorting::_inner_process(RowsetReaderSharedPtr rowset_reader, diff --git a/be/src/runtime/bufferpool/reservation_tracker.cc b/be/src/runtime/bufferpool/reservation_tracker.cc index 6bcd0bf38a..6985edaef7 100644 --- a/be/src/runtime/bufferpool/reservation_tracker.cc +++ b/be/src/runtime/bufferpool/reservation_tracker.cc @@ -73,7 +73,7 @@ void ReservationTracker::InitChildTracker(RuntimeProfile* profile, ReservationTr MemTracker* parent_mem_tracker = GetParentMemTracker(); if (parent_mem_tracker != nullptr) { // Make sure the parent links of the MemTrackers correspond to our parent links. - DCHECK_EQ(parent_mem_tracker, mem_tracker_->parent()); + // DCHECK_EQ(parent_mem_tracker, mem_tracker_->parent()); } else { // Make sure we didn't leave a gap in the links. E.g. this tracker's grandparent // shouldn't have a MemTracker. diff --git a/be/src/runtime/data_stream_recvr.cc b/be/src/runtime/data_stream_recvr.cc index ffd20c58d7..0d456ca74a 100644 --- a/be/src/runtime/data_stream_recvr.cc +++ b/be/src/runtime/data_stream_recvr.cc @@ -461,7 +461,7 @@ DataStreamRecvr::DataStreamRecvr( _profile(profile), _sub_plan_query_statistics_recvr(sub_plan_query_statistics_recvr) { _mem_tracker = std::make_unique( - "DataStreamRecvr:" + print_id(_fragment_instance_id), query_mem_tracker, _profile); + "DataStreamRecvr:" + print_id(_fragment_instance_id), _profile); // Create one queue per sender if is_merging is true. int num_queues = is_merging ? num_senders : 1; diff --git a/be/src/runtime/data_stream_sender.cpp b/be/src/runtime/data_stream_sender.cpp index 2731587e45..13b2308d7b 100644 --- a/be/src/runtime/data_stream_sender.cpp +++ b/be/src/runtime/data_stream_sender.cpp @@ -403,7 +403,7 @@ Status DataStreamSender::prepare(RuntimeState* state) { _profile = _pool->add(new RuntimeProfile(title.str())); SCOPED_TIMER(_profile->total_time_counter()); _mem_tracker = std::make_unique( - "DataStreamSender:" + print_id(state->fragment_instance_id()), nullptr, _profile); + "DataStreamSender:" + print_id(state->fragment_instance_id()), _profile); SCOPED_CONSUME_MEM_TRACKER(_mem_tracker.get()); if (_part_type == TPartitionType::UNPARTITIONED || _part_type == TPartitionType::RANDOM) { diff --git a/be/src/runtime/memory/mem_tracker.cpp b/be/src/runtime/memory/mem_tracker.cpp index 0f4e185e46..ca7e4943ff 100644 --- a/be/src/runtime/memory/mem_tracker.cpp +++ b/be/src/runtime/memory/mem_tracker.cpp @@ -21,37 +21,28 @@ #include "runtime/memory/mem_tracker.h" #include -#include -#include "runtime/memory/mem_tracker_limiter.h" #include "runtime/thread_context.h" #include "util/pretty_printer.h" +#include "util/string_util.h" #include "util/time.h" namespace doris { const std::string MemTracker::COUNTER_NAME = "PeakMemoryUsage"; -using StaticTrackersMap = phmap::parallel_flat_hash_map< - std::string, MemTracker*, phmap::priv::hash_default_hash, - phmap::priv::hash_default_eq, - std::allocator>, 12, std::mutex>; +struct TrackerGroup { + std::list trackers; + std::mutex group_lock; +}; -static StaticTrackersMap _static_mem_trackers; +// Save all MemTrackers in use to maintain the weak relationship between MemTracker and MemTrackerLimiter. +// When MemTrackerLimiter prints statistics, all MemTracker statistics with weak relationship will be printed together. +// Each group corresponds to several MemTrackerLimiters and has a lock. +// Multiple groups are used to reduce the impact of locks. +static std::vector mem_tracker_pool(1000); -MemTracker::MemTracker(const std::string& label, MemTrackerLimiter* parent, RuntimeProfile* profile, - bool is_limiter) { - // Do not check limit exceed when add_child_tracker, otherwise it will cause deadlock when log_usage is called. - STOP_CHECK_THREAD_MEM_TRACKER_LIMIT(); - _parent = parent ? parent : thread_context()->_thread_mem_tracker_mgr->limiter_mem_tracker(); - DCHECK(_parent || label == "Process"); - if (_parent && _parent->label().find("queryId=") != _parent->label().npos) { - // Add the queryId suffix to the tracker below the query. - _label = fmt::format("{}#{}", label, - _parent->label().substr(_parent->label().find("queryId="), -1)); - } else { - _label = label; - } +MemTracker::MemTracker(const std::string& label, RuntimeProfile* profile, bool is_limiter) { if (profile == nullptr) { _consumption = std::make_shared(TUnit::BYTES); } else { @@ -66,36 +57,43 @@ MemTracker::MemTracker(const std::string& label, MemTrackerLimiter* parent, Runt // release(). _consumption = profile->AddSharedHighWaterMarkCounter(COUNTER_NAME, TUnit::BYTES); } + _is_limiter = is_limiter; - if (_parent && !_is_limiter) _parent->add_child(this); + if (!_is_limiter) { + if (thread_context()->_thread_mem_tracker_mgr->limiter_mem_tracker()) { + _label = fmt::format( + "{} | {}", label, + thread_context()->_thread_mem_tracker_mgr->limiter_mem_tracker()->label()); + } else { + _label = label + " | "; + } + + _bind_group_num = + thread_context()->_thread_mem_tracker_mgr->limiter_mem_tracker()->group_num(); + { + std::lock_guard l(mem_tracker_pool[_bind_group_num].group_lock); + _tracker_group_it = mem_tracker_pool[_bind_group_num].trackers.insert( + mem_tracker_pool[_bind_group_num].trackers.end(), this); + } + } else { + _label = label; + } } MemTracker::~MemTracker() { - if (_parent && !_is_limiter) _parent->remove_child(this); -} - -// Count the memory in the scope to a temporary tracker with the specified label name. -// This is very useful when debugging. You can find the position where the tracker statistics are -// inaccurate through the temporary tracker layer by layer. As well as finding memory hotspots. -MemTracker* MemTracker::get_static_mem_tracker(const std::string& label) { - // First time this label registered, make a new object, otherwise do nothing. - // Avoid using locks to resolve erase conflicts. - MemTracker* tracker; - _static_mem_trackers.lazy_emplace_l( - label, [&](MemTracker* v) { tracker = v; }, - [&](const auto& ctor) { - tracker = new MemTracker(fmt::format("[Static]-{}", label)); - ctor(label, tracker); - }); - return tracker; + if (!_is_limiter) { + std::lock_guard l(mem_tracker_pool[_bind_group_num].group_lock); + if (_tracker_group_it != mem_tracker_pool[_bind_group_num].trackers.end()) { + mem_tracker_pool[_bind_group_num].trackers.erase(_tracker_group_it); + _tracker_group_it = mem_tracker_pool[_bind_group_num].trackers.end(); + } + } } MemTracker::Snapshot MemTracker::make_snapshot(size_t level) const { Snapshot snapshot; - snapshot.label = _label; - if (_parent != nullptr) { - snapshot.parent = _parent->label(); - } + snapshot.label = split(_label, " | ")[0]; + snapshot.parent = split(_label, " | ")[1]; snapshot.level = level; snapshot.limit = -1; snapshot.cur_consumption = _consumption->current_value(); @@ -104,15 +102,21 @@ MemTracker::Snapshot MemTracker::make_snapshot(size_t level) const { return snapshot; } -std::string MemTracker::log_usage() { - // Make sure the consumption is up to date. - int64_t curr_consumption = consumption(); - int64_t peak_consumption = _consumption->value(); - if (curr_consumption == 0) return ""; - std::string detail = "MemTracker Label={}, Total={}, Peak={}"; - detail = fmt::format(detail, _label, PrettyPrinter::print(curr_consumption, TUnit::BYTES), - PrettyPrinter::print(peak_consumption, TUnit::BYTES)); - return detail; +void MemTracker::make_group_snapshot(std::vector* snapshots, size_t level, + int64_t group_num, std::string related_label) { + std::lock_guard l(mem_tracker_pool[group_num].group_lock); + for (auto tracker : mem_tracker_pool[group_num].trackers) { + if (split(tracker->label(), " | ")[1] == related_label) { + snapshots->push_back(tracker->make_snapshot(level)); + } + } +} + +std::string MemTracker::log_usage(MemTracker::Snapshot snapshot) { + return fmt::format("MemTracker Label={}, Parent Label={}, Used={}, Peak={}", snapshot.label, + snapshot.parent, + PrettyPrinter::print(snapshot.cur_consumption, TUnit::BYTES), + PrettyPrinter::print(snapshot.peak_consumption, TUnit::BYTES)); } } // namespace doris \ No newline at end of file diff --git a/be/src/runtime/memory/mem_tracker.h b/be/src/runtime/memory/mem_tracker.h index 9f6e021a3c..4a12375344 100644 --- a/be/src/runtime/memory/mem_tracker.h +++ b/be/src/runtime/memory/mem_tracker.h @@ -23,23 +23,18 @@ namespace doris { -class MemTrackerLimiter; - // Used to track memory usage. // // MemTracker can be consumed manually by consume()/release(), or put into SCOPED_CONSUME_MEM_TRACKER, // which will automatically track all memory usage of the code segment where it is located. // -// MemTracker's parent can only be MemTrackerLimiter, which is only used to print tree-like statistics. -// Consuming MemTracker will not consume its parent synchronously. -// Usually, it is not necessary to specify the parent. by default, the MemTrackerLimiter in the thread context -// is used as the parent, which is specified when the thread starts. -// // This class is thread-safe. class MemTracker { public: struct Snapshot { std::string label; + // For MemTracker, it is only weakly related to parent through label, ensuring MemTracker Independence. + // For MemTrackerLimiter, it is strongly related to parent and saves pointer objects to each other. std::string parent = ""; size_t level = 0; int64_t limit = 0; @@ -48,24 +43,18 @@ public: size_t child_count = 0; }; - // Creates and adds the tracker to the tree. - MemTracker(const std::string& label = std::string(), MemTrackerLimiter* parent = nullptr, - RuntimeProfile* profile = nullptr, bool is_limiter = false); + // Creates and adds the tracker to the mem_tracker_pool. + MemTracker(const std::string& label = std::string(), RuntimeProfile* profile = nullptr, + bool is_limiter = false); ~MemTracker(); - // Get a temporary tracker with a specified label, and the tracker will be created when the label is first get. - // Temporary trackers are not automatically destructed, which is usually used for debugging. - static MemTracker* get_static_mem_tracker(const std::string& label); - public: const std::string& label() const { return _label; } - MemTrackerLimiter* parent() const { return _parent; } // Returns the memory consumed in bytes. int64_t consumption() const { return _consumption->current_value(); } int64_t peak_consumption() const { return _consumption->value(); } -public: void consume(int64_t bytes); void release(int64_t bytes) { consume(-bytes); } // Transfer 'bytes' of consumption from this tracker to 'dst'. @@ -78,17 +67,12 @@ public: return limit >= 0 && limit > consumption() + bytes; } - // Usually, a negative values means that the statistics are not accurate, - // 1. The released memory is not consumed. - // 2. The same block of memory, tracker A calls consume, and tracker B calls release. - // 3. Repeated releases of MemTacker. When the consume is called on the child MemTracker, - // after the release is called on the parent MemTracker, - // the child ~MemTracker will cause repeated releases. - void memory_leak_check() { DCHECK_EQ(consumption(), 0) << std::endl << log_usage(); } - Snapshot make_snapshot(size_t level) const; - - std::string log_usage(); + // Specify group_num from mem_tracker_pool to generate snapshot, requiring tracker.label to be related + // with parameter related_label + static void make_group_snapshot(std::vector* snapshots, size_t level, + int64_t group_num, std::string related_label); + static std::string log_usage(MemTracker::Snapshot snapshot); std::string debug_string() { std::stringstream msg; @@ -98,20 +82,22 @@ public: return msg.str(); } - // Iterator into parent_->_child_trackers for this object. Stored to have O(1) remove. - std::list::iterator _child_tracker_it; - static const std::string COUNTER_NAME; protected: - // label used in the usage string (log_usage()) + // label used in the make snapshot, not guaranteed unique. std::string _label; std::shared_ptr _consumption; // in bytes - MemTrackerLimiter* _parent; // The parent of this tracker. + // Tracker is located in group num in mem_tracker_pool + int64_t _bind_group_num; + // Whether is a MemTrackerLimiter bool _is_limiter; + + // Iterator into mem_tracker_pool for this object. Stored to have O(1) remove. + std::list::iterator _tracker_group_it; }; inline void MemTracker::consume(int64_t bytes) { diff --git a/be/src/runtime/memory/mem_tracker_limiter.cpp b/be/src/runtime/memory/mem_tracker_limiter.cpp index 4bdc2a2781..2223fce5a5 100644 --- a/be/src/runtime/memory/mem_tracker_limiter.cpp +++ b/be/src/runtime/memory/mem_tracker_limiter.cpp @@ -30,10 +30,14 @@ namespace doris { MemTrackerLimiter::MemTrackerLimiter(int64_t byte_limit, const std::string& label, MemTrackerLimiter* parent, RuntimeProfile* profile) - : MemTracker(label, parent, profile, true) { - // Walks the MemTrackerLimiter hierarchy and populates _all_ancestors and _limited_ancestors + : MemTracker(label, profile, true) { DCHECK_GE(byte_limit, -1); _limit = byte_limit; + _group_num = GetCurrentTimeMicros() % 1000; + _parent = parent ? parent : thread_context()->_thread_mem_tracker_mgr->limiter_mem_tracker(); + DCHECK(_parent || label == "Process"); + + // Walks the MemTrackerLimiter hierarchy and populates _all_ancestors and _limited_ancestors MemTrackerLimiter* tracker = this; while (tracker != nullptr) { _all_ancestors.push_back(tracker); @@ -59,12 +63,6 @@ void MemTrackerLimiter::add_child(MemTrackerLimiter* tracker) { _had_child_count++; } -void MemTrackerLimiter::add_child(MemTracker* tracker) { - std::lock_guard l(_child_tracker_lock); - tracker->_child_tracker_it = _child_trackers.insert(_child_trackers.end(), tracker); - _had_child_count++; -} - void MemTrackerLimiter::remove_child(MemTrackerLimiter* tracker) { std::lock_guard l(_child_tracker_limiter_lock); if (tracker->_child_tracker_it != _child_tracker_limiters.end()) { @@ -73,19 +71,21 @@ void MemTrackerLimiter::remove_child(MemTrackerLimiter* tracker) { } } -void MemTrackerLimiter::remove_child(MemTracker* tracker) { - std::lock_guard l(_child_tracker_lock); - if (tracker->_child_tracker_it != _child_trackers.end()) { - _child_trackers.erase(tracker->_child_tracker_it); - tracker->_child_tracker_it = _child_trackers.end(); - } +MemTracker::Snapshot MemTrackerLimiter::make_snapshot(size_t level) const { + Snapshot snapshot; + snapshot.label = _label; + snapshot.parent = _parent != nullptr ? _parent->label() : "Root"; + snapshot.level = level; + snapshot.limit = _limit; + snapshot.cur_consumption = _consumption->current_value(); + snapshot.peak_consumption = _consumption->value(); + snapshot.child_count = remain_child_count(); + return snapshot; } void MemTrackerLimiter::make_snapshot(std::vector* snapshots, size_t cur_level, size_t upper_level) const { - Snapshot snapshot = MemTracker::make_snapshot(cur_level); - snapshot.limit = _limit; - snapshot.child_count = remain_child_count(); + Snapshot snapshot = MemTrackerLimiter::make_snapshot(cur_level); (*snapshots).emplace_back(snapshot); if (cur_level < upper_level) { { @@ -94,12 +94,7 @@ void MemTrackerLimiter::make_snapshot(std::vector* snapsho child->make_snapshot(snapshots, cur_level + 1, upper_level); } } - { - std::lock_guard l(_child_tracker_lock); - for (const auto& child : _child_trackers) { - (*snapshots).emplace_back(child->make_snapshot(cur_level + 1)); - } - } + MemTracker::make_group_snapshot(snapshots, cur_level + 1, _group_num, _label); } } @@ -183,8 +178,7 @@ std::string MemTrackerLimiter::log_usage(int max_recursive_depth, int64_t* logge int64_t peak_consumption = _consumption->value(); if (logged_consumption != nullptr) *logged_consumption = curr_consumption; - std::string detail = - "MemTrackerLimiter log_usage Label={}, Limit={}, Total={}, Peak={}, Exceeded={}"; + std::string detail = "MemTrackerLimiter Label={}, Limit={}, Used={}, Peak={}, Exceeded={}"; detail = fmt::format(detail, _label, PrettyPrinter::print(_limit, TUnit::BYTES), PrettyPrinter::print(curr_consumption, TUnit::BYTES), PrettyPrinter::print(peak_consumption, TUnit::BYTES), @@ -201,11 +195,10 @@ std::string MemTrackerLimiter::log_usage(int max_recursive_depth, int64_t* logge child_trackers_usage = log_usage(max_recursive_depth - 1, _child_tracker_limiters, &child_consumption); } - { - std::lock_guard l(_child_tracker_lock); - for (const auto& child : _child_trackers) { - child_trackers_usage += "\n" + child->log_usage(); - } + std::vector snapshots; + MemTracker::make_group_snapshot(&snapshots, 0, _group_num, _label); + for (const auto& snapshot : snapshots) { + child_trackers_usage += MemTracker::log_usage(snapshot); } if (!child_trackers_usage.empty()) detail += "\n" + child_trackers_usage; return detail; diff --git a/be/src/runtime/memory/mem_tracker_limiter.h b/be/src/runtime/memory/mem_tracker_limiter.h index 3852cbe52d..5c41ce7cda 100644 --- a/be/src/runtime/memory/mem_tracker_limiter.h +++ b/be/src/runtime/memory/mem_tracker_limiter.h @@ -45,19 +45,21 @@ public: MemTrackerLimiter(int64_t byte_limit = -1, const std::string& label = std::string(), MemTrackerLimiter* parent = nullptr, RuntimeProfile* profile = nullptr); + // If the final consumption is not as expected, this usually means that the same memory is calling + // consume and release on different trackers. If the two trackers have a parent-child relationship, + // the parent tracker consumption is correct, and the child tracker is wrong; if the two trackers have + // no parent-child relationship, the two tracker consumptions are wrong. ~MemTrackerLimiter(); - void add_child(MemTrackerLimiter* tracker); - void add_child(MemTracker* tracker); - void remove_child(MemTrackerLimiter* tracker); - void remove_child(MemTracker* tracker); + MemTrackerLimiter* parent() const { return _parent; } - // Leaf tracker, without any child - size_t remain_child_count() const { - return _child_tracker_limiters.size() + _child_trackers.size(); - } + void add_child(MemTrackerLimiter* tracker); + void remove_child(MemTrackerLimiter* tracker); + + size_t remain_child_count() const { return _child_tracker_limiters.size(); } size_t had_child_count() const { return _had_child_count; } + Snapshot make_snapshot(size_t level) const; // Returns a list of all the valid tracker snapshots. void make_snapshot(std::vector* snapshots, size_t cur_level, size_t upper_level) const; @@ -76,6 +78,7 @@ public: return Status::OK(); } + int64_t group_num() const { return _group_num; } bool has_limit() const { return _limit >= 0; } int64_t limit() const { return _limit; } void update_limit(int64_t limit) { @@ -181,6 +184,11 @@ private: // Limit on memory consumption, in bytes. If limit_ == -1, there is no consumption limit. Used in log_usage。 int64_t _limit; + // Group number in MemTracker::mem_tracker_pool, generated by the timestamp. + int64_t _group_num; + + MemTrackerLimiter* _parent; // The parent of this tracker. + // this tracker limiter plus all of its ancestors std::vector _all_ancestors; // _all_ancestors with valid limits @@ -192,9 +200,6 @@ private: mutable std::mutex _child_tracker_limiter_lock; std::list _child_tracker_limiters; - mutable std::mutex _child_tracker_lock; - std::list _child_trackers; - // The number of child trackers that have been added. std::atomic_size_t _had_child_count = 0; diff --git a/be/src/runtime/memory/mem_tracker_task_pool.cpp b/be/src/runtime/memory/mem_tracker_task_pool.cpp index 02b38acdb5..24a8c95180 100644 --- a/be/src/runtime/memory/mem_tracker_task_pool.cpp +++ b/be/src/runtime/memory/mem_tracker_task_pool.cpp @@ -68,11 +68,12 @@ MemTrackerLimiter* MemTrackerTaskPool::get_task_mem_tracker(const std::string& t } void MemTrackerTaskPool::logout_task_mem_tracker() { - for (auto it = _task_mem_trackers.begin(); it != _task_mem_trackers.end();) { + std::vector expired_task_ids; + for (auto it = _task_mem_trackers.begin(); it != _task_mem_trackers.end(); it++) { if (!it->second) { // Unknown exception case with high concurrency, after _task_mem_trackers.erase, // the key still exists in _task_mem_trackers. https://github.com/apache/incubator-doris/issues/10006 - _task_mem_trackers._erase(it++); + expired_task_ids.emplace_back(it->first); } else if (it->second->remain_child_count() == 0 && it->second->had_child_count() != 0) { // No RuntimeState uses this task MemTracker, it is only referenced by this map, // and tracker was not created soon, delete it. @@ -90,7 +91,7 @@ void MemTrackerTaskPool::logout_task_mem_tracker() { // the negative number of the current value of consume. it->second->parent()->consumption_revise(-it->second->consumption()); LOG(INFO) << "Deregister query/load memory tracker, queryId/loadId: " << it->first; - _task_mem_trackers._erase(it++); + expired_task_ids.emplace_back(it->first); } else { // Log limit exceeded query tracker. if (it->second->limit_exceeded()) { @@ -99,9 +100,14 @@ void MemTrackerTaskPool::logout_task_mem_tracker() { fmt::format("Task mem limit exceeded but no cancel, queryId:{}", it->first), 0, Status::OK()); } - ++it; } } + for (auto tid : expired_task_ids) { + // Verify the condition again to make sure the tracker is not being used again. + _task_mem_trackers.erase_if(tid, [&](std::shared_ptr v) { + return !v || v->remain_child_count() == 0; + }); + } } // TODO(zxy) More observable methods diff --git a/be/src/runtime/memory/thread_mem_tracker_mgr.h b/be/src/runtime/memory/thread_mem_tracker_mgr.h index 1862c2830a..2d23388a81 100644 --- a/be/src/runtime/memory/thread_mem_tracker_mgr.h +++ b/be/src/runtime/memory/thread_mem_tracker_mgr.h @@ -124,7 +124,8 @@ public: std::string print_debug_string() { fmt::memory_buffer consumer_tracker_buf; for (const auto& v : _consumer_tracker_stack) { - fmt::format_to(consumer_tracker_buf, "{}, ", v->log_usage()); + fmt::format_to(consumer_tracker_buf, "{}, ", + MemTracker::log_usage(v->make_snapshot(0))); } return fmt::format( "ThreadMemTrackerMgr debug, _untracked_mem:{}, _task_id:{}, " diff --git a/be/src/runtime/plan_fragment_executor.cpp b/be/src/runtime/plan_fragment_executor.cpp index f098de7541..5b5aaae861 100644 --- a/be/src/runtime/plan_fragment_executor.cpp +++ b/be/src/runtime/plan_fragment_executor.cpp @@ -36,6 +36,7 @@ #include "runtime/result_buffer_mgr.h" #include "runtime/result_queue_mgr.h" #include "runtime/row_batch.h" +#include "runtime/runtime_filter_mgr.h" #include "runtime/thread_context.h" #include "util/container_util.hpp" #include "util/defer_op.h" @@ -99,6 +100,7 @@ Status PlanFragmentExecutor::prepare(const TExecPlanFragmentParams& request, RETURN_IF_ERROR(_runtime_state->init_mem_trackers(_query_id)); SCOPED_ATTACH_TASK(_runtime_state.get()); + _runtime_state->runtime_filter_mgr()->init(); _runtime_state->set_be_number(request.backend_num); if (request.__isset.backend_id) { _runtime_state->set_backend_id(request.backend_id); diff --git a/be/src/runtime/runtime_filter_mgr.cpp b/be/src/runtime/runtime_filter_mgr.cpp index d231cd75c8..dd58feb1e2 100644 --- a/be/src/runtime/runtime_filter_mgr.cpp +++ b/be/src/runtime/runtime_filter_mgr.cpp @@ -44,9 +44,9 @@ RuntimeFilterMgr::RuntimeFilterMgr(const UniqueId& query_id, RuntimeState* state RuntimeFilterMgr::~RuntimeFilterMgr() {} -Status RuntimeFilterMgr::init(MemTrackerLimiter* parent_tracker) { +Status RuntimeFilterMgr::init() { DCHECK(_state->instance_mem_tracker() != nullptr); - _tracker = std::make_unique("RuntimeFilterMgr", parent_tracker); + _tracker = std::make_unique("RuntimeFilterMgr"); return Status::OK(); } diff --git a/be/src/runtime/runtime_filter_mgr.h b/be/src/runtime/runtime_filter_mgr.h index 340e9b27c7..1471f664f7 100644 --- a/be/src/runtime/runtime_filter_mgr.h +++ b/be/src/runtime/runtime_filter_mgr.h @@ -57,7 +57,7 @@ public: ~RuntimeFilterMgr(); - Status init(MemTrackerLimiter* parent_tracker); + Status init(); // get a consumer filter by filter-id Status get_consume_filter(const int filter_id, IRuntimeFilter** consumer_filter); diff --git a/be/src/runtime/runtime_state.cpp b/be/src/runtime/runtime_state.cpp index f3284d05f6..4c3d114447 100644 --- a/be/src/runtime/runtime_state.cpp +++ b/be/src/runtime/runtime_state.cpp @@ -258,9 +258,6 @@ Status RuntimeState::init_mem_trackers(const TUniqueId& query_id) { _instance_buffer_reservation->InitChildTracker(&_profile, _buffer_reservation, std::numeric_limits::max()); } - - // filter manager depends _instance_mem_tracker - _runtime_filter_mgr->init(_instance_mem_tracker.get()); return Status::OK(); } diff --git a/be/src/vec/exec/join/vhash_join_node.cpp b/be/src/vec/exec/join/vhash_join_node.cpp index f076f251ec..c88fbbb683 100644 --- a/be/src/vec/exec/join/vhash_join_node.cpp +++ b/be/src/vec/exec/join/vhash_join_node.cpp @@ -855,7 +855,7 @@ Status HashJoinNode::prepare(RuntimeState* state) { std::bind(&RuntimeProfile::units_per_second, _rows_returned_counter, runtime_profile()->total_time_counter()), ""); - _mem_tracker = std::make_unique("ExecNode:" + _runtime_profile->name(), nullptr, + _mem_tracker = std::make_unique("ExecNode:" + _runtime_profile->name(), _runtime_profile.get()); SCOPED_CONSUME_MEM_TRACKER(mem_tracker()); diff --git a/be/src/vec/runtime/vdata_stream_recvr.cpp b/be/src/vec/runtime/vdata_stream_recvr.cpp index bce11043ac..020f188cac 100644 --- a/be/src/vec/runtime/vdata_stream_recvr.cpp +++ b/be/src/vec/runtime/vdata_stream_recvr.cpp @@ -263,10 +263,9 @@ VDataStreamRecvr::VDataStreamRecvr( _num_buffered_bytes(0), _profile(profile), _sub_plan_query_statistics_recvr(sub_plan_query_statistics_recvr) { - // DataStreamRecvr may be destructed after the instance execution thread ends, `instance_mem_tracker` - // will be a null pointer, and remove_child fails when _mem_tracker is destructed. + // DataStreamRecvr may be destructed after the instance execution thread ends. _mem_tracker = std::make_unique( - "VDataStreamRecvr:" + print_id(_fragment_instance_id), query_mem_tracker, _profile); + "VDataStreamRecvr:" + print_id(_fragment_instance_id), _profile); SCOPED_CONSUME_MEM_TRACKER(_mem_tracker.get()); // Create one queue per sender if is_merging is true. diff --git a/be/src/vec/sink/vdata_stream_sender.cpp b/be/src/vec/sink/vdata_stream_sender.cpp index c378b3c33d..e137d3ca18 100644 --- a/be/src/vec/sink/vdata_stream_sender.cpp +++ b/be/src/vec/sink/vdata_stream_sender.cpp @@ -397,7 +397,7 @@ Status VDataStreamSender::prepare(RuntimeState* state) { _profile = _pool->add(new RuntimeProfile(std::move(title))); SCOPED_TIMER(_profile->total_time_counter()); _mem_tracker = std::make_unique( - "VDataStreamSender:" + print_id(state->fragment_instance_id()), nullptr, _profile); + "VDataStreamSender:" + print_id(state->fragment_instance_id()), _profile); SCOPED_CONSUME_MEM_TRACKER(_mem_tracker.get()); if (_part_type == TPartitionType::UNPARTITIONED || _part_type == TPartitionType::RANDOM) {