From 97fcad76f83adccc43eaff995bf1d570ded30605 Mon Sep 17 00:00:00 2001 From: Xinyi Zou Date: Mon, 16 Jan 2023 16:30:35 +0800 Subject: [PATCH] [enhancement](memtracker) Improve readability (#15716) --- be/src/exec/exec_node.cpp | 15 ++----------- be/src/exec/exec_node.h | 12 ++++------- be/src/http/default_path_handlers.cpp | 6 ++---- be/src/olap/delta_writer.cpp | 4 ++-- be/src/olap/memtable.cpp | 4 ++-- be/src/olap/olap_server.cpp | 10 ++------- be/src/olap/push_handler.cpp | 4 ++-- be/src/olap/storage_engine.cpp | 7 ++----- be/src/olap/storage_engine.h | 2 -- be/src/olap/tablet_manager.cpp | 11 +++++++++- be/src/olap/tablet_manager.h | 1 + be/src/olap/task/engine_batch_load_task.cpp | 2 +- be/src/olap/task/engine_checksum_task.cpp | 2 +- be/src/pipeline/exec/operator.h | 10 +++++---- be/src/pipeline/pipeline_fragment_context.cpp | 1 - be/src/runtime/exec_env.h | 7 +++---- be/src/runtime/exec_env_init.cpp | 12 ++++++++--- be/src/runtime/load_channel_mgr.cpp | 7 +++---- be/src/runtime/load_channel_mgr.h | 14 ++++++------- be/src/runtime/memory/mem_tracker.cpp | 14 ++++++++++--- be/src/runtime/memory/mem_tracker.h | 14 ++++++------- be/src/runtime/memory/mem_tracker_limiter.cpp | 21 ++++++++++++------- be/src/runtime/memory/mem_tracker_limiter.h | 15 ++++++------- be/src/runtime/plan_fragment_executor.cpp | 1 - be/src/runtime/runtime_filter_mgr.cpp | 6 ++++-- be/src/runtime/runtime_state.cpp | 2 -- be/src/runtime/runtime_state.h | 8 ------- be/src/vec/exec/join/vhash_join_node.cpp | 2 -- be/src/vec/exec/join/vjoin_node_base.cpp | 2 -- .../vec/exec/join/vnested_loop_join_node.cpp | 3 --- be/src/vec/exec/scan/new_es_scan_node.cpp | 1 - be/src/vec/exec/scan/new_jdbc_scan_node.cpp | 1 - be/src/vec/exec/scan/new_odbc_scan_node.cpp | 1 - be/src/vec/exec/scan/new_olap_scan_node.cpp | 1 - be/src/vec/exec/scan/scanner_scheduler.cpp | 1 - be/src/vec/exec/scan/vscan_node.cpp | 4 ---- be/src/vec/exec/vaggregation_node.cpp | 14 +++++-------- be/src/vec/exec/vanalytic_eval_node.cpp | 16 ++------------ be/src/vec/exec/vbroker_scan_node.cpp | 4 ---- be/src/vec/exec/vexchange_node.cpp | 3 --- be/src/vec/exec/vmysql_scan_node.cpp | 2 -- be/src/vec/exec/vrepeat_node.cpp | 2 -- be/src/vec/exec/vschema_scan_node.cpp | 2 -- be/src/vec/exec/vset_operation_node.cpp | 2 -- be/src/vec/exec/vsort_node.cpp | 6 +----- be/src/vec/exec/vtable_function_node.cpp | 1 - be/src/vec/runtime/vdata_stream_recvr.cpp | 5 +++-- be/src/vec/sink/vdata_stream_sender.cpp | 3 ++- be/test/testutil/run_all_tests.cpp | 5 +---- 49 files changed, 114 insertions(+), 179 deletions(-) diff --git a/be/src/exec/exec_node.cpp b/be/src/exec/exec_node.cpp index 9bd0c5c494..e0c85d9816 100644 --- a/be/src/exec/exec_node.cpp +++ b/be/src/exec/exec_node.cpp @@ -142,18 +142,8 @@ Status ExecNode::prepare(RuntimeState* state) { std::bind(&RuntimeProfile::units_per_second, _rows_returned_counter, runtime_profile()->total_time_counter()), ""); - _mem_tracker_held = - std::make_unique("ExecNode:" + _runtime_profile->name(), - _runtime_profile.get(), nullptr, "PeakMemoryUsage"); - // Only when the query profile is enabled, the node allocated memory will be track through the mem hook, - // otherwise _mem_tracker_growh is nullptr, and SCOPED_CONSUME_MEM_TRACKER will do nothing. - if (state->query_options().__isset.is_report_success && - state->query_options().is_report_success) { - _mem_tracker_growh = std::make_shared( - "ExecNode:MemoryOnlyTrackAlloc:" + _runtime_profile->name(), _runtime_profile.get(), - nullptr, "MemoryOnlyTrackAllocNoConsiderFree", true); - } - SCOPED_CONSUME_MEM_TRACKER(mem_tracker_growh()); + _mem_tracker = std::make_unique("ExecNode:" + _runtime_profile->name(), + _runtime_profile.get(), nullptr, "PeakMemoryUsage"); if (_vconjunct_ctx_ptr) { RETURN_IF_ERROR((*_vconjunct_ctx_ptr)->prepare(state, intermediate_row_desc())); @@ -176,7 +166,6 @@ Status ExecNode::prepare(RuntimeState* state) { } Status ExecNode::alloc_resource(doris::RuntimeState* state) { - SCOPED_CONSUME_MEM_TRACKER(mem_tracker_growh()); if (_vconjunct_ctx_ptr) { RETURN_IF_ERROR((*_vconjunct_ctx_ptr)->open(state)); } diff --git a/be/src/exec/exec_node.h b/be/src/exec/exec_node.h index 353db077f3..76ab99fd42 100644 --- a/be/src/exec/exec_node.h +++ b/be/src/exec/exec_node.h @@ -229,9 +229,7 @@ public: RuntimeProfile* runtime_profile() const { return _runtime_profile.get(); } RuntimeProfile::Counter* memory_used_counter() const { return _memory_used_counter; } - MemTracker* mem_tracker_held() const { return _mem_tracker_held.get(); } - MemTracker* mem_tracker_growh() const { return _mem_tracker_growh.get(); } - std::shared_ptr mem_tracker_growh_shared() const { return _mem_tracker_growh; } + MemTracker* mem_tracker() const { return _mem_tracker.get(); } OpentelemetrySpan get_next_span() { return _get_next_span; } @@ -279,11 +277,9 @@ protected: std::unique_ptr _runtime_profile; - // Record the memory size held by this node. - std::unique_ptr _mem_tracker_held; - // Record the memory size allocated by this node. - // Similar to tcmalloc heap profile growh, only track memory alloc, not track memory free. - std::shared_ptr _mem_tracker_growh; + // Record this node memory size. it is expected that artificial guarantees are accurate, + // which will providea reference for operator memory. + std::unique_ptr _mem_tracker; RuntimeProfile::Counter* _rows_returned_counter; RuntimeProfile::Counter* _rows_returned_rate; diff --git a/be/src/http/default_path_handlers.cpp b/be/src/http/default_path_handlers.cpp index 42a29d21e9..e0ddfbbb37 100644 --- a/be/src/http/default_path_handlers.cpp +++ b/be/src/http/default_path_handlers.cpp @@ -147,11 +147,9 @@ void mem_tracker_handler(const WebPageHandler::ArgumentMap& args, std::stringstr MemTrackerLimiter::Type::SCHEMA_CHANGE); } else if (iter->second == "clone") { MemTrackerLimiter::make_type_snapshots(&snapshots, MemTrackerLimiter::Type::CLONE); - } else if (iter->second == "batch_load") { - MemTrackerLimiter::make_type_snapshots(&snapshots, MemTrackerLimiter::Type::BATCHLOAD); - } else if (iter->second == "consistency") { + } else if (iter->second == "experimental") { MemTrackerLimiter::make_type_snapshots(&snapshots, - MemTrackerLimiter::Type::CONSISTENCY); + MemTrackerLimiter::Type::EXPERIMENTAL); } } else { (*output) << "

*Note: (see documentation for details)

\n"; diff --git a/be/src/olap/delta_writer.cpp b/be/src/olap/delta_writer.cpp index c001b67cad..101b46bf6a 100644 --- a/be/src/olap/delta_writer.cpp +++ b/be/src/olap/delta_writer.cpp @@ -245,11 +245,11 @@ void DeltaWriter::_reset_mem_table() { auto mem_table_insert_tracker = std::make_shared( fmt::format("MemTableManualInsert:TabletId={}:MemTableNum={}#loadID={}", std::to_string(tablet_id()), _mem_table_num, _load_id.to_string()), - nullptr, ExecEnv::GetInstance()->load_channel_mgr()->mem_tracker_set()); + ExecEnv::GetInstance()->load_channel_mgr()->mem_tracker()); auto mem_table_flush_tracker = std::make_shared( fmt::format("MemTableHookFlush:TabletId={}:MemTableNum={}#loadID={}", std::to_string(tablet_id()), _mem_table_num++, _load_id.to_string()), - nullptr, ExecEnv::GetInstance()->load_channel_mgr()->mem_tracker_set()); + ExecEnv::GetInstance()->load_channel_mgr()->mem_tracker()); #else auto mem_table_insert_tracker = std::make_shared( fmt::format("MemTableManualInsert:TabletId={}:MemTableNum={}#loadID={}", diff --git a/be/src/olap/memtable.cpp b/be/src/olap/memtable.cpp index ea1188e874..53d785f2c5 100644 --- a/be/src/olap/memtable.cpp +++ b/be/src/olap/memtable.cpp @@ -57,8 +57,8 @@ MemTable::MemTable(TabletSharedPtr tablet, Schema* schema, const TabletSchema* t _cur_max_version(cur_max_version) { #ifndef BE_TEST _insert_mem_tracker_use_hook = std::make_unique( - fmt::format("MemTableHookInsert:TabletId={}", std::to_string(tablet_id())), nullptr, - ExecEnv::GetInstance()->load_channel_mgr()->mem_tracker_set()); + fmt::format("MemTableHookInsert:TabletId={}", std::to_string(tablet_id())), + ExecEnv::GetInstance()->load_channel_mgr()->mem_tracker()); #else _insert_mem_tracker_use_hook = std::make_unique( fmt::format("MemTableHookInsert:TabletId={}", std::to_string(tablet_id()))); diff --git a/be/src/olap/olap_server.cpp b/be/src/olap/olap_server.cpp index 2d5d12c860..7ab1bf1a97 100644 --- a/be/src/olap/olap_server.cpp +++ b/be/src/olap/olap_server.cpp @@ -120,20 +120,14 @@ Status StorageEngine::start_bg_threads() { scoped_refptr path_scan_thread; RETURN_IF_ERROR(Thread::create( "StorageEngine", "path_scan_thread", - [this, data_dir]() { - SCOPED_CONSUME_MEM_TRACKER(_mem_tracker); - this->_path_scan_thread_callback(data_dir); - }, + [this, data_dir]() { this->_path_scan_thread_callback(data_dir); }, &path_scan_thread)); _path_scan_threads.emplace_back(path_scan_thread); scoped_refptr path_gc_thread; RETURN_IF_ERROR(Thread::create( "StorageEngine", "path_gc_thread", - [this, data_dir]() { - SCOPED_CONSUME_MEM_TRACKER(_mem_tracker); - this->_path_gc_thread_callback(data_dir); - }, + [this, data_dir]() { this->_path_gc_thread_callback(data_dir); }, &path_gc_thread)); _path_gc_threads.emplace_back(path_gc_thread); } diff --git a/be/src/olap/push_handler.cpp b/be/src/olap/push_handler.cpp index ad3bb25e79..49b01f6418 100644 --- a/be/src/olap/push_handler.cpp +++ b/be/src/olap/push_handler.cpp @@ -833,8 +833,8 @@ Status PushBrokerReader::init(const Schema* schema, const TBrokerScanRange& t_sc } _runtime_profile = _runtime_state->runtime_profile(); _runtime_profile->set_name("PushBrokerReader"); - _mem_pool.reset(new MemPool(_runtime_state->scanner_mem_tracker().get())); - _tuple_buffer_pool.reset(new MemPool(_runtime_state->scanner_mem_tracker().get())); + _mem_pool.reset(new MemPool()); + _tuple_buffer_pool.reset(new MemPool()); _counter.reset(new ScannerCounter()); diff --git a/be/src/olap/storage_engine.cpp b/be/src/olap/storage_engine.cpp index ffa137c26f..cea5856dbb 100644 --- a/be/src/olap/storage_engine.cpp +++ b/be/src/olap/storage_engine.cpp @@ -108,7 +108,6 @@ StorageEngine::StorageEngine(const EngineOptions& options) _effective_cluster_id(-1), _is_all_cluster_id_exist(true), _stopped(false), - _mem_tracker(std::make_shared("StorageEngine")), _segcompaction_mem_tracker(std::make_shared("SegCompaction")), _segment_meta_mem_tracker(std::make_shared("SegmentMeta")), _stop_background_threads_latch(1), @@ -149,8 +148,7 @@ StorageEngine::~StorageEngine() { void StorageEngine::load_data_dirs(const std::vector& data_dirs) { std::vector threads; for (auto data_dir : data_dirs) { - threads.emplace_back([this, data_dir] { - SCOPED_CONSUME_MEM_TRACKER(_mem_tracker); + threads.emplace_back([data_dir] { auto res = data_dir->load(); if (!res.ok()) { LOG(WARNING) << "io error when init load tables. res=" << res @@ -195,8 +193,7 @@ Status StorageEngine::_init_store_map() { DataDir* store = new DataDir(path.path, path.capacity_bytes, path.storage_medium, _tablet_manager.get(), _txn_manager.get()); tmp_stores.emplace_back(store); - threads.emplace_back([this, store, &error_msg_lock, &error_msg]() { - SCOPED_CONSUME_MEM_TRACKER(_mem_tracker); + threads.emplace_back([store, &error_msg_lock, &error_msg]() { auto st = store->init(); if (!st.ok()) { { diff --git a/be/src/olap/storage_engine.h b/be/src/olap/storage_engine.h index 910c926c69..dcaa78ba56 100644 --- a/be/src/olap/storage_engine.h +++ b/be/src/olap/storage_engine.h @@ -322,8 +322,6 @@ private: // map, if we use RowsetId as the key, we need custom hash func std::unordered_map _unused_rowsets; - // StorageEngine oneself - std::shared_ptr _mem_tracker; // Count the memory consumption of segment compaction tasks. std::shared_ptr _segcompaction_mem_tracker; // This mem tracker is only for tracking memory use by segment meta data such as footer or index page. diff --git a/be/src/olap/tablet_manager.cpp b/be/src/olap/tablet_manager.cpp index 03d6b0f4d1..9d8633dabc 100644 --- a/be/src/olap/tablet_manager.cpp +++ b/be/src/olap/tablet_manager.cpp @@ -74,7 +74,9 @@ DEFINE_GAUGE_METRIC_PROTOTYPE_5ARG(tablet_meta_mem_consumption, MetricUnit::BYTE mem_consumption, Labels({{"type", "tablet_meta"}})); TabletManager::TabletManager(int32_t tablet_map_lock_shard_size) - : _mem_tracker(std::make_shared("TabletManager")), + : _mem_tracker(std::make_shared( + "TabletManager", ExecEnv::GetInstance()->experimental_mem_tracker())), + _tablet_meta_mem_tracker(std::make_shared("TabletMeta")), _tablets_shards_size(tablet_map_lock_shard_size), _tablets_shards_mask(tablet_map_lock_shard_size - 1) { CHECK_GT(_tablets_shards_size, 0); @@ -207,6 +209,10 @@ Status TabletManager::_add_tablet_to_map_unlocked(TTabletId tablet_id, tablet_map_t& tablet_map = _get_tablet_map(tablet_id); tablet_map[tablet_id] = tablet; _add_tablet_to_partition(tablet); + // TODO: remove multiply 2 of tablet meta mem size + // Because table schema will copy in tablet, there will be double mem cost + // so here multiply 2 + _tablet_meta_mem_tracker->consume(tablet->tablet_meta()->mem_size() * 2); VLOG_NOTICE << "add tablet to map successfully." << " tablet_id=" << tablet_id; @@ -489,6 +495,7 @@ Status TabletManager::_drop_tablet_unlocked(TTabletId tablet_id, TReplicaId repl } to_drop_tablet->deregister_tablet_from_dir(); + _tablet_meta_mem_tracker->release(to_drop_tablet->tablet_meta()->mem_size() * 2); return Status::OK(); } @@ -718,6 +725,7 @@ Status TabletManager::load_tablet_from_meta(DataDir* data_dir, TTabletId tablet_ TSchemaHash schema_hash, const string& meta_binary, bool update_meta, bool force, bool restore, bool check_path) { + SCOPED_CONSUME_MEM_TRACKER(_mem_tracker); TabletMetaSharedPtr tablet_meta(new TabletMeta()); Status status = tablet_meta->deserialize(meta_binary); if (!status.ok()) { @@ -800,6 +808,7 @@ Status TabletManager::load_tablet_from_meta(DataDir* data_dir, TTabletId tablet_ Status TabletManager::load_tablet_from_dir(DataDir* store, TTabletId tablet_id, SchemaHash schema_hash, const string& schema_hash_path, bool force, bool restore) { + SCOPED_CONSUME_MEM_TRACKER(_mem_tracker); LOG(INFO) << "begin to load tablet from dir. " << " tablet_id=" << tablet_id << " schema_hash=" << schema_hash << " path = " << schema_hash_path << " force = " << force << " restore = " << restore; diff --git a/be/src/olap/tablet_manager.h b/be/src/olap/tablet_manager.h index 58a60cbee7..30a64937cc 100644 --- a/be/src/olap/tablet_manager.h +++ b/be/src/olap/tablet_manager.h @@ -202,6 +202,7 @@ private: // trace the memory use by meta of tablet std::shared_ptr _mem_tracker; + std::shared_ptr _tablet_meta_mem_tracker; const int32_t _tablets_shards_size; const int32_t _tablets_shards_mask; diff --git a/be/src/olap/task/engine_batch_load_task.cpp b/be/src/olap/task/engine_batch_load_task.cpp index bd3abf3efc..a9135df050 100644 --- a/be/src/olap/task/engine_batch_load_task.cpp +++ b/be/src/olap/task/engine_batch_load_task.cpp @@ -50,7 +50,7 @@ using namespace ErrorCode; EngineBatchLoadTask::EngineBatchLoadTask(TPushReq& push_req, std::vector* tablet_infos) : _push_req(push_req), _tablet_infos(tablet_infos) { _mem_tracker = std::make_shared( - MemTrackerLimiter::Type::BATCHLOAD, + MemTrackerLimiter::Type::LOAD, fmt::format("EngineBatchLoadTask#pushType={}:tabletId={}", _push_req.push_type, std::to_string(_push_req.tablet_id))); } diff --git a/be/src/olap/task/engine_checksum_task.cpp b/be/src/olap/task/engine_checksum_task.cpp index d1e8ae7eb2..c298b104c5 100644 --- a/be/src/olap/task/engine_checksum_task.cpp +++ b/be/src/olap/task/engine_checksum_task.cpp @@ -26,7 +26,7 @@ EngineChecksumTask::EngineChecksumTask(TTabletId tablet_id, TSchemaHash schema_h TVersion version, uint32_t* checksum) : _tablet_id(tablet_id), _schema_hash(schema_hash), _version(version), _checksum(checksum) { _mem_tracker = std::make_shared( - MemTrackerLimiter::Type::CONSISTENCY, + MemTrackerLimiter::Type::LOAD, "EngineChecksumTask#tabletId=" + std::to_string(tablet_id)); } diff --git a/be/src/pipeline/exec/operator.h b/be/src/pipeline/exec/operator.h index 0b7ebfdadc..e2fde2d9e5 100644 --- a/be/src/pipeline/exec/operator.h +++ b/be/src/pipeline/exec/operator.h @@ -262,8 +262,9 @@ public: _runtime_profile.reset(new RuntimeProfile( fmt::format("{} (id={})", _operator_builder->get_name(), _operator_builder->id()))); _sink->profile()->insert_child_head(_runtime_profile.get(), true); - _mem_tracker = std::make_unique("DataSinkOperator:" + _runtime_profile->name(), - _runtime_profile.get()); + _mem_tracker = + std::make_unique("DataSinkOperator:" + _runtime_profile->name(), + _runtime_profile.get(), nullptr, "PeakMemoryUsage"); return Status::OK(); } @@ -319,8 +320,9 @@ public: _runtime_profile.reset(new RuntimeProfile( fmt::format("{} (id={})", _operator_builder->get_name(), _operator_builder->id()))); _node->runtime_profile()->insert_child_head(_runtime_profile.get(), true); - _mem_tracker = std::make_unique(get_name() + ": " + _runtime_profile->name(), - _runtime_profile.get()); + _mem_tracker = + std::make_unique(get_name() + ": " + _runtime_profile->name(), + _runtime_profile.get(), nullptr, "PeakMemoryUsage"); _node->increase_ref(); return Status::OK(); } diff --git a/be/src/pipeline/pipeline_fragment_context.cpp b/be/src/pipeline/pipeline_fragment_context.cpp index d87b47db28..52fb8d2175 100644 --- a/be/src/pipeline/pipeline_fragment_context.cpp +++ b/be/src/pipeline/pipeline_fragment_context.cpp @@ -181,7 +181,6 @@ Status PipelineFragmentContext::prepare(const doris::TExecPlanFragmentParams& re // TODO should be combine with plan_fragment_executor.prepare funciton SCOPED_ATTACH_TASK(get_runtime_state()); - _runtime_state->init_scanner_mem_trackers(); _runtime_state->runtime_filter_mgr()->init(); _runtime_state->set_be_number(request.backend_num); diff --git a/be/src/runtime/exec_env.h b/be/src/runtime/exec_env.h index c47d15d9fa..f90cb98ba6 100644 --- a/be/src/runtime/exec_env.h +++ b/be/src/runtime/exec_env.h @@ -120,12 +120,10 @@ public: return nullptr; } - void set_orphan_mem_tracker(const std::shared_ptr& orphan_tracker) { - _orphan_mem_tracker = orphan_tracker; - _orphan_mem_tracker_raw = orphan_tracker.get(); - } + void init_mem_tracker(); std::shared_ptr orphan_mem_tracker() { return _orphan_mem_tracker; } MemTrackerLimiter* orphan_mem_tracker_raw() { return _orphan_mem_tracker_raw; } + MemTrackerLimiter* experimental_mem_tracker() { return _experimental_mem_tracker.get(); } ThreadResourceMgr* thread_mgr() { return _thread_mgr; } ThreadPool* send_batch_thread_pool() { return _send_batch_thread_pool.get(); } ThreadPool* download_cache_thread_pool() { return _download_cache_thread_pool.get(); } @@ -215,6 +213,7 @@ private: // and the consumption of the orphan mem tracker is close to 0, but greater than 0. std::shared_ptr _orphan_mem_tracker; MemTrackerLimiter* _orphan_mem_tracker_raw; + std::shared_ptr _experimental_mem_tracker; std::unique_ptr _send_batch_thread_pool; diff --git a/be/src/runtime/exec_env_init.cpp b/be/src/runtime/exec_env_init.cpp index 22fa7f9123..d557e8e0ef 100644 --- a/be/src/runtime/exec_env_init.cpp +++ b/be/src/runtime/exec_env_init.cpp @@ -166,9 +166,7 @@ Status ExecEnv::_init_mem_env() { bool is_percent = false; std::stringstream ss; // 1. init mem tracker - _orphan_mem_tracker = - std::make_shared(MemTrackerLimiter::Type::GLOBAL, "Orphan"); - _orphan_mem_tracker_raw = _orphan_mem_tracker.get(); + init_mem_tracker(); thread_context()->thread_mem_tracker_mgr->init(); #if defined(USE_MEM_TRACKER) && !defined(__SANITIZE_ADDRESS__) && !defined(ADDRESS_SANITIZER) && \ !defined(LEAK_SANITIZER) && !defined(THREAD_SANITIZER) && !defined(USE_JEMALLOC) @@ -236,6 +234,14 @@ Status ExecEnv::_init_mem_env() { return Status::OK(); } +void ExecEnv::init_mem_tracker() { + _orphan_mem_tracker = + std::make_shared(MemTrackerLimiter::Type::GLOBAL, "Orphan"); + _orphan_mem_tracker_raw = _orphan_mem_tracker.get(); + _experimental_mem_tracker = std::make_shared( + MemTrackerLimiter::Type::EXPERIMENTAL, "ExperimentalSet"); +} + void ExecEnv::init_download_cache_buf() { std::unique_ptr download_cache_buf(new char[config::download_cache_buffer_size]); memset(download_cache_buf.get(), 0, config::download_cache_buffer_size); diff --git a/be/src/runtime/load_channel_mgr.cpp b/be/src/runtime/load_channel_mgr.cpp index 8d26c47e02..bd00718472 100644 --- a/be/src/runtime/load_channel_mgr.cpp +++ b/be/src/runtime/load_channel_mgr.cpp @@ -75,9 +75,8 @@ LoadChannelMgr::~LoadChannelMgr() { Status LoadChannelMgr::init(int64_t process_mem_limit) { _load_hard_mem_limit = calc_process_max_load_memory(process_mem_limit); _load_soft_mem_limit = _load_hard_mem_limit * config::load_process_soft_mem_limit_percent / 100; - _mem_tracker = std::make_unique("LoadChannelMgr"); - _mem_tracker_set = std::make_unique(MemTrackerLimiter::Type::LOAD, - "LoadChannelMgrTrackerSet"); + _mem_tracker = + std::make_unique(MemTrackerLimiter::Type::LOAD, "LoadChannelMgr"); REGISTER_HOOK_METRIC(load_channel_mem_consumption, [this]() { return _mem_tracker->consumption(); }); _last_success_channel = new_lru_cache("LastestSuccessChannelCache", 1024); @@ -105,7 +104,7 @@ Status LoadChannelMgr::open(const PTabletWriterOpenRequest& params) { auto channel_mem_tracker = std::make_unique( fmt::format("LoadChannel#senderIp={}#loadID={}", params.sender_ip(), load_id.to_string()), - nullptr, ExecEnv::GetInstance()->load_channel_mgr()->mem_tracker_set()); + ExecEnv::GetInstance()->load_channel_mgr()->mem_tracker()); #else auto channel_mem_tracker = std::make_unique(fmt::format( "LoadChannel#senderIp={}#loadID={}", params.sender_ip(), load_id.to_string())); diff --git a/be/src/runtime/load_channel_mgr.h b/be/src/runtime/load_channel_mgr.h index 967617e9bc..a825b74bca 100644 --- a/be/src/runtime/load_channel_mgr.h +++ b/be/src/runtime/load_channel_mgr.h @@ -58,7 +58,7 @@ public: std::lock_guard l(_lock); _refresh_mem_tracker_without_lock(); } - MemTrackerLimiter* mem_tracker_set() { return _mem_tracker_set.get(); } + MemTrackerLimiter* mem_tracker() { return _mem_tracker.get(); } private: template @@ -74,11 +74,12 @@ private: // lock should be held when calling this method void _refresh_mem_tracker_without_lock() { - int64_t mem_usage = 0; + _mem_usage = 0; for (auto& kv : _load_channels) { - mem_usage += kv.second->mem_consumption(); + _mem_usage += kv.second->mem_consumption(); } - _mem_tracker->set_consumption(mem_usage); + THREAD_MEM_TRACKER_TRANSFER_TO(_mem_usage - _mem_tracker->consumption(), + _mem_tracker.get()); } protected: @@ -89,9 +90,8 @@ protected: Cache* _last_success_channel = nullptr; // check the total load channel mem consumption of this Backend - std::unique_ptr _mem_tracker; - // Associate load channel tracker and memtable tracker, avoid default association to Orphan tracker. - std::unique_ptr _mem_tracker_set; + int64_t _mem_usage = 0; + std::unique_ptr _mem_tracker; int64_t _load_hard_mem_limit = -1; int64_t _load_soft_mem_limit = -1; bool _soft_reduce_mem_in_progress = false; diff --git a/be/src/runtime/memory/mem_tracker.cpp b/be/src/runtime/memory/mem_tracker.cpp index 7d1c3de7c1..1888f366a5 100644 --- a/be/src/runtime/memory/mem_tracker.cpp +++ b/be/src/runtime/memory/mem_tracker.cpp @@ -40,8 +40,8 @@ struct TrackerGroup { static std::vector mem_tracker_pool(1000); MemTracker::MemTracker(const std::string& label, RuntimeProfile* profile, MemTrackerLimiter* parent, - const std::string& profile_counter_name, bool only_track_alloc) - : _label(label), _only_track_alloc(only_track_alloc) { + const std::string& profile_counter_name) + : _label(label) { if (profile == nullptr) { _consumption = std::make_shared(TUnit::BYTES); } else { @@ -56,7 +56,15 @@ MemTracker::MemTracker(const std::string& label, RuntimeProfile* profile, MemTra // release(). _consumption = profile->AddSharedHighWaterMarkCounter(profile_counter_name, TUnit::BYTES); } + bind_parent(parent); +} +MemTracker::MemTracker(const std::string& label, MemTrackerLimiter* parent) : _label(label) { + _consumption = std::make_shared(TUnit::BYTES); + bind_parent(parent); +} + +void MemTracker::bind_parent(MemTrackerLimiter* parent) { if (parent) { _parent_label = parent->label(); _parent_group_num = parent->group_num(); @@ -95,7 +103,7 @@ void MemTracker::make_group_snapshot(std::vector* snapshot int64_t group_num, std::string parent_label) { std::lock_guard l(mem_tracker_pool[group_num].group_lock); for (auto tracker : mem_tracker_pool[group_num].trackers) { - if (tracker->parent_label() == parent_label) { + if (tracker->parent_label() == parent_label && tracker->consumption() != 0) { snapshots->push_back(tracker->make_snapshot()); } } diff --git a/be/src/runtime/memory/mem_tracker.h b/be/src/runtime/memory/mem_tracker.h index 46d1ec24d4..509362397a 100644 --- a/be/src/runtime/memory/mem_tracker.h +++ b/be/src/runtime/memory/mem_tracker.h @@ -44,10 +44,9 @@ public: }; // Creates and adds the tracker to the mem_tracker_pool. - MemTracker(const std::string& label, RuntimeProfile* profile = nullptr, - MemTrackerLimiter* parent = nullptr, - const std::string& profile_counter_name = "PeakMemoryUsage", - bool only_track_alloc = false); + MemTracker(const std::string& label, RuntimeProfile* profile, MemTrackerLimiter* parent, + const std::string& profile_counter_name); + MemTracker(const std::string& label, MemTrackerLimiter* parent = nullptr); // For MemTrackerLimiter MemTracker() { _parent_group_num = -1; } @@ -61,13 +60,13 @@ public: public: const std::string& label() const { return _label; } const std::string& parent_label() const { return _parent_label; } + const std::string& set_parent_label() const { return _parent_label; } // Returns the memory consumed in bytes. int64_t consumption() const { return _consumption->current_value(); } int64_t peak_consumption() const { return _consumption->value(); } void consume(int64_t bytes) { if (bytes == 0) return; - if (bytes < 0 && _only_track_alloc) return; _consumption->add(bytes); } void release(int64_t bytes) { consume(-bytes); } @@ -89,6 +88,8 @@ public: } protected: + void bind_parent(MemTrackerLimiter* parent); + // label used in the make snapshot, not guaranteed unique. std::string _label; @@ -96,10 +97,9 @@ protected: // Tracker is located in group num in mem_tracker_pool int64_t _parent_group_num = 0; + // Use _parent_label to correlate with parent limiter tracker. std::string _parent_label = "-"; - bool _only_track_alloc = false; - // Iterator into mem_tracker_pool for this object. Stored to have O(1) remove. std::list::iterator _tracker_group_it; }; diff --git a/be/src/runtime/memory/mem_tracker_limiter.cpp b/be/src/runtime/memory/mem_tracker_limiter.cpp index 05e53baaf9..87ef52fbed 100644 --- a/be/src/runtime/memory/mem_tracker_limiter.cpp +++ b/be/src/runtime/memory/mem_tracker_limiter.cpp @@ -97,9 +97,8 @@ MemTracker::Snapshot MemTrackerLimiter::make_snapshot() const { void MemTrackerLimiter::refresh_global_counter() { std::unordered_map type_mem_sum = { - {Type::GLOBAL, 0}, {Type::QUERY, 0}, {Type::LOAD, 0}, - {Type::COMPACTION, 0}, {Type::SCHEMA_CHANGE, 0}, {Type::CLONE, 0}, - {Type::BATCHLOAD, 0}, {Type::CONSISTENCY, 0}}; + {Type::GLOBAL, 0}, {Type::QUERY, 0}, {Type::LOAD, 0}, {Type::COMPACTION, 0}, + {Type::SCHEMA_CHANGE, 0}, {Type::CLONE, 0}}; // No need refresh Type::EXPERIMENTAL for (unsigned i = 0; i < mem_tracker_limiter_pool.size(); ++i) { std::lock_guard l(mem_tracker_limiter_pool[i].group_lock); for (auto tracker : mem_tracker_limiter_pool[i].trackers) { @@ -327,6 +326,11 @@ int64_t MemTrackerLimiter::free_top_overcommit_query(int64_t min_free_mem) { } } + // Minor gc does not cancel when there is only one query. full gc conver. + if (query_consumption.size() <= 1) { + return 0; + } + std::priority_queue> max_pq; // Min-heap to Max-heap. while (!min_pq.empty()) { @@ -341,15 +345,16 @@ int64_t MemTrackerLimiter::free_top_overcommit_query(int64_t min_free_mem) { int64_t query_mem = query_consumption[max_pq.top().second]; ExecEnv::GetInstance()->fragment_mgr()->cancel_query( cancelled_queryid, PPlanFragmentCancelReason::MEMORY_LIMIT_EXCEED, - fmt::format("Process has no memory available, cancel top memory usage query: " + fmt::format("Process has less memory, cancel top memory overcommit query: " "query memory tracker <{}> consumption {}, backend {} " - "process memory used {} exceed limit {} or sys mem available {} " - "less than low water mark {}. Execute again after enough memory, " + "process memory used {} exceed soft limit {} or sys mem available {} " + "less than warning water mark {}. Execute again after enough memory, " "details see be.INFO.", max_pq.top().second, print_bytes(query_mem), BackendOptions::get_localhost(), PerfCounters::get_vm_rss_str(), - MemInfo::mem_limit_str(), MemInfo::sys_mem_available_str(), - print_bytes(MemInfo::sys_mem_available_low_water_mark()))); + print_bytes(MemInfo::soft_mem_limit()), + MemInfo::sys_mem_available_str(), + print_bytes(MemInfo::sys_mem_available_warning_water_mark()))); usage_strings.push_back(fmt::format("{} memory usage {} Bytes, overcommit ratio: {}", max_pq.top().second, query_mem, max_pq.top().first)); diff --git a/be/src/runtime/memory/mem_tracker_limiter.h b/be/src/runtime/memory/mem_tracker_limiter.h index 7696908dab..bb346a2a3b 100644 --- a/be/src/runtime/memory/mem_tracker_limiter.h +++ b/be/src/runtime/memory/mem_tracker_limiter.h @@ -46,8 +46,8 @@ public: COMPACTION = 3, // Count the memory consumption of all Base and Cumulative tasks. SCHEMA_CHANGE = 4, // Count the memory consumption of all SchemaChange tasks. CLONE = 5, // Count the memory consumption of all EngineCloneTask. Note: Memory that does not contain make/release snapshots. - BATCHLOAD = 6, // Count the memory consumption of all EngineBatchLoadTask. - CONSISTENCY = 7 // Count the memory consumption of all EngineChecksumTask. + EXPERIMENTAL = + 6 // Experimental memory statistics, usually inaccurate, used for debugging, and expect to add other types in the future. }; inline static std::unordered_map> @@ -63,14 +63,11 @@ public: std::make_shared(TUnit::BYTES)}, {Type::CLONE, std::make_shared(TUnit::BYTES)}, - {Type::BATCHLOAD, - std::make_shared(TUnit::BYTES)}, - {Type::CONSISTENCY, + {Type::EXPERIMENTAL, std::make_shared(TUnit::BYTES)}}; - inline static const std::string TypeString[] = {"global", "query", "load", - "compaction", "schema_change", "clone", - "batch_load", "consistency"}; + inline static const std::string TypeString[] = { + "global", "query", "load", "compaction", "schema_change", "clone", "experimental"}; public: // byte_limit equal to -1 means no consumption limit, only participate in process memory statistics. @@ -161,7 +158,7 @@ public: static std::string process_mem_log_str() { return fmt::format( - "physical memory {}, process memory used {} limit {}, sys mem available {} low " + "OS physical memory {}, process memory used {} limit {}, sys mem available {} low " "water mark {}, refresh interval memory growth {} B", PrettyPrinter::print(MemInfo::physical_mem(), TUnit::BYTES), PerfCounters::get_vm_rss_str(), MemInfo::mem_limit_str(), diff --git a/be/src/runtime/plan_fragment_executor.cpp b/be/src/runtime/plan_fragment_executor.cpp index f36cefaf51..748191b177 100644 --- a/be/src/runtime/plan_fragment_executor.cpp +++ b/be/src/runtime/plan_fragment_executor.cpp @@ -100,7 +100,6 @@ Status PlanFragmentExecutor::prepare(const TExecPlanFragmentParams& request, _runtime_state->set_tracer(std::move(tracer)); SCOPED_ATTACH_TASK(_runtime_state.get()); - _runtime_state->init_scanner_mem_trackers(); _runtime_state->runtime_filter_mgr()->init(); _runtime_state->set_be_number(request.backend_num); if (request.__isset.backend_id) { diff --git a/be/src/runtime/runtime_filter_mgr.cpp b/be/src/runtime/runtime_filter_mgr.cpp index bf0bfd0d9d..4d5478a596 100644 --- a/be/src/runtime/runtime_filter_mgr.cpp +++ b/be/src/runtime/runtime_filter_mgr.cpp @@ -45,7 +45,8 @@ RuntimeFilterMgr::~RuntimeFilterMgr() {} Status RuntimeFilterMgr::init() { DCHECK(_state->query_mem_tracker() != nullptr); - _tracker = std::make_unique("RuntimeFilterMgr"); + _tracker = std::make_unique("RuntimeFilterMgr", + ExecEnv::GetInstance()->experimental_mem_tracker()); return Status::OK(); } @@ -161,7 +162,8 @@ Status RuntimeFilterMergeControllerEntity::init(UniqueId query_id, UniqueId frag const TQueryOptions& query_options) { _query_id = query_id; _fragment_instance_id = fragment_instance_id; - _mem_tracker = std::make_shared("RuntimeFilterMergeControllerEntity"); + _mem_tracker = std::make_shared("RuntimeFilterMergeControllerEntity", + ExecEnv::GetInstance()->experimental_mem_tracker()); SCOPED_CONSUME_MEM_TRACKER(_mem_tracker.get()); for (auto& filterid_to_desc : runtime_filter_params.rid_to_runtime_filter) { int filter_id = filterid_to_desc.first; diff --git a/be/src/runtime/runtime_state.cpp b/be/src/runtime/runtime_state.cpp index 044a619e03..c3bd7d0c3f 100644 --- a/be/src/runtime/runtime_state.cpp +++ b/be/src/runtime/runtime_state.cpp @@ -216,8 +216,6 @@ Status RuntimeState::init(const TUniqueId& fragment_instance_id, const TQueryOpt Status RuntimeState::init_mem_trackers(const TUniqueId& query_id) { _query_mem_tracker = std::make_shared( MemTrackerLimiter::Type::QUERY, fmt::format("TestQuery#Id={}", print_id(query_id))); - _scanner_mem_tracker = - std::make_shared(fmt::format("TestScanner#QueryId={}", print_id(query_id))); return Status::OK(); } diff --git a/be/src/runtime/runtime_state.h b/be/src/runtime/runtime_state.h index ee22760ab1..d72631de92 100644 --- a/be/src/runtime/runtime_state.h +++ b/be/src/runtime/runtime_state.h @@ -75,11 +75,6 @@ public: Status init(const TUniqueId& fragment_instance_id, const TQueryOptions& query_options, const TQueryGlobals& query_globals, ExecEnv* exec_env); - // after SCOPED_ATTACH_TASK; - void init_scanner_mem_trackers() { - _scanner_mem_tracker = std::make_shared( - fmt::format("Scanner#QueryId={}", print_id(_query_id))); - } // for ut and non-query. Status init_mem_trackers(const TUniqueId& query_id = TUniqueId()); @@ -111,7 +106,6 @@ public: const TUniqueId& fragment_instance_id() const { return _fragment_instance_id; } ExecEnv* exec_env() { return _exec_env; } std::shared_ptr query_mem_tracker() { return _query_mem_tracker; } - std::shared_ptr scanner_mem_tracker() { return _scanner_mem_tracker; } ThreadResourceMgr::ResourcePool* resource_pool() { return _resource_pool; } void set_fragment_root_id(PlanNodeId id) { @@ -431,8 +425,6 @@ private: static const int DEFAULT_BATCH_SIZE = 2048; std::shared_ptr _query_mem_tracker; - // Count the memory consumption of Scanner - std::shared_ptr _scanner_mem_tracker; // put runtime state before _obj_pool, so that it will be deconstructed after // _obj_pool. Because some of object in _obj_pool will use profile when deconstructing. diff --git a/be/src/vec/exec/join/vhash_join_node.cpp b/be/src/vec/exec/join/vhash_join_node.cpp index c6e6ecea45..fd87c56f98 100644 --- a/be/src/vec/exec/join/vhash_join_node.cpp +++ b/be/src/vec/exec/join/vhash_join_node.cpp @@ -356,7 +356,6 @@ Status HashJoinNode::init(const TPlanNode& tnode, RuntimeState* state) { Status HashJoinNode::prepare(RuntimeState* state) { RETURN_IF_ERROR(VJoinNodeBase::prepare(state)); - SCOPED_CONSUME_MEM_TRACKER(mem_tracker_growh()); auto* memory_usage = runtime_profile()->create_child("MemoryUsage", true, true); runtime_profile()->add_child(memory_usage, false, nullptr); @@ -662,7 +661,6 @@ Status HashJoinNode::open(RuntimeState* state) { START_AND_SCOPE_SPAN(state->get_tracer(), span, "HashJoinNode::open"); SCOPED_TIMER(_runtime_profile->total_time_counter()); RETURN_IF_ERROR(VJoinNodeBase::open(state)); - SCOPED_CONSUME_MEM_TRACKER(mem_tracker_growh()); RETURN_IF_CANCELLED(state); return Status::OK(); } diff --git a/be/src/vec/exec/join/vjoin_node_base.cpp b/be/src/vec/exec/join/vjoin_node_base.cpp index 5e5c24fcfc..e186f15d8c 100644 --- a/be/src/vec/exec/join/vjoin_node_base.cpp +++ b/be/src/vec/exec/join/vjoin_node_base.cpp @@ -176,7 +176,6 @@ Status VJoinNodeBase::init(const TPlanNode& tnode, RuntimeState* state) { Status VJoinNodeBase::open(RuntimeState* state) { START_AND_SCOPE_SPAN(state->get_tracer(), span, "VJoinNodeBase::open"); RETURN_IF_ERROR(ExecNode::open(state)); - SCOPED_CONSUME_MEM_TRACKER(mem_tracker_growh()); RETURN_IF_CANCELLED(state); std::promise thread_status; @@ -220,7 +219,6 @@ void VJoinNodeBase::_reset_tuple_is_null_column() { void VJoinNodeBase::_probe_side_open_thread(RuntimeState* state, std::promise* status) { START_AND_SCOPE_SPAN(state->get_tracer(), span, "VJoinNodeBase::_hash_table_build_thread"); SCOPED_ATTACH_TASK(state); - SCOPED_CONSUME_MEM_TRACKER(mem_tracker_growh_shared()); status->set_value(child(0)->open(state)); } diff --git a/be/src/vec/exec/join/vnested_loop_join_node.cpp b/be/src/vec/exec/join/vnested_loop_join_node.cpp index 39ae99a36e..00ed59bcbd 100644 --- a/be/src/vec/exec/join/vnested_loop_join_node.cpp +++ b/be/src/vec/exec/join/vnested_loop_join_node.cpp @@ -103,7 +103,6 @@ Status VNestedLoopJoinNode::init(const TPlanNode& tnode, RuntimeState* state) { Status VNestedLoopJoinNode::prepare(RuntimeState* state) { SCOPED_TIMER(_runtime_profile->total_time_counter()); RETURN_IF_ERROR(VJoinNodeBase::prepare(state)); - SCOPED_CONSUME_MEM_TRACKER(mem_tracker_growh()); _build_timer = ADD_TIMER(runtime_profile(), "BuildTime"); _build_rows_counter = ADD_COUNTER(runtime_profile(), "BuildRows", TUnit::UNIT); @@ -237,7 +236,6 @@ Status VNestedLoopJoinNode::get_next(RuntimeState* state, Block* block, bool* eo SCOPED_TIMER(_runtime_profile->total_time_counter()); SCOPED_TIMER(_probe_timer); RETURN_IF_CANCELLED(state); - SCOPED_CONSUME_MEM_TRACKER(mem_tracker_growh()); while (need_more_input_data()) { RETURN_IF_ERROR(_fresh_left_block(state)); @@ -605,7 +603,6 @@ Status VNestedLoopJoinNode::open(RuntimeState* state) { START_AND_SCOPE_SPAN(state->get_tracer(), span, "VNestedLoopJoinNode::open") SCOPED_TIMER(_runtime_profile->total_time_counter()); RETURN_IF_ERROR(VJoinNodeBase::open(state)); - SCOPED_CONSUME_MEM_TRACKER(mem_tracker_growh()); RETURN_IF_CANCELLED(state); // We can close the right child to release its resources because its input has been // fully consumed. diff --git a/be/src/vec/exec/scan/new_es_scan_node.cpp b/be/src/vec/exec/scan/new_es_scan_node.cpp index 61f7f4022c..5b22c2ef19 100644 --- a/be/src/vec/exec/scan/new_es_scan_node.cpp +++ b/be/src/vec/exec/scan/new_es_scan_node.cpp @@ -76,7 +76,6 @@ Status NewEsScanNode::init(const TPlanNode& tnode, RuntimeState* state) { Status NewEsScanNode::prepare(RuntimeState* state) { VLOG_CRITICAL << NEW_SCAN_NODE_TYPE << "::prepare"; RETURN_IF_ERROR(VScanNode::prepare(state)); - SCOPED_CONSUME_MEM_TRACKER(mem_tracker_growh()); _tuple_desc = state->desc_tbl().get_tuple_descriptor(_tuple_id); if (_tuple_desc == nullptr) { diff --git a/be/src/vec/exec/scan/new_jdbc_scan_node.cpp b/be/src/vec/exec/scan/new_jdbc_scan_node.cpp index b328151a92..19fd93b9d3 100644 --- a/be/src/vec/exec/scan/new_jdbc_scan_node.cpp +++ b/be/src/vec/exec/scan/new_jdbc_scan_node.cpp @@ -37,7 +37,6 @@ std::string NewJdbcScanNode::get_name() { Status NewJdbcScanNode::prepare(RuntimeState* state) { VLOG_CRITICAL << "VNewJdbcScanNode::Prepare"; RETURN_IF_ERROR(VScanNode::prepare(state)); - SCOPED_CONSUME_MEM_TRACKER(mem_tracker_growh()); return Status::OK(); } diff --git a/be/src/vec/exec/scan/new_odbc_scan_node.cpp b/be/src/vec/exec/scan/new_odbc_scan_node.cpp index 2bd6ad4c34..dbbf57a120 100644 --- a/be/src/vec/exec/scan/new_odbc_scan_node.cpp +++ b/be/src/vec/exec/scan/new_odbc_scan_node.cpp @@ -38,7 +38,6 @@ std::string NewOdbcScanNode::get_name() { Status NewOdbcScanNode::prepare(RuntimeState* state) { VLOG_CRITICAL << NEW_SCAN_NODE_TYPE << "::prepare"; RETURN_IF_ERROR(VScanNode::prepare(state)); - SCOPED_CONSUME_MEM_TRACKER(mem_tracker_growh()); return Status::OK(); } diff --git a/be/src/vec/exec/scan/new_olap_scan_node.cpp b/be/src/vec/exec/scan/new_olap_scan_node.cpp index a84443d38d..c6df1722cb 100644 --- a/be/src/vec/exec/scan/new_olap_scan_node.cpp +++ b/be/src/vec/exec/scan/new_olap_scan_node.cpp @@ -46,7 +46,6 @@ Status NewOlapScanNode::collect_query_statistics(QueryStatistics* statistics) { } Status NewOlapScanNode::prepare(RuntimeState* state) { - SCOPED_CONSUME_MEM_TRACKER(mem_tracker_growh()); RETURN_IF_ERROR(VScanNode::prepare(state)); return Status::OK(); } diff --git a/be/src/vec/exec/scan/scanner_scheduler.cpp b/be/src/vec/exec/scan/scanner_scheduler.cpp index f5e270c606..fe7356d0e8 100644 --- a/be/src/vec/exec/scan/scanner_scheduler.cpp +++ b/be/src/vec/exec/scan/scanner_scheduler.cpp @@ -191,7 +191,6 @@ void ScannerScheduler::_schedule_scanners(ScannerContext* ctx) { void ScannerScheduler::_scanner_scan(ScannerScheduler* scheduler, ScannerContext* ctx, VScanner* scanner) { SCOPED_ATTACH_TASK(scanner->runtime_state()); - SCOPED_CONSUME_MEM_TRACKER(scanner->runtime_state()->scanner_mem_tracker()); Thread::set_self_name("_scanner_scan"); scanner->update_wait_worker_timer(); scanner->start_scan_cpu_timer(); diff --git a/be/src/vec/exec/scan/vscan_node.cpp b/be/src/vec/exec/scan/vscan_node.cpp index 78e7d91bd3..ebd4132281 100644 --- a/be/src/vec/exec/scan/vscan_node.cpp +++ b/be/src/vec/exec/scan/vscan_node.cpp @@ -75,7 +75,6 @@ Status VScanNode::init(const TPlanNode& tnode, RuntimeState* state) { Status VScanNode::prepare(RuntimeState* state) { RETURN_IF_ERROR(ExecNode::prepare(state)); - SCOPED_CONSUME_MEM_TRACKER(mem_tracker_growh()); // init profile for runtime filter for (auto& rf_ctx : _runtime_filter_ctxs) { @@ -85,7 +84,6 @@ Status VScanNode::prepare(RuntimeState* state) { } Status VScanNode::open(RuntimeState* state) { - SCOPED_CONSUME_MEM_TRACKER(mem_tracker_growh()); START_AND_SCOPE_SPAN(state->get_tracer(), span, "VScanNode::open"); SCOPED_TIMER(_runtime_profile->total_time_counter()); RETURN_IF_CANCELLED(state); @@ -96,7 +94,6 @@ Status VScanNode::alloc_resource(RuntimeState* state) { if (_opened) { return Status::OK(); } - SCOPED_CONSUME_MEM_TRACKER(mem_tracker_growh()); _input_tuple_desc = state->desc_tbl().get_tuple_descriptor(_input_tuple_id); _output_tuple_desc = state->desc_tbl().get_tuple_descriptor(_output_tuple_id); START_AND_SCOPE_SPAN(state->get_tracer(), span, "VScanNode::alloc_resource"); @@ -125,7 +122,6 @@ Status VScanNode::get_next(RuntimeState* state, vectorized::Block* block, bool* INIT_AND_SCOPE_GET_NEXT_SPAN(state->get_tracer(), _get_next_span, "VScanNode::get_next"); SCOPED_TIMER(_get_next_timer); SCOPED_TIMER(_runtime_profile->total_time_counter()); - SCOPED_CONSUME_MEM_TRACKER(mem_tracker_growh()); // in inverted index apply logic, in order to optimize query performance, // we built some temporary columns into block, these columns only used in scan node level, // remove them when query leave scan node to avoid other nodes use block->columns() to make a wrong decision diff --git a/be/src/vec/exec/vaggregation_node.cpp b/be/src/vec/exec/vaggregation_node.cpp index 89504b20ef..6b33cba1d3 100644 --- a/be/src/vec/exec/vaggregation_node.cpp +++ b/be/src/vec/exec/vaggregation_node.cpp @@ -446,7 +446,6 @@ Status AggregationNode::prepare_profile(RuntimeState* state) { } Status AggregationNode::prepare(RuntimeState* state) { - SCOPED_CONSUME_MEM_TRACKER(mem_tracker_growh()); SCOPED_TIMER(_runtime_profile->total_time_counter()); RETURN_IF_ERROR(ExecNode::prepare(state)); @@ -456,7 +455,6 @@ Status AggregationNode::prepare(RuntimeState* state) { Status AggregationNode::alloc_resource(doris::RuntimeState* state) { RETURN_IF_ERROR(ExecNode::alloc_resource(state)); - SCOPED_CONSUME_MEM_TRACKER(mem_tracker_growh()); RETURN_IF_ERROR(VExpr::open(_probe_expr_ctxs, state)); @@ -534,7 +532,6 @@ Status AggregationNode::get_next(RuntimeState* state, Block* block, bool* eos) { _children[0]->get_next_span(), _child_eos); }; { - SCOPED_CONSUME_MEM_TRACKER(mem_tracker_growh()); if (_preagg_block.rows() != 0) { RETURN_IF_ERROR(do_pre_agg(&_preagg_block, block)); } else { @@ -542,7 +539,6 @@ Status AggregationNode::get_next(RuntimeState* state, Block* block, bool* eos) { } } } else { - SCOPED_CONSUME_MEM_TRACKER(mem_tracker_growh()); RETURN_IF_ERROR(pull(state, block, eos)); } return Status::OK(); @@ -758,7 +754,7 @@ Status AggregationNode::_merge_without_key(Block* block) { void AggregationNode::_update_memusage_without_key() { auto arena_memory_usage = _agg_arena_pool->size() - _mem_usage_record.used_in_arena; - mem_tracker_held()->consume(arena_memory_usage); + mem_tracker()->consume(arena_memory_usage); _serialize_key_arena_memory_usage->add(arena_memory_usage); _mem_usage_record.used_in_arena = _agg_arena_pool->size(); } @@ -1370,9 +1366,9 @@ void AggregationNode::_update_memusage_with_serialized_key() { auto arena_memory_usage = _agg_arena_pool->size() + _aggregate_data_container->memory_usage() - _mem_usage_record.used_in_arena; - mem_tracker_held()->consume(arena_memory_usage); - mem_tracker_held()->consume(data.get_buffer_size_in_bytes() - - _mem_usage_record.used_in_state); + mem_tracker()->consume(arena_memory_usage); + mem_tracker()->consume(data.get_buffer_size_in_bytes() - + _mem_usage_record.used_in_state); _serialize_key_arena_memory_usage->add(arena_memory_usage); COUNTER_UPDATE(_hash_table_memory_usage, data.get_buffer_size_in_bytes() - _mem_usage_record.used_in_state); @@ -1399,7 +1395,7 @@ void AggregationNode::_close_with_serialized_key() { } void AggregationNode::release_tracker() { - mem_tracker_held()->release(_mem_usage_record.used_in_state + _mem_usage_record.used_in_arena); + mem_tracker()->release(_mem_usage_record.used_in_state + _mem_usage_record.used_in_arena); } void AggregationNode::_release_mem() { diff --git a/be/src/vec/exec/vanalytic_eval_node.cpp b/be/src/vec/exec/vanalytic_eval_node.cpp index b2235de9ab..237d583cc0 100644 --- a/be/src/vec/exec/vanalytic_eval_node.cpp +++ b/be/src/vec/exec/vanalytic_eval_node.cpp @@ -140,7 +140,6 @@ Status VAnalyticEvalNode::init(const TPlanNode& tnode, RuntimeState* state) { } Status VAnalyticEvalNode::prepare(RuntimeState* state) { - SCOPED_CONSUME_MEM_TRACKER(mem_tracker_growh()); SCOPED_TIMER(_runtime_profile->total_time_counter()); RETURN_IF_ERROR(ExecNode::prepare(state)); DCHECK(child(0)->row_desc().is_prefix_of(_row_descriptor)); @@ -211,7 +210,6 @@ Status VAnalyticEvalNode::prepare(RuntimeState* state) { Status VAnalyticEvalNode::open(RuntimeState* state) { RETURN_IF_ERROR(alloc_resource(state)); - SCOPED_CONSUME_MEM_TRACKER(mem_tracker_growh()); RETURN_IF_ERROR(child(0)->open(state)); return Status::OK(); } @@ -226,13 +224,11 @@ Status VAnalyticEvalNode::close(RuntimeState* state) { Status VAnalyticEvalNode::alloc_resource(RuntimeState* state) { { - SCOPED_CONSUME_MEM_TRACKER(mem_tracker_growh()); START_AND_SCOPE_SPAN(state->get_tracer(), span, "VAnalyticEvalNode::open"); SCOPED_TIMER(_runtime_profile->total_time_counter()); RETURN_IF_ERROR(ExecNode::alloc_resource(state)); RETURN_IF_CANCELLED(state); } - SCOPED_CONSUME_MEM_TRACKER(mem_tracker_growh()); RETURN_IF_ERROR(VExpr::open(_partition_by_eq_expr_ctxs, state)); RETURN_IF_ERROR(VExpr::open(_order_by_eq_expr_ctxs, state)); for (size_t i = 0; i < _agg_functions_size; ++i) { @@ -322,14 +318,12 @@ Status VAnalyticEvalNode::get_next(RuntimeState* state, vectorized::Block* block return Status::OK(); } size_t current_block_rows = _input_blocks[_output_block_index].rows(); - SCOPED_CONSUME_MEM_TRACKER(mem_tracker_growh()); RETURN_IF_ERROR(_executor.get_next(current_block_rows)); if (_window_end_position == current_block_rows) { break; } } RETURN_IF_ERROR(_output_current_block(block)); - SCOPED_CONSUME_MEM_TRACKER(mem_tracker_growh()); RETURN_IF_ERROR(VExprContext::filter_block(_vconjunct_ctx_ptr, block, block->columns())); reached_limit(block, eos); return Status::OK(); @@ -393,8 +387,6 @@ Status VAnalyticEvalNode::_consumed_block_and_init_partition(RuntimeState* state found_partition_end = _get_partition_by_end(); //claculate new partition end } - SCOPED_CONSUME_MEM_TRACKER(mem_tracker_growh()); - if (_input_eos && _input_total_rows == 0) { *eos = true; return Status::OK(); @@ -528,8 +520,6 @@ Status VAnalyticEvalNode::sink(doris::RuntimeState* /*state*/, vectorized::Block return Status::OK(); } - SCOPED_CONSUME_MEM_TRACKER(mem_tracker_growh()); - input_block_first_row_positions.emplace_back(_input_total_rows); size_t block_rows = input_block->rows(); _input_total_rows += block_rows; @@ -566,7 +556,7 @@ Status VAnalyticEvalNode::sink(doris::RuntimeState* /*state*/, vectorized::Block _ordey_by_column_idxs[i] = result_col_id; } - mem_tracker_held()->consume(input_block->allocated_bytes()); + mem_tracker()->consume(input_block->allocated_bytes()); _blocks_memory_usage->add(input_block->allocated_bytes()); //TODO: if need improvement, the is a tips to maintain a free queue, @@ -627,11 +617,9 @@ void VAnalyticEvalNode::_insert_result_info(int64_t current_block_rows) { } Status VAnalyticEvalNode::_output_current_block(Block* block) { - SCOPED_CONSUME_MEM_TRACKER(mem_tracker_growh()); - block->swap(std::move(_input_blocks[_output_block_index])); _blocks_memory_usage->add(-block->allocated_bytes()); - mem_tracker_held()->consume(-block->allocated_bytes()); + mem_tracker()->consume(-block->allocated_bytes()); if (_origin_cols.size() < block->columns()) { block->erase_not_in(_origin_cols); } diff --git a/be/src/vec/exec/vbroker_scan_node.cpp b/be/src/vec/exec/vbroker_scan_node.cpp index ac96f97965..a94b73592a 100644 --- a/be/src/vec/exec/vbroker_scan_node.cpp +++ b/be/src/vec/exec/vbroker_scan_node.cpp @@ -59,7 +59,6 @@ Status VBrokerScanNode::init(const TPlanNode& tnode, RuntimeState* state) { Status VBrokerScanNode::prepare(RuntimeState* state) { VLOG_QUERY << "VBrokerScanNode prepare"; RETURN_IF_ERROR(ScanNode::prepare(state)); - SCOPED_CONSUME_MEM_TRACKER(mem_tracker_growh()); // get tuple desc _runtime_state = state; _tuple_desc = state->desc_tbl().get_tuple_descriptor(_tuple_id); @@ -85,7 +84,6 @@ Status VBrokerScanNode::open(RuntimeState* state) { START_AND_SCOPE_SPAN(state->get_tracer(), span, "VBrokerScanNode::open"); SCOPED_TIMER(_runtime_profile->total_time_counter()); RETURN_IF_ERROR(ExecNode::open(state)); - SCOPED_CONSUME_MEM_TRACKER(mem_tracker_growh()); RETURN_IF_CANCELLED(state); RETURN_IF_ERROR(start_scanners()); @@ -109,7 +107,6 @@ Status VBrokerScanNode::start_scanners() { Status VBrokerScanNode::get_next(RuntimeState* state, vectorized::Block* block, bool* eos) { INIT_AND_SCOPE_GET_NEXT_SPAN(state->get_tracer(), _get_next_span, "VBrokerScanNode::get_next"); SCOPED_TIMER(_runtime_profile->total_time_counter()); - SCOPED_CONSUME_MEM_TRACKER(mem_tracker_growh()); // check if CANCELLED. if (state->is_cancelled()) { std::unique_lock l(_batch_queue_lock); @@ -271,7 +268,6 @@ Status VBrokerScanNode::scanner_scan(const TBrokerScanRange& scan_range, Scanner void VBrokerScanNode::scanner_worker(int start_idx, int length) { START_AND_SCOPE_SPAN(_runtime_state->get_tracer(), span, "VBrokerScanNode::scanner_worker"); SCOPED_ATTACH_TASK(_runtime_state); - SCOPED_CONSUME_MEM_TRACKER(mem_tracker_growh_shared()); Thread::set_self_name("vbroker_scanner"); Status status = Status::OK(); ScannerCounter counter; diff --git a/be/src/vec/exec/vexchange_node.cpp b/be/src/vec/exec/vexchange_node.cpp index 272116cbe1..069caaf635 100644 --- a/be/src/vec/exec/vexchange_node.cpp +++ b/be/src/vec/exec/vexchange_node.cpp @@ -54,7 +54,6 @@ Status VExchangeNode::init(const TPlanNode& tnode, RuntimeState* state) { Status VExchangeNode::prepare(RuntimeState* state) { RETURN_IF_ERROR(ExecNode::prepare(state)); - SCOPED_CONSUME_MEM_TRACKER(mem_tracker_growh()); DCHECK_GT(_num_senders, 0); _sub_plan_query_statistics_recvr.reset(new QueryStatisticsRecvr()); _stream_recvr = state->exec_env()->vstream_mgr()->create_recvr( @@ -83,7 +82,6 @@ Status VExchangeNode::alloc_resource(RuntimeState* state) { Status VExchangeNode::open(RuntimeState* state) { START_AND_SCOPE_SPAN(state->get_tracer(), span, "VExchangeNode::open"); SCOPED_TIMER(_runtime_profile->total_time_counter()); - SCOPED_CONSUME_MEM_TRACKER(mem_tracker_growh()); RETURN_IF_ERROR(ExecNode::open(state)); return Status::OK(); @@ -92,7 +90,6 @@ Status VExchangeNode::open(RuntimeState* state) { Status VExchangeNode::get_next(RuntimeState* state, Block* block, bool* eos) { INIT_AND_SCOPE_GET_NEXT_SPAN(state->get_tracer(), _get_next_span, "VExchangeNode::get_next"); SCOPED_TIMER(runtime_profile()->total_time_counter()); - SCOPED_CONSUME_MEM_TRACKER(mem_tracker_growh()); if (_is_merging && state->enable_pipeline_exec() && !_is_ready) { RETURN_IF_ERROR(_stream_recvr->create_merger(_vsort_exec_exprs.lhs_ordering_expr_ctxs(), _is_asc_order, _nulls_first, diff --git a/be/src/vec/exec/vmysql_scan_node.cpp b/be/src/vec/exec/vmysql_scan_node.cpp index 3fb03bd19c..dcc7593ee9 100644 --- a/be/src/vec/exec/vmysql_scan_node.cpp +++ b/be/src/vec/exec/vmysql_scan_node.cpp @@ -49,7 +49,6 @@ Status VMysqlScanNode::prepare(RuntimeState* state) { } RETURN_IF_ERROR(ScanNode::prepare(state)); - SCOPED_CONSUME_MEM_TRACKER(mem_tracker_growh()); // get tuple desc _tuple_desc = state->desc_tbl().get_tuple_descriptor(_tuple_id); @@ -103,7 +102,6 @@ Status VMysqlScanNode::open(RuntimeState* state) { START_AND_SCOPE_SPAN(state->get_tracer(), span, "VMysqlScanNode::open"); SCOPED_TIMER(_runtime_profile->total_time_counter()); RETURN_IF_ERROR(ExecNode::open(state)); - SCOPED_CONSUME_MEM_TRACKER(mem_tracker_growh()); VLOG_CRITICAL << "MysqlScanNode::Open"; if (!_is_init) { diff --git a/be/src/vec/exec/vrepeat_node.cpp b/be/src/vec/exec/vrepeat_node.cpp index 8a090ad0d6..c3fe0e8d0f 100644 --- a/be/src/vec/exec/vrepeat_node.cpp +++ b/be/src/vec/exec/vrepeat_node.cpp @@ -44,7 +44,6 @@ Status VRepeatNode::prepare(RuntimeState* state) { SCOPED_TIMER(_runtime_profile->total_time_counter()); RETURN_IF_ERROR(ExecNode::prepare(state)); - SCOPED_CONSUME_MEM_TRACKER(mem_tracker_growh()); _output_tuple_desc = state->desc_tbl().get_tuple_descriptor(_output_tuple_id); if (_output_tuple_desc == nullptr) { return Status::InternalError("Failed to get tuple descriptor."); @@ -72,7 +71,6 @@ Status VRepeatNode::alloc_resource(RuntimeState* state) { START_AND_SCOPE_SPAN(state->get_tracer(), span, "VRepeatNode::open"); SCOPED_TIMER(_runtime_profile->total_time_counter()); RETURN_IF_ERROR(ExecNode::alloc_resource(state)); - SCOPED_CONSUME_MEM_TRACKER(mem_tracker_growh()); RETURN_IF_ERROR(VExpr::open(_expr_ctxs, state)); return Status::OK(); } diff --git a/be/src/vec/exec/vschema_scan_node.cpp b/be/src/vec/exec/vschema_scan_node.cpp index 9e3e6a1c42..65588eb11b 100644 --- a/be/src/vec/exec/vschema_scan_node.cpp +++ b/be/src/vec/exec/vschema_scan_node.cpp @@ -122,7 +122,6 @@ Status VSchemaScanNode::open(RuntimeState* state) { SCOPED_TIMER(_runtime_profile->total_time_counter()); RETURN_IF_CANCELLED(state); RETURN_IF_ERROR(ExecNode::open(state)); - SCOPED_CONSUME_MEM_TRACKER(mem_tracker_growh()); if (_scanner_param.user) { TSetSessionParams param; @@ -145,7 +144,6 @@ Status VSchemaScanNode::prepare(RuntimeState* state) { } START_AND_SCOPE_SPAN(state->get_tracer(), span, "VSchemaScanNode::prepare"); RETURN_IF_ERROR(ScanNode::prepare(state)); - SCOPED_CONSUME_MEM_TRACKER(mem_tracker_growh()); // new one mem pool _tuple_pool.reset(new (std::nothrow) MemPool()); diff --git a/be/src/vec/exec/vset_operation_node.cpp b/be/src/vec/exec/vset_operation_node.cpp index 9ad65dbf1c..253d360569 100644 --- a/be/src/vec/exec/vset_operation_node.cpp +++ b/be/src/vec/exec/vset_operation_node.cpp @@ -193,7 +193,6 @@ Status VSetOperationNode::open(RuntimeState* state) { START_AND_SCOPE_SPAN(state->get_tracer(), span, "VSetOperationNode::open"); SCOPED_TIMER(_runtime_profile->total_time_counter()); RETURN_IF_ERROR(ExecNode::open(state)); - SCOPED_CONSUME_MEM_TRACKER(mem_tracker_growh()); // TODO: build the hash table in a thread to open other children asynchronously. RETURN_IF_ERROR(hash_table_build(state)); @@ -234,7 +233,6 @@ template Status VSetOperationNode::prepare(RuntimeState* state) { SCOPED_TIMER(_runtime_profile->total_time_counter()); RETURN_IF_ERROR(ExecNode::prepare(state)); - SCOPED_CONSUME_MEM_TRACKER(mem_tracker_growh()); _build_timer = ADD_TIMER(runtime_profile(), "BuildTime"); _probe_timer = ADD_TIMER(runtime_profile(), "ProbeTime"); _pull_timer = ADD_TIMER(runtime_profile(), "PullTime"); diff --git a/be/src/vec/exec/vsort_node.cpp b/be/src/vec/exec/vsort_node.cpp index 4ebdd05506..fd7c88af75 100644 --- a/be/src/vec/exec/vsort_node.cpp +++ b/be/src/vec/exec/vsort_node.cpp @@ -87,7 +87,6 @@ Status VSortNode::init(const TPlanNode& tnode, RuntimeState* state) { } Status VSortNode::prepare(RuntimeState* state) { - SCOPED_CONSUME_MEM_TRACKER(mem_tracker_growh()); SCOPED_TIMER(_runtime_profile->total_time_counter()); RETURN_IF_ERROR(ExecNode::prepare(state)); _runtime_profile->add_info_string("TOP-N", _limit == -1 ? "false" : "true"); @@ -102,7 +101,6 @@ Status VSortNode::prepare(RuntimeState* state) { Status VSortNode::alloc_resource(doris::RuntimeState* state) { RETURN_IF_ERROR(ExecNode::alloc_resource(state)); - SCOPED_CONSUME_MEM_TRACKER(mem_tracker_growh()); RETURN_IF_ERROR(_vsort_exec_exprs.open(state)); RETURN_IF_CANCELLED(state); RETURN_IF_ERROR(state->check_query_state("vsort, while open.")); @@ -161,13 +159,12 @@ Status VSortNode::open(RuntimeState* state) { _children[0], std::placeholders::_1, std::placeholders::_2, std::placeholders::_3)), child(0)->get_next_span(), eos); - SCOPED_CONSUME_MEM_TRACKER(mem_tracker_growh()); RETURN_IF_ERROR(sink(state, upstream_block.get(), eos)); } while (!eos); child(0)->close(state); - mem_tracker_held()->consume(_sorter->data_size()); + mem_tracker()->consume(_sorter->data_size()); COUNTER_UPDATE(_sort_blocks_memory_usage, _sorter->data_size()); return Status::OK(); @@ -183,7 +180,6 @@ Status VSortNode::pull(doris::RuntimeState* state, vectorized::Block* output_blo } Status VSortNode::get_next(RuntimeState* state, Block* block, bool* eos) { - SCOPED_CONSUME_MEM_TRACKER(mem_tracker_growh()); INIT_AND_SCOPE_GET_NEXT_SPAN(state->get_tracer(), _get_next_span, "VSortNode::get_next"); SCOPED_TIMER(_runtime_profile->total_time_counter()); diff --git a/be/src/vec/exec/vtable_function_node.cpp b/be/src/vec/exec/vtable_function_node.cpp index c3e9f2a829..f7fe4f71d6 100644 --- a/be/src/vec/exec/vtable_function_node.cpp +++ b/be/src/vec/exec/vtable_function_node.cpp @@ -83,7 +83,6 @@ bool VTableFunctionNode::_is_inner_and_empty() { Status VTableFunctionNode::prepare(RuntimeState* state) { SCOPED_TIMER(_runtime_profile->total_time_counter()); RETURN_IF_ERROR(ExecNode::prepare(state)); - SCOPED_CONSUME_MEM_TRACKER(mem_tracker_growh()); _num_rows_filtered_counter = ADD_COUNTER(_runtime_profile, "RowsFiltered", TUnit::UNIT); diff --git a/be/src/vec/runtime/vdata_stream_recvr.cpp b/be/src/vec/runtime/vdata_stream_recvr.cpp index 3efb84b4c0..f58e3d1652 100644 --- a/be/src/vec/runtime/vdata_stream_recvr.cpp +++ b/be/src/vec/runtime/vdata_stream_recvr.cpp @@ -308,8 +308,9 @@ VDataStreamRecvr::VDataStreamRecvr( _sub_plan_query_statistics_recvr(sub_plan_query_statistics_recvr), _enable_pipeline(state->enable_pipeline_exec()) { // DataStreamRecvr may be destructed after the instance execution thread ends. - _mem_tracker = std::make_unique( - "VDataStreamRecvr:" + print_id(_fragment_instance_id), _profile); + _mem_tracker = + std::make_unique("VDataStreamRecvr:" + print_id(_fragment_instance_id), + _profile, nullptr, "PeakMemoryUsage"); SCOPED_CONSUME_MEM_TRACKER(_mem_tracker.get()); // Create one queue per sender if is_merging is true. diff --git a/be/src/vec/sink/vdata_stream_sender.cpp b/be/src/vec/sink/vdata_stream_sender.cpp index 1d36902561..8b87ff4840 100644 --- a/be/src/vec/sink/vdata_stream_sender.cpp +++ b/be/src/vec/sink/vdata_stream_sender.cpp @@ -421,7 +421,8 @@ Status VDataStreamSender::prepare(RuntimeState* state) { _profile = _pool->add(new RuntimeProfile(title)); SCOPED_TIMER(_profile->total_time_counter()); _mem_tracker = std::make_unique( - "VDataStreamSender:" + print_id(state->fragment_instance_id()), _profile); + "VDataStreamSender:" + print_id(state->fragment_instance_id()), _profile, nullptr, + "PeakMemoryUsage"); SCOPED_CONSUME_MEM_TRACKER(_mem_tracker.get()); if (_part_type == TPartitionType::UNPARTITIONED || _part_type == TPartitionType::RANDOM) { diff --git a/be/test/testutil/run_all_tests.cpp b/be/test/testutil/run_all_tests.cpp index 99a338a5aa..cd6499831f 100644 --- a/be/test/testutil/run_all_tests.cpp +++ b/be/test/testutil/run_all_tests.cpp @@ -28,10 +28,7 @@ #include "util/mem_info.h" int main(int argc, char** argv) { - std::shared_ptr orphan_mem_tracker = - std::make_shared(doris::MemTrackerLimiter::Type::GLOBAL, - "Orphan"); - doris::ExecEnv::GetInstance()->set_orphan_mem_tracker(orphan_mem_tracker); + doris::ExecEnv::GetInstance()->init_mem_tracker(); doris::thread_context()->thread_mem_tracker_mgr->init(); doris::TabletSchemaCache::create_global_schema_cache(); doris::StoragePageCache::create_global_cache(1 << 30, 10);