diff --git a/be/src/agent/task_worker_pool.cpp b/be/src/agent/task_worker_pool.cpp
index aa986d46ec..330f3e916e 100644
--- a/be/src/agent/task_worker_pool.cpp
+++ b/be/src/agent/task_worker_pool.cpp
@@ -1500,6 +1500,7 @@ void PublishVersionWorkerPool::publish_version_callback(const TAgentTaskRequest&
EnginePublishVersionTask engine_task(publish_version_req, &error_tablet_ids, &succ_tablets,
&discontinuous_version_tablets,
&table_id_to_num_delta_rows);
+ SCOPED_ATTACH_TASK(engine_task.mem_tracker());
status = engine_task.execute();
if (status.ok()) {
break;
diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp
index 1d26085db3..9364303a3e 100644
--- a/be/src/common/config.cpp
+++ b/be/src/common/config.cpp
@@ -132,6 +132,8 @@ DEFINE_mBool(disable_memory_gc, "false");
DEFINE_mInt64(large_memory_check_bytes, "2147483648");
+DEFINE_mBool(enable_memory_orphan_check, "true");
+
// The maximum time a thread waits for full GC. Currently only query will wait for full gc.
DEFINE_mInt32(thread_wait_gc_max_milliseconds, "1000");
diff --git a/be/src/common/config.h b/be/src/common/config.h
index 5650db764f..01d8a123a4 100644
--- a/be/src/common/config.h
+++ b/be/src/common/config.h
@@ -178,6 +178,9 @@ DECLARE_mBool(disable_memory_gc);
// If is -1, disable large memory check.
DECLARE_mInt64(large_memory_check_bytes);
+// default is true. if any memory tracking in Orphan mem tracker will report error.
+DECLARE_mBool(enable_memory_orphan_check);
+
// The maximum time a thread waits for a full GC. Currently only query will wait for full gc.
DECLARE_mInt32(thread_wait_gc_max_milliseconds);
diff --git a/be/src/http/default_path_handlers.cpp b/be/src/http/default_path_handlers.cpp
index 39a75b7cca..6e4ce97b90 100644
--- a/be/src/http/default_path_handlers.cpp
+++ b/be/src/http/default_path_handlers.cpp
@@ -153,11 +153,8 @@ void mem_tracker_handler(const WebPageHandler::ArgumentMap& args, std::stringstr
} else if (iter->second == "schema_change") {
MemTrackerLimiter::make_type_snapshots(&snapshots,
MemTrackerLimiter::Type::SCHEMA_CHANGE);
- } else if (iter->second == "clone") {
- MemTrackerLimiter::make_type_snapshots(&snapshots, MemTrackerLimiter::Type::CLONE);
- } else if (iter->second == "experimental") {
- MemTrackerLimiter::make_type_snapshots(&snapshots,
- MemTrackerLimiter::Type::EXPERIMENTAL);
+ } else if (iter->second == "other") {
+ MemTrackerLimiter::make_type_snapshots(&snapshots, MemTrackerLimiter::Type::OTHER);
}
} else {
(*output) << "
*Notice:
\n";
diff --git a/be/src/io/fs/local_file_system.cpp b/be/src/io/fs/local_file_system.cpp
index 55e9466bde..2c4b6bb5f8 100644
--- a/be/src/io/fs/local_file_system.cpp
+++ b/be/src/io/fs/local_file_system.cpp
@@ -300,13 +300,11 @@ Status LocalFileSystem::md5sum_impl(const Path& file, std::string* md5sum) {
return localfs_error(errno, fmt::format("failed to stat file {}", file.native()));
}
size_t file_len = statbuf.st_size;
- CONSUME_THREAD_MEM_TRACKER(file_len);
void* buf = mmap(nullptr, file_len, PROT_READ, MAP_SHARED, fd, 0);
unsigned char result[MD5_DIGEST_LENGTH];
MD5((unsigned char*)buf, file_len, result);
munmap(buf, file_len);
- RELEASE_THREAD_MEM_TRACKER(file_len);
std::stringstream ss;
for (int32_t i = 0; i < MD5_DIGEST_LENGTH; i++) {
diff --git a/be/src/io/fs/s3_file_bufferpool.cpp b/be/src/io/fs/s3_file_bufferpool.cpp
index 4efbd43942..c683d2eabf 100644
--- a/be/src/io/fs/s3_file_bufferpool.cpp
+++ b/be/src/io/fs/s3_file_bufferpool.cpp
@@ -29,6 +29,7 @@
#include "io/cache/block/block_file_segment.h"
#include "io/fs/s3_common.h"
#include "runtime/exec_env.h"
+#include "runtime/thread_context.h"
#include "util/defer_op.h"
#include "util/slice.h"
#include "vec/common/arena.h"
@@ -83,7 +84,10 @@ FileBuffer::FileBuffer(BufferType type, std::function all
_inner_data(std::make_unique()),
_capacity(_inner_data->size()) {}
-FileBuffer::~FileBuffer() = default;
+FileBuffer::~FileBuffer() {
+ SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(ExecEnv::GetInstance()->s3_file_buffer_tracker());
+ _inner_data.reset();
+}
/**
* 0. check if file cache holder allocated
* 1. update the cache's type to index cache
@@ -251,6 +255,7 @@ FileBufferBuilder& FileBufferBuilder::set_allocate_file_segments_holder(
}
Status FileBufferBuilder::build(std::shared_ptr* buf) {
+ SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(ExecEnv::GetInstance()->s3_file_buffer_tracker());
OperationState state(_sync_after_complete_task, _is_cancelled);
if (_type == BufferType::UPLOAD) {
diff --git a/be/src/io/fs/stream_load_pipe.cpp b/be/src/io/fs/stream_load_pipe.cpp
index 014d1240c5..cd5ee5a8a0 100644
--- a/be/src/io/fs/stream_load_pipe.cpp
+++ b/be/src/io/fs/stream_load_pipe.cpp
@@ -50,7 +50,6 @@ StreamLoadPipe::~StreamLoadPipe() {
Status StreamLoadPipe::read_at_impl(size_t /*offset*/, Slice result, size_t* bytes_read,
const IOContext* /*io_ctx*/) {
- SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(ExecEnv::GetInstance()->orphan_mem_tracker());
*bytes_read = 0;
size_t bytes_req = result.size;
char* to = result.data;
diff --git a/be/src/olap/calc_delete_bitmap_executor.cpp b/be/src/olap/calc_delete_bitmap_executor.cpp
index 363a3d4019..03bbbd2792 100644
--- a/be/src/olap/calc_delete_bitmap_executor.cpp
+++ b/be/src/olap/calc_delete_bitmap_executor.cpp
@@ -38,9 +38,11 @@ Status CalcDeleteBitmapToken::submit(TabletSharedPtr tablet, RowsetSharedPtr cur
{
std::shared_lock rlock(_lock);
RETURN_IF_ERROR(_status);
+ _query_thread_context.init();
}
return _thread_token->submit_func([=, this]() {
+ SCOPED_ATTACH_TASK(_query_thread_context);
auto st = tablet->calc_segment_delete_bitmap(cur_rowset, cur_segment, target_rowsets,
delete_bitmap, end_version, rowset_writer);
if (!st.ok()) {
diff --git a/be/src/olap/calc_delete_bitmap_executor.h b/be/src/olap/calc_delete_bitmap_executor.h
index 02e8867151..c6adea6d2d 100644
--- a/be/src/olap/calc_delete_bitmap_executor.h
+++ b/be/src/olap/calc_delete_bitmap_executor.h
@@ -66,6 +66,7 @@ private:
// Records the current status of the calc delete bitmap job.
// Note: Once its value is set to Failed, it cannot return to SUCCESS.
Status _status;
+ QueryThreadContext _query_thread_context;
};
// CalcDeleteBitmapExecutor is responsible for calc delete bitmap concurrently.
diff --git a/be/src/olap/compaction.cpp b/be/src/olap/compaction.cpp
index ae6bded1b7..09e128d933 100644
--- a/be/src/olap/compaction.cpp
+++ b/be/src/olap/compaction.cpp
@@ -75,9 +75,8 @@ Compaction::Compaction(const TabletSharedPtr& tablet, const std::string& label)
_input_row_num(0),
_input_num_segments(0),
_input_index_size(0),
- _state(CompactionState::INITED),
- _allow_delete_in_cumu_compaction(config::enable_delete_when_cumu_compaction) {
- _mem_tracker = std::make_shared(MemTrackerLimiter::Type::COMPACTION, label);
+ _state(CompactionState::INITED) {
+ _mem_tracker = MemTrackerLimiter::create_shared(MemTrackerLimiter::Type::COMPACTION, label);
init_profile(label);
}
diff --git a/be/src/olap/delta_writer.h b/be/src/olap/delta_writer.h
index 5effb6102f..d2419d2d81 100644
--- a/be/src/olap/delta_writer.h
+++ b/be/src/olap/delta_writer.h
@@ -43,7 +43,6 @@ namespace doris {
class FlushToken;
class MemTable;
-class MemTracker;
class StorageEngine;
class TupleDescriptor;
class SlotDescriptor;
diff --git a/be/src/olap/delta_writer_v2.cpp b/be/src/olap/delta_writer_v2.cpp
index 1dbc96a294..b2f275f18d 100644
--- a/be/src/olap/delta_writer_v2.cpp
+++ b/be/src/olap/delta_writer_v2.cpp
@@ -145,7 +145,6 @@ Status DeltaWriterV2::append(const vectorized::Block* block) {
Status DeltaWriterV2::write(const vectorized::Block* block, const std::vector& row_idxs,
bool is_append) {
- SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(ExecEnv::GetInstance()->orphan_mem_tracker());
if (UNLIKELY(row_idxs.empty() && !is_append)) {
return Status::OK();
}
diff --git a/be/src/olap/memtable.cpp b/be/src/olap/memtable.cpp
index 3e6381a0e6..7cddd2abe0 100644
--- a/be/src/olap/memtable.cpp
+++ b/be/src/olap/memtable.cpp
@@ -65,14 +65,7 @@ MemTable::MemTable(int64_t tablet_id, const TabletSchema* tablet_schema,
_total_size_of_aggregate_states(0),
_mem_usage(0) {
g_memtable_cnt << 1;
-#ifndef BE_TEST
- _insert_mem_tracker_use_hook = std::make_unique(
- fmt::format("MemTableHookInsert:TabletId={}", std::to_string(tablet_id)),
- ExecEnv::GetInstance()->memtable_memory_limiter()->mem_tracker());
-#else
- _insert_mem_tracker_use_hook = std::make_unique(
- fmt::format("MemTableHookInsert:TabletId={}", std::to_string(tablet_id)));
-#endif
+ _query_thread_context.init();
_arena = std::make_unique();
_vec_row_comparator = std::make_shared(_tablet_schema);
// TODO: Support ZOrderComparator in the future
@@ -138,6 +131,7 @@ void MemTable::_init_agg_functions(const vectorized::Block* block) {
}
MemTable::~MemTable() {
+ SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(_query_thread_context.query_mem_tracker);
g_memtable_input_block_allocated_size << -_input_mutable_block.allocated_bytes();
g_memtable_cnt << -1;
if (_keys_type != KeysType::DUP_KEYS) {
@@ -161,6 +155,13 @@ MemTable::~MemTable() {
<< std::endl
<< MemTracker::log_usage(_insert_mem_tracker->make_snapshot());
DCHECK_EQ(_flush_mem_tracker->consumption(), 0);
+ _arena.reset();
+ _agg_buffer_pool.clear();
+ _vec_row_comparator.reset();
+ _row_in_blocks.clear();
+ _agg_functions.clear();
+ _input_mutable_block.clear();
+ _output_mutable_block.clear();
}
int RowInBlockComparator::operator()(const RowInBlock* left, const RowInBlock* right) const {
@@ -170,7 +171,6 @@ int RowInBlockComparator::operator()(const RowInBlock* left, const RowInBlock* r
void MemTable::insert(const vectorized::Block* input_block, const std::vector& row_idxs,
bool is_append) {
- SCOPED_CONSUME_MEM_TRACKER(_insert_mem_tracker_use_hook.get());
vectorized::Block target_block = *input_block;
target_block = input_block->copy_block(_column_offset);
if (_is_first_insertion) {
@@ -473,7 +473,6 @@ void MemTable::_aggregate() {
}
void MemTable::shrink_memtable_by_agg() {
- SCOPED_CONSUME_MEM_TRACKER(_insert_mem_tracker_use_hook.get());
if (_keys_type == KeysType::DUP_KEYS) {
return;
}
diff --git a/be/src/olap/memtable.h b/be/src/olap/memtable.h
index 9e31e02b2b..6fa140846a 100644
--- a/be/src/olap/memtable.h
+++ b/be/src/olap/memtable.h
@@ -34,6 +34,7 @@
#include "olap/partial_update_info.h"
#include "olap/tablet_schema.h"
#include "runtime/memory/mem_tracker.h"
+#include "runtime/thread_context.h"
#include "vec/aggregate_functions/aggregate_function.h"
#include "vec/common/arena.h"
#include "vec/core/block.h"
@@ -197,6 +198,8 @@ public:
std::shared_ptr flush_mem_tracker() { return _flush_mem_tracker; }
+ QueryThreadContext query_thread_context() { return _query_thread_context; }
+
private:
// for vectorized
void _aggregate_two_row_in_block(vectorized::MutableBlock& mutable_block, RowInBlock* new_row,
@@ -211,16 +214,14 @@ private:
std::shared_ptr _vec_row_comparator;
+ QueryThreadContext _query_thread_context;
+
// `_insert_manual_mem_tracker` manually records the memory value of memtable insert()
// `_flush_hook_mem_tracker` automatically records the memory value of memtable flush() through mem hook.
// Is used to flush when _insert_manual_mem_tracker larger than write_buffer_size and run flush memtable
// when the sum of all memtable (_insert_manual_mem_tracker + _flush_hook_mem_tracker) exceeds the limit.
std::shared_ptr _insert_mem_tracker;
std::shared_ptr _flush_mem_tracker;
- // It is only used for verification when the value of `_insert_manual_mem_tracker` is suspected to be wrong.
- // The memory value automatically tracked by the mem hook is 20% less than the manually recorded
- // value in the memtable, because some freed memory is not allocated in the DeltaWriter.
- std::unique_ptr _insert_mem_tracker_use_hook;
// Only the rows will be inserted into SkipList can allocate memory from _arena.
// In this way, we can make MemTable::memory_usage() to be more accurate, and eventually
// reduce the number of segment files that are generated by current load
diff --git a/be/src/olap/memtable_flush_executor.cpp b/be/src/olap/memtable_flush_executor.cpp
index 4d0af7496d..3317371387 100644
--- a/be/src/olap/memtable_flush_executor.cpp
+++ b/be/src/olap/memtable_flush_executor.cpp
@@ -137,6 +137,7 @@ Status FlushToken::_do_flush_memtable(MemTable* memtable, int32_t segment_id, in
<< ", rows: " << memtable->stat().raw_rows;
int64_t duration_ns;
SCOPED_RAW_TIMER(&duration_ns);
+ SCOPED_ATTACH_TASK(memtable->query_thread_context());
signal::set_signal_task_id(_rowset_writer->load_id());
{
SCOPED_CONSUME_MEM_TRACKER(memtable->flush_mem_tracker());
diff --git a/be/src/olap/memtable_memory_limiter.cpp b/be/src/olap/memtable_memory_limiter.cpp
index 696ca4a953..f0a46aaa45 100644
--- a/be/src/olap/memtable_memory_limiter.cpp
+++ b/be/src/olap/memtable_memory_limiter.cpp
@@ -38,7 +38,6 @@ bvar::Status g_memtable_flush_memory("mm_limiter_mem_flush", 0);
bvar::Status g_memtable_load_memory("mm_limiter_mem_load", 0);
bvar::Status g_load_hard_mem_limit("mm_limiter_limit_hard", 0);
bvar::Status g_load_soft_mem_limit("mm_limiter_limit_soft", 0);
-bvar::Status g_orphan_memory("mm_limiter_mem_orphan", 0);
// Calculate the total memory limit of all load tasks on this BE
static int64_t calc_process_max_load_memory(int64_t process_mem_limit) {
@@ -63,8 +62,10 @@ Status MemTableMemoryLimiter::init(int64_t process_mem_limit) {
_load_hard_mem_limit * config::load_process_safe_mem_permit_percent / 100;
g_load_hard_mem_limit.set_value(_load_hard_mem_limit);
g_load_soft_mem_limit.set_value(_load_soft_mem_limit);
- _mem_tracker = std::make_unique(MemTrackerLimiter::Type::LOAD,
- "MemTableMemoryLimiter");
+ _memtable_tracker_set =
+ MemTrackerLimiter::create_shared(MemTrackerLimiter::Type::LOAD, "MemTableTrackerSet");
+ _mem_tracker = std::make_unique("AllMemTableMemory",
+ ExecEnv::GetInstance()->details_mem_tracker_set());
REGISTER_HOOK_METRIC(memtable_memory_limiter_mem_consumption,
[this]() { return _mem_tracker->consumption(); });
_log_timer.start();
@@ -257,8 +258,7 @@ void MemTableMemoryLimiter::_refresh_mem_tracker() {
g_memtable_flush_memory.set_value(_flush_mem_usage);
g_memtable_load_memory.set_value(_mem_usage);
VLOG_DEBUG << "refreshed mem_tracker, num writers: " << _writers.size();
- THREAD_MEM_TRACKER_TRANSFER_TO(_mem_usage - _mem_tracker->consumption(), _mem_tracker.get());
- g_orphan_memory.set_value(ExecEnv::GetInstance()->orphan_mem_tracker()->consumption());
+ _mem_tracker->set_consumption(_mem_usage);
if (!_hard_limit_reached()) {
_hard_limit_end_cond.notify_all();
}
diff --git a/be/src/olap/memtable_memory_limiter.h b/be/src/olap/memtable_memory_limiter.h
index 25a263675d..895b0bbe2c 100644
--- a/be/src/olap/memtable_memory_limiter.h
+++ b/be/src/olap/memtable_memory_limiter.h
@@ -45,7 +45,8 @@ public:
void refresh_mem_tracker();
- MemTrackerLimiter* mem_tracker() { return _mem_tracker.get(); }
+ MemTrackerLimiter* memtable_tracker_set() { return _memtable_tracker_set.get(); }
+ MemTracker* mem_tracker() { return _mem_tracker.get(); }
int64_t mem_usage() const { return _mem_usage; }
@@ -67,7 +68,10 @@ private:
int64_t _write_mem_usage = 0;
int64_t _active_mem_usage = 0;
- std::unique_ptr _mem_tracker;
+ // mem tracker collection of all mem tables.
+ std::shared_ptr _memtable_tracker_set;
+ // sum of all mem table memory.
+ std::unique_ptr _mem_tracker;
int64_t _load_hard_mem_limit = -1;
int64_t _load_soft_mem_limit = -1;
int64_t _load_safe_mem_permit = -1;
diff --git a/be/src/olap/memtable_writer.cpp b/be/src/olap/memtable_writer.cpp
index 6a5a5a5ebf..dde76fec38 100644
--- a/be/src/olap/memtable_writer.cpp
+++ b/be/src/olap/memtable_writer.cpp
@@ -189,12 +189,12 @@ void MemTableWriter::_reset_mem_table() {
fmt::format("MemTableManualInsert:TabletId={}:MemTableNum={}#loadID={}",
std::to_string(tablet_id()), _mem_table_num,
UniqueId(_req.load_id).to_string()),
- ExecEnv::GetInstance()->memtable_memory_limiter()->mem_tracker());
+ ExecEnv::GetInstance()->memtable_memory_limiter()->memtable_tracker_set());
auto mem_table_flush_tracker = std::make_shared(
fmt::format("MemTableHookFlush:TabletId={}:MemTableNum={}#loadID={}",
std::to_string(tablet_id()), _mem_table_num++,
UniqueId(_req.load_id).to_string()),
- ExecEnv::GetInstance()->memtable_memory_limiter()->mem_tracker());
+ ExecEnv::GetInstance()->memtable_memory_limiter()->memtable_tracker_set());
#else
auto mem_table_insert_tracker = std::make_shared(fmt::format(
"MemTableManualInsert:TabletId={}:MemTableNum={}#loadID={}",
diff --git a/be/src/olap/olap_server.cpp b/be/src/olap/olap_server.cpp
index e81dcd643f..d3150c17e1 100644
--- a/be/src/olap/olap_server.cpp
+++ b/be/src/olap/olap_server.cpp
@@ -277,6 +277,9 @@ void StorageEngine::_cache_clean_callback() {
<< "], force set to 3600 ";
interval = 3600;
}
+ if (config::disable_memory_gc) {
+ continue;
+ }
CacheManager::instance()->for_each_cache_prune_stale();
diff --git a/be/src/olap/page_cache.cpp b/be/src/olap/page_cache.cpp
index fc87b498ba..3476ddb2a3 100644
--- a/be/src/olap/page_cache.cpp
+++ b/be/src/olap/page_cache.cpp
@@ -70,7 +70,7 @@ void StoragePageCache::insert(const CacheKey& key, DataPage* data, PageCacheHand
}
auto* cache = _get_page_cache(page_type);
- auto* lru_handle = cache->insert(key.encode(), data, data->capacity(), 0, priority);
+ auto* lru_handle = cache->insert_no_tracking(key.encode(), data, data->capacity(), priority);
*handle = PageCacheHandle(cache, lru_handle);
}
diff --git a/be/src/olap/page_cache.h b/be/src/olap/page_cache.h
index 97ffe9fba5..9b36bfc690 100644
--- a/be/src/olap/page_cache.h
+++ b/be/src/olap/page_cache.h
@@ -43,8 +43,8 @@ public:
PageBase() = default;
PageBase(size_t b, const std::shared_ptr& mem_tracker)
- : LRUCacheValueBase(mem_tracker), _size(b), _capacity(b) {
- SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(_mem_tracker);
+ : LRUCacheValueBase(), _size(b), _capacity(b), _mem_tracker_by_allocator(mem_tracker) {
+ SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(_mem_tracker_by_allocator);
_data = reinterpret_cast(TAllocator::alloc(_capacity, ALLOCATOR_ALIGNMENT_16));
}
@@ -54,7 +54,7 @@ public:
~PageBase() override {
if (_data != nullptr) {
DCHECK(_capacity != 0 && _size != 0);
- SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(_mem_tracker);
+ SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(_mem_tracker_by_allocator);
TAllocator::free(_data, _capacity);
}
}
@@ -73,6 +73,7 @@ private:
// Effective size, smaller than capacity, such as data page remove checksum suffix.
size_t _size = 0;
size_t _capacity = 0;
+ std::shared_ptr _mem_tracker_by_allocator;
};
using DataPage = PageBase>;
@@ -109,7 +110,9 @@ public:
DataPageCache(size_t capacity, uint32_t num_shards)
: LRUCachePolicy(CachePolicy::CacheType::DATA_PAGE_CACHE, capacity,
LRUCacheType::SIZE, config::data_page_cache_stale_sweep_time_sec,
- num_shards) {}
+ num_shards) {
+ init_mem_tracker_by_allocator();
+ }
};
class IndexPageCache : public LRUCachePolicy {
@@ -117,7 +120,9 @@ public:
IndexPageCache(size_t capacity, uint32_t num_shards)
: LRUCachePolicy(CachePolicy::CacheType::INDEXPAGE_CACHE, capacity,
LRUCacheType::SIZE, config::index_page_cache_stale_sweep_time_sec,
- num_shards) {}
+ num_shards) {
+ init_mem_tracker_by_allocator();
+ }
};
class PKIndexPageCache : public LRUCachePolicy {
@@ -125,7 +130,9 @@ public:
PKIndexPageCache(size_t capacity, uint32_t num_shards)
: LRUCachePolicy(CachePolicy::CacheType::PK_INDEX_PAGE_CACHE, capacity,
LRUCacheType::SIZE,
- config::pk_index_page_cache_stale_sweep_time_sec, num_shards) {}
+ config::pk_index_page_cache_stale_sweep_time_sec, num_shards) {
+ init_mem_tracker_by_allocator();
+ }
};
static constexpr uint32_t kDefaultNumShards = 16;
@@ -162,7 +169,7 @@ public:
segment_v2::PageTypePB page_type, bool in_memory = false);
std::shared_ptr mem_tracker(segment_v2::PageTypePB page_type) {
- return _get_page_cache(page_type)->mem_tracker();
+ return _get_page_cache(page_type)->mem_tracker_by_allocator();
}
private:
diff --git a/be/src/olap/push_handler.cpp b/be/src/olap/push_handler.cpp
index 0f4385cb64..09e4228c37 100644
--- a/be/src/olap/push_handler.cpp
+++ b/be/src/olap/push_handler.cpp
@@ -371,15 +371,17 @@ Status PushBrokerReader::init() {
fragment_params.protocol_version = PaloInternalServiceVersion::V1;
TQueryOptions query_options;
TQueryGlobals query_globals;
+ std::shared_ptr tracker = MemTrackerLimiter::create_shared(
+ MemTrackerLimiter::Type::LOAD,
+ fmt::format("PushBrokerReader:dummy_id={}", print_id(dummy_id)));
_runtime_state = RuntimeState::create_unique(params, query_options, query_globals,
- ExecEnv::GetInstance(), nullptr);
+ ExecEnv::GetInstance(), nullptr, tracker);
DescriptorTbl* desc_tbl = nullptr;
Status status = DescriptorTbl::create(_runtime_state->obj_pool(), _t_desc_tbl, &desc_tbl);
if (UNLIKELY(!status.ok())) {
return Status::Error("Failed to create descriptor table, msg: {}", status);
}
_runtime_state->set_desc_tbl(desc_tbl);
- _runtime_state->init_mem_trackers(dummy_id, "PushBrokerReader");
_runtime_profile = _runtime_state->runtime_profile();
_runtime_profile->set_name("PushBrokerReader");
diff --git a/be/src/olap/rowset/segcompaction.cpp b/be/src/olap/rowset/segcompaction.cpp
index f017f53448..2de881ee13 100644
--- a/be/src/olap/rowset/segcompaction.cpp
+++ b/be/src/olap/rowset/segcompaction.cpp
@@ -210,7 +210,7 @@ Status SegcompactionWorker::_create_segment_writer_for_segcompaction(
}
Status SegcompactionWorker::_do_compact_segments(SegCompactionCandidatesSharedPtr segments) {
- SCOPED_CONSUME_MEM_TRACKER(StorageEngine::instance()->segcompaction_mem_tracker());
+ SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(ExecEnv::GetInstance()->segcompaction_mem_tracker());
/* throttle segcompaction task if memory depleted */
if (MemInfo::is_exceed_soft_mem_limit(GB_EXCHANGE_BYTE)) {
return Status::Error("skip segcompaction due to memory shortage");
diff --git a/be/src/olap/rowset/segment_v2/page_io.cpp b/be/src/olap/rowset/segment_v2/page_io.cpp
index ac489d451e..cf6e054161 100644
--- a/be/src/olap/rowset/segment_v2/page_io.cpp
+++ b/be/src/olap/rowset/segment_v2/page_io.cpp
@@ -147,11 +147,7 @@ Status PageIO::read_and_decompress_page(const PageReadOptions& opts, PageHandle*
if (opts.use_page_cache && cache) {
page_mem_tracker = cache->mem_tracker(opts.type);
} else {
- if (is_thread_context_init()) {
- page_mem_tracker = thread_context()->thread_mem_tracker_mgr->limiter_mem_tracker();
- } else {
- page_mem_tracker = ExecEnv::GetInstance()->orphan_mem_tracker();
- }
+ page_mem_tracker = thread_context()->thread_mem_tracker_mgr->limiter_mem_tracker();
}
// hold compressed page at first, reset to decompressed page later
diff --git a/be/src/olap/rowset/segment_v2/segment.cpp b/be/src/olap/rowset/segment_v2/segment.cpp
index 17539012a7..ac222ed088 100644
--- a/be/src/olap/rowset/segment_v2/segment.cpp
+++ b/be/src/olap/rowset/segment_v2/segment.cpp
@@ -95,16 +95,12 @@ Segment::Segment(uint32_t segment_id, RowsetId rowset_id, TabletSchemaSPtr table
: _segment_id(segment_id),
_meta_mem_usage(0),
_rowset_id(rowset_id),
- _tablet_schema(tablet_schema),
- _segment_meta_mem_tracker(StorageEngine::instance()->segment_meta_mem_tracker()) {
+ _tablet_schema(tablet_schema) {
g_total_segment_num << 1;
}
Segment::~Segment() {
g_total_segment_num << -1;
-#ifndef BE_TEST
- _segment_meta_mem_tracker->release(_meta_mem_usage);
-#endif
}
Status Segment::_open() {
@@ -296,7 +292,6 @@ Status Segment::_load_pk_bloom_filter() {
return _load_pk_bf_once.call([this] {
RETURN_IF_ERROR(_pk_index_reader->parse_bf(_file_reader, *_pk_index_meta));
_meta_mem_usage += _pk_index_reader->get_bf_memory_size();
- _segment_meta_mem_tracker->consume(_pk_index_reader->get_bf_memory_size());
return Status::OK();
});
}();
@@ -333,7 +328,6 @@ Status Segment::_load_index_impl() {
_pk_index_reader.reset(new PrimaryKeyIndexReader());
RETURN_IF_ERROR(_pk_index_reader->parse_index(_file_reader, *_pk_index_meta));
_meta_mem_usage += _pk_index_reader->get_memory_size();
- _segment_meta_mem_tracker->consume(_pk_index_reader->get_memory_size());
return Status::OK();
} else {
// read and parse short key index page
@@ -356,7 +350,6 @@ Status Segment::_load_index_impl() {
DCHECK(footer.has_short_key_page_footer());
_meta_mem_usage += body.get_size();
- _segment_meta_mem_tracker->consume(body.get_size());
_sk_index_decoder.reset(new ShortKeyIndexDecoder);
return _sk_index_decoder->parse(body, footer.short_key_page_footer());
}
diff --git a/be/src/olap/rowset/segment_v2/segment.h b/be/src/olap/rowset/segment_v2/segment.h
index f9d426c8de..819a28648a 100644
--- a/be/src/olap/rowset/segment_v2/segment.h
+++ b/be/src/olap/rowset/segment_v2/segment.h
@@ -209,6 +209,12 @@ private:
io::FileReaderSPtr _file_reader;
uint32_t _segment_id;
uint32_t _num_rows;
+
+ // only for tracking memory use by segment meta data such as footer or index page.
+ // The memory consumed by querying is tracked in segment iterator.
+ // TODO: Segment::_meta_mem_usage Unknown value overflow, causes the value of SegmentMeta mem tracker
+ // is similar to `-2912341218700198079`. So, temporarily put it in experimental type tracker.
+ // maybe have to use ColumnReader count as segment meta size.
int64_t _meta_mem_usage;
RowsetId _rowset_id;
@@ -244,8 +250,6 @@ private:
std::unique_ptr _sk_index_decoder;
// primary key index reader
std::unique_ptr _pk_index_reader;
- // Segment may be destructed after StorageEngine, in order to exit gracefully.
- std::shared_ptr _segment_meta_mem_tracker;
std::mutex _open_lock;
// inverted index file reader
std::shared_ptr _inverted_index_file_reader;
diff --git a/be/src/olap/storage_engine.cpp b/be/src/olap/storage_engine.cpp
index 832c554e45..f8df3fd0a6 100644
--- a/be/src/olap/storage_engine.cpp
+++ b/be/src/olap/storage_engine.cpp
@@ -74,7 +74,6 @@
#include "olap/tablet_meta_manager.h"
#include "olap/task/engine_task.h"
#include "olap/txn_manager.h"
-#include "runtime/memory/mem_tracker.h"
#include "runtime/stream_load/stream_load_recorder.h"
#include "util/doris_metrics.h"
#include "util/metrics.h"
@@ -121,9 +120,6 @@ StorageEngine::StorageEngine(const EngineOptions& options)
_effective_cluster_id(-1),
_is_all_cluster_id_exist(true),
_stopped(false),
- _segcompaction_mem_tracker(std::make_shared("SegCompaction")),
- _segment_meta_mem_tracker(std::make_shared(
- "SegmentMeta", ExecEnv::GetInstance()->experimental_mem_tracker())),
_stop_background_threads_latch(1),
_tablet_manager(new TabletManager(*this, config::tablet_map_shard_size)),
_txn_manager(new TxnManager(*this, config::txn_map_shard_size, config::txn_shard_size)),
diff --git a/be/src/olap/storage_engine.h b/be/src/olap/storage_engine.h
index 6ffa08b7cd..ff5e0e3e8c 100644
--- a/be/src/olap/storage_engine.h
+++ b/be/src/olap/storage_engine.h
@@ -61,7 +61,6 @@ class BaseCompaction;
class CumulativeCompaction;
class SingleReplicaCompaction;
class CumulativeCompactionPolicy;
-class MemTracker;
class StreamLoadRecorder;
class TCloneReq;
class TCreateTabletReq;
@@ -192,9 +191,6 @@ public:
Status get_compaction_status_json(std::string* result);
- std::shared_ptr segment_meta_mem_tracker() { return _segment_meta_mem_tracker; }
- std::shared_ptr segcompaction_mem_tracker() { return _segcompaction_mem_tracker; }
-
// check cumulative compaction config
void check_cumulative_compaction_config();
@@ -365,15 +361,6 @@ private:
std::mutex _quering_rowsets_mutex;
std::unordered_map _querying_rowsets;
- // Count the memory consumption of segment compaction tasks.
- std::shared_ptr _segcompaction_mem_tracker;
- // This mem tracker is only for tracking memory use by segment meta data such as footer or index page.
- // The memory consumed by querying is tracked in segment iterator.
- // TODO: Segment::_meta_mem_usage Unknown value overflow, causes the value of SegmentMeta mem tracker
- // is similar to `-2912341218700198079`. So, temporarily put it in experimental type tracker.
- // maybe have to use ColumnReader count as segment meta size.
- std::shared_ptr _segment_meta_mem_tracker;
-
CountDownLatch _stop_background_threads_latch;
scoped_refptr _unused_rowset_monitor_thread;
// thread to monitor snapshot expiry
diff --git a/be/src/olap/tablet_manager.cpp b/be/src/olap/tablet_manager.cpp
index f5e081aab5..ea0c5bffcf 100644
--- a/be/src/olap/tablet_manager.cpp
+++ b/be/src/olap/tablet_manager.cpp
@@ -89,7 +89,7 @@ bvar::Adder g_tablet_meta_schema_columns_count("tablet_meta_schema_colu
TabletManager::TabletManager(StorageEngine& engine, int32_t tablet_map_lock_shard_size)
: _engine(engine),
_tablet_meta_mem_tracker(std::make_shared(
- "TabletMeta", ExecEnv::GetInstance()->experimental_mem_tracker())),
+ "TabletMeta(experimental)", ExecEnv::GetInstance()->details_mem_tracker_set())),
_tablets_shards_size(tablet_map_lock_shard_size),
_tablets_shards_mask(tablet_map_lock_shard_size - 1) {
CHECK_GT(_tablets_shards_size, 0);
diff --git a/be/src/olap/task/engine_alter_tablet_task.cpp b/be/src/olap/task/engine_alter_tablet_task.cpp
index 30c3e95809..21f63d58e7 100644
--- a/be/src/olap/task/engine_alter_tablet_task.cpp
+++ b/be/src/olap/task/engine_alter_tablet_task.cpp
@@ -44,7 +44,6 @@ EngineAlterTabletTask::EngineAlterTabletTask(const TAlterTabletReqV2& request)
}
Status EngineAlterTabletTask::execute() {
- SCOPED_ATTACH_TASK(_mem_tracker);
DorisMetrics::instance()->create_rollup_requests_total->increment(1);
Status res = Status::OK();
try {
diff --git a/be/src/olap/task/engine_batch_load_task.cpp b/be/src/olap/task/engine_batch_load_task.cpp
index 5c6df55d3f..6265b8a03b 100644
--- a/be/src/olap/task/engine_batch_load_task.cpp
+++ b/be/src/olap/task/engine_batch_load_task.cpp
@@ -63,7 +63,7 @@ using namespace ErrorCode;
EngineBatchLoadTask::EngineBatchLoadTask(TPushReq& push_req, std::vector* tablet_infos)
: _push_req(push_req), _tablet_infos(tablet_infos) {
- _mem_tracker = std::make_shared(
+ _mem_tracker = MemTrackerLimiter::create_shared(
MemTrackerLimiter::Type::LOAD,
fmt::format("EngineBatchLoadTask#pushType={}:tabletId={}", _push_req.push_type,
std::to_string(_push_req.tablet_id)));
diff --git a/be/src/olap/task/engine_checksum_task.cpp b/be/src/olap/task/engine_checksum_task.cpp
index b7a5545c37..ed7937f0ff 100644
--- a/be/src/olap/task/engine_checksum_task.cpp
+++ b/be/src/olap/task/engine_checksum_task.cpp
@@ -44,7 +44,7 @@ namespace doris {
EngineChecksumTask::EngineChecksumTask(TTabletId tablet_id, TSchemaHash schema_hash,
TVersion version, uint32_t* checksum)
: _tablet_id(tablet_id), _schema_hash(schema_hash), _version(version), _checksum(checksum) {
- _mem_tracker = std::make_shared(
+ _mem_tracker = MemTrackerLimiter::create_shared(
MemTrackerLimiter::Type::LOAD,
"EngineChecksumTask#tabletId=" + std::to_string(tablet_id));
}
diff --git a/be/src/olap/task/engine_clone_task.cpp b/be/src/olap/task/engine_clone_task.cpp
index c5d1d85877..03b07608f8 100644
--- a/be/src/olap/task/engine_clone_task.cpp
+++ b/be/src/olap/task/engine_clone_task.cpp
@@ -148,8 +148,8 @@ EngineCloneTask::EngineCloneTask(const TCloneReq& clone_req, const TMasterInfo&
_tablet_infos(tablet_infos),
_signature(signature),
_master_info(master_info) {
- _mem_tracker = std::make_shared(
- MemTrackerLimiter::Type::CLONE,
+ _mem_tracker = MemTrackerLimiter::create_shared(
+ MemTrackerLimiter::Type::OTHER,
"EngineCloneTask#tabletId=" + std::to_string(_clone_req.tablet_id));
}
diff --git a/be/src/olap/task/engine_index_change_task.cpp b/be/src/olap/task/engine_index_change_task.cpp
index 2ff41eccd8..78cc3ef629 100644
--- a/be/src/olap/task/engine_index_change_task.cpp
+++ b/be/src/olap/task/engine_index_change_task.cpp
@@ -24,7 +24,7 @@ namespace doris {
EngineIndexChangeTask::EngineIndexChangeTask(
const TAlterInvertedIndexReq& alter_inverted_index_request)
: _alter_inverted_index_req(alter_inverted_index_request) {
- _mem_tracker = std::make_shared(
+ _mem_tracker = MemTrackerLimiter::create_shared(
MemTrackerLimiter::Type::SCHEMA_CHANGE,
fmt::format("EngineIndexChangeTask#tabletId={}",
std::to_string(_alter_inverted_index_req.tablet_id)),
diff --git a/be/src/olap/task/engine_publish_version_task.cpp b/be/src/olap/task/engine_publish_version_task.cpp
index 95a438e6ee..96cad7f934 100644
--- a/be/src/olap/task/engine_publish_version_task.cpp
+++ b/be/src/olap/task/engine_publish_version_task.cpp
@@ -80,7 +80,10 @@ EnginePublishVersionTask::EnginePublishVersionTask(
_error_tablet_ids(error_tablet_ids),
_succ_tablets(succ_tablets),
_discontinuous_version_tablets(discontinuous_version_tablets),
- _table_id_to_num_delta_rows(table_id_to_num_delta_rows) {}
+ _table_id_to_num_delta_rows(table_id_to_num_delta_rows) {
+ _mem_tracker = MemTrackerLimiter::create_shared(MemTrackerLimiter::Type::SCHEMA_CHANGE,
+ "TabletPublishTxnTask");
+}
void EnginePublishVersionTask::add_error_tablet_id(int64_t tablet_id) {
std::lock_guard lck(_tablet_ids_mutex);
@@ -352,12 +355,15 @@ TabletPublishTxnTask::TabletPublishTxnTask(EnginePublishVersionTask* engine_task
_partition_id(partition_id),
_transaction_id(transaction_id),
_version(version),
- _tablet_info(tablet_info) {
+ _tablet_info(tablet_info),
+ _mem_tracker(MemTrackerLimiter::create_shared(MemTrackerLimiter::Type::SCHEMA_CHANGE,
+ "TabletPublishTxnTask")) {
_stats.submit_time_us = MonotonicMicros();
}
void TabletPublishTxnTask::handle() {
std::shared_lock migration_rlock(_tablet->get_migration_lock(), std::chrono::seconds(5));
+ SCOPED_ATTACH_TASK(_mem_tracker);
if (!migration_rlock.owns_lock()) {
_result = Status::Error("got migration_rlock failed");
LOG(WARNING) << "failed to publish version. tablet_id=" << _tablet_info.tablet_id
@@ -408,6 +414,7 @@ void TabletPublishTxnTask::handle() {
void AsyncTabletPublishTask::handle() {
std::shared_lock migration_rlock(_tablet->get_migration_lock(), std::chrono::seconds(5));
+ SCOPED_ATTACH_TASK(_mem_tracker);
if (!migration_rlock.owns_lock()) {
LOG(WARNING) << "failed to publish version. tablet_id=" << _tablet->tablet_id()
<< ", txn_id=" << _transaction_id << ", got migration_rlock failed";
diff --git a/be/src/olap/task/engine_publish_version_task.h b/be/src/olap/task/engine_publish_version_task.h
index 5683e58316..2b8b5540b2 100644
--- a/be/src/olap/task/engine_publish_version_task.h
+++ b/be/src/olap/task/engine_publish_version_task.h
@@ -33,6 +33,7 @@
#include "olap/rowset/rowset.h"
#include "olap/tablet.h"
#include "olap/task/engine_task.h"
+#include "runtime/memory/mem_tracker_limiter.h"
#include "util/time.h"
namespace doris {
@@ -82,6 +83,7 @@ private:
TabletInfo _tablet_info;
TabletPublishStatistics _stats;
Status _result;
+ std::shared_ptr _mem_tracker;
};
class EnginePublishVersionTask : public EngineTask {
@@ -116,7 +118,9 @@ public:
: _tablet(std::move(tablet)),
_partition_id(partition_id),
_transaction_id(transaction_id),
- _version(version) {
+ _version(version),
+ _mem_tracker(MemTrackerLimiter::create_shared(MemTrackerLimiter::Type::SCHEMA_CHANGE,
+ "AsyncTabletPublishTask")) {
_stats.submit_time_us = MonotonicMicros();
}
~AsyncTabletPublishTask() = default;
@@ -129,6 +133,7 @@ private:
int64_t _transaction_id;
int64_t _version;
TabletPublishStatistics _stats;
+ std::shared_ptr _mem_tracker;
};
} // namespace doris
diff --git a/be/src/pipeline/exec/exchange_sink_buffer.cpp b/be/src/pipeline/exec/exchange_sink_buffer.cpp
index 3b02373ecb..9f692d0bee 100644
--- a/be/src/pipeline/exec/exchange_sink_buffer.cpp
+++ b/be/src/pipeline/exec/exchange_sink_buffer.cpp
@@ -314,7 +314,6 @@ Status ExchangeSinkBuffer::_send_rpc(InstanceLoId id) {
}
});
{
- SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(ExecEnv::GetInstance()->orphan_mem_tracker());
auto send_remote_block_closure =
AutoReleaseClosure>::
@@ -393,7 +392,6 @@ Status ExchangeSinkBuffer::_send_rpc(InstanceLoId id) {
}
});
{
- SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(ExecEnv::GetInstance()->orphan_mem_tracker());
auto send_remote_block_closure =
AutoReleaseClosure>::
diff --git a/be/src/pipeline/exec/partitioned_aggregation_sink_operator.cpp b/be/src/pipeline/exec/partitioned_aggregation_sink_operator.cpp
index 87eae081d0..3207c10958 100644
--- a/be/src/pipeline/exec/partitioned_aggregation_sink_operator.cpp
+++ b/be/src/pipeline/exec/partitioned_aggregation_sink_operator.cpp
@@ -201,7 +201,6 @@ Status PartitionedAggSinkLocalState::setup_in_memory_agg_op(RuntimeState* state)
_runtime_state = RuntimeState::create_unique(
nullptr, state->fragment_instance_id(), state->query_id(), state->fragment_id(),
state->query_options(), TQueryGlobals {}, state->exec_env(), state->get_query_ctx());
- _runtime_state->set_query_mem_tracker(state->query_mem_tracker());
_runtime_state->set_task_execution_context(state->get_task_execution_context().lock());
_runtime_state->set_be_number(state->be_number());
diff --git a/be/src/pipeline/exec/partitioned_aggregation_source_operator.cpp b/be/src/pipeline/exec/partitioned_aggregation_source_operator.cpp
index 916a4e8268..960decdb95 100644
--- a/be/src/pipeline/exec/partitioned_aggregation_source_operator.cpp
+++ b/be/src/pipeline/exec/partitioned_aggregation_source_operator.cpp
@@ -158,7 +158,6 @@ Status PartitionedAggLocalState::setup_in_memory_agg_op(RuntimeState* state) {
_runtime_state = RuntimeState::create_unique(
nullptr, state->fragment_instance_id(), state->query_id(), state->fragment_id(),
state->query_options(), TQueryGlobals {}, state->exec_env(), state->get_query_ctx());
- _runtime_state->set_query_mem_tracker(state->query_mem_tracker());
_runtime_state->set_task_execution_context(state->get_task_execution_context().lock());
_runtime_state->set_be_number(state->be_number());
diff --git a/be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp b/be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp
index 25ac9d333b..c0f5e3b65b 100644
--- a/be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp
+++ b/be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp
@@ -551,8 +551,6 @@ Status PartitionedHashJoinProbeOperatorX::_setup_internal_operators(
nullptr, state->fragment_instance_id(), state->query_id(), state->fragment_id(),
state->query_options(), TQueryGlobals {}, state->exec_env(), state->get_query_ctx());
- local_state._runtime_state->set_query_mem_tracker(state->query_mem_tracker());
-
local_state._runtime_state->set_task_execution_context(
state->get_task_execution_context().lock());
local_state._runtime_state->set_be_number(state->be_number());
diff --git a/be/src/pipeline/exec/spill_sort_sink_operator.cpp b/be/src/pipeline/exec/spill_sort_sink_operator.cpp
index 6e70ba5fca..c586a8e5e5 100644
--- a/be/src/pipeline/exec/spill_sort_sink_operator.cpp
+++ b/be/src/pipeline/exec/spill_sort_sink_operator.cpp
@@ -86,7 +86,6 @@ Status SpillSortSinkLocalState::setup_in_memory_sort_op(RuntimeState* state) {
_runtime_state = RuntimeState::create_unique(
nullptr, state->fragment_instance_id(), state->query_id(), state->fragment_id(),
state->query_options(), TQueryGlobals {}, state->exec_env(), state->get_query_ctx());
- _runtime_state->set_query_mem_tracker(state->query_mem_tracker());
_runtime_state->set_task_execution_context(state->get_task_execution_context().lock());
_runtime_state->set_be_number(state->be_number());
diff --git a/be/src/pipeline/exec/spill_sort_source_operator.cpp b/be/src/pipeline/exec/spill_sort_source_operator.cpp
index c772b6b279..d249b3be56 100644
--- a/be/src/pipeline/exec/spill_sort_source_operator.cpp
+++ b/be/src/pipeline/exec/spill_sort_source_operator.cpp
@@ -200,7 +200,6 @@ Status SpillSortLocalState::setup_in_memory_sort_op(RuntimeState* state) {
_runtime_state = RuntimeState::create_unique(
nullptr, state->fragment_instance_id(), state->query_id(), state->fragment_id(),
state->query_options(), TQueryGlobals {}, state->exec_env(), state->get_query_ctx());
- _runtime_state->set_query_mem_tracker(state->query_mem_tracker());
_runtime_state->set_task_execution_context(state->get_task_execution_context().lock());
_runtime_state->set_be_number(state->be_number());
diff --git a/be/src/pipeline/pipeline_fragment_context.cpp b/be/src/pipeline/pipeline_fragment_context.cpp
index 55c5d95bb9..c273a0c380 100644
--- a/be/src/pipeline/pipeline_fragment_context.cpp
+++ b/be/src/pipeline/pipeline_fragment_context.cpp
@@ -134,18 +134,25 @@ PipelineFragmentContext::PipelineFragmentContext(
_create_time(MonotonicNanos()) {
_fragment_watcher.start();
_start_time = VecDateTimeValue::local_time();
+ _query_thread_context = {query_id, _query_ctx->query_mem_tracker};
}
PipelineFragmentContext::~PipelineFragmentContext() {
+ // The memory released by the query end is recorded in the query mem tracker.
+ SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(_query_thread_context.query_mem_tracker);
auto st = _query_ctx->exec_status();
+ _query_ctx.reset();
+ _tasks.clear();
if (_runtime_state != nullptr) {
- // The memory released by the query end is recorded in the query mem tracker, main memory in _runtime_state.
- SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(_runtime_state->query_mem_tracker());
_call_back(_runtime_state.get(), &st);
_runtime_state.reset();
} else {
_call_back(_runtime_state.get(), &st);
}
+ _root_pipeline.reset();
+ _pipelines.clear();
+ _sink.reset();
+ _multi_cast_stream_sink_senders.clear();
}
bool PipelineFragmentContext::is_timeout(const VecDateTimeValue& now) const {
@@ -250,10 +257,9 @@ Status PipelineFragmentContext::prepare(const doris::TPipelineFragmentParams& re
request.query_options, _query_ctx->query_globals, _exec_env, _query_ctx.get());
_runtime_state->set_task_execution_context(shared_from_this());
- _runtime_state->set_query_mem_tracker(_query_ctx->query_mem_tracker);
// TODO should be combine with plan_fragment_executor.prepare funciton
- SCOPED_ATTACH_TASK(_runtime_state.get());
+ SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(_runtime_state->query_mem_tracker());
_runtime_state->set_be_number(local_params.backend_num);
if (request.__isset.backend_id) {
diff --git a/be/src/pipeline/pipeline_fragment_context.h b/be/src/pipeline/pipeline_fragment_context.h
index 96936233b3..c75a95b908 100644
--- a/be/src/pipeline/pipeline_fragment_context.h
+++ b/be/src/pipeline/pipeline_fragment_context.h
@@ -190,6 +190,8 @@ protected:
std::shared_ptr _query_ctx;
+ QueryThreadContext _query_thread_context;
+
MonotonicStopWatch _fragment_watcher;
RuntimeProfile::Counter* _start_timer = nullptr;
RuntimeProfile::Counter* _prepare_timer = nullptr;
diff --git a/be/src/pipeline/pipeline_task.cpp b/be/src/pipeline/pipeline_task.cpp
index a6b6329cc0..f31a39df31 100644
--- a/be/src/pipeline/pipeline_task.cpp
+++ b/be/src/pipeline/pipeline_task.cpp
@@ -211,6 +211,7 @@ void PipelineTask::set_task_queue(TaskQueue* task_queue) {
Status PipelineTask::execute(bool* eos) {
SCOPED_TIMER(_task_profile->total_time_counter());
SCOPED_TIMER(_exec_timer);
+ SCOPED_ATTACH_TASK(_state);
int64_t time_spent = 0;
ThreadCpuStopWatch cpu_time_stop_watch;
diff --git a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp
index f55df4fcb6..744ce754a5 100644
--- a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp
+++ b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp
@@ -110,10 +110,11 @@ PipelineXFragmentContext::PipelineXFragmentContext(
call_back, report_status_cb) {}
PipelineXFragmentContext::~PipelineXFragmentContext() {
+ // The memory released by the query end is recorded in the query mem tracker.
+ SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(_query_thread_context.query_mem_tracker);
auto st = _query_ctx->exec_status();
+ _tasks.clear();
if (!_task_runtime_states.empty()) {
- // The memory released by the query end is recorded in the query mem tracker, main memory in _runtime_state.
- SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(_runtime_state->query_mem_tracker());
for (auto& runtime_state : _task_runtime_states) {
_call_back(runtime_state.get(), &st);
runtime_state.reset();
@@ -122,6 +123,9 @@ PipelineXFragmentContext::~PipelineXFragmentContext() {
_call_back(nullptr, &st);
}
_runtime_state.reset();
+ _runtime_filter_states.clear();
+ _runtime_filter_mgr_map.clear();
+ _op_id_to_le_state.clear();
}
void PipelineXFragmentContext::cancel(const PPlanFragmentCancelReason& reason,
@@ -184,9 +188,8 @@ Status PipelineXFragmentContext::prepare(const doris::TPipelineFragmentParams& r
_runtime_state = RuntimeState::create_unique(request.query_id, request.fragment_id,
request.query_options, _query_ctx->query_globals,
_exec_env, _query_ctx.get());
- _runtime_state->set_query_mem_tracker(_query_ctx->query_mem_tracker);
- SCOPED_ATTACH_TASK(_runtime_state.get());
+ SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(_runtime_state->query_mem_tracker());
if (request.__isset.backend_id) {
_runtime_state->set_backend_id(request.backend_id);
}
@@ -547,8 +550,8 @@ Status PipelineXFragmentContext::_build_pipeline_tasks(
}
// build local_runtime_filter_mgr for each instance
- runtime_filter_mgr =
- std::make_unique(request.query_id, filterparams.get());
+ runtime_filter_mgr = std::make_unique(
+ request.query_id, filterparams.get(), _query_ctx->query_mem_tracker);
filterparams->runtime_filter_mgr = runtime_filter_mgr.get();
diff --git a/be/src/pipeline/pipeline_x/pipeline_x_task.cpp b/be/src/pipeline/pipeline_x/pipeline_x_task.cpp
index 9d5338e7f5..94cd106bbc 100644
--- a/be/src/pipeline/pipeline_x/pipeline_x_task.cpp
+++ b/be/src/pipeline/pipeline_x/pipeline_x_task.cpp
@@ -203,6 +203,7 @@ Status PipelineXTask::_open() {
Status PipelineXTask::execute(bool* eos) {
SCOPED_TIMER(_task_profile->total_time_counter());
SCOPED_TIMER(_exec_timer);
+ SCOPED_ATTACH_TASK(_state);
int64_t time_spent = 0;
ThreadCpuStopWatch cpu_time_stop_watch;
diff --git a/be/src/pipeline/task_scheduler.cpp b/be/src/pipeline/task_scheduler.cpp
index 8819067e59..205eaa686b 100644
--- a/be/src/pipeline/task_scheduler.cpp
+++ b/be/src/pipeline/task_scheduler.cpp
@@ -227,6 +227,10 @@ Status TaskScheduler::schedule_task(PipelineTask* task) {
// after _close_task, task maybe destructed.
void _close_task(PipelineTask* task, PipelineTaskState state, Status exec_status) {
+ // Has to attach memory tracker here, because the close task will also release some memory.
+ // Should count the memory to the query or the query's memory will not decrease when part of
+ // task finished.
+ SCOPED_ATTACH_TASK(task->runtime_state());
// close_a_pipeline may delete fragment context and will core in some defer
// code, because the defer code will access fragment context it self.
auto lock_for_context = task->fragment_context()->shared_from_this();
@@ -271,10 +275,6 @@ void TaskScheduler::_do_work(size_t index) {
bool canceled = fragment_ctx->is_canceled();
auto state = task->get_state();
- // Has to attach memory tracker here, because the close task will also release some memory.
- // Should count the memory to the query or the query's memory will not decrease when part of
- // task finished.
- SCOPED_ATTACH_TASK(task->runtime_state());
// If the state is PENDING_FINISH, then the task is come from blocked queue, its is_pending_finish
// has to return false. The task is finished and need to close now.
if (state == PipelineTaskState::PENDING_FINISH) {
diff --git a/be/src/runtime/exec_env.h b/be/src/runtime/exec_env.h
index fb270ac187..9785613a48 100644
--- a/be/src/runtime/exec_env.h
+++ b/be/src/runtime/exec_env.h
@@ -86,6 +86,7 @@ class RoutineLoadTaskExecutor;
class SmallFileMgr;
class BlockSpillManager;
class BackendServiceClient;
+class ThreadContext;
class TPaloBrokerServiceClient;
class PBackendService_Stub;
class PFunctionService_Stub;
@@ -145,6 +146,7 @@ public:
static Result get_tablet(int64_t tablet_id);
static bool ready() { return _s_ready.load(std::memory_order_acquire); }
+ static bool tracking_memory() { return _s_tracking_memory.load(std::memory_order_acquire); }
const std::string& token() const;
ExternalScanContextMgr* external_scan_context_mgr() { return _external_scan_context_mgr; }
vectorized::VDataStreamMgr* vstream_mgr() { return _vstream_mgr; }
@@ -167,12 +169,27 @@ public:
return nullptr;
}
+ ThreadContext* env_thread_context() { return _env_thread_context; }
+ // Save all MemTrackerLimiters in use.
+ // Each group corresponds to several MemTrackerLimiters and has a lock.
+ // Multiple groups are used to reduce the impact of locks.
+ std::vector mem_tracker_limiter_pool;
void init_mem_tracker();
std::shared_ptr orphan_mem_tracker() { return _orphan_mem_tracker; }
MemTrackerLimiter* orphan_mem_tracker_raw() { return _orphan_mem_tracker_raw; }
- MemTrackerLimiter* experimental_mem_tracker() { return _experimental_mem_tracker.get(); }
+ MemTrackerLimiter* details_mem_tracker_set() { return _details_mem_tracker_set.get(); }
std::shared_ptr page_no_cache_mem_tracker() { return _page_no_cache_mem_tracker; }
MemTracker* brpc_iobuf_block_memory_tracker() { return _brpc_iobuf_block_memory_tracker.get(); }
+ std::shared_ptr segcompaction_mem_tracker() {
+ return _segcompaction_mem_tracker;
+ }
+ std::shared_ptr rowid_storage_reader_tracker() {
+ return _rowid_storage_reader_tracker;
+ }
+ std::shared_ptr subcolumns_tree_tracker() {
+ return _subcolumns_tree_tracker;
+ }
+ std::shared_ptr s3_file_buffer_tracker() { return _s3_file_buffer_tracker; }
ThreadPool* send_batch_thread_pool() { return _send_batch_thread_pool.get(); }
ThreadPool* buffered_reader_prefetch_thread_pool() {
@@ -302,6 +319,7 @@ private:
void _deregister_metrics();
inline static std::atomic_bool _s_ready {false};
+ inline static std::atomic_bool _s_tracking_memory {false};
std::vector _store_paths;
std::vector _spill_store_paths;
@@ -316,6 +334,7 @@ private:
ClientCache* _frontend_client_cache = nullptr;
ClientCache* _broker_client_cache = nullptr;
+ ThreadContext* _env_thread_context = nullptr;
// The default tracker consumed by mem hook. If the thread does not attach other trackers,
// by default all consumption will be passed to the process tracker through the orphan tracker.
// In real time, `consumption of all limiter trackers` + `orphan tracker consumption` = `process tracker consumption`.
@@ -323,10 +342,17 @@ private:
// and the consumption of the orphan mem tracker is close to 0, but greater than 0.
std::shared_ptr _orphan_mem_tracker;
MemTrackerLimiter* _orphan_mem_tracker_raw = nullptr;
- std::shared_ptr _experimental_mem_tracker;
+ std::shared_ptr _details_mem_tracker_set;
// page size not in cache, data page/index page/etc.
std::shared_ptr _page_no_cache_mem_tracker;
std::shared_ptr _brpc_iobuf_block_memory_tracker;
+ // Count the memory consumption of segment compaction tasks.
+ std::shared_ptr _segcompaction_mem_tracker;
+
+ // TODO, looking forward to more accurate tracking.
+ std::shared_ptr _rowid_storage_reader_tracker;
+ std::shared_ptr _subcolumns_tree_tracker;
+ std::shared_ptr _s3_file_buffer_tracker;
std::unique_ptr _send_batch_thread_pool;
// Threadpool used to prefetch remote file for buffered reader
diff --git a/be/src/runtime/exec_env_init.cpp b/be/src/runtime/exec_env_init.cpp
index c171461363..d2e7e47f9e 100644
--- a/be/src/runtime/exec_env_init.cpp
+++ b/be/src/runtime/exec_env_init.cpp
@@ -371,6 +371,7 @@ Status ExecEnv::_init_mem_env() {
// 1. init mem tracker
init_mem_tracker();
thread_context()->thread_mem_tracker_mgr->init();
+ _env_thread_context = thread_context();
#if defined(USE_MEM_TRACKER) && !defined(__SANITIZE_ADDRESS__) && !defined(ADDRESS_SANITIZER) && \
!defined(LEAK_SANITIZER) && !defined(THREAD_SANITIZER) && !defined(USE_JEMALLOC)
init_hook();
@@ -492,15 +493,26 @@ Status ExecEnv::_init_mem_env() {
}
void ExecEnv::init_mem_tracker() {
+ mem_tracker_limiter_pool.resize(MEM_TRACKER_GROUP_NUM,
+ TrackerLimiterGroup()); // before all mem tracker init.
+ _s_tracking_memory = true;
_orphan_mem_tracker =
- std::make_shared(MemTrackerLimiter::Type::GLOBAL, "Orphan");
+ MemTrackerLimiter::create_shared(MemTrackerLimiter::Type::GLOBAL, "Orphan");
_orphan_mem_tracker_raw = _orphan_mem_tracker.get();
- _experimental_mem_tracker = std::make_shared(
- MemTrackerLimiter::Type::EXPERIMENTAL, "ExperimentalSet");
+ _details_mem_tracker_set =
+ MemTrackerLimiter::create_shared(MemTrackerLimiter::Type::GLOBAL, "DetailsTrackerSet");
_page_no_cache_mem_tracker =
- std::make_shared("PageNoCache", _orphan_mem_tracker_raw);
+ std::make_shared("PageNoCache", _details_mem_tracker_set.get());
_brpc_iobuf_block_memory_tracker =
- std::make_shared("IOBufBlockMemory", _orphan_mem_tracker_raw);
+ std::make_shared("IOBufBlockMemory", _details_mem_tracker_set.get());
+ _segcompaction_mem_tracker =
+ MemTrackerLimiter::create_shared(MemTrackerLimiter::Type::GLOBAL, "SegCompaction");
+ _rowid_storage_reader_tracker =
+ MemTrackerLimiter::create_shared(MemTrackerLimiter::Type::GLOBAL, "RowIdStorageReader");
+ _subcolumns_tree_tracker =
+ MemTrackerLimiter::create_shared(MemTrackerLimiter::Type::GLOBAL, "SubcolumnsTree");
+ _s3_file_buffer_tracker =
+ MemTrackerLimiter::create_shared(MemTrackerLimiter::Type::GLOBAL, "S3FileBuffer");
}
void ExecEnv::_register_metrics() {
@@ -566,12 +578,6 @@ void ExecEnv::destroy() {
_deregister_metrics();
SAFE_DELETE(_load_channel_mgr);
- // shared_ptr maybe no need to be reset
- // _brpc_iobuf_block_memory_tracker.reset();
- // _page_no_cache_mem_tracker.reset();
- // _experimental_mem_tracker.reset();
- // _orphan_mem_tracker.reset();
-
SAFE_DELETE(_spill_stream_mgr);
SAFE_DELETE(_block_spill_mgr);
SAFE_DELETE(_inverted_index_query_cache);
@@ -652,6 +658,7 @@ void ExecEnv::destroy() {
// dns cache is a global instance and need to be released at last
SAFE_DELETE(_dns_cache);
+ _s_tracking_memory = false;
LOG(INFO) << "Doris exec envorinment is destoried.";
}
diff --git a/be/src/runtime/fold_constant_executor.cpp b/be/src/runtime/fold_constant_executor.cpp
index 9120fa266d..69fe217da7 100644
--- a/be/src/runtime/fold_constant_executor.cpp
+++ b/be/src/runtime/fold_constant_executor.cpp
@@ -80,7 +80,7 @@ Status FoldConstantExecutor::fold_constant_vexpr(const TFoldConstantParams& para
// init
RETURN_IF_ERROR(_init(query_globals, params.query_options));
// only after init operation, _mem_tracker is ready
- SCOPED_CONSUME_MEM_TRACKER(_mem_tracker.get());
+ SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(_mem_tracker);
for (const auto& m : expr_map) {
PExprResultMap pexpr_result_map;
@@ -144,8 +144,12 @@ Status FoldConstantExecutor::_init(const TQueryGlobals& query_globals,
TExecPlanFragmentParams fragment_params;
fragment_params.params = params;
fragment_params.protocol_version = PaloInternalServiceVersion::V1;
- _runtime_state = RuntimeState::create_unique(fragment_params.params, query_options,
- query_globals, ExecEnv::GetInstance(), nullptr);
+ _mem_tracker = MemTrackerLimiter::create_shared(
+ MemTrackerLimiter::Type::SCHEMA_CHANGE,
+ fmt::format("FoldConstant:query_id={}", print_id(_query_id)));
+ _runtime_state =
+ RuntimeState::create_unique(fragment_params.params, query_options, query_globals,
+ ExecEnv::GetInstance(), nullptr, _mem_tracker);
DescriptorTbl* desc_tbl = nullptr;
Status status =
DescriptorTbl::create(_runtime_state->obj_pool(), TDescriptorTable(), &desc_tbl);
@@ -154,11 +158,9 @@ Status FoldConstantExecutor::_init(const TQueryGlobals& query_globals,
return status;
}
_runtime_state->set_desc_tbl(desc_tbl);
- _runtime_state->init_mem_trackers(_query_id, "FoldConstant");
_runtime_profile = _runtime_state->runtime_profile();
_runtime_profile->set_name("FoldConstantExpr");
- _mem_tracker = std::make_unique("FoldConstantExpr");
return Status::OK();
}
diff --git a/be/src/runtime/fold_constant_executor.h b/be/src/runtime/fold_constant_executor.h
index 2c7f17d404..9c39c73772 100644
--- a/be/src/runtime/fold_constant_executor.h
+++ b/be/src/runtime/fold_constant_executor.h
@@ -58,7 +58,7 @@ private:
std::string& result);
std::unique_ptr _runtime_state;
- std::unique_ptr _mem_tracker;
+ std::shared_ptr _mem_tracker;
RuntimeProfile* _runtime_profile = nullptr;
ObjectPool _pool;
TUniqueId _query_id;
diff --git a/be/src/runtime/fragment_mgr.cpp b/be/src/runtime/fragment_mgr.cpp
index de964b6da4..0ec721c125 100644
--- a/be/src/runtime/fragment_mgr.cpp
+++ b/be/src/runtime/fragment_mgr.cpp
@@ -448,10 +448,6 @@ static void empty_function(RuntimeState*, Status*) {}
void FragmentMgr::_exec_actual(std::shared_ptr fragment_executor,
const FinishCallback& cb) {
-#ifndef BE_TEST
- SCOPED_ATTACH_TASK(fragment_executor->runtime_state());
-#endif
-
VLOG_DEBUG << fmt::format("Instance {}|{} executing", print_id(fragment_executor->query_id()),
print_id(fragment_executor->fragment_instance_id()));
@@ -626,6 +622,7 @@ Status FragmentMgr::_get_query_ctx(const Params& params, TUniqueId query_id, boo
query_ctx = QueryContext::create_shared(query_id, params.fragment_num_on_host, _exec_env,
params.query_options, params.coord, pipeline,
params.is_nereids);
+ SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(query_ctx->query_mem_tracker);
RETURN_IF_ERROR(DescriptorTbl::create(&(query_ctx->obj_pool), params.desc_tbl,
&(query_ctx->desc_tbl)));
// set file scan range params
@@ -703,6 +700,7 @@ Status FragmentMgr::exec_plan_fragment(const TExecPlanFragmentParams& params,
RETURN_IF_ERROR(
_get_query_ctx(params, params.params.query_id, pipeline_engine_enabled, query_ctx));
+ SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(query_ctx->query_mem_tracker);
{
// Need lock here, because it will modify fragment ids and std::vector may resize and reallocate
// memory, but query_is_canncelled will traverse the vector, it will core.
@@ -746,8 +744,12 @@ Status FragmentMgr::exec_plan_fragment(const TExecPlanFragmentParams& params,
if (!current_thread_pool) {
current_thread_pool = _thread_pool.get();
}
- auto st = current_thread_pool->submit_func(
- [this, fragment_executor, cb] { _exec_actual(fragment_executor, cb); });
+ auto st = current_thread_pool->submit_func([this, fragment_executor, cb]() {
+#ifndef BE_TEST
+ SCOPED_ATTACH_TASK(fragment_executor->runtime_state());
+#endif
+ _exec_actual(fragment_executor, cb);
+ });
if (!st.ok()) {
{
// Remove the exec state added
@@ -798,6 +800,7 @@ Status FragmentMgr::exec_plan_fragment(const TPipelineFragmentParams& params,
std::shared_ptr query_ctx;
RETURN_IF_ERROR(_get_query_ctx(params, params.query_id, true, query_ctx));
+ SCOPED_ATTACH_TASK_WITH_ID(query_ctx->query_mem_tracker, params.query_id);
const bool enable_pipeline_x = params.query_options.__isset.enable_pipeline_x_engine &&
params.query_options.enable_pipeline_x_engine;
if (enable_pipeline_x) {
@@ -976,6 +979,16 @@ void FragmentMgr::_set_scan_concurrency(const Param& params, QueryContext* query
#endif
}
+std::shared_ptr FragmentMgr::get_query_context(const TUniqueId& query_id) {
+ std::lock_guard state_lock(_lock);
+ auto ctx = _query_ctx_map.find(query_id);
+ if (ctx != _query_ctx_map.end()) {
+ return ctx->second;
+ } else {
+ return nullptr;
+ }
+}
+
void FragmentMgr::cancel_query(const TUniqueId& query_id, const PPlanFragmentCancelReason& reason,
const std::string& msg) {
std::shared_ptr query_ctx;
@@ -1341,6 +1354,7 @@ Status FragmentMgr::apply_filterv2(const PPublishFilterRequestV2* request,
std::shared_ptr fragment_executor;
std::shared_ptr pip_context;
+ QueryThreadContext query_thread_context;
RuntimeFilterMgr* runtime_filter_mgr = nullptr;
ObjectPool* pool = nullptr;
@@ -1356,6 +1370,8 @@ Status FragmentMgr::apply_filterv2(const PPublishFilterRequestV2* request,
DCHECK(pip_context != nullptr);
runtime_filter_mgr = pip_context->get_query_ctx()->runtime_filter_mgr();
pool = &pip_context->get_query_ctx()->obj_pool;
+ query_thread_context = {pip_context->get_query_ctx()->query_id(),
+ pip_context->get_query_ctx()->query_mem_tracker};
} else {
std::unique_lock lock(_lock);
auto iter = _fragment_instance_map.find(tfragment_instance_id);
@@ -1370,8 +1386,12 @@ Status FragmentMgr::apply_filterv2(const PPublishFilterRequestV2* request,
DCHECK(fragment_executor != nullptr);
runtime_filter_mgr = fragment_executor->get_query_ctx()->runtime_filter_mgr();
pool = &fragment_executor->get_query_ctx()->obj_pool;
+ query_thread_context = {fragment_executor->get_query_ctx()->query_id(),
+ fragment_executor->get_query_ctx()->query_mem_tracker};
}
+ SCOPED_ATTACH_TASK(query_thread_context);
+
// 1. get the target filters
std::vector filters;
RETURN_IF_ERROR(runtime_filter_mgr->get_consume_filters(request->filter_id(), filters));
@@ -1421,6 +1441,7 @@ Status FragmentMgr::merge_filter(const PMergeFilterRequest* request,
// when filter_controller->merge is still in progress
query_ctx = iter->second;
}
+ SCOPED_ATTACH_TASK_WITH_ID(query_ctx->query_mem_tracker, query_ctx->query_id());
auto merge_status = filter_controller->merge(request, attach_data, opt_remote_rf);
DCHECK(merge_status.ok());
return merge_status;
diff --git a/be/src/runtime/fragment_mgr.h b/be/src/runtime/fragment_mgr.h
index 0bae4939b6..76e9a50eb5 100644
--- a/be/src/runtime/fragment_mgr.h
+++ b/be/src/runtime/fragment_mgr.h
@@ -134,6 +134,8 @@ public:
ThreadPool* get_thread_pool() { return _thread_pool.get(); }
+ std::shared_ptr get_query_context(const TUniqueId& query_id);
+
int32_t running_query_num() {
std::unique_lock ctx_lock(_lock);
return _query_ctx_map.size();
diff --git a/be/src/runtime/group_commit_mgr.cpp b/be/src/runtime/group_commit_mgr.cpp
index 646b58c5fd..6d8873602d 100644
--- a/be/src/runtime/group_commit_mgr.cpp
+++ b/be/src/runtime/group_commit_mgr.cpp
@@ -186,7 +186,8 @@ void LoadBlockQueue::_cancel_without_lock(const Status& st) {
Status GroupCommitTable::get_first_block_load_queue(
int64_t table_id, int64_t base_schema_version, const UniqueId& load_id,
- std::shared_ptr& load_block_queue, int be_exe_version) {
+ std::shared_ptr& load_block_queue, int be_exe_version,
+ std::shared_ptr mem_tracker) {
DCHECK(table_id == _table_id);
{
std::unique_lock l(_lock);
@@ -212,7 +213,7 @@ Status GroupCommitTable::get_first_block_load_queue(
if (!_is_creating_plan_fragment) {
_is_creating_plan_fragment = true;
RETURN_IF_ERROR(_thread_pool->submit_func([&] {
- auto st = _create_group_commit_load(be_exe_version);
+ auto st = _create_group_commit_load(be_exe_version, mem_tracker);
if (!st.ok()) {
LOG(WARNING) << "create group commit load error, st=" << st.to_string();
std::unique_lock l(_lock);
@@ -228,7 +229,9 @@ Status GroupCommitTable::get_first_block_load_queue(
std::to_string(_table_id));
}
-Status GroupCommitTable::_create_group_commit_load(int be_exe_version) {
+Status GroupCommitTable::_create_group_commit_load(int be_exe_version,
+ std::shared_ptr mem_tracker) {
+ SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(mem_tracker);
Status st = Status::OK();
TStreamLoadPutRequest request;
UniqueId load_id = UniqueId::gen_uid();
@@ -483,7 +486,8 @@ Status GroupCommitMgr::get_first_block_load_queue(int64_t db_id, int64_t table_i
int64_t base_schema_version,
const UniqueId& load_id,
std::shared_ptr& load_block_queue,
- int be_exe_version) {
+ int be_exe_version,
+ std::shared_ptr mem_tracker) {
std::shared_ptr group_commit_table;
{
std::lock_guard wlock(_lock);
@@ -495,7 +499,7 @@ Status GroupCommitMgr::get_first_block_load_queue(int64_t db_id, int64_t table_i
group_commit_table = _table_map[table_id];
}
RETURN_IF_ERROR(group_commit_table->get_first_block_load_queue(
- table_id, base_schema_version, load_id, load_block_queue, be_exe_version));
+ table_id, base_schema_version, load_id, load_block_queue, be_exe_version, mem_tracker));
return Status::OK();
}
diff --git a/be/src/runtime/group_commit_mgr.h b/be/src/runtime/group_commit_mgr.h
index f77012bc2d..76a890f7a8 100644
--- a/be/src/runtime/group_commit_mgr.h
+++ b/be/src/runtime/group_commit_mgr.h
@@ -129,12 +129,14 @@ public:
Status get_first_block_load_queue(int64_t table_id, int64_t base_schema_version,
const UniqueId& load_id,
std::shared_ptr& load_block_queue,
- int be_exe_version);
+ int be_exe_version,
+ std::shared_ptr mem_tracker);
Status get_load_block_queue(const TUniqueId& instance_id,
std::shared_ptr& load_block_queue);
private:
- Status _create_group_commit_load(int be_exe_version);
+ Status _create_group_commit_load(int be_exe_version,
+ std::shared_ptr mem_tracker);
Status _exec_plan_fragment(int64_t db_id, int64_t table_id, const std::string& label,
int64_t txn_id, bool is_pipeline,
const TExecPlanFragmentParams& params,
@@ -171,7 +173,8 @@ public:
Status get_first_block_load_queue(int64_t db_id, int64_t table_id, int64_t base_schema_version,
const UniqueId& load_id,
std::shared_ptr& load_block_queue,
- int be_exe_version);
+ int be_exe_version,
+ std::shared_ptr mem_tracker);
std::promise debug_promise;
std::future debug_future = debug_promise.get_future();
diff --git a/be/src/runtime/load_channel.cpp b/be/src/runtime/load_channel.cpp
index 5969461f35..146575feac 100644
--- a/be/src/runtime/load_channel.cpp
+++ b/be/src/runtime/load_channel.cpp
@@ -22,8 +22,11 @@
#include "bvar/bvar.h"
#include "olap/storage_engine.h"
+#include "runtime/exec_env.h"
+#include "runtime/fragment_mgr.h"
#include "runtime/memory/mem_tracker.h"
#include "runtime/tablets_channel.h"
+#include "runtime/thread_context.h"
namespace doris {
@@ -37,6 +40,17 @@ LoadChannel::LoadChannel(const UniqueId& load_id, int64_t timeout_s, bool is_hig
_sender_ip(sender_ip),
_backend_id(backend_id),
_enable_profile(enable_profile) {
+ std::shared_ptr query_context =
+ ExecEnv::GetInstance()->fragment_mgr()->get_query_context(_load_id.to_thrift());
+ if (query_context != nullptr) {
+ _query_thread_context = {_load_id.to_thrift(), query_context->query_mem_tracker};
+ } else {
+ _query_thread_context = {
+ _load_id.to_thrift(),
+ MemTrackerLimiter::create_shared(
+ MemTrackerLimiter::Type::LOAD,
+ fmt::format("(FromLoadChannel)Load#Id={}", _load_id.to_string()))};
+ }
g_loadchannel_cnt << 1;
// _last_updated_time should be set before being inserted to
// _load_channels in load_channel_mgr, or it may be erased
@@ -73,6 +87,7 @@ void LoadChannel::_init_profile() {
}
Status LoadChannel::open(const PTabletWriterOpenRequest& params) {
+ SCOPED_ATTACH_TASK(_query_thread_context);
int64_t index_id = params.index_id();
std::shared_ptr channel;
{
@@ -129,6 +144,7 @@ Status LoadChannel::add_batch(const PTabletWriterAddBlockRequest& request,
PTabletWriterAddBlockResult* response) {
SCOPED_TIMER(_add_batch_timer);
COUNTER_UPDATE(_add_batch_times, 1);
+ SCOPED_ATTACH_TASK(_query_thread_context);
int64_t index_id = request.index_id();
// 1. get tablets channel
std::shared_ptr channel;
diff --git a/be/src/runtime/load_channel.h b/be/src/runtime/load_channel.h
index fc19e94215..4a437e5190 100644
--- a/be/src/runtime/load_channel.h
+++ b/be/src/runtime/load_channel.h
@@ -33,7 +33,7 @@
#include "common/status.h"
#include "olap/memtable_memory_limiter.h"
#include "runtime/exec_env.h"
-#include "runtime/memory/mem_tracker.h"
+#include "runtime/thread_context.h"
#include "util/runtime_profile.h"
#include "util/spinlock.h"
#include "util/thrift_util.h"
@@ -115,6 +115,8 @@ private:
// set to true if at least one tablets channel has been opened
bool _opened = false;
+ QueryThreadContext _query_thread_context;
+
std::atomic _last_updated_time;
// the timeout of this load job.
diff --git a/be/src/runtime/load_stream.cpp b/be/src/runtime/load_stream.cpp
index 430abe609b..037d4764f5 100644
--- a/be/src/runtime/load_stream.cpp
+++ b/be/src/runtime/load_stream.cpp
@@ -34,6 +34,7 @@
#include "gutil/ref_counted.h"
#include "olap/tablet_fwd.h"
#include "olap/tablet_schema.h"
+#include "runtime/fragment_mgr.h"
#include "runtime/load_channel.h"
#include "runtime/load_stream_mgr.h"
#include "runtime/load_stream_writer.h"
@@ -331,6 +332,24 @@ LoadStream::LoadStream(PUniqueId load_id, LoadStreamMgr* load_stream_mgr, bool e
_profile = std::make_unique("LoadStream");
_append_data_timer = ADD_TIMER(_profile, "AppendDataTime");
_close_wait_timer = ADD_TIMER(_profile, "CloseWaitTime");
+ TUniqueId load_tid = ((UniqueId)load_id).to_thrift();
+#ifndef BE_TEST
+ std::shared_ptr query_context =
+ ExecEnv::GetInstance()->fragment_mgr()->get_query_context(load_tid);
+ if (query_context != nullptr) {
+ _query_thread_context = {load_tid, query_context->query_mem_tracker};
+ } else {
+ _query_thread_context = {load_tid, MemTrackerLimiter::create_shared(
+ MemTrackerLimiter::Type::LOAD,
+ fmt::format("(FromLoadStream)Load#Id={}",
+ ((UniqueId)load_id).to_string()))};
+ }
+#else
+ _query_thread_context = {load_tid, MemTrackerLimiter::create_shared(
+ MemTrackerLimiter::Type::LOAD,
+ fmt::format("(FromLoadStream)Load#Id={}",
+ ((UniqueId)load_id).to_string()))};
+#endif
}
LoadStream::~LoadStream() {
@@ -522,6 +541,7 @@ int LoadStream::on_received_messages(StreamId id, butil::IOBuf* const messages[]
void LoadStream::_dispatch(StreamId id, const PStreamHeader& hdr, butil::IOBuf* data) {
VLOG_DEBUG << PStreamHeader_Opcode_Name(hdr.opcode()) << " from " << hdr.src_id()
<< " with tablet " << hdr.tablet_id();
+ SCOPED_ATTACH_TASK(_query_thread_context);
// CLOSE_LOAD message should not be fault injected,
// otherwise the message will be ignored and causing close wait timeout
if (hdr.opcode() != PStreamHeader::CLOSE_LOAD) {
diff --git a/be/src/runtime/load_stream.h b/be/src/runtime/load_stream.h
index c3fad94693..be1cb7756a 100644
--- a/be/src/runtime/load_stream.h
+++ b/be/src/runtime/load_stream.h
@@ -166,6 +166,7 @@ private:
RuntimeProfile::Counter* _append_data_timer = nullptr;
RuntimeProfile::Counter* _close_wait_timer = nullptr;
LoadStreamMgr* _load_stream_mgr = nullptr;
+ QueryThreadContext _query_thread_context;
};
using LoadStreamSharedPtr = std::shared_ptr;
diff --git a/be/src/runtime/load_stream_writer.cpp b/be/src/runtime/load_stream_writer.cpp
index 15a608b751..90de07556e 100644
--- a/be/src/runtime/load_stream_writer.cpp
+++ b/be/src/runtime/load_stream_writer.cpp
@@ -73,6 +73,7 @@ LoadStreamWriter::LoadStreamWriter(WriteRequest* context, RuntimeProfile* profil
: _req(*context), _rowset_writer(nullptr) {
_rowset_builder =
std::make_unique(*StorageEngine::instance(), *context, profile);
+ _query_thread_context.init(); // from load stream
}
LoadStreamWriter::~LoadStreamWriter() = default;
@@ -85,6 +86,7 @@ Status LoadStreamWriter::init() {
}
Status LoadStreamWriter::append_data(uint32_t segid, uint64_t offset, butil::IOBuf buf) {
+ SCOPED_ATTACH_TASK(_query_thread_context);
io::FileWriter* file_writer = nullptr;
{
std::lock_guard lock_guard(_lock);
@@ -119,6 +121,7 @@ Status LoadStreamWriter::append_data(uint32_t segid, uint64_t offset, butil::IOB
}
Status LoadStreamWriter::close_segment(uint32_t segid) {
+ SCOPED_ATTACH_TASK(_query_thread_context);
io::FileWriter* file_writer = nullptr;
{
std::lock_guard lock_guard(_lock);
@@ -152,6 +155,7 @@ Status LoadStreamWriter::close_segment(uint32_t segid) {
Status LoadStreamWriter::add_segment(uint32_t segid, const SegmentStatistics& stat,
TabletSchemaSPtr flush_schema) {
+ SCOPED_ATTACH_TASK(_query_thread_context);
io::FileWriter* file_writer = nullptr;
{
std::lock_guard lock_guard(_lock);
@@ -185,6 +189,7 @@ Status LoadStreamWriter::add_segment(uint32_t segid, const SegmentStatistics& st
Status LoadStreamWriter::close() {
std::lock_guard l(_lock);
+ SCOPED_ATTACH_TASK(_query_thread_context);
if (!_is_init) {
// if this delta writer is not initialized, but close() is called.
// which means this tablet has no data loaded, but at least one tablet
diff --git a/be/src/runtime/load_stream_writer.h b/be/src/runtime/load_stream_writer.h
index 4f35950a93..9e3fce3c7d 100644
--- a/be/src/runtime/load_stream_writer.h
+++ b/be/src/runtime/load_stream_writer.h
@@ -81,6 +81,7 @@ private:
std::unordered_map _segment_stat_map;
std::mutex _segment_stat_map_lock;
std::vector _segment_file_writers;
+ QueryThreadContext _query_thread_context;
};
using LoadStreamWriterSharedPtr = std::shared_ptr;
diff --git a/be/src/runtime/memory/cache_policy.h b/be/src/runtime/memory/cache_policy.h
index 46eef92214..ceec3bd435 100644
--- a/be/src/runtime/memory/cache_policy.h
+++ b/be/src/runtime/memory/cache_policy.h
@@ -17,6 +17,7 @@
#pragma once
+#include "runtime/exec_env.h"
#include "runtime/memory/mem_tracker_limiter.h"
#include "util/runtime_profile.h"
@@ -94,7 +95,7 @@ public:
virtual void prune_all(bool force) = 0;
CacheType type() { return _type; }
- std::shared_ptr mem_tracker() { return _mem_tracker; }
+ MemTracker* mem_tracker() { return _mem_tracker.get(); }
int64_t mem_consumption() { return _mem_tracker->consumption(); }
bool enable_prune() const { return _enable_prune; }
RuntimeProfile* profile() { return _profile.get(); }
@@ -110,13 +111,15 @@ protected:
_cost_timer = ADD_TIMER(_profile, "CostTime");
}
- void init_mem_tracker(const std::string& name) {
- _mem_tracker = std::make_shared(MemTrackerLimiter::Type::GLOBAL, name);
+ void init_mem_tracker(const std::string& type_name) {
+ _mem_tracker =
+ std::make_unique(fmt::format("{}[{}]", type_string(_type), type_name),
+ ExecEnv::GetInstance()->details_mem_tracker_set());
}
CacheType _type;
- std::shared_ptr _mem_tracker;
+ std::unique_ptr _mem_tracker;
std::unique_ptr _profile;
RuntimeProfile::Counter* _prune_stale_number_counter = nullptr;
diff --git a/be/src/runtime/memory/lru_cache_policy.h b/be/src/runtime/memory/lru_cache_policy.h
index 430f33396e..0d9524f221 100644
--- a/be/src/runtime/memory/lru_cache_policy.h
+++ b/be/src/runtime/memory/lru_cache_policy.h
@@ -45,8 +45,7 @@ public:
CHECK(ExecEnv::GetInstance()->get_dummy_lru_cache());
_cache = ExecEnv::GetInstance()->get_dummy_lru_cache();
}
- init_mem_tracker(
- fmt::format("{}[{}]", type_string(_type), lru_cache_type_string(_lru_cache_type)));
+ init_mem_tracker(lru_cache_type_string(_lru_cache_type));
}
LRUCachePolicy(CacheType type, size_t capacity, LRUCacheType lru_cache_type,
@@ -64,8 +63,7 @@ public:
CHECK(ExecEnv::GetInstance()->get_dummy_lru_cache());
_cache = ExecEnv::GetInstance()->get_dummy_lru_cache();
}
- init_mem_tracker(
- fmt::format("{}[{}]", type_string(_type), lru_cache_type_string(_lru_cache_type)));
+ init_mem_tracker(lru_cache_type_string(_lru_cache_type));
}
~LRUCachePolicy() override { _cache.reset(); }
@@ -93,19 +91,35 @@ public:
}
}
+ void init_mem_tracker_by_allocator() {
+ _mem_tracker_by_allocator = MemTrackerLimiter::create_shared(
+ MemTrackerLimiter::Type::GLOBAL,
+ fmt::format("{}[{}](AllocByAllocator)", type_string(_type),
+ lru_cache_type_string(_lru_cache_type)));
+ }
+ std::shared_ptr mem_tracker_by_allocator() const {
+ DCHECK(_mem_tracker_by_allocator != nullptr);
+ return _mem_tracker_by_allocator;
+ }
+
// Insert and cache value destroy will be manually consume tracking_bytes to mem tracker.
- // If memory is allocated from Allocator, tracking_bytes will is 0, no longer manual tracking.
- // If lru cache is LRUCacheType::SIZE, tracking_bytes will be equal to charge.
+ // If lru cache is LRUCacheType::SIZE, tracking_bytes usually equal to charge.
Cache::Handle* insert(const CacheKey& key, void* value, size_t charge, size_t tracking_bytes,
CachePriority priority = CachePriority::NORMAL) {
size_t bytes_with_handle = _get_bytes_with_handle(key, charge, tracking_bytes);
- if (value != nullptr && tracking_bytes > 0) {
- ((LRUCacheValueBase*)value)->mem_tracker()->cache_consume(bytes_with_handle);
+ if (value != nullptr) { // if tracking_bytes = 0, only tracking handle size.
+ ((LRUCacheValueBase*)value)->mem_tracker()->consume(bytes_with_handle);
((LRUCacheValueBase*)value)->set_tracking_bytes(bytes_with_handle);
}
return _cache->insert(key, value, charge, priority);
}
+ Cache::Handle* insert_no_tracking(const CacheKey& key, void* value, size_t charge,
+ CachePriority priority = CachePriority::NORMAL) {
+ DCHECK(_mem_tracker_by_allocator != nullptr); // must be tracking in Allcator.
+ return _cache->insert(key, value, charge, priority);
+ }
+
Cache::Handle* lookup(const CacheKey& key) { return _cache->lookup(key); }
void release(Cache::Handle* handle) { _cache->release(handle); }
@@ -200,6 +214,7 @@ private:
// compatible with ShardedLRUCache usage, but will not actually cache.
std::shared_ptr _cache;
LRUCacheType _lru_cache_type;
+ std::shared_ptr _mem_tracker_by_allocator;
};
} // namespace doris
diff --git a/be/src/runtime/memory/lru_cache_value_base.h b/be/src/runtime/memory/lru_cache_value_base.h
index 2bb06dfaad..08a689f3fd 100644
--- a/be/src/runtime/memory/lru_cache_value_base.h
+++ b/be/src/runtime/memory/lru_cache_value_base.h
@@ -25,27 +25,24 @@ namespace doris {
// Base of the lru cache value.
class LRUCacheValueBase {
public:
- LRUCacheValueBase() = delete;
+ LRUCacheValueBase() = default;
LRUCacheValueBase(CachePolicy::CacheType type) {
_mem_tracker = CacheManager::instance()->get_cache(type)->mem_tracker();
}
- LRUCacheValueBase(const std::shared_ptr& mem_tracker)
- : _mem_tracker(mem_tracker) {}
virtual ~LRUCacheValueBase() {
if (_tracking_bytes > 0) {
- // value not alloc use Allocator
- _mem_tracker->cache_consume(-_tracking_bytes);
+ _mem_tracker->consume(-_tracking_bytes);
}
}
void set_tracking_bytes(size_t tracking_bytes) { this->_tracking_bytes = tracking_bytes; }
- std::shared_ptr mem_tracker() { return _mem_tracker; }
+ MemTracker* mem_tracker() const { return _mem_tracker; }
protected:
size_t _tracking_bytes = 0;
- std::shared_ptr _mem_tracker = nullptr;
+ MemTracker* _mem_tracker = nullptr;
};
} // namespace doris
diff --git a/be/src/runtime/memory/mem_tracker.cpp b/be/src/runtime/memory/mem_tracker.cpp
index 3d23bd447c..27b16c76f2 100644
--- a/be/src/runtime/memory/mem_tracker.cpp
+++ b/be/src/runtime/memory/mem_tracker.cpp
@@ -47,7 +47,7 @@ void MemTracker::bind_parent(MemTrackerLimiter* parent) {
if (parent) {
_parent_label = parent->label();
_parent_group_num = parent->group_num();
- } else if (is_thread_context_init()) {
+ } else {
_parent_label = thread_context()->thread_mem_tracker()->label();
_parent_group_num = thread_context()->thread_mem_tracker()->group_num();
}
diff --git a/be/src/runtime/memory/mem_tracker_limiter.cpp b/be/src/runtime/memory/mem_tracker_limiter.cpp
index 83b81f5160..3edbd21c74 100644
--- a/be/src/runtime/memory/mem_tracker_limiter.cpp
+++ b/be/src/runtime/memory/mem_tracker_limiter.cpp
@@ -27,6 +27,7 @@
#include
#include "bvar/bvar.h"
+#include "common/config.h"
#include "olap/memtable_memory_limiter.h"
#include "runtime/exec_env.h"
#include "runtime/fragment_mgr.h"
@@ -43,12 +44,6 @@ namespace doris {
bvar::Adder g_memtrackerlimiter_cnt("memtrackerlimiter_cnt");
constexpr auto GC_MAX_SEEK_TRACKER = 1000;
-// Save all MemTrackerLimiters in use.
-// Each group corresponds to several MemTrackerLimiters and has a lock.
-// Multiple groups are used to reduce the impact of locks.
-std::vector MemTrackerLimiter::mem_tracker_limiter_pool(
- MEM_TRACKER_GROUP_NUM);
-
std::atomic MemTrackerLimiter::_enable_print_log_process_usage {true};
// Reset before each free
@@ -83,35 +78,69 @@ MemTrackerLimiter::MemTrackerLimiter(Type type, const std::string& label, int64_
if (_type == Type::LOAD || _type == Type::QUERY) {
_query_statistics = std::make_shared();
}
-
- {
- std::lock_guard l(mem_tracker_limiter_pool[_group_num].group_lock);
- _tracker_limiter_group_it = mem_tracker_limiter_pool[_group_num].trackers.insert(
- mem_tracker_limiter_pool[_group_num].trackers.end(), this);
- }
g_memtrackerlimiter_cnt << 1;
}
+std::shared_ptr MemTrackerLimiter::create_shared(MemTrackerLimiter::Type type,
+ const std::string& label,
+ int64_t byte_limit) {
+ auto tracker = std::make_shared(type, label, byte_limit);
+#ifndef BE_TEST
+ DCHECK(ExecEnv::tracking_memory());
+ std::lock_guard l(
+ ExecEnv::GetInstance()->mem_tracker_limiter_pool[tracker->group_num()].group_lock);
+ tracker->tracker_limiter_group_it =
+ ExecEnv::GetInstance()->mem_tracker_limiter_pool[tracker->group_num()].trackers.insert(
+ ExecEnv::GetInstance()
+ ->mem_tracker_limiter_pool[tracker->group_num()]
+ .trackers.end(),
+ tracker);
+#endif
+ return tracker;
+}
+
MemTrackerLimiter::~MemTrackerLimiter() {
- if (_type == Type::GLOBAL) {
- return;
- }
consume(_untracked_mem);
- // mem hook record tracker cannot guarantee that the final consumption is 0,
- // nor can it guarantee that the memory alloc and free are recorded in a one-to-one correspondence.
- // In order to ensure `consumption of all limiter trackers` + `orphan tracker consumption` = `process tracker consumption`
- // in real time. Merge its consumption into orphan when parent is process, to avoid repetition.
- if (ExecEnv::ready()) {
- ExecEnv::GetInstance()->orphan_mem_tracker()->consume(_consumption->current_value());
+ static std::string mem_tracker_inaccurate_msg =
+ ", mem tracker not equal to 0 when mem tracker destruct, this usually means that "
+ "memory tracking is inaccurate and SCOPED_ATTACH_TASK and "
+ "SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER are not used correctly. "
+ "1. For query and load, memory leaks may have occurred, it is expected that the query "
+ "mem tracker will be bound to the thread context using SCOPED_ATTACH_TASK and "
+ "SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER before all memory alloc and free. "
+ "2. If a memory alloc is recorded by this tracker, it is expected that be "
+ "recorded in this tracker when memory is freed. "
+ "3. Merge the remaining memory tracking value by "
+ "this tracker into Orphan, if you observe that Orphan is not equal to 0 in the mem "
+ "tracker web or log, this indicates that there may be a memory leak. "
+ "4. If you need to "
+ "transfer memory tracking value between two trackers, can use transfer_to.";
+ if (_consumption->current_value() != 0) {
+ // TODO, expect mem tracker equal to 0 at the task end.
+ if (doris::config::enable_memory_orphan_check && _type == Type::QUERY) {
+ LOG(INFO) << "mem tracker label: " << _label
+ << ", consumption: " << _consumption->current_value()
+ << ", peak consumption: " << _consumption->peak_value()
+ << mem_tracker_inaccurate_msg;
+ }
+ if (ExecEnv::tracking_memory()) {
+ ExecEnv::GetInstance()->orphan_mem_tracker()->consume(_consumption->current_value());
+ }
+ _consumption->set(0);
}
- _consumption->set(0);
- {
- std::lock_guard l(mem_tracker_limiter_pool[_group_num].group_lock);
- if (_tracker_limiter_group_it != mem_tracker_limiter_pool[_group_num].trackers.end()) {
- mem_tracker_limiter_pool[_group_num].trackers.erase(_tracker_limiter_group_it);
- _tracker_limiter_group_it = mem_tracker_limiter_pool[_group_num].trackers.end();
+#ifndef BE_TEST
+ if (ExecEnv::tracking_memory()) {
+ std::lock_guard l(
+ ExecEnv::GetInstance()->mem_tracker_limiter_pool[_group_num].group_lock);
+ if (tracker_limiter_group_it !=
+ ExecEnv::GetInstance()->mem_tracker_limiter_pool[_group_num].trackers.end()) {
+ ExecEnv::GetInstance()->mem_tracker_limiter_pool[_group_num].trackers.erase(
+ tracker_limiter_group_it);
+ tracker_limiter_group_it =
+ ExecEnv::GetInstance()->mem_tracker_limiter_pool[_group_num].trackers.end();
}
}
+#endif
g_memtrackerlimiter_cnt << -1;
}
@@ -127,12 +156,16 @@ MemTracker::Snapshot MemTrackerLimiter::make_snapshot() const {
void MemTrackerLimiter::refresh_global_counter() {
std::unordered_map type_mem_sum = {
- {Type::GLOBAL, 0}, {Type::QUERY, 0}, {Type::LOAD, 0}, {Type::COMPACTION, 0},
- {Type::SCHEMA_CHANGE, 0}, {Type::CLONE, 0}}; // No need refresh Type::EXPERIMENTAL
- for (unsigned i = 0; i < mem_tracker_limiter_pool.size(); ++i) {
- std::lock_guard l(mem_tracker_limiter_pool[i].group_lock);
- for (auto tracker : mem_tracker_limiter_pool[i].trackers) {
- type_mem_sum[tracker->type()] += tracker->consumption();
+ {Type::GLOBAL, 0}, {Type::QUERY, 0}, {Type::LOAD, 0},
+ {Type::COMPACTION, 0}, {Type::SCHEMA_CHANGE, 0}, {Type::OTHER, 0}};
+ // always ExecEnv::ready(), because Daemon::_stop_background_threads_latch
+ for (auto& group : ExecEnv::GetInstance()->mem_tracker_limiter_pool) {
+ std::lock_guard l(group.group_lock);
+ for (auto trackerWptr : group.trackers) {
+ auto tracker = trackerWptr.lock();
+ if (tracker != nullptr) {
+ type_mem_sum[tracker->type()] += tracker->consumption();
+ }
}
}
for (auto it : type_mem_sum) {
@@ -187,16 +220,22 @@ void MemTrackerLimiter::make_process_snapshots(std::vector
void MemTrackerLimiter::make_type_snapshots(std::vector* snapshots,
MemTrackerLimiter::Type type) {
if (type == Type::GLOBAL) {
- std::lock_guard l(mem_tracker_limiter_pool[0].group_lock);
- for (auto tracker : mem_tracker_limiter_pool[0].trackers) {
- (*snapshots).emplace_back(tracker->make_snapshot());
- MemTracker::make_group_snapshot(snapshots, tracker->group_num(), tracker->label());
+ std::lock_guard l(
+ ExecEnv::GetInstance()->mem_tracker_limiter_pool[0].group_lock);
+ for (auto trackerWptr : ExecEnv::GetInstance()->mem_tracker_limiter_pool[0].trackers) {
+ auto tracker = trackerWptr.lock();
+ if (tracker != nullptr) {
+ (*snapshots).emplace_back(tracker->make_snapshot());
+ MemTracker::make_group_snapshot(snapshots, tracker->group_num(), tracker->label());
+ }
}
} else {
- for (unsigned i = 1; i < mem_tracker_limiter_pool.size(); ++i) {
- std::lock_guard l(mem_tracker_limiter_pool[i].group_lock);
- for (auto tracker : mem_tracker_limiter_pool[i].trackers) {
- if (tracker->type() == type) {
+ for (unsigned i = 1; i < ExecEnv::GetInstance()->mem_tracker_limiter_pool.size(); ++i) {
+ std::lock_guard l(
+ ExecEnv::GetInstance()->mem_tracker_limiter_pool[i].group_lock);
+ for (auto trackerWptr : ExecEnv::GetInstance()->mem_tracker_limiter_pool[i].trackers) {
+ auto tracker = trackerWptr.lock();
+ if (tracker != nullptr && tracker->type() == type) {
(*snapshots).emplace_back(tracker->make_snapshot());
MemTracker::make_group_snapshot(snapshots, tracker->group_num(),
tracker->label());
@@ -210,10 +249,14 @@ void MemTrackerLimiter::make_top_consumption_snapshots(std::vector max_pq;
// not include global type.
- for (unsigned i = 1; i < mem_tracker_limiter_pool.size(); ++i) {
- std::lock_guard l(mem_tracker_limiter_pool[i].group_lock);
- for (auto* tracker : mem_tracker_limiter_pool[i].trackers) {
- max_pq.emplace(tracker->make_snapshot());
+ for (unsigned i = 1; i < ExecEnv::GetInstance()->mem_tracker_limiter_pool.size(); ++i) {
+ std::lock_guard l(
+ ExecEnv::GetInstance()->mem_tracker_limiter_pool[i].group_lock);
+ for (auto trackerWptr : ExecEnv::GetInstance()->mem_tracker_limiter_pool[i].trackers) {
+ auto tracker = trackerWptr.lock();
+ if (tracker != nullptr) {
+ max_pq.emplace(tracker->make_snapshot());
+ }
}
}
@@ -240,10 +283,12 @@ std::string MemTrackerLimiter::type_log_usage(MemTracker::Snapshot snapshot) {
std::string MemTrackerLimiter::type_detail_usage(const std::string& msg, Type type) {
std::string detail = fmt::format("{}, Type:{}, Memory Tracker Summary", msg, type_string(type));
- for (unsigned i = 1; i < mem_tracker_limiter_pool.size(); ++i) {
- std::lock_guard l(mem_tracker_limiter_pool[i].group_lock);
- for (auto tracker : mem_tracker_limiter_pool[i].trackers) {
- if (tracker->type() == type) {
+ for (unsigned i = 1; i < ExecEnv::GetInstance()->mem_tracker_limiter_pool.size(); ++i) {
+ std::lock_guard l(
+ ExecEnv::GetInstance()->mem_tracker_limiter_pool[i].group_lock);
+ for (auto trackerWptr : ExecEnv::GetInstance()->mem_tracker_limiter_pool[i].trackers) {
+ auto tracker = trackerWptr.lock();
+ if (tracker != nullptr && tracker->type() == type) {
detail += "\n " + MemTrackerLimiter::log_usage(tracker->make_snapshot());
}
}
@@ -285,9 +330,9 @@ std::string MemTrackerLimiter::log_process_usage_str() {
detail += "\nMemory Tracker Summary:";
for (const auto& snapshot : snapshots) {
- if (snapshot.label == "" && snapshot.parent_label == "") {
+ if (snapshot.label.empty() && snapshot.parent_label.empty()) {
detail += "\n " + MemTrackerLimiter::type_log_usage(snapshot);
- } else if (snapshot.parent_label == "") {
+ } else if (snapshot.parent_label.empty()) {
detail += "\n " + MemTrackerLimiter::log_usage(snapshot);
} else {
detail += "\n " + MemTracker::log_usage(snapshot);
@@ -313,11 +358,8 @@ bool MemTrackerLimiter::sys_mem_exceed_limit_check(int64_t bytes) {
// tcmalloc/jemalloc allocator cache does not participate in the mem check as part of the process physical memory.
// because `new/malloc` will trigger mem hook when using tcmalloc/jemalloc allocator cache,
// but it may not actually alloc physical memory, which is not expected in mem hook fail.
- if (MemInfo::proc_mem_no_allocator_cache() + bytes >= MemInfo::mem_limit() ||
- MemInfo::sys_mem_available() < MemInfo::sys_mem_available_low_water_mark()) {
- return true;
- }
- return false;
+ return MemInfo::proc_mem_no_allocator_cache() + bytes >= MemInfo::mem_limit() ||
+ MemInfo::sys_mem_available() < MemInfo::sys_mem_available_low_water_mark();
}
std::string MemTrackerLimiter::process_mem_log_str() {
@@ -362,9 +404,7 @@ std::string MemTrackerLimiter::tracker_limit_exceeded_str() {
err_msg += fmt::format(
" exec node:<{}>, can `set exec_mem_limit=8G` to change limit, details see "
"be.INFO.",
- doris::is_thread_context_init()
- ? doris::thread_context()->thread_mem_tracker_mgr->last_consumer_tracker()
- : "");
+ doris::thread_context()->thread_mem_tracker_mgr->last_consumer_tracker());
} else if (_type == Type::SCHEMA_CHANGE) {
err_msg += fmt::format(
" can modify `memory_limitation_per_thread_for_schema_change_bytes` in be.conf to "
@@ -378,7 +418,7 @@ int64_t MemTrackerLimiter::free_top_memory_query(int64_t min_free_mem,
const std::string& mem_available_str,
RuntimeProfile* profile, Type type) {
return free_top_memory_query(
- min_free_mem, type, mem_tracker_limiter_pool,
+ min_free_mem, type, ExecEnv::GetInstance()->mem_tracker_limiter_pool,
[&vm_rss_str, &mem_available_str, &type](int64_t mem_consumption,
const std::string& label) {
return fmt::format(
@@ -395,16 +435,8 @@ int64_t MemTrackerLimiter::free_top_memory_query(int64_t min_free_mem,
profile, GCType::PROCESS);
}
-int64_t MemTrackerLimiter::tg_free_top_memory_query(
- int64_t min_free_mem, Type type, std::vector& tracker_groups,
- const std::function& cancel_msg,
- RuntimeProfile* profile, GCType gctype) {
- return free_top_memory_query(min_free_mem, type, tracker_groups, cancel_msg, profile, gctype);
-}
-
-template
int64_t MemTrackerLimiter::free_top_memory_query(
- int64_t min_free_mem, Type type, std::vector& tracker_groups,
+ int64_t min_free_mem, Type type, std::vector& tracker_groups,
const std::function& cancel_msg,
RuntimeProfile* profile, GCType GCtype) {
using MemTrackerMinQueue = std::priority_queue,
@@ -435,8 +467,9 @@ int64_t MemTrackerLimiter::free_top_memory_query(
break;
}
std::lock_guard l(tracker_groups[i].group_lock);
- for (auto tracker : tracker_groups[i].trackers) {
- if (tracker->type() == type) {
+ for (auto trackerWptr : tracker_groups[i].trackers) {
+ auto tracker = trackerWptr.lock();
+ if (tracker != nullptr && tracker->type() == type) {
seek_num++;
if (tracker->is_query_cancelled()) {
canceling_task.push_back(fmt::format("{}:{} Bytes", tracker->label(),
@@ -475,7 +508,7 @@ int64_t MemTrackerLimiter::free_top_memory_query(
<< min_pq.size() << " tasks will be canceled, " << prepare_free_mem
<< " memory size prepare free; " << canceling_task.size()
<< " tasks is being canceled and has not been completed yet;"
- << (canceling_task.size() > 0 ? " consist of: " + join(canceling_task, ",") : "");
+ << (!canceling_task.empty() ? " consist of: " + join(canceling_task, ",") : "");
std::vector usage_strings;
{
@@ -512,7 +545,7 @@ int64_t MemTrackerLimiter::free_top_overcommit_query(int64_t min_free_mem,
const std::string& mem_available_str,
RuntimeProfile* profile, Type type) {
return free_top_overcommit_query(
- min_free_mem, type, mem_tracker_limiter_pool,
+ min_free_mem, type, ExecEnv::GetInstance()->mem_tracker_limiter_pool,
[&vm_rss_str, &mem_available_str, &type](int64_t mem_consumption,
const std::string& label) {
return fmt::format(
@@ -529,17 +562,8 @@ int64_t MemTrackerLimiter::free_top_overcommit_query(int64_t min_free_mem,
profile, GCType::PROCESS);
}
-int64_t MemTrackerLimiter::tg_free_top_overcommit_query(
- int64_t min_free_mem, Type type, std::vector& tracker_groups,
- const std::function& cancel_msg,
- RuntimeProfile* profile, GCType gctype) {
- return free_top_overcommit_query(min_free_mem, type, tracker_groups, cancel_msg, profile,
- gctype);
-}
-
-template
int64_t MemTrackerLimiter::free_top_overcommit_query(
- int64_t min_free_mem, Type type, std::vector& tracker_groups,
+ int64_t min_free_mem, Type type, std::vector& tracker_groups,
const std::function& cancel_msg,
RuntimeProfile* profile, GCType GCtype) {
std::priority_queue> max_pq;
@@ -567,8 +591,9 @@ int64_t MemTrackerLimiter::free_top_overcommit_query(
break;
}
std::lock_guard l(tracker_groups[i].group_lock);
- for (auto tracker : tracker_groups[i].trackers) {
- if (tracker->type() == type) {
+ for (auto trackerWptr : tracker_groups[i].trackers) {
+ auto tracker = trackerWptr.lock();
+ if (tracker != nullptr && tracker->type() == type) {
seek_num++;
// 32M small query does not cancel
if (tracker->consumption() <= 33554432 ||
@@ -598,10 +623,10 @@ int64_t MemTrackerLimiter::free_top_overcommit_query(
<< query_consumption.size() << " tasks can be canceled; " << small_num
<< " small tasks that were skipped; " << canceling_task.size()
<< " tasks is being canceled and has not been completed yet;"
- << (canceling_task.size() > 0 ? " consist of: " + join(canceling_task, ",") : "");
+ << (!canceling_task.empty() ? " consist of: " + join(canceling_task, ",") : "");
// Minor gc does not cancel when there is only one query.
- if (query_consumption.size() == 0) {
+ if (query_consumption.empty()) {
LOG(INFO) << log_prefix << "finished, no task need be canceled.";
return 0;
}
diff --git a/be/src/runtime/memory/mem_tracker_limiter.h b/be/src/runtime/memory/mem_tracker_limiter.h
index b10f41f669..d22a79c73f 100644
--- a/be/src/runtime/memory/mem_tracker_limiter.h
+++ b/be/src/runtime/memory/mem_tracker_limiter.h
@@ -45,9 +45,18 @@ class RuntimeProfile;
constexpr auto MEM_TRACKER_GROUP_NUM = 1000;
-struct WgTrackerLimiterGroup;
-class WorkloadGroup;
-using WorkloadGroupPtr = std::shared_ptr;
+struct TrackerLimiterGroup {
+ // Note! in order to enable ExecEnv::mem_tracker_limiter_pool support resize,
+ // the copy construction of TrackerLimiterGroup is disabled.
+ // so cannot copy TrackerLimiterGroup anywhere, should use reference.
+ TrackerLimiterGroup() = default;
+ TrackerLimiterGroup(TrackerLimiterGroup&&) noexcept {}
+ TrackerLimiterGroup(const TrackerLimiterGroup&) {}
+ TrackerLimiterGroup& operator=(const TrackerLimiterGroup&) { return *this; }
+
+ std::list> trackers;
+ std::mutex group_lock;
+};
// Track and limit the memory usage of process and query.
// Contains an limit, arranged into a tree structure.
@@ -63,31 +72,26 @@ public:
LOAD = 2, // Count the memory consumption of all Load tasks.
COMPACTION = 3, // Count the memory consumption of all Base and Cumulative tasks.
SCHEMA_CHANGE = 4, // Count the memory consumption of all SchemaChange tasks.
- CLONE = 5, // Count the memory consumption of all EngineCloneTask. Note: Memory that does not contain make/release snapshots.
- EXPERIMENTAL =
- 6 // Experimental memory statistics, usually inaccurate, used for debugging, and expect to add other types in the future.
+ OTHER = 5
};
// TODO There are more and more GC codes and there should be a separate manager class.
enum class GCType { PROCESS = 0, WORK_LOAD_GROUP = 1 };
- struct TrackerLimiterGroup {
- std::list trackers;
- std::mutex group_lock;
- };
-
inline static std::unordered_map> TypeMemSum = {
{Type::GLOBAL, std::make_shared()},
{Type::QUERY, std::make_shared()},
{Type::LOAD, std::make_shared()},
{Type::COMPACTION, std::make_shared()},
{Type::SCHEMA_CHANGE, std::make_shared()},
- {Type::CLONE, std::make_shared()},
- {Type::EXPERIMENTAL, std::make_shared()}};
+ {Type::OTHER, std::make_shared()}};
public:
+ static std::shared_ptr create_shared(
+ MemTrackerLimiter::Type type, const std::string& label = std::string(),
+ int64_t byte_limit = -1);
// byte_limit equal to -1 means no consumption limit, only participate in process memory statistics.
- MemTrackerLimiter(Type type, const std::string& label = std::string(), int64_t byte_limit = -1);
+ MemTrackerLimiter(Type type, const std::string& label, int64_t byte_limit);
~MemTrackerLimiter() override;
@@ -103,10 +107,8 @@ public:
return "compaction";
case Type::SCHEMA_CHANGE:
return "schema_change";
- case Type::CLONE:
- return "clone";
- case Type::EXPERIMENTAL:
- return "experimental";
+ case Type::OTHER:
+ return "other";
default:
LOG(FATAL) << "not match type of mem tracker limiter :" << static_cast(type);
}
@@ -148,9 +150,6 @@ public:
void set_is_query_cancelled(bool is_cancelled) { _is_query_cancelled.store(is_cancelled); }
public:
- // If need to consume the tracker frequently, use it
- void cache_consume(int64_t bytes);
-
// Transfer 'bytes' of consumption from this tracker to 'dst'.
void transfer_to(int64_t size, MemTrackerLimiter* dst) {
if (label() == dst->label()) {
@@ -186,14 +185,8 @@ public:
const std::string& mem_available_str,
RuntimeProfile* profile, Type type = Type::QUERY);
- template
static int64_t free_top_memory_query(
- int64_t min_free_mem, Type type, std::vector& tracker_groups,
- const std::function& cancel_msg,
- RuntimeProfile* profile, GCType gctype);
-
- static int64_t tg_free_top_memory_query(
- int64_t min_free_mem, Type type, std::vector& tracker_groups,
+ int64_t min_free_mem, Type type, std::vector& tracker_groups,
const std::function& cancel_msg,
RuntimeProfile* profile, GCType gctype);
@@ -209,14 +202,8 @@ public:
const std::string& mem_available_str,
RuntimeProfile* profile, Type type = Type::QUERY);
- template
static int64_t free_top_overcommit_query(
- int64_t min_free_mem, Type type, std::vector& tracker_groups,
- const std::function& cancel_msg,
- RuntimeProfile* profile, GCType gctype);
-
- static int64_t tg_free_top_overcommit_query(
- int64_t min_free_mem, Type type, std::vector& tracker_groups,
+ int64_t min_free_mem, Type type, std::vector& tracker_groups,
const std::function& cancel_msg,
RuntimeProfile* profile, GCType gctype);
@@ -253,9 +240,16 @@ public:
return msg.str();
}
+ // Iterator into mem_tracker_limiter_pool for this object. Stored to have O(1) remove.
+ std::list>::iterator tracker_limiter_group_it;
+ std::list>::iterator tg_tracker_limiter_group_it;
+
private:
friend class ThreadMemTrackerMgr;
+ // If need to consume the tracker frequently, use it
+ void cache_consume(int64_t bytes);
+
// When the accumulated untracked memory value exceeds the upper limit,
// the current value is returned and set to 0.
// Thread safety.
@@ -267,7 +261,7 @@ private:
// Limit on memory consumption, in bytes.
int64_t _limit;
- // Group number in MemTracker::mem_tracker_limiter_pool and MemTracker::mem_tracker_pool, generated by the timestamp.
+ // Group number in mem_tracker_limiter_pool and mem_tracker_pool, generated by the timestamp.
int64_t _group_num;
// Consume size smaller than mem_tracker_consume_min_size_bytes will continue to accumulate
@@ -280,11 +274,6 @@ private:
// Avoid frequent printing.
bool _enable_print_log_usage = false;
static std::atomic _enable_print_log_process_usage;
-
- static std::vector mem_tracker_limiter_pool;
-
- // Iterator into mem_tracker_limiter_pool for this object. Stored to have O(1) remove.
- std::list::iterator _tracker_limiter_group_it;
};
inline int64_t MemTrackerLimiter::add_untracked_mem(int64_t bytes) {
diff --git a/be/src/runtime/memory/thread_mem_tracker_mgr.cpp b/be/src/runtime/memory/thread_mem_tracker_mgr.cpp
index 8596951acf..bc962b5148 100644
--- a/be/src/runtime/memory/thread_mem_tracker_mgr.cpp
+++ b/be/src/runtime/memory/thread_mem_tracker_mgr.cpp
@@ -42,24 +42,20 @@ private:
};
void ThreadMemTrackerMgr::attach_limiter_tracker(
- const std::shared_ptr& mem_tracker, const TUniqueId& query_id) {
+ const std::shared_ptr& mem_tracker) {
DCHECK(mem_tracker);
CHECK(init());
flush_untracked_mem();
- _query_id = query_id;
_limiter_tracker = mem_tracker;
_limiter_tracker_raw = mem_tracker.get();
- _wait_gc = true;
}
void ThreadMemTrackerMgr::detach_limiter_tracker(
const std::shared_ptr& old_mem_tracker) {
CHECK(init());
flush_untracked_mem();
- _query_id = TUniqueId();
_limiter_tracker = old_mem_tracker;
_limiter_tracker_raw = old_mem_tracker.get();
- _wait_gc = false;
}
void ThreadMemTrackerMgr::cancel_query(const std::string& exceed_msg) {
diff --git a/be/src/runtime/memory/thread_mem_tracker_mgr.h b/be/src/runtime/memory/thread_mem_tracker_mgr.h
index 5f65d890f5..40fe6e1303 100644
--- a/be/src/runtime/memory/thread_mem_tracker_mgr.h
+++ b/be/src/runtime/memory/thread_mem_tracker_mgr.h
@@ -52,8 +52,7 @@ public:
bool init();
// After attach, the current thread Memory Hook starts to consume/release task mem_tracker
- void attach_limiter_tracker(const std::shared_ptr& mem_tracker,
- const TUniqueId& query_id);
+ void attach_limiter_tracker(const std::shared_ptr& mem_tracker);
void detach_limiter_tracker(const std::shared_ptr& old_mem_tracker =
ExecEnv::GetInstance()->orphan_mem_tracker());
@@ -64,7 +63,12 @@ public:
return _consumer_tracker_stack.empty() ? "" : _consumer_tracker_stack.back()->label();
}
+ void set_query_id(const TUniqueId& query_id) { _query_id = query_id; }
+
+ TUniqueId query_id() { return _query_id; }
+
void start_count_scope_mem() {
+ CHECK(init());
_scope_mem = 0;
_count_scope_mem = true;
}
@@ -97,6 +101,7 @@ public:
return _limiter_tracker_raw;
}
+ void enable_wait_gc() { _wait_gc = true; }
void disable_wait_gc() { _wait_gc = false; }
[[nodiscard]] bool wait_gc() const { return _wait_gc; }
void cancel_query(const std::string& exceed_msg);
diff --git a/be/src/runtime/plan_fragment_executor.cpp b/be/src/runtime/plan_fragment_executor.cpp
index c50b137ba6..7bf4f3846d 100644
--- a/be/src/runtime/plan_fragment_executor.cpp
+++ b/be/src/runtime/plan_fragment_executor.cpp
@@ -97,17 +97,17 @@ PlanFragmentExecutor::PlanFragmentExecutor(ExecEnv* exec_env,
_start_time = VecDateTimeValue::local_time();
_query_statistics = std::make_shared();
_query_ctx->register_query_statistics(_query_statistics);
+ _query_thread_context = {_query_ctx->query_id(), query_ctx->query_mem_tracker};
}
PlanFragmentExecutor::~PlanFragmentExecutor() {
- if (_runtime_state != nullptr) {
- // The memory released by the query end is recorded in the query mem tracker, main memory in _runtime_state.
- SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(_runtime_state->query_mem_tracker());
- close();
- _runtime_state.reset();
- } else {
- close();
- }
+ // The memory released by the query end is recorded in the query mem tracker.
+ SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(_query_thread_context.query_mem_tracker);
+ close();
+ _query_ctx.reset();
+ _query_statistics.reset();
+ _sink.reset();
+ _runtime_state.reset();
// at this point, the report thread should have been stopped
DCHECK(!_report_thread_active);
}
@@ -129,9 +129,8 @@ Status PlanFragmentExecutor::prepare(const TExecPlanFragmentParams& request) {
_runtime_state = RuntimeState::create_unique(params, request.query_options, query_globals,
_exec_env, _query_ctx.get());
_runtime_state->set_task_execution_context(shared_from_this());
- _runtime_state->set_query_mem_tracker(_query_ctx->query_mem_tracker);
- SCOPED_ATTACH_TASK(_runtime_state.get());
+ SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(_runtime_state->query_mem_tracker());
_runtime_state->set_be_number(request.backend_num);
if (request.__isset.backend_id) {
_runtime_state->set_backend_id(request.backend_id);
diff --git a/be/src/runtime/plan_fragment_executor.h b/be/src/runtime/plan_fragment_executor.h
index 5529d1ba3b..89b2534b61 100644
--- a/be/src/runtime/plan_fragment_executor.h
+++ b/be/src/runtime/plan_fragment_executor.h
@@ -215,6 +215,8 @@ private:
RuntimeProfile::Counter* _fragment_cpu_timer = nullptr;
+ QueryThreadContext _query_thread_context;
+
// If set the true, this plan fragment will be executed only after FE send execution start rpc.
bool _need_wait_execution_trigger = false;
diff --git a/be/src/runtime/query_context.cpp b/be/src/runtime/query_context.cpp
index 2e3fcd613c..76f262df0a 100644
--- a/be/src/runtime/query_context.cpp
+++ b/be/src/runtime/query_context.cpp
@@ -23,6 +23,7 @@
#include "pipeline/pipeline_fragment_context.h"
#include "pipeline/pipeline_x/dependency.h"
#include "runtime/runtime_query_statistics_mgr.h"
+#include "runtime/thread_context.h"
#include "runtime/workload_group/workload_group_manager.h"
#include "util/mem_info.h"
@@ -48,6 +49,8 @@ QueryContext::QueryContext(TUniqueId query_id, int total_fragment_num, ExecEnv*
_is_pipeline(is_pipeline),
_is_nereids(is_nereids),
_query_options(query_options) {
+ _init_query_mem_tracker();
+ SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(query_mem_tracker);
this->coord_addr = coord_addr;
_start_time = VecDateTimeValue::local_time();
_shared_hash_table_controller.reset(new vectorized::SharedHashTableController());
@@ -55,12 +58,17 @@ QueryContext::QueryContext(TUniqueId query_id, int total_fragment_num, ExecEnv*
_execution_dependency =
pipeline::Dependency::create_unique(-1, -1, "ExecutionDependency", this);
_runtime_filter_mgr = std::make_unique(
- TUniqueId(), RuntimeFilterParamsContext::create(this));
+ TUniqueId(), RuntimeFilterParamsContext::create(this), query_mem_tracker);
timeout_second = query_options.execution_timeout;
- bool has_query_mem_tracker = query_options.__isset.mem_limit && (query_options.mem_limit > 0);
- int64_t _bytes_limit = has_query_mem_tracker ? query_options.mem_limit : -1;
+ register_memory_statistics();
+ register_cpu_statistics();
+}
+
+void QueryContext::_init_query_mem_tracker() {
+ bool has_query_mem_limit = _query_options.__isset.mem_limit && (_query_options.mem_limit > 0);
+ int64_t _bytes_limit = has_query_mem_limit ? _query_options.mem_limit : -1;
if (_bytes_limit > MemInfo::mem_limit()) {
VLOG_NOTICE << "Query memory limit " << PrettyPrinter::print(_bytes_limit, TUnit::BYTES)
<< " exceeds process memory limit of "
@@ -68,28 +76,26 @@ QueryContext::QueryContext(TUniqueId query_id, int total_fragment_num, ExecEnv*
<< ". Using process memory limit instead";
_bytes_limit = MemInfo::mem_limit();
}
- if (query_options.query_type == TQueryType::SELECT) {
- query_mem_tracker = std::make_shared(
+ if (_query_options.query_type == TQueryType::SELECT) {
+ query_mem_tracker = MemTrackerLimiter::create_shared(
MemTrackerLimiter::Type::QUERY, fmt::format("Query#Id={}", print_id(_query_id)),
_bytes_limit);
- } else if (query_options.query_type == TQueryType::LOAD) {
- query_mem_tracker = std::make_shared(
+ } else if (_query_options.query_type == TQueryType::LOAD) {
+ query_mem_tracker = MemTrackerLimiter::create_shared(
MemTrackerLimiter::Type::LOAD, fmt::format("Load#Id={}", print_id(_query_id)),
_bytes_limit);
} else { // EXTERNAL
- query_mem_tracker = std::make_shared(
+ query_mem_tracker = MemTrackerLimiter::create_shared(
MemTrackerLimiter::Type::LOAD, fmt::format("External#Id={}", print_id(_query_id)),
_bytes_limit);
}
- if (query_options.__isset.is_report_success && query_options.is_report_success) {
+ if (_query_options.__isset.is_report_success && _query_options.is_report_success) {
query_mem_tracker->enable_print_log_usage();
}
-
- register_memory_statistics();
- register_cpu_statistics();
}
QueryContext::~QueryContext() {
+ SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(query_mem_tracker);
// query mem tracker consumption is equal to 0, it means that after QueryContext is created,
// it is found that query already exists in _query_ctx_map, and query mem tracker is not used.
// query mem tracker consumption is not equal to 0 after use, because there is memory consumed
@@ -133,6 +139,13 @@ QueryContext::~QueryContext() {
LOG(WARNING) << "Dump trace log failed bacause " << e.what();
}
}
+ _runtime_filter_mgr.reset();
+ _execution_dependency.reset();
+ _shared_hash_table_controller.reset();
+ _shared_scanner_controller.reset();
+ _runtime_predicates.clear();
+ file_scan_range_params_map.clear();
+ obj_pool.clear();
}
void QueryContext::set_ready_to_execute(bool is_cancelled) {
diff --git a/be/src/runtime/query_context.h b/be/src/runtime/query_context.h
index 5dd0999a63..b4120b6942 100644
--- a/be/src/runtime/query_context.h
+++ b/be/src/runtime/query_context.h
@@ -295,6 +295,8 @@ private:
std::atomic _ready_to_execute {false};
std::atomic _is_cancelled {false};
+ void _init_query_mem_tracker();
+
std::shared_ptr _shared_hash_table_controller;
std::shared_ptr _shared_scanner_controller;
std::unordered_map _runtime_predicates;
diff --git a/be/src/runtime/runtime_filter_mgr.cpp b/be/src/runtime/runtime_filter_mgr.cpp
index 362cb34b42..42fbfb418f 100644
--- a/be/src/runtime/runtime_filter_mgr.cpp
+++ b/be/src/runtime/runtime_filter_mgr.cpp
@@ -51,11 +51,19 @@ struct AsyncRPCContext {
brpc::CallId cid;
};
-RuntimeFilterMgr::RuntimeFilterMgr(const UniqueId& query_id, RuntimeFilterParamsContext* state) {
+RuntimeFilterMgr::RuntimeFilterMgr(const UniqueId& query_id, RuntimeFilterParamsContext* state,
+ const std::shared_ptr& query_mem_tracker) {
_state = state;
_state->runtime_filter_mgr = this;
- _tracker = std::make_unique("RuntimeFilterMgr",
- ExecEnv::GetInstance()->experimental_mem_tracker());
+ _query_mem_tracker = query_mem_tracker;
+ _tracker = std::make_unique("RuntimeFilterMgr(experimental)",
+ _query_mem_tracker.get());
+}
+
+RuntimeFilterMgr::~RuntimeFilterMgr() {
+ CHECK(_query_mem_tracker != nullptr);
+ SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(_query_mem_tracker);
+ _pool.clear();
}
Status RuntimeFilterMgr::get_consume_filters(const int filter_id,
@@ -262,8 +270,8 @@ Status RuntimeFilterMergeControllerEntity::init(UniqueId query_id,
const TRuntimeFilterParams& runtime_filter_params,
const TQueryOptions& query_options) {
_query_id = query_id;
- _mem_tracker = std::make_shared("RuntimeFilterMergeControllerEntity",
- ExecEnv::GetInstance()->experimental_mem_tracker());
+ _mem_tracker = std::make_shared("RuntimeFilterMergeControllerEntity(experimental)",
+ ExecEnv::GetInstance()->details_mem_tracker_set());
SCOPED_CONSUME_MEM_TRACKER(_mem_tracker.get());
if (runtime_filter_params.__isset.rid_to_runtime_filter) {
for (auto& filterid_to_desc : runtime_filter_params.rid_to_runtime_filter) {
diff --git a/be/src/runtime/runtime_filter_mgr.h b/be/src/runtime/runtime_filter_mgr.h
index d29978d3b6..bd543e782a 100644
--- a/be/src/runtime/runtime_filter_mgr.h
+++ b/be/src/runtime/runtime_filter_mgr.h
@@ -46,6 +46,7 @@ class PPublishFilterRequestV2;
class PMergeFilterRequest;
class IRuntimeFilter;
class MemTracker;
+class MemTrackerLimiter;
class RuntimeState;
enum class RuntimeFilterRole;
class RuntimePredicateWrapper;
@@ -73,9 +74,10 @@ struct LocalMergeFilters {
// RuntimeFilterMgr will be destroyed when RuntimeState is destroyed
class RuntimeFilterMgr {
public:
- RuntimeFilterMgr(const UniqueId& query_id, RuntimeFilterParamsContext* state);
+ RuntimeFilterMgr(const UniqueId& query_id, RuntimeFilterParamsContext* state,
+ const std::shared_ptr& query_mem_tracker);
- ~RuntimeFilterMgr() = default;
+ ~RuntimeFilterMgr();
Status get_consume_filters(const int filter_id, std::vector& consumer_filters);
@@ -118,6 +120,7 @@ private:
RuntimeFilterParamsContext* _state = nullptr;
std::unique_ptr _tracker;
+ std::shared_ptr _query_mem_tracker;
ObjectPool _pool;
TNetworkAddress _merge_addr;
diff --git a/be/src/runtime/runtime_state.cpp b/be/src/runtime/runtime_state.cpp
index df1b166c5f..fcbf20c0f7 100644
--- a/be/src/runtime/runtime_state.cpp
+++ b/be/src/runtime/runtime_state.cpp
@@ -52,7 +52,8 @@ using namespace ErrorCode;
// for ut only
RuntimeState::RuntimeState(const TUniqueId& fragment_instance_id,
const TQueryOptions& query_options, const TQueryGlobals& query_globals,
- ExecEnv* exec_env)
+ ExecEnv* exec_env,
+ const std::shared_ptr& query_mem_tracker)
: _profile("Fragment " + print_id(fragment_instance_id)),
_load_channel_profile(""),
_obj_pool(new ObjectPool()),
@@ -72,13 +73,21 @@ RuntimeState::RuntimeState(const TUniqueId& fragment_instance_id,
_error_log_file(nullptr) {
Status status = init(fragment_instance_id, query_options, query_globals, exec_env);
DCHECK(status.ok());
- _runtime_filter_mgr.reset(
- new RuntimeFilterMgr(TUniqueId(), RuntimeFilterParamsContext::create(this)));
+ _query_mem_tracker = query_mem_tracker;
+#ifdef BE_TEST
+ if (_query_mem_tracker == nullptr) {
+ init_mem_trackers();
+ }
+#endif
+ DCHECK(_query_mem_tracker != nullptr && _query_mem_tracker->label() != "Orphan");
+ _runtime_filter_mgr.reset(new RuntimeFilterMgr(
+ TUniqueId(), RuntimeFilterParamsContext::create(this), _query_mem_tracker));
}
RuntimeState::RuntimeState(const TPlanFragmentExecParams& fragment_exec_params,
const TQueryOptions& query_options, const TQueryGlobals& query_globals,
- ExecEnv* exec_env, QueryContext* ctx)
+ ExecEnv* exec_env, QueryContext* ctx,
+ const std::shared_ptr& query_mem_tracker)
: _profile("Fragment " + print_id(fragment_exec_params.fragment_instance_id)),
_load_channel_profile(""),
_obj_pool(new ObjectPool()),
@@ -100,8 +109,21 @@ RuntimeState::RuntimeState(const TPlanFragmentExecParams& fragment_exec_params,
Status status =
init(fragment_exec_params.fragment_instance_id, query_options, query_globals, exec_env);
DCHECK(status.ok());
+ if (query_mem_tracker != nullptr) {
+ _query_mem_tracker = query_mem_tracker;
+ } else {
+ DCHECK(ctx != nullptr);
+ _query_mem_tracker = ctx->query_mem_tracker;
+ }
+#ifdef BE_TEST
+ if (_query_mem_tracker == nullptr) {
+ init_mem_trackers();
+ }
+#endif
+ DCHECK(_query_mem_tracker != nullptr && _query_mem_tracker->label() != "Orphan");
_runtime_filter_mgr = std::make_unique(
- fragment_exec_params.query_id, RuntimeFilterParamsContext::create(this));
+ fragment_exec_params.query_id, RuntimeFilterParamsContext::create(this),
+ _query_mem_tracker);
if (fragment_exec_params.__isset.runtime_filter_params) {
_query_ctx->runtime_filter_mgr()->set_runtime_filter_params(
fragment_exec_params.runtime_filter_params);
@@ -141,8 +163,15 @@ RuntimeState::RuntimeState(const TUniqueId& instance_id, const TUniqueId& query_
_query_ctx(ctx) {
[[maybe_unused]] auto status = init(instance_id, query_options, query_globals, exec_env);
DCHECK(status.ok());
- _runtime_filter_mgr.reset(
- new RuntimeFilterMgr(query_id, RuntimeFilterParamsContext::create(this)));
+ _query_mem_tracker = ctx->query_mem_tracker;
+#ifdef BE_TEST
+ if (_query_mem_tracker == nullptr) {
+ init_mem_trackers();
+ }
+#endif
+ DCHECK(_query_mem_tracker != nullptr && _query_mem_tracker->label() != "Orphan");
+ _runtime_filter_mgr.reset(new RuntimeFilterMgr(
+ query_id, RuntimeFilterParamsContext::create(this), _query_mem_tracker));
}
RuntimeState::RuntimeState(pipeline::PipelineXFragmentContext*, const TUniqueId& instance_id,
@@ -171,6 +200,13 @@ RuntimeState::RuntimeState(pipeline::PipelineXFragmentContext*, const TUniqueId&
_error_log_file(nullptr),
_query_ctx(ctx) {
[[maybe_unused]] auto status = init(instance_id, query_options, query_globals, exec_env);
+ _query_mem_tracker = ctx->query_mem_tracker;
+#ifdef BE_TEST
+ if (_query_mem_tracker == nullptr) {
+ init_mem_trackers();
+ }
+#endif
+ DCHECK(_query_mem_tracker != nullptr && _query_mem_tracker->label() != "Orphan");
DCHECK(status.ok());
}
@@ -200,8 +236,15 @@ RuntimeState::RuntimeState(const TUniqueId& query_id, int32_t fragment_id,
// TODO: do we really need instance id?
Status status = init(TUniqueId(), query_options, query_globals, exec_env);
DCHECK(status.ok());
- _runtime_filter_mgr.reset(
- new RuntimeFilterMgr(query_id, RuntimeFilterParamsContext::create(this)));
+ _query_mem_tracker = ctx->query_mem_tracker;
+#ifdef BE_TEST
+ if (_query_mem_tracker == nullptr) {
+ init_mem_trackers();
+ }
+#endif
+ DCHECK(_query_mem_tracker != nullptr && _query_mem_tracker->label() != "Orphan");
+ _runtime_filter_mgr.reset(new RuntimeFilterMgr(
+ query_id, RuntimeFilterParamsContext::create(this), _query_mem_tracker));
}
RuntimeState::RuntimeState(const TQueryGlobals& query_globals)
@@ -236,6 +279,7 @@ RuntimeState::RuntimeState(const TQueryGlobals& query_globals)
_nano_seconds = 0;
}
TimezoneUtils::find_cctz_time_zone(_timezone, _timezone_obj);
+ init_mem_trackers("");
}
RuntimeState::RuntimeState()
@@ -252,9 +296,11 @@ RuntimeState::RuntimeState()
_nano_seconds = 0;
TimezoneUtils::find_cctz_time_zone(_timezone, _timezone_obj);
_exec_env = ExecEnv::GetInstance();
+ init_mem_trackers("");
}
RuntimeState::~RuntimeState() {
+ SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(_query_mem_tracker);
// close error log file
if (_error_log_file != nullptr && _error_log_file->is_open()) {
_error_log_file->close();
@@ -316,15 +362,13 @@ Status RuntimeState::init(const TUniqueId& fragment_instance_id, const TQueryOpt
return Status::OK();
}
-void RuntimeState::init_mem_trackers(const TUniqueId& id, const std::string& name) {
- _query_mem_tracker = std::make_shared(
- MemTrackerLimiter::Type::EXPERIMENTAL, fmt::format("{}#Id={}", name, print_id(id)));
+void RuntimeState::init_mem_trackers(const std::string& name, const TUniqueId& id) {
+ _query_mem_tracker = MemTrackerLimiter::create_shared(
+ MemTrackerLimiter::Type::OTHER, fmt::format("{}#Id={}", name, print_id(id)));
}
std::shared_ptr