diff --git a/be/src/common/config.h b/be/src/common/config.h
index 2061172b6a..aa193300a7 100644
--- a/be/src/common/config.h
+++ b/be/src/common/config.h
@@ -635,7 +635,7 @@ CONF_mInt32(remote_storage_read_buffer_mb, "16");
CONF_Bool(enable_tcmalloc_hook, "true");
// Print more detailed logs, more detailed records, etc.
-CONF_Bool(memory_debug, "false");
+CONF_mBool(memory_debug, "false");
// The minimum length when TCMalloc Hook consumes/releases MemTracker, consume size
// smaller than this value will continue to accumulate. specified as number of bytes.
diff --git a/be/src/exec/hash_table.cpp b/be/src/exec/hash_table.cpp
index b50b03460e..e0fbd8dd7b 100644
--- a/be/src/exec/hash_table.cpp
+++ b/be/src/exec/hash_table.cpp
@@ -175,8 +175,7 @@ Status HashTable::resize_buckets(int64_t num_buckets) {
int64_t old_num_buckets = _num_buckets;
int64_t delta_bytes = (num_buckets - old_num_buckets) * sizeof(Bucket);
- Status st = thread_context()->_thread_mem_tracker_mgr->limiter_mem_tracker()->check_limit(
- delta_bytes);
+ Status st = thread_context()->thread_mem_tracker()->check_limit(delta_bytes);
if (!st) {
LOG_EVERY_N(WARNING, 100) << "resize bucket failed: " << st.to_string();
return st;
diff --git a/be/src/exec/partitioned_aggregation_node.cc b/be/src/exec/partitioned_aggregation_node.cc
index 49c02fa5a8..03cecb794a 100644
--- a/be/src/exec/partitioned_aggregation_node.cc
+++ b/be/src/exec/partitioned_aggregation_node.cc
@@ -910,15 +910,12 @@ Tuple* PartitionedAggregationNode::ConstructIntermediateTuple(
<< "to allocate $1 bytes for intermediate tuple. "
<< "Backend: " << BackendOptions::get_localhost() << ", "
<< "fragment: " << print_id(state_->fragment_instance_id()) << " "
- << "Used: "
- << thread_context()->_thread_mem_tracker_mgr->limiter_mem_tracker()->consumption()
- << ", Limit: "
- << thread_context()->_thread_mem_tracker_mgr->limiter_mem_tracker()->limit() << ". "
+ << "Used: " << thread_context()->thread_mem_tracker()->consumption()
+ << ", Limit: " << thread_context()->thread_mem_tracker()->limit() << ". "
<< "You can change the limit by session variable exec_mem_limit.";
string details = Substitute(str.str(), _id, tuple_data_size);
- *status = thread_context()
- ->_thread_mem_tracker_mgr->limiter_mem_tracker()
- ->fragment_mem_limit_exceeded(state_, details, tuple_data_size);
+ *status = thread_context()->thread_mem_tracker()->fragment_mem_limit_exceeded(
+ state_, details, tuple_data_size);
return nullptr;
}
memset(tuple_data, 0, fixed_size);
diff --git a/be/src/exec/partitioned_hash_table.cc b/be/src/exec/partitioned_hash_table.cc
index f9598b510b..1b819a1c39 100644
--- a/be/src/exec/partitioned_hash_table.cc
+++ b/be/src/exec/partitioned_hash_table.cc
@@ -307,8 +307,7 @@ Status PartitionedHashTableCtx::ExprValuesCache::Init(RuntimeState* state,
MAX_EXPR_VALUES_ARRAY_SIZE / expr_values_bytes_per_row_));
int mem_usage = MemUsage(capacity_, expr_values_bytes_per_row_, num_exprs_);
- if (UNLIKELY(!thread_context()->_thread_mem_tracker_mgr->limiter_mem_tracker()->check_limit(
- mem_usage))) {
+ if (UNLIKELY(!thread_context()->thread_mem_tracker()->check_limit(mem_usage))) {
capacity_ = 0;
string details = Substitute(
"PartitionedHashTableCtx::ExprValuesCache failed to allocate $0 bytes", mem_usage);
diff --git a/be/src/http/default_path_handlers.cpp b/be/src/http/default_path_handlers.cpp
index 7edc3ec7f0..850f6e51b5 100644
--- a/be/src/http/default_path_handlers.cpp
+++ b/be/src/http/default_path_handlers.cpp
@@ -144,7 +144,8 @@ void mem_tracker_handler(const WebPageHandler::ArgumentMap& args, std::stringstr
} else {
(*output) << "
*Note: (see documentation for details)
\n";
(*output) << " 1.`/mem_tracker?type=global` to view the memory statistics of each "
- "type
\n";
+ "type, `global`life cycle is the same as the process, e.g. each Cache, "
+ "StorageEngine, each Manager.\n";
(*output) << " 2.`/mem_tracker` counts virtual memory, which is equal to `Actual "
"memory used` in `/memz`
\n";
(*output) << " 3.`process` is equal to the sum of all types of memory, "
diff --git a/be/src/http/ev_http_server.cpp b/be/src/http/ev_http_server.cpp
index 96a8709ace..3e7f55de04 100644
--- a/be/src/http/ev_http_server.cpp
+++ b/be/src/http/ev_http_server.cpp
@@ -102,7 +102,6 @@ void EvHttpServer::start() {
_event_bases.resize(_num_workers);
for (int i = 0; i < _num_workers; ++i) {
CHECK(_workers->submit_func([this, i]() {
- thread_context()->_thread_mem_tracker_mgr->set_check_attach(false);
std::shared_ptr base(event_base_new(), [](event_base* base) {
event_base_free(base);
});
diff --git a/be/src/olap/snapshot_manager.cpp b/be/src/olap/snapshot_manager.cpp
index 904ef62baf..4cbb0d394a 100644
--- a/be/src/olap/snapshot_manager.cpp
+++ b/be/src/olap/snapshot_manager.cpp
@@ -65,7 +65,7 @@ SnapshotManager* SnapshotManager::instance() {
Status SnapshotManager::make_snapshot(const TSnapshotRequest& request, string* snapshot_path,
bool* allow_incremental_clone) {
- SCOPED_CONSUME_MEM_TRACKER(_mem_tracker.get());
+ SCOPED_CONSUME_MEM_TRACKER(_mem_tracker);
Status res = Status::OK();
if (snapshot_path == nullptr) {
LOG(WARNING) << "output parameter cannot be null";
@@ -93,7 +93,7 @@ Status SnapshotManager::make_snapshot(const TSnapshotRequest& request, string* s
Status SnapshotManager::release_snapshot(const string& snapshot_path) {
// 如果请求的snapshot_path位于root/snapshot文件夹下,则认为是合法的,可以删除
// 否则认为是非法请求,返回错误结果
- SCOPED_CONSUME_MEM_TRACKER(_mem_tracker.get());
+ SCOPED_CONSUME_MEM_TRACKER(_mem_tracker);
auto stores = StorageEngine::instance()->get_stores();
for (auto store : stores) {
std::string abs_path;
@@ -117,7 +117,7 @@ Status SnapshotManager::release_snapshot(const string& snapshot_path) {
Status SnapshotManager::convert_rowset_ids(const std::string& clone_dir, int64_t tablet_id,
int64_t replica_id, const int32_t& schema_hash) {
- SCOPED_CONSUME_MEM_TRACKER(_mem_tracker.get());
+ SCOPED_CONSUME_MEM_TRACKER(_mem_tracker);
Status res = Status::OK();
// check clone dir existed
if (!FileUtils::check_exist(clone_dir)) {
diff --git a/be/src/olap/snapshot_manager.h b/be/src/olap/snapshot_manager.h
index d4c71405b9..a7857c3ffb 100644
--- a/be/src/olap/snapshot_manager.h
+++ b/be/src/olap/snapshot_manager.h
@@ -64,7 +64,7 @@ public:
private:
SnapshotManager() : _snapshot_base_id(0) {
- _mem_tracker = std::make_unique("SnapshotManager");
+ _mem_tracker = std::make_shared("SnapshotManager");
}
Status _calc_snapshot_id_path(const TabletSharedPtr& tablet, int64_t timeout_s,
@@ -98,7 +98,7 @@ private:
std::mutex _snapshot_mutex;
uint64_t _snapshot_base_id;
- std::unique_ptr _mem_tracker;
+ std::shared_ptr _mem_tracker;
}; // SnapshotManager
} // namespace doris
diff --git a/be/src/olap/storage_engine.cpp b/be/src/olap/storage_engine.cpp
index 5eab6fa5f3..bd1095a1db 100644
--- a/be/src/olap/storage_engine.cpp
+++ b/be/src/olap/storage_engine.cpp
@@ -107,7 +107,7 @@ StorageEngine::StorageEngine(const EngineOptions& options)
_effective_cluster_id(-1),
_is_all_cluster_id_exist(true),
_mem_tracker(std::make_shared("StorageEngine")),
- _segcompaction_mem_tracker(std::make_unique("SegCompaction")),
+ _segcompaction_mem_tracker(std::make_shared("SegCompaction")),
_segment_meta_mem_tracker(std::make_unique("SegmentMeta")),
_stop_background_threads_latch(1),
_tablet_manager(new TabletManager(config::tablet_map_shard_size)),
@@ -152,7 +152,7 @@ void StorageEngine::load_data_dirs(const std::vector& data_dirs) {
std::vector threads;
for (auto data_dir : data_dirs) {
threads.emplace_back([this, data_dir] {
- SCOPED_CONSUME_MEM_TRACKER(_mem_tracker.get());
+ SCOPED_CONSUME_MEM_TRACKER(_mem_tracker);
auto res = data_dir->load();
if (!res.ok()) {
LOG(WARNING) << "io error when init load tables. res=" << res
@@ -198,7 +198,7 @@ Status StorageEngine::_init_store_map() {
_tablet_manager.get(), _txn_manager.get());
tmp_stores.emplace_back(store);
threads.emplace_back([this, store, &error_msg_lock, &error_msg]() {
- SCOPED_CONSUME_MEM_TRACKER(_mem_tracker.get());
+ SCOPED_CONSUME_MEM_TRACKER(_mem_tracker);
auto st = store->init();
if (!st.ok()) {
{
diff --git a/be/src/olap/storage_engine.h b/be/src/olap/storage_engine.h
index 46607b241e..20069b6b40 100644
--- a/be/src/olap/storage_engine.h
+++ b/be/src/olap/storage_engine.h
@@ -180,7 +180,7 @@ public:
Status get_compaction_status_json(std::string* result);
MemTracker* segment_meta_mem_tracker() { return _segment_meta_mem_tracker.get(); }
- MemTracker* segcompaction_mem_tracker() { return _segcompaction_mem_tracker.get(); }
+ std::shared_ptr segcompaction_mem_tracker() { return _segcompaction_mem_tracker; }
// check cumulative compaction config
void check_cumulative_compaction_config();
@@ -326,7 +326,7 @@ private:
// StorageEngine oneself
std::shared_ptr _mem_tracker;
// Count the memory consumption of segment compaction tasks.
- std::unique_ptr _segcompaction_mem_tracker;
+ std::shared_ptr _segcompaction_mem_tracker;
// This mem tracker is only for tracking memory use by segment meta data such as footer or index page.
// The memory consumed by querying is tracked in segment iterator.
std::unique_ptr _segment_meta_mem_tracker;
diff --git a/be/src/olap/tablet_manager.cpp b/be/src/olap/tablet_manager.cpp
index 40e700736d..c877a8d0ec 100644
--- a/be/src/olap/tablet_manager.cpp
+++ b/be/src/olap/tablet_manager.cpp
@@ -73,7 +73,7 @@ DEFINE_GAUGE_METRIC_PROTOTYPE_5ARG(tablet_meta_mem_consumption, MetricUnit::BYTE
mem_consumption, Labels({{"type", "tablet_meta"}}));
TabletManager::TabletManager(int32_t tablet_map_lock_shard_size)
- : _mem_tracker(std::make_unique("TabletManager")),
+ : _mem_tracker(std::make_shared("TabletManager")),
_tablets_shards_size(tablet_map_lock_shard_size),
_tablets_shards_mask(tablet_map_lock_shard_size - 1) {
CHECK_GT(_tablets_shards_size, 0);
@@ -224,7 +224,7 @@ bool TabletManager::_check_tablet_id_exist_unlocked(TTabletId tablet_id) {
}
Status TabletManager::create_tablet(const TCreateTabletReq& request, std::vector stores) {
- SCOPED_CONSUME_MEM_TRACKER(_mem_tracker.get());
+ SCOPED_CONSUME_MEM_TRACKER(_mem_tracker);
DorisMetrics::instance()->create_tablet_requests_total->increment(1);
int64_t tablet_id = request.tablet_id;
@@ -433,7 +433,7 @@ Status TabletManager::drop_tablet(TTabletId tablet_id, TReplicaId replica_id,
LOG(INFO) << "tablet " << tablet_id << " is under clone, skip drop task";
return Status::Aborted("aborted");
}
- SCOPED_CONSUME_MEM_TRACKER(_mem_tracker.get());
+ SCOPED_CONSUME_MEM_TRACKER(_mem_tracker);
return _drop_tablet_unlocked(tablet_id, replica_id, false, is_drop_table_or_partition);
}
@@ -493,7 +493,7 @@ Status TabletManager::_drop_tablet_unlocked(TTabletId tablet_id, TReplicaId repl
Status TabletManager::drop_tablets_on_error_root_path(
const std::vector& tablet_info_vec) {
- SCOPED_CONSUME_MEM_TRACKER(_mem_tracker.get());
+ SCOPED_CONSUME_MEM_TRACKER(_mem_tracker);
Status res = Status::OK();
if (tablet_info_vec.empty()) { // This is a high probability event
return res;
@@ -908,7 +908,7 @@ Status TabletManager::build_all_report_tablets_info(std::map
}
Status TabletManager::start_trash_sweep() {
- SCOPED_CONSUME_MEM_TRACKER(_mem_tracker.get());
+ SCOPED_CONSUME_MEM_TRACKER(_mem_tracker);
{
std::vector
all_tablets; // we use this vector to save all tablet ptr for saving lock time.
@@ -1027,7 +1027,7 @@ void TabletManager::unregister_clone_tablet(int64_t tablet_id) {
void TabletManager::try_delete_unused_tablet_path(DataDir* data_dir, TTabletId tablet_id,
SchemaHash schema_hash,
const string& schema_hash_path) {
- SCOPED_CONSUME_MEM_TRACKER(_mem_tracker.get());
+ SCOPED_CONSUME_MEM_TRACKER(_mem_tracker);
// acquire the read lock, so that there is no creating tablet or load tablet from meta tasks
// create tablet and load tablet task should check whether the dir exists
tablets_shard& shard = _get_tablets_shard(tablet_id);
@@ -1089,7 +1089,7 @@ void TabletManager::get_partition_related_tablets(int64_t partition_id,
}
void TabletManager::do_tablet_meta_checkpoint(DataDir* data_dir) {
- SCOPED_CONSUME_MEM_TRACKER(_mem_tracker.get());
+ SCOPED_CONSUME_MEM_TRACKER(_mem_tracker);
std::vector related_tablets;
{
for (auto& tablets_shard : _tablets_shards) {
diff --git a/be/src/olap/tablet_manager.h b/be/src/olap/tablet_manager.h
index 1154a22d32..1bd62381cf 100644
--- a/be/src/olap/tablet_manager.h
+++ b/be/src/olap/tablet_manager.h
@@ -201,7 +201,7 @@ private:
};
// trace the memory use by meta of tablet
- std::unique_ptr _mem_tracker;
+ std::shared_ptr _mem_tracker;
const int32_t _tablets_shards_size;
const int32_t _tablets_shards_mask;
diff --git a/be/src/runtime/buffered_block_mgr2.cc b/be/src/runtime/buffered_block_mgr2.cc
index ffa3c52b9b..0a17ad0b3a 100644
--- a/be/src/runtime/buffered_block_mgr2.cc
+++ b/be/src/runtime/buffered_block_mgr2.cc
@@ -250,9 +250,7 @@ int64_t BufferedBlockMgr2::available_buffers(Client* client) const {
int64_t BufferedBlockMgr2::remaining_unreserved_buffers() const {
int64_t num_buffers =
_free_io_buffers.size() + _unpinned_blocks.size() + _non_local_outstanding_writes;
- num_buffers +=
- thread_context()->_thread_mem_tracker_mgr->limiter_mem_tracker()->spare_capacity() /
- max_block_size();
+ num_buffers += thread_context()->thread_mem_tracker()->spare_capacity() / max_block_size();
num_buffers -= _unfullfilled_reserved_buffers;
return num_buffers;
}
@@ -358,9 +356,7 @@ Status BufferedBlockMgr2::get_new_block(Client* client, Block* unpin_block, Bloc
if (len > 0 && len < _max_block_size) {
DCHECK(unpin_block == nullptr);
- Status st =
- thread_context()->_thread_mem_tracker_mgr->limiter_mem_tracker()->check_limit(
- len);
+ Status st = thread_context()->thread_mem_tracker()->check_limit(len);
WARN_IF_ERROR(st, "get_new_block failed");
if (st) {
client->_tracker->consume(len);
@@ -986,8 +982,7 @@ Status BufferedBlockMgr2::find_buffer(unique_lock& lock, BufferDescriptor
// First, try to allocate a new buffer.
if (_free_io_buffers.size() < _block_write_threshold &&
- thread_context()->_thread_mem_tracker_mgr->limiter_mem_tracker()->check_limit(
- _max_block_size)) {
+ thread_context()->thread_mem_tracker()->check_limit(_max_block_size)) {
_mem_tracker->consume(_max_block_size);
uint8_t* new_buffer = new uint8_t[_max_block_size];
*buffer_desc = _obj_pool.add(new BufferDescriptor(new_buffer, _max_block_size));
@@ -1155,11 +1150,9 @@ string BufferedBlockMgr2::debug_internal() const {
<< " Unfullfilled reserved buffers: " << _unfullfilled_reserved_buffers << endl
<< " BUffer Block Mgr Used memory: " << _mem_tracker->consumption()
<< " Instance remaining memory: "
- << thread_context()->_thread_mem_tracker_mgr->limiter_mem_tracker()->spare_capacity()
- << " (#blocks="
- << (thread_context()->_thread_mem_tracker_mgr->limiter_mem_tracker()->spare_capacity() /
- _max_block_size)
- << ")" << endl
+ << thread_context()->thread_mem_tracker()->spare_capacity() << " (#blocks="
+ << (thread_context()->thread_mem_tracker()->spare_capacity() / _max_block_size) << ")"
+ << endl
<< " Block write threshold: " << _block_write_threshold;
return ss.str();
}
diff --git a/be/src/runtime/disk_io_mgr.cc b/be/src/runtime/disk_io_mgr.cc
index 2f330f250c..702ee127a2 100644
--- a/be/src/runtime/disk_io_mgr.cc
+++ b/be/src/runtime/disk_io_mgr.cc
@@ -955,7 +955,6 @@ void DiskIoMgr::work_loop(DiskQueue* disk_queue) {
// 3. Perform the read or write as specified.
// Cancellation checking needs to happen in both steps 1 and 3.
- thread_context()->_thread_mem_tracker_mgr->set_check_attach(false);
while (!_shut_down) {
RequestContext* worker_context = nullptr;
;
diff --git a/be/src/runtime/exec_env_init.cpp b/be/src/runtime/exec_env_init.cpp
index bec666859c..e4cd3c7bab 100644
--- a/be/src/runtime/exec_env_init.cpp
+++ b/be/src/runtime/exec_env_init.cpp
@@ -179,8 +179,7 @@ Status ExecEnv::_init_mem_env() {
_orphan_mem_tracker =
std::make_shared(MemTrackerLimiter::Type::GLOBAL, "Orphan");
_orphan_mem_tracker_raw = _orphan_mem_tracker.get();
- thread_context()->_thread_mem_tracker_mgr->init();
- thread_context()->_thread_mem_tracker_mgr->set_check_attach(false);
+ thread_context()->thread_mem_tracker_mgr->init();
#if defined(USE_MEM_TRACKER) && !defined(__SANITIZE_ADDRESS__) && !defined(ADDRESS_SANITIZER) && \
!defined(LEAK_SANITIZER) && !defined(THREAD_SANITIZER) && !defined(USE_JEMALLOC)
if (doris::config::enable_tcmalloc_hook) {
diff --git a/be/src/runtime/mem_pool.cpp b/be/src/runtime/mem_pool.cpp
index 6d2760b389..c289e29acc 100644
--- a/be/src/runtime/mem_pool.cpp
+++ b/be/src/runtime/mem_pool.cpp
@@ -133,9 +133,7 @@ Status MemPool::find_chunk(size_t min_size, bool check_limits) {
}
chunk_size = BitUtil::RoundUpToPowerOfTwo(chunk_size);
- if (check_limits &&
- !thread_context()->_thread_mem_tracker_mgr->limiter_mem_tracker()->check_limit(
- chunk_size)) {
+ if (check_limits && !thread_context()->thread_mem_tracker()->check_limit(chunk_size)) {
return Status::MemoryAllocFailed("MemPool find new chunk {} bytes faild, exceed limit",
chunk_size);
}
diff --git a/be/src/runtime/memory/jemalloc_hook.cpp b/be/src/runtime/memory/jemalloc_hook.cpp
index 9a664cae17..5bba0877a6 100644
--- a/be/src/runtime/memory/jemalloc_hook.cpp
+++ b/be/src/runtime/memory/jemalloc_hook.cpp
@@ -28,16 +28,16 @@
extern "C" {
void* doris_malloc(size_t size) __THROW {
- MEM_MALLOC_HOOK(je_nallocx(size, 0));
+ TRY_CONSUME_MEM_TRACKER(je_nallocx(size, 0), nullptr);
void* ptr = je_malloc(size);
if (UNLIKELY(ptr == nullptr)) {
- MEM_FREE_HOOK(je_nallocx(size, 0));
+ TRY_RELEASE_MEM_TRACKER(je_nallocx(size, 0));
}
return ptr;
}
void doris_free(void* p) __THROW {
- MEM_FREE_HOOK(je_malloc_usable_size(p));
+ RELEASE_MEM_TRACKER(je_malloc_usable_size(p));
je_free(p);
}
@@ -50,10 +50,10 @@ void* doris_realloc(void* p, size_t size) __THROW {
int64_t old_size = je_malloc_usable_size(p);
#endif
- MEM_MALLOC_HOOK(je_nallocx(size, 0) - old_size);
+ TRY_CONSUME_MEM_TRACKER(je_nallocx(size, 0) - old_size, nullptr);
void* ptr = je_realloc(p, size);
if (UNLIKELY(ptr == nullptr)) {
- MEM_FREE_HOOK(je_nallocx(size, 0) - old_size);
+ TRY_RELEASE_MEM_TRACKER(je_nallocx(size, 0) - old_size);
}
return ptr;
}
@@ -63,72 +63,72 @@ void* doris_calloc(size_t n, size_t size) __THROW {
return nullptr;
}
- MEM_MALLOC_HOOK(n * size);
+ TRY_CONSUME_MEM_TRACKER(n * size, nullptr);
void* ptr = je_calloc(n, size);
if (UNLIKELY(ptr == nullptr)) {
- MEM_FREE_HOOK(n * size);
+ TRY_RELEASE_MEM_TRACKER(n * size);
} else {
- MEM_FREE_HOOK(je_malloc_usable_size(ptr) - n * size);
+ CONSUME_MEM_TRACKER(je_malloc_usable_size(ptr) - n * size);
}
return ptr;
}
void doris_cfree(void* ptr) __THROW {
- MEM_FREE_HOOK(je_malloc_usable_size(ptr));
+ RELEASE_MEM_TRACKER(je_malloc_usable_size(ptr));
je_free(ptr);
}
void* doris_memalign(size_t align, size_t size) __THROW {
- MEM_MALLOC_HOOK(size);
+ TRY_CONSUME_MEM_TRACKER(size, nullptr);
void* ptr = je_aligned_alloc(align, size);
if (UNLIKELY(ptr == nullptr)) {
- MEM_FREE_HOOK(size);
+ TRY_RELEASE_MEM_TRACKER(size);
} else {
- MEM_MALLOC_HOOK(je_malloc_usable_size(ptr) - size);
+ CONSUME_MEM_TRACKER(je_malloc_usable_size(ptr) - size);
}
return ptr;
}
void* doris_aligned_alloc(size_t align, size_t size) __THROW {
- MEM_MALLOC_HOOK(size);
+ TRY_CONSUME_MEM_TRACKER(size, nullptr);
void* ptr = je_aligned_alloc(align, size);
if (UNLIKELY(ptr == nullptr)) {
- MEM_FREE_HOOK(size);
+ TRY_RELEASE_MEM_TRACKER(size);
} else {
- MEM_MALLOC_HOOK(je_malloc_usable_size(ptr) - size);
+ CONSUME_MEM_TRACKER(je_malloc_usable_size(ptr) - size);
}
return ptr;
}
void* doris_valloc(size_t size) __THROW {
- MEM_MALLOC_HOOK(size);
+ TRY_CONSUME_MEM_TRACKER(size, nullptr);
void* ptr = je_valloc(size);
if (UNLIKELY(ptr == nullptr)) {
- MEM_FREE_HOOK(size);
+ TRY_RELEASE_MEM_TRACKER(size);
} else {
- MEM_MALLOC_HOOK(je_malloc_usable_size(ptr) - size);
+ CONSUME_MEM_TRACKER(je_malloc_usable_size(ptr) - size);
}
return ptr;
}
void* doris_pvalloc(size_t size) __THROW {
- MEM_MALLOC_HOOK(size);
+ TRY_CONSUME_MEM_TRACKER(size, nullptr);
void* ptr = je_valloc(size);
if (UNLIKELY(ptr == nullptr)) {
- MEM_FREE_HOOK(size);
+ TRY_RELEASE_MEM_TRACKER(size);
} else {
- MEM_MALLOC_HOOK(je_malloc_usable_size(ptr) - size);
+ CONSUME_MEM_TRACKER(je_malloc_usable_size(ptr) - size);
}
return ptr;
}
int doris_posix_memalign(void** r, size_t align, size_t size) __THROW {
- MEM_MALLOC_HOOK(size);
+ TRY_CONSUME_MEM_TRACKER(size, ENOMEM);
int ret = je_posix_memalign(r, align, size);
if (UNLIKELY(ret != 0)) {
- MEM_FREE_HOOK(size);
+ TRY_RELEASE_MEM_TRACKER(size);
} else {
- MEM_MALLOC_HOOK(je_malloc_usable_size(*r) - size);
+ CONSUME_MEM_TRACKER(je_malloc_usable_size(*r) - size);
}
return ret;
}
diff --git a/be/src/runtime/memory/mem_tracker.cpp b/be/src/runtime/memory/mem_tracker.cpp
index bf7e308ff3..4248ae99f2 100644
--- a/be/src/runtime/memory/mem_tracker.cpp
+++ b/be/src/runtime/memory/mem_tracker.cpp
@@ -62,10 +62,9 @@ MemTracker::MemTracker(const std::string& label, RuntimeProfile* profile, MemTra
_parent_label = parent->label();
_parent_group_num = parent->group_num();
} else {
- DCHECK(thread_context()->_thread_mem_tracker_mgr->limiter_mem_tracker() != nullptr);
- _parent_label = thread_context()->_thread_mem_tracker_mgr->limiter_mem_tracker()->label();
- _parent_group_num =
- thread_context()->_thread_mem_tracker_mgr->limiter_mem_tracker()->group_num();
+ DCHECK(thread_context()->thread_mem_tracker_mgr->limiter_mem_tracker() != nullptr);
+ _parent_label = thread_context()->thread_mem_tracker()->label();
+ _parent_group_num = thread_context()->thread_mem_tracker()->group_num();
}
{
std::lock_guard l(mem_tracker_pool[_parent_group_num].group_lock);
@@ -106,7 +105,7 @@ void MemTracker::make_group_snapshot(std::vector* snapshot
std::string MemTracker::log_usage(MemTracker::Snapshot snapshot) {
return fmt::format("MemTracker Label={}, Parent Label={}, Used={}({} B), Peak={}({} B)",
- snapshot.label, snapshot.type, print_bytes(snapshot.cur_consumption),
+ snapshot.label, snapshot.parent_label, print_bytes(snapshot.cur_consumption),
snapshot.cur_consumption, print_bytes(snapshot.peak_consumption),
snapshot.peak_consumption);
}
diff --git a/be/src/runtime/memory/mem_tracker_limiter.cpp b/be/src/runtime/memory/mem_tracker_limiter.cpp
index 1fca6009ee..e2ec8bf028 100644
--- a/be/src/runtime/memory/mem_tracker_limiter.cpp
+++ b/be/src/runtime/memory/mem_tracker_limiter.cpp
@@ -170,7 +170,7 @@ void MemTrackerLimiter::print_log_usage(const std::string& msg) {
std::string detail = msg;
detail += "\n " + MemTrackerLimiter::process_mem_log_str();
if (_enable_print_log_usage) {
- detail += log_usage();
+ detail += "\n " + log_usage();
std::string child_trackers_usage;
std::vector snapshots;
MemTracker::make_group_snapshot(&snapshots, _group_num, _label);
@@ -195,22 +195,26 @@ void MemTrackerLimiter::print_log_process_usage(const std::string& msg) {
MemTrackerLimiter::make_process_snapshots(&snapshots);
MemTrackerLimiter::make_type_snapshots(&snapshots, MemTrackerLimiter::Type::GLOBAL);
for (const auto& snapshot : snapshots) {
- detail += "\n " + MemTrackerLimiter::log_usage(snapshot);
+ if (snapshot.parent_label == "") {
+ detail += "\n " + MemTrackerLimiter::log_usage(snapshot);
+ } else {
+ detail += "\n " + MemTracker::log_usage(snapshot);
+ }
}
LOG(WARNING) << detail;
+ // LOG(WARNING) << boost::stacktrace::to_string(boost::stacktrace::stacktrace()); // TODO
}
std::string MemTrackerLimiter::mem_limit_exceeded(const std::string& msg,
- const std::string& limit_exceeded_errmsg_prefix) {
+ const std::string& limit_exceeded_errmsg) {
DCHECK(_limit != -1);
STOP_CHECK_THREAD_MEM_TRACKER_LIMIT();
std::string detail = fmt::format(
"Memory limit exceeded:, {}>, executing msg:<{}>. backend {} "
"process memory used {}, limit {}. If query tracker exceed, `set "
"exec_mem_limit=8G` to change limit, details mem usage see be.INFO.",
- _label, limit_exceeded_errmsg_prefix, msg, BackendOptions::get_localhost(),
+ _label, limit_exceeded_errmsg, msg, BackendOptions::get_localhost(),
PerfCounters::get_vm_rss_str(), MemInfo::mem_limit_str());
- print_log_usage(detail);
return detail;
}
@@ -218,6 +222,7 @@ Status MemTrackerLimiter::fragment_mem_limit_exceeded(RuntimeState* state, const
int64_t failed_alloc_size) {
auto failed_msg =
mem_limit_exceeded(msg, tracker_limit_exceeded_errmsg_str(failed_alloc_size, this));
+ print_log_usage(failed_msg);
state->log_error(failed_msg);
return Status::MemoryLimitExceeded(failed_msg);
}
diff --git a/be/src/runtime/memory/mem_tracker_limiter.h b/be/src/runtime/memory/mem_tracker_limiter.h
index 2182e84f00..26489075da 100644
--- a/be/src/runtime/memory/mem_tracker_limiter.h
+++ b/be/src/runtime/memory/mem_tracker_limiter.h
@@ -135,7 +135,7 @@ public:
// Log the memory usage when memory limit is exceeded.
std::string mem_limit_exceeded(const std::string& msg,
- const std::string& limit_exceeded_errmsg_prefix);
+ const std::string& limit_exceeded_errmsg);
Status fragment_mem_limit_exceeded(RuntimeState* state, const std::string& msg,
int64_t failed_allocation_size = 0);
@@ -237,8 +237,6 @@ inline bool MemTrackerLimiter::try_consume(int64_t bytes, std::string& failed_ms
_consumption->add(bytes); // No limit at this tracker.
} else {
if (!_consumption->try_add(bytes, _limit)) {
- // Failed for this mem tracker. Roll back the ones that succeeded.
- _consumption->add(-bytes);
failed_msg = tracker_limit_exceeded_errmsg_str(bytes, this);
return false;
}
diff --git a/be/src/runtime/memory/system_allocator.cpp b/be/src/runtime/memory/system_allocator.cpp
index f8dd402ad1..c0721a046e 100644
--- a/be/src/runtime/memory/system_allocator.cpp
+++ b/be/src/runtime/memory/system_allocator.cpp
@@ -45,8 +45,9 @@ uint8_t* SystemAllocator::allocate_via_malloc(size_t length) {
char buf[64];
auto err = fmt::format("fail to allocate mem via posix_memalign, res={}, errmsg={}.", res,
strerror_r(res, buf, 64));
- MemTrackerLimiter::print_log_process_usage(err);
LOG(ERROR) << err;
+ if (enable_thread_cache_bad_alloc) throw std::bad_alloc {};
+ MemTrackerLimiter::print_log_process_usage(err);
return nullptr;
}
return (uint8_t*)ptr;
diff --git a/be/src/runtime/memory/tcmalloc_hook.h b/be/src/runtime/memory/tcmalloc_hook.h
index 6ec9352ad3..afd15d0b32 100644
--- a/be/src/runtime/memory/tcmalloc_hook.h
+++ b/be/src/runtime/memory/tcmalloc_hook.h
@@ -36,11 +36,11 @@
// destructor to control the behavior of consume can lead to unexpected behavior,
// like this: if (LIKELY(doris::start_thread_mem_tracker)) {
void new_hook(const void* ptr, size_t size) {
- MEM_MALLOC_HOOK(tc_nallocx(size, 0));
+ CONSUME_MEM_TRACKER(tc_nallocx(size, 0));
}
void delete_hook(const void* ptr) {
- MEM_FREE_HOOK(tc_malloc_size(const_cast(ptr)));
+ RELEASE_MEM_TRACKER(tc_malloc_size(const_cast(ptr)));
}
void init_hook() {
diff --git a/be/src/runtime/memory/thread_mem_tracker_mgr.cpp b/be/src/runtime/memory/thread_mem_tracker_mgr.cpp
index 4273860468..ac853d7c87 100644
--- a/be/src/runtime/memory/thread_mem_tracker_mgr.cpp
+++ b/be/src/runtime/memory/thread_mem_tracker_mgr.cpp
@@ -49,14 +49,12 @@ void ThreadMemTrackerMgr::exceeded_cancel_task(const std::string& cancel_details
}
}
-void ThreadMemTrackerMgr::exceeded(const std::string& failed_msg) {
+void ThreadMemTrackerMgr::exceeded() {
if (_cb_func != nullptr) {
_cb_func();
}
- auto cancel_msg = _limiter_tracker_raw->mem_limit_exceeded(
- fmt::format("execute:<{}>", last_consumer_tracker()), failed_msg);
if (is_attach_query()) {
- exceeded_cancel_task(cancel_msg);
+ exceeded_cancel_task(_exceed_mem_limit_msg);
}
_check_limit = false; // Make sure it will only be canceled once
}
diff --git a/be/src/runtime/memory/thread_mem_tracker_mgr.h b/be/src/runtime/memory/thread_mem_tracker_mgr.h
index 1ab3e7bce4..27c2f892e1 100644
--- a/be/src/runtime/memory/thread_mem_tracker_mgr.h
+++ b/be/src/runtime/memory/thread_mem_tracker_mgr.h
@@ -78,6 +78,7 @@ public:
// such as calling LOG/iostream/sstream/stringstream/etc. related methods,
// must increase the control to avoid entering infinite recursion, otherwise it may cause crash or stuck,
void consume(int64_t size);
+ bool try_consume(int64_t size);
template
void flush_untracked_mem();
@@ -95,7 +96,8 @@ public:
bool check_limit() { return _check_limit; }
void set_check_limit(bool check_limit) { _check_limit = check_limit; }
- void set_check_attach(bool check_attach) { _check_attach = check_attach; }
+ std::string exceed_mem_limit_msg() { return _exceed_mem_limit_msg; }
+ void clear_exceed_mem_limit_msg() { _exceed_mem_limit_msg = ""; }
std::string print_debug_string() {
fmt::memory_buffer consumer_tracker_buf;
@@ -113,7 +115,12 @@ private:
// If tryConsume fails due to task mem tracker exceeding the limit, the task must be canceled
void exceeded_cancel_task(const std::string& cancel_details);
- void exceeded(const std::string& failed_msg);
+ void exceeded();
+
+ void save_exceed_mem_limit_msg() {
+ _exceed_mem_limit_msg = _limiter_tracker_raw->mem_limit_exceeded(
+ fmt::format("execute:<{}>", last_consumer_tracker()), _bad_consume_msg);
+ }
private:
// is false: ExecEnv::GetInstance()->initialized() = false when thread local is initialized
@@ -125,7 +132,8 @@ private:
bool _count_scope_mem = false;
int64_t _scope_mem = 0;
- std::string failed_msg = std::string();
+ std::string _bad_consume_msg = std::string();
+ std::string _exceed_mem_limit_msg = std::string();
std::shared_ptr _limiter_tracker;
MemTrackerLimiter* _limiter_tracker_raw = nullptr;
@@ -135,7 +143,6 @@ private:
bool _check_limit = false;
// If there is a memory new/delete operation in the consume method, it may enter infinite recursion.
bool _stop_consume = false;
- bool _check_attach = true;
TUniqueId _fragment_instance_id = TUniqueId();
ExceedCallBack _cb_func = nullptr;
};
@@ -191,21 +198,13 @@ inline void ThreadMemTrackerMgr::flush_untracked_mem() {
old_untracked_mem = _untracked_mem;
if (_count_scope_mem) _scope_mem += _untracked_mem;
if (CheckLimit) {
-#ifndef BE_TEST
- // When all threads are started, `attach_limiter_tracker` is expected to be called to bind the limiter tracker.
- // If _check_attach is true and it is not in the brpc server (the protobuf will be operated when bthread is started),
- // it will check whether the tracker label is equal to the default "Process" when flushing.
- // If you do not want this check, set_check_attach=true
- // TODO(zxy) The current p0 test cannot guarantee that all threads are checked,
- // so disable it and try to open it when memory tracking is not on time.
- // DCHECK(!_check_attach || btls_key != EMPTY_BTLS_KEY ||
- // _limiter_tracker_raw->label() != "Process");
-#endif
- if (!_limiter_tracker_raw->try_consume(old_untracked_mem, failed_msg)) {
+ if (!_limiter_tracker_raw->try_consume(old_untracked_mem, _bad_consume_msg)) {
// The memory has been allocated, so when TryConsume fails, need to continue to complete
// the consume to ensure the accuracy of the statistics.
_limiter_tracker_raw->consume(old_untracked_mem);
- exceeded(failed_msg);
+ save_exceed_mem_limit_msg();
+ _limiter_tracker_raw->print_log_usage(_exceed_mem_limit_msg);
+ exceeded();
}
} else {
_limiter_tracker_raw->consume(old_untracked_mem);
@@ -217,4 +216,22 @@ inline void ThreadMemTrackerMgr::flush_untracked_mem() {
_stop_consume = false;
}
+inline bool ThreadMemTrackerMgr::try_consume(int64_t size) {
+ if (!_stop_consume) {
+ // Temporary memory may be allocated during the consumption of the mem tracker, which will lead to entering
+ // the Memory Hook again, so suspend consumption to avoid falling into an infinite loop.
+ _stop_consume = true;
+ if (!_limiter_tracker_raw->try_consume(size, _bad_consume_msg)) {
+ save_exceed_mem_limit_msg();
+ _stop_consume = false;
+ return false;
+ }
+ _stop_consume = false;
+ return true;
+ } else {
+ _untracked_mem += size;
+ return true;
+ }
+}
+
} // namespace doris
diff --git a/be/src/runtime/runtime_state.cpp b/be/src/runtime/runtime_state.cpp
index 07b04f5f09..b18671a9b8 100644
--- a/be/src/runtime/runtime_state.cpp
+++ b/be/src/runtime/runtime_state.cpp
@@ -281,7 +281,7 @@ Status RuntimeState::set_mem_limit_exceeded(const std::string& msg) {
Status RuntimeState::check_query_state(const std::string& msg) {
// TODO: it would be nice if this also checked for cancellation, but doing so breaks
// cases where we use Status::Cancelled("Cancelled") to indicate that the limit was reached.
- if (thread_context()->_thread_mem_tracker_mgr->limiter_mem_tracker()->limit_exceeded()) {
+ if (thread_context()->thread_mem_tracker()->limit_exceeded()) {
RETURN_LIMIT_EXCEEDED(this, msg);
}
return query_status();
diff --git a/be/src/runtime/sorted_run_merger.cc b/be/src/runtime/sorted_run_merger.cc
index 32bdad9a6d..31ae06f47a 100644
--- a/be/src/runtime/sorted_run_merger.cc
+++ b/be/src/runtime/sorted_run_merger.cc
@@ -129,7 +129,7 @@ public:
*done = false;
_pull_task_thread =
std::thread(&SortedRunMerger::ParallelBatchedRowSupplier::process_sorted_run_task,
- this, thread_context()->_thread_mem_tracker_mgr->limiter_mem_tracker());
+ this, thread_context()->thread_mem_tracker_mgr->limiter_mem_tracker());
RETURN_IF_ERROR(next(nullptr, done));
return Status::OK();
diff --git a/be/src/runtime/thread_context.cpp b/be/src/runtime/thread_context.cpp
index cd593f05ad..6caafdc5db 100644
--- a/be/src/runtime/thread_context.cpp
+++ b/be/src/runtime/thread_context.cpp
@@ -31,11 +31,11 @@ ThreadContextPtr::ThreadContextPtr() {
ScopeMemCount::ScopeMemCount(int64_t* scope_mem) {
_scope_mem = scope_mem;
- thread_context()->_thread_mem_tracker_mgr->start_count_scope_mem();
+ thread_context()->thread_mem_tracker_mgr->start_count_scope_mem();
}
ScopeMemCount::~ScopeMemCount() {
- *_scope_mem = thread_context()->_thread_mem_tracker_mgr->stop_count_scope_mem();
+ *_scope_mem += thread_context()->thread_mem_tracker_mgr->stop_count_scope_mem();
}
AttachTask::AttachTask(const std::shared_ptr& mem_tracker,
@@ -58,30 +58,29 @@ AttachTask::~AttachTask() {
SwitchThreadMemTrackerLimiter::SwitchThreadMemTrackerLimiter(
const std::shared_ptr& mem_tracker) {
- _old_mem_tracker = thread_context()->_thread_mem_tracker_mgr->limiter_mem_tracker();
- thread_context()->_thread_mem_tracker_mgr->attach_limiter_tracker(mem_tracker, TUniqueId());
+ _old_mem_tracker = thread_context()->thread_mem_tracker_mgr->limiter_mem_tracker();
+ thread_context()->thread_mem_tracker_mgr->attach_limiter_tracker(mem_tracker, TUniqueId());
}
SwitchThreadMemTrackerLimiter::~SwitchThreadMemTrackerLimiter() {
- thread_context()->_thread_mem_tracker_mgr->detach_limiter_tracker(_old_mem_tracker);
+ thread_context()->thread_mem_tracker_mgr->detach_limiter_tracker(_old_mem_tracker);
}
AddThreadMemTrackerConsumer::AddThreadMemTrackerConsumer(MemTracker* mem_tracker) {
- _need_pop = thread_context()->_thread_mem_tracker_mgr->push_consumer_tracker(mem_tracker);
+ _need_pop = thread_context()->thread_mem_tracker_mgr->push_consumer_tracker(mem_tracker);
}
AddThreadMemTrackerConsumer::AddThreadMemTrackerConsumer(
const std::shared_ptr& mem_tracker)
: _mem_tracker(mem_tracker) {
- _need_pop =
- thread_context()->_thread_mem_tracker_mgr->push_consumer_tracker(_mem_tracker.get());
+ _need_pop = thread_context()->thread_mem_tracker_mgr->push_consumer_tracker(_mem_tracker.get());
}
AddThreadMemTrackerConsumer::~AddThreadMemTrackerConsumer() {
#ifndef NDEBUG
DorisMetrics::instance()->add_thread_mem_tracker_consumer_count->increment(1);
#endif // NDEBUG
- if (_need_pop) thread_context()->_thread_mem_tracker_mgr->pop_consumer_tracker();
+ if (_need_pop) thread_context()->thread_mem_tracker_mgr->pop_consumer_tracker();
}
} // namespace doris
diff --git a/be/src/runtime/thread_context.h b/be/src/runtime/thread_context.h
index d07cbe7056..68f09ce085 100644
--- a/be/src/runtime/thread_context.h
+++ b/be/src/runtime/thread_context.h
@@ -27,6 +27,7 @@
#include "gutil/macros.h"
#include "runtime/memory/thread_mem_tracker_mgr.h"
#include "runtime/threadlocal.h"
+#include "util/defer_op.h"
// Used to observe the memory usage of the specified code segment
#ifdef USE_MEM_TRACKER
@@ -66,16 +67,13 @@
auto VARNAME_LINENUM(stop_check_limit) = StopCheckThreadMemTrackerLimit()
// If the thread MemTrackerLimiter exceeds the limit, an error status is returned.
// Usually used after SCOPED_ATTACH_TASK, during query execution.
-#define RETURN_LIMIT_EXCEEDED(state, msg, ...) \
- return doris::thread_context() \
- ->_thread_mem_tracker_mgr->limiter_mem_tracker() \
- ->fragment_mem_limit_exceeded( \
- state, \
- fmt::format("exec node:<{}>, {}", \
- doris::thread_context() \
- ->_thread_mem_tracker_mgr->last_consumer_tracker(), \
- msg), \
- ##__VA_ARGS__);
+#define RETURN_LIMIT_EXCEEDED(state, msg, ...) \
+ return doris::thread_context()->thread_mem_tracker()->fragment_mem_limit_exceeded( \
+ state, \
+ fmt::format("exec node:<{}>, {}", \
+ doris::thread_context()->thread_mem_tracker_mgr->last_consumer_tracker(), \
+ msg), \
+ ##__VA_ARGS__);
#else
#define SCOPED_ATTACH_TASK(arg1, ...) (void)0
#define SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(mem_tracker_limiter) (void)0
@@ -125,6 +123,7 @@ public:
};
inline thread_local ThreadContextPtr thread_context_ptr;
+inline thread_local bool enable_thread_cache_bad_alloc = false;
// To avoid performance problems caused by frequently calling `bthread_getspecific` to obtain bthread TLS
// in tcmalloc hook, cache the key and value of bthread TLS in pthread TLS.
@@ -142,8 +141,8 @@ inline thread_local bthread_t bthread_id;
class ThreadContext {
public:
ThreadContext() {
- _thread_mem_tracker_mgr.reset(new ThreadMemTrackerMgr());
- if (ExecEnv::GetInstance()->initialized()) _thread_mem_tracker_mgr->init();
+ thread_mem_tracker_mgr.reset(new ThreadMemTrackerMgr());
+ if (ExecEnv::GetInstance()->initialized()) thread_mem_tracker_mgr->init();
}
~ThreadContext() { thread_context_ptr.init = false; }
@@ -154,18 +153,18 @@ public:
// will only attach_task at the beginning of the thread function, there should be no duplicate attach_task.
DCHECK(mem_tracker);
// Orphan is thread default tracker.
- DCHECK(_thread_mem_tracker_mgr->limiter_mem_tracker()->label() == "Orphan")
+ DCHECK(thread_mem_tracker()->label() == "Orphan")
<< ", attach mem tracker label: " << mem_tracker->label();
#endif
_task_id = task_id;
_fragment_instance_id = fragment_instance_id;
- _thread_mem_tracker_mgr->attach_limiter_tracker(mem_tracker, fragment_instance_id);
+ thread_mem_tracker_mgr->attach_limiter_tracker(mem_tracker, fragment_instance_id);
}
void detach_task() {
_task_id = "";
_fragment_instance_id = TUniqueId();
- _thread_mem_tracker_mgr->detach_limiter_tracker();
+ thread_mem_tracker_mgr->detach_limiter_tracker();
}
const TUniqueId& fragment_instance_id() const { return _fragment_instance_id; }
@@ -176,13 +175,16 @@ public:
return ss.str();
}
- // After _thread_mem_tracker_mgr is initialized, the current thread TCMalloc Hook starts to
+ // After thread_mem_tracker_mgr is initialized, the current thread TCMalloc Hook starts to
// consume/release mem_tracker.
// Note that the use of shared_ptr will cause a crash. The guess is that there is an
// intermediate state during the copy construction of shared_ptr. Shared_ptr is not equal
// to nullptr, but the object it points to is not initialized. At this time, when the memory
// is released somewhere, the TCMalloc hook is triggered to cause the crash.
- std::unique_ptr _thread_mem_tracker_mgr;
+ std::unique_ptr thread_mem_tracker_mgr;
+ MemTrackerLimiter* thread_mem_tracker() {
+ return thread_mem_tracker_mgr->limiter_mem_tracker_raw();
+ }
private:
std::string _task_id = "";
@@ -271,12 +273,12 @@ private:
class StopCheckThreadMemTrackerLimit {
public:
explicit StopCheckThreadMemTrackerLimit() {
- _pre = thread_context()->_thread_mem_tracker_mgr->check_limit();
- thread_context()->_thread_mem_tracker_mgr->set_check_limit(false);
+ _pre = thread_context()->thread_mem_tracker_mgr->check_limit();
+ thread_context()->thread_mem_tracker_mgr->set_check_limit(false);
}
~StopCheckThreadMemTrackerLimit() {
- thread_context()->_thread_mem_tracker_mgr->set_check_limit(_pre);
+ thread_context()->thread_mem_tracker_mgr->set_check_limit(_pre);
}
private:
@@ -287,45 +289,102 @@ private:
#ifdef USE_MEM_TRACKER
// For the memory that cannot be counted by mem hook, manually count it into the mem tracker, such as mmap.
#define CONSUME_THREAD_MEM_TRACKER(size) \
- doris::thread_context()->_thread_mem_tracker_mgr->consume(size)
+ doris::thread_context()->thread_mem_tracker_mgr->consume(size)
#define RELEASE_THREAD_MEM_TRACKER(size) \
- doris::thread_context()->_thread_mem_tracker_mgr->consume(-size)
+ doris::thread_context()->thread_mem_tracker_mgr->consume(-size)
// used to fix the tracking accuracy of caches.
-#define THREAD_MEM_TRACKER_TRANSFER_TO(size, tracker) \
- doris::thread_context()->_thread_mem_tracker_mgr->limiter_mem_tracker_raw()->transfer_to( \
+#define THREAD_MEM_TRACKER_TRANSFER_TO(size, tracker) \
+ doris::thread_context()->thread_mem_tracker_mgr->limiter_mem_tracker_raw()->transfer_to( \
size, tracker)
#define THREAD_MEM_TRACKER_TRANSFER_FROM(size, tracker) \
tracker->transfer_to( \
- size, doris::thread_context()->_thread_mem_tracker_mgr->limiter_mem_tracker_raw())
+ size, doris::thread_context()->thread_mem_tracker_mgr->limiter_mem_tracker_raw())
+
+// Consider catching other memory errors, such as memset failure, etc.
+#define RETURN_IF_CATCH_BAD_ALLOC(stmt) \
+ do { \
+ doris::thread_context()->thread_mem_tracker_mgr->clear_exceed_mem_limit_msg(); \
+ if (doris::enable_thread_cache_bad_alloc) { \
+ try { \
+ { stmt; } \
+ } catch (std::bad_alloc const& e) { \
+ doris::thread_context()->thread_mem_tracker()->print_log_usage( \
+ doris::thread_context()->thread_mem_tracker_mgr->exceed_mem_limit_msg()); \
+ return Status::MemoryLimitExceeded(fmt::format( \
+ "PreCatch {}, {}", e.what(), \
+ doris::thread_context()->thread_mem_tracker_mgr->exceed_mem_limit_msg())); \
+ } \
+ } else { \
+ try { \
+ doris::enable_thread_cache_bad_alloc = true; \
+ Defer defer {[&]() { doris::enable_thread_cache_bad_alloc = false; }}; \
+ { stmt; } \
+ } catch (std::bad_alloc const& e) { \
+ doris::thread_context()->thread_mem_tracker()->print_log_usage( \
+ doris::thread_context()->thread_mem_tracker_mgr->exceed_mem_limit_msg()); \
+ return Status::MemoryLimitExceeded(fmt::format( \
+ "PreCatch {}, {}", e.what(), \
+ doris::thread_context()->thread_mem_tracker_mgr->exceed_mem_limit_msg())); \
+ } \
+ } \
+ } while (0)
// Mem Hook to consume thread mem tracker
// TODO: In the original design, the MemTracker consume method is called before the memory is allocated.
// If the consume succeeds, the memory is actually allocated, otherwise an exception is thrown.
// But the statistics of memory through TCMalloc new/delete Hook are after the memory is actually allocated,
// which is different from the previous behavior.
-#define MEM_MALLOC_HOOK(size) \
+#define CONSUME_MEM_TRACKER(size) \
+ do { \
+ if (doris::thread_context_ptr.init) { \
+ doris::thread_context()->thread_mem_tracker_mgr->consume(size); \
+ } else { \
+ doris::ThreadMemTrackerMgr::consume_no_attach(size); \
+ } \
+ } while (0)
+// NOTE, The LOG cannot be printed in the mem hook. If the LOG statement triggers the mem hook LOG,
+// the nested LOG may cause an unknown crash.
+#define TRY_CONSUME_MEM_TRACKER(size, fail_ret) \
+ do { \
+ if (doris::thread_context_ptr.init) { \
+ if (doris::enable_thread_cache_bad_alloc) { \
+ if (!doris::thread_context()->thread_mem_tracker_mgr->try_consume(size)) { \
+ return fail_ret; \
+ } \
+ } else { \
+ doris::thread_context()->thread_mem_tracker_mgr->consume(size); \
+ } \
+ } else { \
+ doris::ThreadMemTrackerMgr::consume_no_attach(size); \
+ } \
+ } while (0)
+#define RELEASE_MEM_TRACKER(size) \
do { \
if (doris::thread_context_ptr.init) { \
- doris::thread_context()->_thread_mem_tracker_mgr->consume(size); \
+ doris::thread_context()->thread_mem_tracker_mgr->consume(-size); \
} else { \
- doris::ThreadMemTrackerMgr::consume_no_attach(size); \
+ doris::ThreadMemTrackerMgr::consume_no_attach(-size); \
} \
} while (0)
-#define MEM_FREE_HOOK(size) \
- do { \
- if (doris::thread_context_ptr.init) { \
- doris::thread_context()->_thread_mem_tracker_mgr->consume(-size); \
- } else { \
- doris::ThreadMemTrackerMgr::consume_no_attach(-size); \
- } \
+#define TRY_RELEASE_MEM_TRACKER(size) \
+ do { \
+ if (doris::thread_context_ptr.init) { \
+ if (!doris::enable_thread_cache_bad_alloc) { \
+ doris::thread_context()->thread_mem_tracker_mgr->consume(-size); \
+ } \
+ } else { \
+ doris::ThreadMemTrackerMgr::consume_no_attach(-size); \
+ } \
} while (0)
#else
#define CONSUME_THREAD_MEM_TRACKER(size) (void)0
#define RELEASE_THREAD_MEM_TRACKER(size) (void)0
#define THREAD_MEM_TRACKER_TRANSFER_TO(size, tracker) (void)0
#define THREAD_MEM_TRACKER_TRANSFER_FROM(size, tracker) (void)0
-#define MEM_MALLOC_HOOK(size) (void)0
-#define MEM_FREE_HOOK(size) (void)0
+#define CONSUME_MEM_TRACKER(size) (void)0
+#define TRY_CONSUME_MEM_TRACKER(size) (void)0
+#define RELEASE_MEM_TRACKER(size) (void)0
+#define TRY_RELEASE_MEM_TRACKER(size) (void)0
#endif
} // namespace doris
diff --git a/be/src/vec/common/allocator.h b/be/src/vec/common/allocator.h
index 36a11fc6f9..c1c7e81297 100644
--- a/be/src/vec/common/allocator.h
+++ b/be/src/vec/common/allocator.h
@@ -102,6 +102,14 @@ static constexpr size_t CHUNK_THRESHOLD = 1024;
static constexpr size_t MMAP_MIN_ALIGNMENT = 4096;
static constexpr size_t MALLOC_MIN_ALIGNMENT = 8;
+#define RETURN_BAD_ALLOC(err) \
+ do { \
+ LOG(ERROR) << err; \
+ if (doris::enable_thread_cache_bad_alloc) throw std::bad_alloc {}; \
+ doris::MemTrackerLimiter::print_log_process_usage(err); \
+ return nullptr; \
+ } while (0)
+
/** Responsible for allocating / freeing memory. Used, for example, in PODArray, Arena.
* Also used in hash tables.
* The interface is different from std::allocator
@@ -131,20 +139,14 @@ public:
buf = mmap(get_mmap_hint(), size, PROT_READ | PROT_WRITE, mmap_flags, -1, 0);
if (MAP_FAILED == buf) {
RELEASE_THREAD_MEM_TRACKER(size);
- auto err = fmt::format("Allocator: Cannot mmap {}.", size);
- doris::MemTrackerLimiter::print_log_process_usage(err);
- doris::vectorized::throwFromErrno(err,
- doris::TStatusCode::VEC_CANNOT_ALLOCATE_MEMORY);
+ RETURN_BAD_ALLOC(fmt::format("Allocator: Cannot mmap {}.", size));
}
/// No need for zero-fill, because mmap guarantees it.
} else if (!doris::config::disable_chunk_allocator_in_vec && size >= CHUNK_THRESHOLD) {
doris::Chunk chunk;
if (!doris::ChunkAllocator::instance()->allocate_align(size, &chunk)) {
- auto err = fmt::format("Allocator: Cannot allocate chunk {}.", size);
- doris::MemTrackerLimiter::print_log_process_usage(err);
- doris::vectorized::throwFromErrno(err,
- doris::TStatusCode::VEC_CANNOT_ALLOCATE_MEMORY);
+ RETURN_BAD_ALLOC(fmt::format("Allocator: Cannot allocate chunk {}.", size));
}
buf = chunk.data;
if constexpr (clear_memory) memset(buf, 0, chunk.size);
@@ -156,20 +158,15 @@ public:
buf = ::malloc(size);
if (nullptr == buf) {
- auto err = fmt::format("Allocator: Cannot malloc {}.", size);
- doris::MemTrackerLimiter::print_log_process_usage(err);
- doris::vectorized::throwFromErrno(
- err, doris::TStatusCode::VEC_CANNOT_ALLOCATE_MEMORY);
+ RETURN_BAD_ALLOC(fmt::format("Allocator: Cannot malloc {}.", size));
}
} else {
buf = nullptr;
int res = posix_memalign(&buf, alignment, size);
if (0 != res) {
- auto err = fmt::format("Cannot allocate memory (posix_memalign) {}.", size);
- doris::MemTrackerLimiter::print_log_process_usage(err);
- doris::vectorized::throwFromErrno(
- err, doris::TStatusCode::VEC_CANNOT_ALLOCATE_MEMORY, res);
+ RETURN_BAD_ALLOC(
+ fmt::format("Cannot allocate memory (posix_memalign) {}.", size));
}
if constexpr (clear_memory) memset(buf, 0, size);
@@ -183,8 +180,9 @@ public:
if (size >= MMAP_THRESHOLD) {
if (0 != munmap(buf, size)) {
auto err = fmt::format("Allocator: Cannot munmap {}.", size);
+ LOG(ERROR) << err;
+ if (doris::enable_thread_cache_bad_alloc) throw std::bad_alloc {};
doris::MemTrackerLimiter::print_log_process_usage(err);
- doris::vectorized::throwFromErrno(err, doris::TStatusCode::VEC_CANNOT_MUNMAP);
} else {
RELEASE_THREAD_MEM_TRACKER(size);
}
@@ -210,11 +208,8 @@ public:
/// Resize malloc'd memory region with no special alignment requirement.
void* new_buf = ::realloc(buf, new_size);
if (nullptr == new_buf) {
- auto err =
- fmt::format("Allocator: Cannot realloc from {} to {}.", old_size, new_size);
- doris::MemTrackerLimiter::print_log_process_usage(err);
- doris::vectorized::throwFromErrno(err,
- doris::TStatusCode::VEC_CANNOT_ALLOCATE_MEMORY);
+ RETURN_BAD_ALLOC(fmt::format("Allocator: Cannot realloc from {} to {}.", old_size,
+ new_size));
}
buf = new_buf;
@@ -230,10 +225,8 @@ public:
mmap_flags, -1, 0);
if (MAP_FAILED == buf) {
RELEASE_THREAD_MEM_TRACKER(new_size - old_size);
- auto err = fmt::format("Allocator: Cannot mremap memory chunk from {} to {}.",
- old_size, new_size);
- doris::MemTrackerLimiter::print_log_process_usage(err);
- doris::vectorized::throwFromErrno(err, doris::TStatusCode::VEC_CANNOT_MREMAP);
+ RETURN_BAD_ALLOC(fmt::format("Allocator: Cannot mremap memory chunk from {} to {}.",
+ old_size, new_size));
}
/// No need for zero-fill, because mmap guarantees it.
diff --git a/be/src/vec/common/sort/sorter.cpp b/be/src/vec/common/sort/sorter.cpp
index 94ad261165..f41ceeb5b7 100644
--- a/be/src/vec/common/sort/sorter.cpp
+++ b/be/src/vec/common/sort/sorter.cpp
@@ -17,6 +17,8 @@
#include "vec/common/sort/sorter.h"
+#include "runtime/thread_context.h"
+
namespace doris::vectorized {
void MergeSorterState::build_merge_tree(SortDescription& sort_description) {
@@ -136,8 +138,8 @@ Status FullSorter::append_block(Block* block) {
auto sz = block->rows();
for (int i = 0; i < data.size(); ++i) {
DCHECK(data[i].type->equals(*(arrival_data[i].type)));
- data[i].column->assume_mutable()->insert_range_from(
- *arrival_data[i].column->convert_to_full_column_if_const().get(), 0, sz);
+ RETURN_IF_CATCH_BAD_ALLOC(data[i].column->assume_mutable()->insert_range_from(
+ *arrival_data[i].column->convert_to_full_column_if_const().get(), 0, sz));
}
block->clear_column_data();
}
diff --git a/be/src/vec/exec/join/vhash_join_node.cpp b/be/src/vec/exec/join/vhash_join_node.cpp
index 8bea283f07..5a7d364401 100644
--- a/be/src/vec/exec/join/vhash_join_node.cpp
+++ b/be/src/vec/exec/join/vhash_join_node.cpp
@@ -81,7 +81,7 @@ struct ProcessHashTableBuild {
_build_side_compute_hash_timer(join_node->_build_side_compute_hash_timer) {}
template
- void run(HashTableContext& hash_table_ctx, ConstNullMapPtr null_map, bool* has_null_key) {
+ Status run(HashTableContext& hash_table_ctx, ConstNullMapPtr null_map, bool* has_null_key) {
using KeyGetter = typename HashTableContext::State;
using Mapped = typename HashTableContext::Mapped;
int64_t old_bucket_bytes = hash_table_ctx.hash_table.get_buffer_size_in_bytes();
@@ -99,7 +99,7 @@ struct ProcessHashTableBuild {
// only not build_unique, we need expanse hash table before insert data
if (!_join_node->_build_unique) {
// _rows contains null row, which will cause hash table resize to be large.
- hash_table_ctx.hash_table.expanse_for_add_elem(_rows);
+ RETURN_IF_CATCH_BAD_ALLOC(hash_table_ctx.hash_table.expanse_for_add_elem(_rows));
}
hash_table_ctx.hash_table.reset_resize_timer();
@@ -125,7 +125,7 @@ struct ProcessHashTableBuild {
if ((*null_map)[k]) {
DCHECK(has_null_key);
*has_null_key = true;
- return;
+ return Status::OK();
}
}
if constexpr (IsSerializedHashTableContextTraits::value) {
@@ -187,6 +187,7 @@ struct ProcessHashTableBuild {
COUNTER_UPDATE(_join_node->_build_table_expanse_timer,
hash_table_ctx.hash_table.get_resize_timer_value());
+ return Status::OK();
}
private:
@@ -1279,7 +1280,7 @@ Status HashJoinNode::_materialize_build_side(RuntimeState* state) {
if (block.rows() != 0) {
SCOPED_TIMER(_build_side_merge_block_timer);
- mutable_block.merge(block);
+ RETURN_IF_CATCH_BAD_ALLOC(mutable_block.merge(block));
}
if (UNLIKELY(_mem_used - last_mem_used > BUILD_BLOCK_MAX_SIZE)) {
@@ -1464,19 +1465,20 @@ Status HashJoinNode::_process_build_block(RuntimeState* state, Block& block, uin
// Get the key column that needs to be built
Status st = _extract_join_column(block, null_map_val, raw_ptrs, res_col_ids);
- std::visit(
+ st = std::visit(
Overload {
[&](std::monostate& arg, auto has_null_value,
- auto short_circuit_for_null_in_build_side) {
+ auto short_circuit_for_null_in_build_side) -> Status {
LOG(FATAL) << "FATAL: uninited hash table";
__builtin_unreachable();
+ return Status::OK();
},
[&](auto&& arg, auto has_null_value,
- auto short_circuit_for_null_in_build_side) {
+ auto short_circuit_for_null_in_build_side) -> Status {
using HashTableCtxType = std::decay_t;
ProcessHashTableBuild hash_table_build_process(
rows, block, raw_ptrs, this, state->batch_size(), offset);
- hash_table_build_process
+ return hash_table_build_process
.template run(
arg,
has_null_value || short_circuit_for_null_in_build_side
diff --git a/be/src/vec/exec/vaggregation_node.cpp b/be/src/vec/exec/vaggregation_node.cpp
index 54a7659471..4d6965ed4d 100644
--- a/be/src/vec/exec/vaggregation_node.cpp
+++ b/be/src/vec/exec/vaggregation_node.cpp
@@ -1054,7 +1054,7 @@ Status AggregationNode::_pre_agg_with_serialized_key(doris::vectorized::Block* i
_agg_data._aggregated_method_variant);
if (!ret_flag) {
- _emplace_into_hash_table(_places.data(), key_columns, rows);
+ RETURN_IF_CATCH_BAD_ALLOC(_emplace_into_hash_table(_places.data(), key_columns, rows));
for (int i = 0; i < _aggregate_evaluators.size(); ++i) {
_aggregate_evaluators[i]->execute_batch_add(in_block, _offsets_of_aggregate_states[i],
diff --git a/be/test/testutil/run_all_tests.cpp b/be/test/testutil/run_all_tests.cpp
index 570f428720..99a338a5aa 100644
--- a/be/test/testutil/run_all_tests.cpp
+++ b/be/test/testutil/run_all_tests.cpp
@@ -32,7 +32,7 @@ int main(int argc, char** argv) {
std::make_shared(doris::MemTrackerLimiter::Type::GLOBAL,
"Orphan");
doris::ExecEnv::GetInstance()->set_orphan_mem_tracker(orphan_mem_tracker);
- doris::thread_context()->_thread_mem_tracker_mgr->init();
+ doris::thread_context()->thread_mem_tracker_mgr->init();
doris::TabletSchemaCache::create_global_schema_cache();
doris::StoragePageCache::create_global_cache(1 << 30, 10);
doris::SegmentLoader::create_global_instance(1000);