diff --git a/be/src/io/fs/stream_load_pipe.cpp b/be/src/io/fs/stream_load_pipe.cpp index f93926d612..3e2f6bdf3e 100644 --- a/be/src/io/fs/stream_load_pipe.cpp +++ b/be/src/io/fs/stream_load_pipe.cpp @@ -43,7 +43,6 @@ StreamLoadPipe::StreamLoadPipe(size_t max_buffered_bytes, size_t min_chunk_size, _use_proto(use_proto) {} StreamLoadPipe::~StreamLoadPipe() { - SCOPED_TRACK_MEMORY_TO_UNKNOWN(); while (!_buf_queue.empty()) { _buf_queue.pop_front(); } diff --git a/be/src/olap/schema.h b/be/src/olap/schema.h index d220ebbee0..ce5c6705e7 100644 --- a/be/src/olap/schema.h +++ b/be/src/olap/schema.h @@ -50,7 +50,7 @@ using SchemaSPtr = std::shared_ptr; class Schema { public: Schema(TabletSchemaSPtr tablet_schema) { - SCOPED_MEM_COUNT(&_mem_size); + SCOPED_MEM_COUNT_BY_HOOK(&_mem_size); size_t num_columns = tablet_schema->num_columns(); // ignore this column if (tablet_schema->columns().back().name() == BeConsts::ROW_STORE_COL) { @@ -86,7 +86,7 @@ public: // All the columns of one table may exist in the columns param, but col_ids is only a subset. Schema(const std::vector& columns, const std::vector& col_ids) { - SCOPED_MEM_COUNT(&_mem_size); + SCOPED_MEM_COUNT_BY_HOOK(&_mem_size); size_t num_key_columns = 0; _unique_ids.resize(columns.size()); for (size_t i = 0; i < columns.size(); ++i) { @@ -109,7 +109,7 @@ public: // Only for UT Schema(const std::vector& columns, size_t num_key_columns) { - SCOPED_MEM_COUNT(&_mem_size); + SCOPED_MEM_COUNT_BY_HOOK(&_mem_size); std::vector col_ids(columns.size()); _unique_ids.resize(columns.size()); for (uint32_t cid = 0; cid < columns.size(); ++cid) { @@ -121,7 +121,7 @@ public: } Schema(const std::vector& cols, size_t num_key_columns) { - SCOPED_MEM_COUNT(&_mem_size); + SCOPED_MEM_COUNT_BY_HOOK(&_mem_size); std::vector col_ids(cols.size()); _unique_ids.resize(cols.size()); for (uint32_t cid = 0; cid < cols.size(); ++cid) { diff --git a/be/src/olap/tablet_schema.cpp b/be/src/olap/tablet_schema.cpp index 2bbe0d498a..d2b16a907a 100644 --- a/be/src/olap/tablet_schema.cpp +++ b/be/src/olap/tablet_schema.cpp @@ -720,7 +720,7 @@ void TabletSchema::clear_columns() { } void TabletSchema::init_from_pb(const TabletSchemaPB& schema) { - SCOPED_MEM_COUNT(&_mem_size); + SCOPED_MEM_COUNT_BY_HOOK(&_mem_size); _keys_type = schema.keys_type(); _num_columns = 0; _num_variant_columns = 0; diff --git a/be/src/runtime/buffer_control_block.cpp b/be/src/runtime/buffer_control_block.cpp index 5e8efda14f..17f9088902 100644 --- a/be/src/runtime/buffer_control_block.cpp +++ b/be/src/runtime/buffer_control_block.cpp @@ -43,7 +43,6 @@ void GetResultBatchCtx::on_failure(const Status& status) { status.to_protobuf(result->mutable_status()); { // call by result sink - SCOPED_TRACK_MEMORY_TO_UNKNOWN(); done->Run(); } delete this; @@ -57,10 +56,7 @@ void GetResultBatchCtx::on_close(int64_t packet_seq, QueryStatistics* statistics } result->set_packet_seq(packet_seq); result->set_eos(true); - { - SCOPED_TRACK_MEMORY_TO_UNKNOWN(); - done->Run(); - } + { done->Run(); } delete this; } @@ -85,10 +81,7 @@ void GetResultBatchCtx::on_data(const std::unique_ptr& t_resul result->set_eos(eos); } st.to_protobuf(result->mutable_status()); - { - SCOPED_TRACK_MEMORY_TO_UNKNOWN(); - done->Run(); - } + { done->Run(); } delete this; } diff --git a/be/src/runtime/memory/jemalloc_hook.cpp b/be/src/runtime/memory/jemalloc_hook.cpp index 2529244425..445d60d382 100644 --- a/be/src/runtime/memory/jemalloc_hook.cpp +++ b/be/src/runtime/memory/jemalloc_hook.cpp @@ -40,16 +40,18 @@ extern "C" { // mem hook should avoid nesting new/malloc. void* doris_malloc(size_t size) __THROW { - CONSUME_MEM_TRACKER(jenallocx(size, 0)); + CONSUME_THREAD_MEM_TRACKER_BY_HOOK_WITH_FN([](size_t size) { return jenallocx(size, 0); }, + size); void* ptr = jemalloc(size); if (UNLIKELY(ptr == nullptr)) { - RELEASE_MEM_TRACKER(jenallocx(size, 0)); + RELEASE_THREAD_MEM_TRACKER_BY_HOOK_WITH_FN([](size_t size) { return jenallocx(size, 0); }, + size); } return ptr; } void doris_free(void* p) __THROW { - RELEASE_MEM_TRACKER(jemalloc_usable_size(p)); + RELEASE_THREAD_MEM_TRACKER_BY_HOOK_WITH_FN([](void* p) { return jemalloc_usable_size(p); }, p); jefree(p); } @@ -60,14 +62,20 @@ void* doris_realloc(void* p, size_t size) __THROW { #if USE_MEM_TRACKER int64_t old_size = jemalloc_usable_size(p); -#endif - - CONSUME_MEM_TRACKER(jenallocx(size, 0) - old_size); + CONSUME_THREAD_MEM_TRACKER_BY_HOOK_WITH_FN( + [](size_t size, int64_t old_size) { return jenallocx(size, 0) - old_size; }, size, + old_size); void* ptr = jerealloc(p, size); if (UNLIKELY(ptr == nullptr)) { - RELEASE_MEM_TRACKER(jenallocx(size, 0) - old_size); + RELEASE_THREAD_MEM_TRACKER_BY_HOOK_WITH_FN( + [](size_t size, int64_t old_size) { return jenallocx(size, 0) - old_size; }, size, + old_size); } return ptr; +#else + void* ptr = jerealloc(p, size); + return ptr; +#endif } void* doris_calloc(size_t n, size_t size) __THROW { @@ -75,72 +83,80 @@ void* doris_calloc(size_t n, size_t size) __THROW { return nullptr; } - CONSUME_MEM_TRACKER(n * size); + CONSUME_THREAD_MEM_TRACKER_BY_HOOK(n * size); void* ptr = jecalloc(n, size); if (UNLIKELY(ptr == nullptr)) { - RELEASE_MEM_TRACKER(n * size); + RELEASE_THREAD_MEM_TRACKER_BY_HOOK(n * size); } else { - CONSUME_MEM_TRACKER(jemalloc_usable_size(ptr) - n * size); + CONSUME_THREAD_MEM_TRACKER_BY_HOOK_WITH_FN( + [](void* ptr, size_t size) { return jemalloc_usable_size(ptr) - size; }, ptr, + n * size); } return ptr; } void doris_cfree(void* ptr) __THROW { - RELEASE_MEM_TRACKER(jemalloc_usable_size(ptr)); + RELEASE_THREAD_MEM_TRACKER_BY_HOOK_WITH_FN([](void* ptr) { return jemalloc_usable_size(ptr); }, + ptr); jefree(ptr); } void* doris_memalign(size_t align, size_t size) __THROW { - CONSUME_MEM_TRACKER(size); + CONSUME_THREAD_MEM_TRACKER_BY_HOOK(size); void* ptr = jealigned_alloc(align, size); if (UNLIKELY(ptr == nullptr)) { - RELEASE_MEM_TRACKER(size); + RELEASE_THREAD_MEM_TRACKER_BY_HOOK(size); } else { - CONSUME_MEM_TRACKER(jemalloc_usable_size(ptr) - size); + CONSUME_THREAD_MEM_TRACKER_BY_HOOK_WITH_FN( + [](void* ptr, size_t size) { return jemalloc_usable_size(ptr) - size; }, ptr, size); } return ptr; } void* doris_aligned_alloc(size_t align, size_t size) __THROW { - CONSUME_MEM_TRACKER(size); + CONSUME_THREAD_MEM_TRACKER_BY_HOOK(size); void* ptr = jealigned_alloc(align, size); if (UNLIKELY(ptr == nullptr)) { - RELEASE_MEM_TRACKER(size); + RELEASE_THREAD_MEM_TRACKER_BY_HOOK(size); } else { - CONSUME_MEM_TRACKER(jemalloc_usable_size(ptr) - size); + CONSUME_THREAD_MEM_TRACKER_BY_HOOK_WITH_FN( + [](void* ptr, size_t size) { return jemalloc_usable_size(ptr) - size; }, ptr, size); } return ptr; } void* doris_valloc(size_t size) __THROW { - CONSUME_MEM_TRACKER(size); + CONSUME_THREAD_MEM_TRACKER_BY_HOOK(size); void* ptr = jevalloc(size); if (UNLIKELY(ptr == nullptr)) { - RELEASE_MEM_TRACKER(size); + RELEASE_THREAD_MEM_TRACKER_BY_HOOK(size); } else { - CONSUME_MEM_TRACKER(jemalloc_usable_size(ptr) - size); + CONSUME_THREAD_MEM_TRACKER_BY_HOOK_WITH_FN( + [](void* ptr, size_t size) { return jemalloc_usable_size(ptr) - size; }, ptr, size); } return ptr; } void* doris_pvalloc(size_t size) __THROW { - CONSUME_MEM_TRACKER(size); + CONSUME_THREAD_MEM_TRACKER_BY_HOOK(size); void* ptr = jevalloc(size); if (UNLIKELY(ptr == nullptr)) { - RELEASE_MEM_TRACKER(size); + RELEASE_THREAD_MEM_TRACKER_BY_HOOK(size); } else { - CONSUME_MEM_TRACKER(jemalloc_usable_size(ptr) - size); + CONSUME_THREAD_MEM_TRACKER_BY_HOOK_WITH_FN( + [](void* ptr, size_t size) { return jemalloc_usable_size(ptr) - size; }, ptr, size); } return ptr; } int doris_posix_memalign(void** r, size_t align, size_t size) __THROW { - CONSUME_MEM_TRACKER(size); + CONSUME_THREAD_MEM_TRACKER_BY_HOOK(size); int ret = jeposix_memalign(r, align, size); if (UNLIKELY(ret != 0)) { - RELEASE_MEM_TRACKER(size); + RELEASE_THREAD_MEM_TRACKER_BY_HOOK(size); } else { - CONSUME_MEM_TRACKER(jemalloc_usable_size(*r) - size); + CONSUME_THREAD_MEM_TRACKER_BY_HOOK_WITH_FN( + [](void* ptr, size_t size) { return jemalloc_usable_size(ptr) - size; }, *r, size); } return ret; } diff --git a/be/src/runtime/memory/mem_tracker.cpp b/be/src/runtime/memory/mem_tracker.cpp index df8e9d80a4..3d23bd447c 100644 --- a/be/src/runtime/memory/mem_tracker.cpp +++ b/be/src/runtime/memory/mem_tracker.cpp @@ -47,7 +47,7 @@ void MemTracker::bind_parent(MemTrackerLimiter* parent) { if (parent) { _parent_label = parent->label(); _parent_group_num = parent->group_num(); - } else if (thread_context_ptr.init) { + } else if (is_thread_context_init()) { _parent_label = thread_context()->thread_mem_tracker()->label(); _parent_group_num = thread_context()->thread_mem_tracker()->group_num(); } diff --git a/be/src/runtime/memory/mem_tracker_limiter.cpp b/be/src/runtime/memory/mem_tracker_limiter.cpp index 268b0ec38e..ad90df8114 100644 --- a/be/src/runtime/memory/mem_tracker_limiter.cpp +++ b/be/src/runtime/memory/mem_tracker_limiter.cpp @@ -136,7 +136,7 @@ void MemTrackerLimiter::refresh_global_counter() { void MemTrackerLimiter::make_process_snapshots(std::vector* snapshots) { MemTrackerLimiter::refresh_global_counter(); - int64_t process_mem_sum = 0; + int64_t all_tracker_mem_sum = 0; Snapshot snapshot; for (auto it : MemTrackerLimiter::TypeMemSum) { snapshot.type = type_string(it.first); @@ -145,7 +145,7 @@ void MemTrackerLimiter::make_process_snapshots(std::vector snapshot.cur_consumption = it.second->current_value(); snapshot.peak_consumption = it.second->peak_value(); (*snapshots).emplace_back(snapshot); - process_mem_sum += it.second->current_value(); + all_tracker_mem_sum += it.second->current_value(); } snapshot.type = "tc/jemalloc_free_memory"; @@ -154,14 +154,28 @@ void MemTrackerLimiter::make_process_snapshots(std::vector snapshot.cur_consumption = MemInfo::allocator_cache_mem(); snapshot.peak_consumption = -1; (*snapshots).emplace_back(snapshot); - process_mem_sum += MemInfo::allocator_cache_mem(); + all_tracker_mem_sum += MemInfo::allocator_cache_mem(); - snapshot.type = "process"; + snapshot.type = "all_tracker_sum (is virtual memory)"; snapshot.label = ""; snapshot.limit = -1; - snapshot.cur_consumption = process_mem_sum; + snapshot.cur_consumption = all_tracker_mem_sum; snapshot.peak_consumption = -1; (*snapshots).emplace_back(snapshot); + + snapshot.type = "process resident memory (from /proc VmRSS VmHWM)"; + snapshot.label = ""; + snapshot.limit = -1; + snapshot.cur_consumption = PerfCounters::get_vm_rss(); + snapshot.peak_consumption = PerfCounters::get_vm_hwm(); + (*snapshots).emplace_back(snapshot); + + snapshot.type = "process virtual memory (from /proc VmSize VmPeak)"; + snapshot.label = ""; + snapshot.limit = -1; + snapshot.cur_consumption = PerfCounters::get_vm_size(); + snapshot.peak_consumption = PerfCounters::get_vm_peak(); + (*snapshots).emplace_back(snapshot); } void MemTrackerLimiter::make_type_snapshots(std::vector* snapshots, @@ -343,7 +357,9 @@ std::string MemTrackerLimiter::tracker_limit_exceeded_str() { err_msg += fmt::format( " exec node:<{}>, can `set exec_mem_limit=8G` to change limit, details see " "be.INFO.", - doris::thread_context()->thread_mem_tracker_mgr->last_consumer_tracker()); + doris::is_thread_context_init() + ? doris::thread_context()->thread_mem_tracker_mgr->last_consumer_tracker() + : ""); } else if (_type == Type::SCHEMA_CHANGE) { err_msg += fmt::format( " can modify `memory_limitation_per_thread_for_schema_change_bytes` in be.conf to " diff --git a/be/src/runtime/memory/mem_tracker_limiter.h b/be/src/runtime/memory/mem_tracker_limiter.h index 936faa1592..df20448c01 100644 --- a/be/src/runtime/memory/mem_tracker_limiter.h +++ b/be/src/runtime/memory/mem_tracker_limiter.h @@ -155,6 +155,9 @@ public: // Transfer 'bytes' of consumption from this tracker to 'dst'. void transfer_to(int64_t size, MemTrackerLimiter* dst) { + if (label() == dst->label()) { + return; + } cache_consume(-size); dst->cache_consume(size); } diff --git a/be/src/runtime/memory/tcmalloc_hook.h b/be/src/runtime/memory/tcmalloc_hook.h index afd15d0b32..9cc88be1b0 100644 --- a/be/src/runtime/memory/tcmalloc_hook.h +++ b/be/src/runtime/memory/tcmalloc_hook.h @@ -36,11 +36,11 @@ // destructor to control the behavior of consume can lead to unexpected behavior, // like this: if (LIKELY(doris::start_thread_mem_tracker)) { void new_hook(const void* ptr, size_t size) { - CONSUME_MEM_TRACKER(tc_nallocx(size, 0)); + CONSUME_THREAD_MEM_TRACKER_BY_HOOK(tc_nallocx(size, 0)); } void delete_hook(const void* ptr) { - RELEASE_MEM_TRACKER(tc_malloc_size(const_cast(ptr))); + RELEASE_THREAD_MEM_TRACKER_BY_HOOK(tc_malloc_size(const_cast(ptr))); } void init_hook() { diff --git a/be/src/runtime/memory/thread_mem_tracker_mgr.h b/be/src/runtime/memory/thread_mem_tracker_mgr.h index 487f82ac54..4cb22b9e1a 100644 --- a/be/src/runtime/memory/thread_mem_tracker_mgr.h +++ b/be/src/runtime/memory/thread_mem_tracker_mgr.h @@ -40,11 +40,13 @@ namespace doris { // Memory Hook is counted in the memory tracker of the current thread. class ThreadMemTrackerMgr { public: - ThreadMemTrackerMgr() {} + ThreadMemTrackerMgr() = default; ~ThreadMemTrackerMgr() { // if _init == false, exec env is not initialized when init(). and never consumed mem tracker once. - if (_init) flush_untracked_mem(); + if (_init) { + flush_untracked_mem(); + } } bool init(); @@ -77,7 +79,7 @@ public: // such as calling LOG/iostream/sstream/stringstream/etc. related methods, // must increase the control to avoid entering infinite recursion, otherwise it may cause crash or stuck, // Returns whether the memory exceeds limit, and will consume mem trcker no matter whether the limit is exceeded. - void consume(int64_t size, bool large_memory_check = false); + void consume(int64_t size, int skip_large_memory_check = 0); void flush_untracked_mem(); bool is_attach_query() { return _fragment_instance_id != TUniqueId(); } @@ -92,7 +94,7 @@ public: } void disable_wait_gc() { _wait_gc = false; } - bool wait_gc() { return _wait_gc; } + [[nodiscard]] bool wait_gc() const { return _wait_gc; } void cancel_instance(const std::string& exceed_msg); std::string print_debug_string() { @@ -161,9 +163,9 @@ inline void ThreadMemTrackerMgr::pop_consumer_tracker() { _consumer_tracker_stack.pop_back(); } -inline void ThreadMemTrackerMgr::consume(int64_t size, bool large_memory_check) { +inline void ThreadMemTrackerMgr::consume(int64_t size, int skip_large_memory_check) { _untracked_mem += size; - if (!ExecEnv::ready()) { + if (!_init && !ExecEnv::ready()) { return; } // When some threads `0 < _untracked_mem < config::mem_tracker_consume_min_size_bytes` @@ -176,7 +178,7 @@ inline void ThreadMemTrackerMgr::consume(int64_t size, bool large_memory_check) flush_untracked_mem(); } - if (large_memory_check && doris::config::large_memory_check_bytes > 0 && + if (skip_large_memory_check == 0 && doris::config::large_memory_check_bytes > 0 && size > doris::config::large_memory_check_bytes) { _stop_consume = true; LOG(WARNING) << fmt::format( diff --git a/be/src/runtime/runtime_state.cpp b/be/src/runtime/runtime_state.cpp index 84329be070..daf3e1bc75 100644 --- a/be/src/runtime/runtime_state.cpp +++ b/be/src/runtime/runtime_state.cpp @@ -334,7 +334,7 @@ Status RuntimeState::check_query_state(const std::string& msg) { // // If the thread MemTrackerLimiter exceeds the limit, an error status is returned. // Usually used after SCOPED_ATTACH_TASK, during query execution. - if (thread_context()->thread_mem_tracker()->limit_exceeded() && + if (is_thread_context_init() && thread_context()->thread_mem_tracker()->limit_exceeded() && !config::enable_query_memory_overcommit) { auto failed_msg = fmt::format("{}, {}", msg, diff --git a/be/src/runtime/thread_context.cpp b/be/src/runtime/thread_context.cpp index a092385f41..cd03c67b99 100644 --- a/be/src/runtime/thread_context.cpp +++ b/be/src/runtime/thread_context.cpp @@ -23,22 +23,15 @@ namespace doris { class MemTracker; -DEFINE_STATIC_THREAD_LOCAL(ThreadContext, ThreadContextPtr, _ptr); - -ThreadContextPtr::ThreadContextPtr() { - INIT_STATIC_THREAD_LOCAL(ThreadContext, _ptr); - init = true; -} - AttachTask::AttachTask(const std::shared_ptr& mem_tracker, const TUniqueId& task_id, const TUniqueId& fragment_instance_id) { - SwitchBthreadLocal::switch_to_bthread_local(); + ThreadLocalHandle::create_thread_local_if_not_exits(); signal::set_signal_task_id(task_id); thread_context()->attach_task(task_id, fragment_instance_id, mem_tracker); } AttachTask::AttachTask(RuntimeState* runtime_state) { - SwitchBthreadLocal::switch_to_bthread_local(); + ThreadLocalHandle::create_thread_local_if_not_exits(); signal::set_signal_task_id(runtime_state->query_id()); thread_context()->attach_task(runtime_state->query_id(), runtime_state->fragment_instance_id(), runtime_state->query_mem_tracker()); @@ -46,11 +39,11 @@ AttachTask::AttachTask(RuntimeState* runtime_state) { AttachTask::~AttachTask() { thread_context()->detach_task(); - SwitchBthreadLocal::switch_back_pthread_local(); + ThreadLocalHandle::del_thread_local_if_count_is_zero(); } AddThreadMemTrackerConsumer::AddThreadMemTrackerConsumer(MemTracker* mem_tracker) { - SwitchBthreadLocal::switch_to_bthread_local(); + ThreadLocalHandle::create_thread_local_if_not_exits(); if (mem_tracker) { _need_pop = thread_context()->thread_mem_tracker_mgr->push_consumer_tracker(mem_tracker); } @@ -59,7 +52,7 @@ AddThreadMemTrackerConsumer::AddThreadMemTrackerConsumer(MemTracker* mem_tracker AddThreadMemTrackerConsumer::AddThreadMemTrackerConsumer( const std::shared_ptr& mem_tracker) : _mem_tracker(mem_tracker) { - SwitchBthreadLocal::switch_to_bthread_local(); + ThreadLocalHandle::create_thread_local_if_not_exits(); if (_mem_tracker) { _need_pop = thread_context()->thread_mem_tracker_mgr->push_consumer_tracker(_mem_tracker.get()); @@ -70,7 +63,22 @@ AddThreadMemTrackerConsumer::~AddThreadMemTrackerConsumer() { if (_need_pop) { thread_context()->thread_mem_tracker_mgr->pop_consumer_tracker(); } - SwitchBthreadLocal::switch_back_pthread_local(); + ThreadLocalHandle::del_thread_local_if_count_is_zero(); +} + +AddThreadMemTrackerConsumerByHook::AddThreadMemTrackerConsumerByHook( + const std::shared_ptr& mem_tracker) + : _mem_tracker(mem_tracker) { + ThreadLocalHandle::create_thread_local_if_not_exits(); + DCHECK(mem_tracker != nullptr); + use_mem_hook = true; + thread_context()->thread_mem_tracker_mgr->push_consumer_tracker(_mem_tracker.get()); +} + +AddThreadMemTrackerConsumerByHook::~AddThreadMemTrackerConsumerByHook() { + thread_context()->thread_mem_tracker_mgr->pop_consumer_tracker(); + use_mem_hook = false; + ThreadLocalHandle::del_thread_local_if_count_is_zero(); } } // namespace doris diff --git a/be/src/runtime/thread_context.h b/be/src/runtime/thread_context.h index a5c9473d66..1999549049 100644 --- a/be/src/runtime/thread_context.h +++ b/be/src/runtime/thread_context.h @@ -33,64 +33,76 @@ #include "runtime/exec_env.h" #include "runtime/memory/mem_tracker_limiter.h" #include "runtime/memory/thread_mem_tracker_mgr.h" -#include "runtime/threadlocal.h" #include "util/defer_op.h" // IWYU pragma: keep -// Used to observe the memory usage of the specified code segment -#if defined(USE_MEM_TRACKER) && !defined(UNDEFINED_BEHAVIOR_SANITIZER) -// Count a code segment memory (memory malloc - memory free) to int64_t -// Usage example: int64_t scope_mem = 0; { SCOPED_MEM_COUNT(&scope_mem); xxx; xxx; } -#define SCOPED_MEM_COUNT(scope_mem) \ - auto VARNAME_LINENUM(scope_mem_count) = doris::ScopeMemCount(scope_mem) - -// Count a code segment memory (memory malloc - memory free) to MemTracker. -// Compared to count `scope_mem`, MemTracker is easier to observe from the outside and is thread-safe. -// Usage example: std::unique_ptr tracker = std::make_unique("first_tracker"); -// { SCOPED_CONSUME_MEM_TRACKER(_mem_tracker.get()); xxx; xxx; } -// Usually used to record query more detailed memory, including ExecNode operators. -#define SCOPED_CONSUME_MEM_TRACKER(mem_tracker) \ - auto VARNAME_LINENUM(add_mem_consumer) = doris::AddThreadMemTrackerConsumer(mem_tracker) -#else -#define SCOPED_MEM_COUNT(scope_mem) (void)0 -#define SCOPED_CONSUME_MEM_TRACKER(mem_tracker) (void)0 -#endif - -// Used to observe query/load/compaction/e.g. execution thread memory usage and respond when memory exceeds the limit. -#if defined(USE_MEM_TRACKER) && !defined(UNDEFINED_BEHAVIOR_SANITIZER) +// Used to tracking query/load/compaction/e.g. execution thread memory usage. +// This series of methods saves some information to the thread local context of the current worker thread, +// including MemTracker, QueryID, etc. Use CONSUME_THREAD_MEM_TRACKER/RELEASE_THREAD_MEM_TRACKER in the code segment where +// the macro is located to record the memory into MemTracker. +// Not use it in rpc done.run(), because bthread_setspecific may have errors when UBSAN compiles. +#if defined(USE_MEM_TRACKER) && !defined(UNDEFINED_BEHAVIOR_SANITIZER) && !defined(BE_TEST) // Attach to query/load/compaction/e.g. when thread starts. // This will save some info about a working thread in the thread context. -// And count the memory during thread execution (is actually also the code segment that executes the function) -// to specify MemTrackerLimiter, and expect to handle when the memory exceeds the limit, for example cancel query. -// Usage is similar to SCOPED_CONSUME_MEM_TRACKER. +// Looking forward to tracking memory during thread execution into MemTrackerLimiter. #define SCOPED_ATTACH_TASK(arg1) auto VARNAME_LINENUM(attach_task) = AttachTask(arg1) - #define SCOPED_ATTACH_TASK_WITH_ID(arg1, arg2, arg3) \ auto VARNAME_LINENUM(attach_task) = AttachTask(arg1, arg2, arg3) // Switch MemTrackerLimiter for count memory during thread execution. -// Usually used after SCOPED_ATTACH_TASK, in order to count the memory of the specified code segment into another +// Used after SCOPED_ATTACH_TASK, in order to count the memory into another // MemTrackerLimiter instead of the MemTrackerLimiter added by the attach task. -// Note that, not use it in rpc done.run(), because bthread_setspecific may have errors when UBSAN compiles. #define SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(mem_tracker_limiter) \ auto VARNAME_LINENUM(switch_mem_tracker) = SwitchThreadMemTrackerLimiter(mem_tracker_limiter) -// Usually used to exclude a part of memory in query or load mem tracker and track it to Orphan Mem Tracker. -// Note that, not check whether it is currently a Bthread and switch Bthread Local, because this is usually meaningless, -// if used in Bthread, and pthread switching is expected, use SwitchThreadMemTrackerLimiter. -#define SCOPED_TRACK_MEMORY_TO_UNKNOWN() \ - auto VARNAME_LINENUM(track_memory_to_unknown) = TrackMemoryToUnknown() +// Looking forward to tracking memory during thread execution into MemTracker. +// Usually used to record query more detailed memory, including ExecNode operators. +#define SCOPED_CONSUME_MEM_TRACKER(mem_tracker) \ + auto VARNAME_LINENUM(add_mem_consumer) = doris::AddThreadMemTrackerConsumer(mem_tracker) #else #define SCOPED_ATTACH_TASK(arg1, ...) (void)0 #define SCOPED_ATTACH_TASK_WITH_ID(arg1, arg2, arg3) (void)0 #define SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(mem_tracker_limiter) (void)0 -#define SCOPED_TRACK_MEMORY_TO_UNKNOWN() (void)0 +#define SCOPED_CONSUME_MEM_TRACKER(mem_tracker) (void)0 #endif -#define SKIP_MEMORY_CHECK(...) \ - do { \ - doris::thread_context()->skip_memory_check++; \ - DEFER({ doris::thread_context()->skip_memory_check--; }); \ - __VA_ARGS__; \ +// Used to tracking the memory usage of the specified code segment use by mem hook. +#if defined(USE_MEM_TRACKER) +// Count a code segment memory (memory malloc - memory free) to int64_t +// Usage example: int64_t scope_mem = 0; { SCOPED_MEM_COUNT(&scope_mem); xxx; xxx; } +#define SCOPED_MEM_COUNT_BY_HOOK(scope_mem) \ + auto VARNAME_LINENUM(scope_mem_count) = doris::ScopeMemCountByHook(scope_mem) + +// Count a code segment memory (memory malloc - memory free) to MemTracker. +// Compared to count `scope_mem`, MemTracker is easier to observe from the outside and is thread-safe. +// Usage example: std::unique_ptr tracker = std::make_unique("first_tracker"); +// { SCOPED_CONSUME_MEM_TRACKER_BY_HOOK(_mem_tracker.get()); xxx; xxx; } +#define SCOPED_CONSUME_MEM_TRACKER_BY_HOOK(mem_tracker) \ + auto VARNAME_LINENUM(add_mem_consumer) = doris::AddThreadMemTrackerConsumerByHook(mem_tracker) +#else +#define SCOPED_MEM_COUNT_BY_HOOK(scope_mem) (void)0 +#define SCOPED_CONSUME_MEM_TRACKER_BY_HOOK(mem_tracker) (void)0 +#endif + +#define SKIP_MEMORY_CHECK(...) \ + do { \ + doris::ThreadLocalHandle::create_thread_local_if_not_exits(); \ + doris::thread_context()->skip_memory_check++; \ + DEFER({ \ + doris::thread_context()->skip_memory_check--; \ + doris::ThreadLocalHandle::del_thread_local_if_count_is_zero(); \ + }); \ + __VA_ARGS__; \ + } while (0) + +#define SKIP_LARGE_MEMORY_CHECK(...) \ + do { \ + doris::ThreadLocalHandle::create_thread_local_if_not_exits(); \ + doris::thread_context()->skip_large_memory_check++; \ + DEFER({ \ + doris::thread_context()->skip_large_memory_check--; \ + doris::ThreadLocalHandle::del_thread_local_if_count_is_zero(); \ + }); \ + __VA_ARGS__; \ } while (0) namespace doris { @@ -101,63 +113,26 @@ class RuntimeState; extern bthread_key_t btls_key; -// Using gcc11 compiles thread_local variable on lower versions of GLIBC will report an error, -// see https://github.com/apache/doris/pull/7911 -// -// If we want to avoid this error, -// 1. For non-trivial variables in thread_local, such as std::string, you need to store them as pointers to -// ensure that thread_local is trivial, these non-trivial pointers will uniformly call destructors elsewhere. -// 2. The default destructor of the thread_local variable cannot be overridden. -// -// This is difficult to implement. Because the destructor is not overwritten, it means that the outside cannot -// be notified when the thread terminates, and the non-trivial pointers in thread_local cannot be released in time. -// The func provided by pthread and std::thread doesn't help either. -// -// So, kudu Class-scoped static thread local implementation was introduced. Solve the above problem by -// Thread-scoped thread local + Class-scoped thread local. -// -// This may look very trick, but it's the best way I can find. -// -// refer to: -// https://gcc.gnu.org/onlinedocs/gcc-3.3.1/gcc/Thread-Local.html -// https://stackoverflow.com/questions/12049684/ -// https://sourceware.org/glibc/wiki/Destructor%20support%20for%20thread_local%20variables -// https://www.jianshu.com/p/756240e837dd -// https://man7.org/linux/man-pages/man3/pthread_tryjoin_np.3.html -class ThreadContextPtr { -public: - ThreadContextPtr(); - // Cannot add destructor `~ThreadContextPtr`, otherwise it will no longer be of type POD, the reason is as above. - - // TCMalloc hook is triggered during ThreadContext construction, which may lead to deadlock. - bool init = false; - - DECLARE_STATIC_THREAD_LOCAL(ThreadContext, _ptr); -}; - -inline thread_local ThreadContextPtr thread_context_ptr; - -// To avoid performance problems caused by frequently calling `bthread_getspecific` to obtain bthread TLS -// in tcmalloc hook, cache the key and value of bthread TLS in pthread TLS. -inline thread_local ThreadContext* bthread_context; -inline thread_local bthread_t bthread_id; +// Is true after ThreadContext construction. +inline thread_local bool pthread_context_ptr_init = false; +inline thread_local constinit ThreadContext* thread_context_ptr; +// use mem hook to consume thread mem tracker. +inline thread_local bool use_mem_hook = false; // The thread context saves some info about a working thread. // 2 required info: // 1. thread_id: Current thread id, Auto generated. -// 2. type: The type is a enum value indicating which type of task current thread is running. +// 2. type(abolished): The type is a enum value indicating which type of task current thread is running. // For example: QUERY, LOAD, COMPACTION, ... // 3. task id: A unique id to identify this task. maybe query id, load job id, etc. +// 4. ThreadMemTrackerMgr // // There may be other optional info to be added later. class ThreadContext { public: - ThreadContext() { - thread_mem_tracker_mgr.reset(new ThreadMemTrackerMgr()); - if (ExecEnv::ready()) thread_mem_tracker_mgr->init(); - } + ThreadContext() { thread_mem_tracker_mgr = std::make_unique(); } - ~ThreadContext() { thread_context_ptr.init = false; } + ~ThreadContext() = default; void attach_task(const TUniqueId& task_id, const TUniqueId& fragment_instance_id, const std::shared_ptr& mem_tracker) { @@ -179,118 +154,140 @@ public: thread_mem_tracker_mgr->detach_limiter_tracker(); } - const TUniqueId& task_id() const { return _task_id; } - const TUniqueId& fragment_instance_id() const { return _fragment_instance_id; } + [[nodiscard]] const TUniqueId& task_id() const { return _task_id; } + [[nodiscard]] const TUniqueId& fragment_instance_id() const { return _fragment_instance_id; } - std::string get_thread_id() { + static std::string get_thread_id() { std::stringstream ss; ss << std::this_thread::get_id(); return ss.str(); } - // After thread_mem_tracker_mgr is initialized, the current thread TCMalloc Hook starts to + // After thread_mem_tracker_mgr is initialized, the current thread Hook starts to // consume/release mem_tracker. // Note that the use of shared_ptr will cause a crash. The guess is that there is an // intermediate state during the copy construction of shared_ptr. Shared_ptr is not equal // to nullptr, but the object it points to is not initialized. At this time, when the memory - // is released somewhere, the TCMalloc hook is triggered to cause the crash. + // is released somewhere, the hook is triggered to cause the crash. std::unique_ptr thread_mem_tracker_mgr; - MemTrackerLimiter* thread_mem_tracker() { + [[nodiscard]] MemTrackerLimiter* thread_mem_tracker() const { return thread_mem_tracker_mgr->limiter_mem_tracker_raw(); } void consume_memory(const int64_t size) const { - thread_mem_tracker_mgr->consume(size, large_memory_check); + thread_mem_tracker_mgr->consume(size, skip_large_memory_check); } - int switch_bthread_local_count = 0; + int thread_local_handle_count = 0; int skip_memory_check = 0; - bool large_memory_check = true; + int skip_large_memory_check = 0; private: TUniqueId _task_id; TUniqueId _fragment_instance_id; }; -// Switch thread context from pthread local to bthread local context. -// Cache the pointer of bthread local in pthead local, -// Avoid calling bthread_getspecific frequently to get bthread local, which has performance problems. -class SwitchBthreadLocal { +class ThreadLocalHandle { public: - static void switch_to_bthread_local() { - if (bthread_self() != 0) { - // Very frequent bthread_getspecific will slow, but switch_to_bthread_local is not expected to be much. - bthread_context = static_cast(bthread_getspecific(btls_key)); + static void create_thread_local_if_not_exits() { + if (bthread_self() == 0) { + if (!pthread_context_ptr_init) { + thread_context_ptr = new ThreadContext(); + pthread_context_ptr_init = true; + } + DCHECK(thread_context_ptr != nullptr); + thread_context_ptr->thread_local_handle_count++; + } else { + // Avoid calling bthread_getspecific frequently to get bthread local. + // Very frequent bthread_getspecific will slow, but create_thread_local_if_not_exits is not expected to be much. + // Cache the pointer of bthread local in pthead local. + auto* bthread_context = static_cast(bthread_getspecific(btls_key)); if (bthread_context == nullptr) { - // A new bthread starts, two scenarios: + // If bthread_context == nullptr: // 1. First call to bthread_getspecific (and before any bthread_setspecific) returns NULL // 2. There are not enough reusable btls in btls pool. - // else, two scenarios: + // else if bthread_context != nullptr: // 1. A new bthread starts, but get a reuses btls. - // 2. A pthread switch occurs. Because the pthread switch cannot be accurately identified at the moment. - // So tracker call reset 0 like reuses btls. - // during this period, stop the use of thread_context. - thread_context_ptr.init = false; bthread_context = new ThreadContext; // The brpc server should respond as quickly as possible. bthread_context->thread_mem_tracker_mgr->disable_wait_gc(); // set the data so that next time bthread_getspecific in the thread returns the data. CHECK(0 == bthread_setspecific(btls_key, bthread_context) || k_doris_exit); - thread_context_ptr.init = true; } - bthread_id = bthread_self(); - bthread_context->switch_bthread_local_count++; + DCHECK(bthread_context != nullptr); + bthread_context->thread_local_handle_count++; } } - // `switch_to_bthread_local` and `switch_back_pthread_local` should be used in pairs, - // `switch_to_bthread_local` should only be called if `switch_to_bthread_local` returns true - static void switch_back_pthread_local() { - if (bthread_self() != 0) { - if (!bthread_equal(bthread_self(), bthread_id)) { - bthread_id = bthread_self(); - bthread_context = static_cast(bthread_getspecific(btls_key)); - DCHECK(bthread_context != nullptr); - } - bthread_context->switch_bthread_local_count--; - if (bthread_context->switch_bthread_local_count == 0) { - bthread_context = thread_context_ptr._ptr; + // `create_thread_local_if_not_exits` and `del_thread_local_if_count_is_zero` should be used in pairs, + // `del_thread_local_if_count_is_zero` should only be called if `create_thread_local_if_not_exits` returns true + static void del_thread_local_if_count_is_zero() { + if (pthread_context_ptr_init) { + // in pthread + thread_context_ptr->thread_local_handle_count--; + if (thread_context_ptr->thread_local_handle_count == 0) { + pthread_context_ptr_init = false; + delete doris::thread_context_ptr; + thread_context_ptr = nullptr; } + } else if (bthread_self() != 0) { + // in bthread + auto* bthread_context = static_cast(bthread_getspecific(btls_key)); + DCHECK(bthread_context != nullptr); + bthread_context->thread_local_handle_count--; + } else { + LOG(FATAL) << "__builtin_unreachable"; + __builtin_unreachable(); } } }; -// Note: All use of thread_context() in bthread requires the use of SwitchBthreadLocal. -static ThreadContext* thread_context() { +[[maybe_unused]] static bool is_thread_context_init() { + if (pthread_context_ptr_init) { + // in pthread + DCHECK(bthread_self() == 0); + DCHECK(thread_context_ptr != nullptr); + return true; + } if (bthread_self() != 0) { // in bthread - if (!bthread_equal(bthread_self(), bthread_id)) { - // bthread switching pthread may be very frequent, remember not to use lock or other time-consuming operations. - bthread_id = bthread_self(); - bthread_context = static_cast(bthread_getspecific(btls_key)); - // if nullptr, a new bthread task start and no reusable bthread local, - // or bthread switch pthread but not call switch_to_bthread_local, use pthread local context - // else, bthread switch pthread and called switch_to_bthread_local, use bthread local context. - if (bthread_context == nullptr) { - bthread_context = thread_context_ptr._ptr; - } - } - return bthread_context; - } else { - // in pthread - return thread_context_ptr._ptr; + return static_cast(bthread_getspecific(btls_key)) != nullptr; } + return false; } -class ScopeMemCount { +// must call create_thread_local_if_not_exits() before use thread_context(). +static ThreadContext* thread_context() { + if (pthread_context_ptr_init) { + // in pthread + DCHECK(bthread_self() == 0); + DCHECK(thread_context_ptr != nullptr); + return thread_context_ptr; + } + if (bthread_self() != 0) { + // in bthread + // bthread switching pthread may be very frequent, remember not to use lock or other time-consuming operations. + auto* bthread_context = static_cast(bthread_getspecific(btls_key)); + DCHECK(bthread_context != nullptr); + return bthread_context; + } + LOG(FATAL) << "__builtin_unreachable"; + __builtin_unreachable(); +} + +class ScopeMemCountByHook { public: - explicit ScopeMemCount(int64_t* scope_mem) { + explicit ScopeMemCountByHook(int64_t* scope_mem) { + ThreadLocalHandle::create_thread_local_if_not_exits(); + use_mem_hook = true; _scope_mem = scope_mem; thread_context()->thread_mem_tracker_mgr->start_count_scope_mem(); } - ~ScopeMemCount() { + ~ScopeMemCountByHook() { *_scope_mem += thread_context()->thread_mem_tracker_mgr->stop_count_scope_mem(); + use_mem_hook = false; + ThreadLocalHandle::del_thread_local_if_count_is_zero(); } private: @@ -311,44 +308,20 @@ public: class SwitchThreadMemTrackerLimiter { public: explicit SwitchThreadMemTrackerLimiter(const std::shared_ptr& mem_tracker) { - SwitchBthreadLocal::switch_to_bthread_local(); + ThreadLocalHandle::create_thread_local_if_not_exits(); _old_mem_tracker = thread_context()->thread_mem_tracker_mgr->limiter_mem_tracker(); thread_context()->thread_mem_tracker_mgr->attach_limiter_tracker(mem_tracker, TUniqueId()); } ~SwitchThreadMemTrackerLimiter() { thread_context()->thread_mem_tracker_mgr->detach_limiter_tracker(_old_mem_tracker); - SwitchBthreadLocal::switch_back_pthread_local(); + ThreadLocalHandle::del_thread_local_if_count_is_zero(); } private: std::shared_ptr _old_mem_tracker; }; -class TrackMemoryToUnknown { -public: - explicit TrackMemoryToUnknown() { - if (bthread_self() != 0) { - _tid = std::this_thread::get_id(); // save pthread id - } - _old_mem_tracker = thread_context()->thread_mem_tracker_mgr->limiter_mem_tracker(); - thread_context()->thread_mem_tracker_mgr->attach_limiter_tracker( - ExecEnv::GetInstance()->orphan_mem_tracker(), TUniqueId()); - } - - ~TrackMemoryToUnknown() { - if (bthread_self() != 0) { - // make sure pthread is not switch, if switch, mem tracker will be wrong, but not crash in release - DCHECK(_tid == std::this_thread::get_id()); - } - thread_context()->thread_mem_tracker_mgr->detach_limiter_tracker(_old_mem_tracker); - } - -private: - std::shared_ptr _old_mem_tracker; - std::thread::id _tid; -}; - class AddThreadMemTrackerConsumer { public: // The owner and user of MemTracker are in the same thread, and the raw pointer is faster. @@ -365,48 +338,94 @@ private: bool _need_pop = false; }; +class AddThreadMemTrackerConsumerByHook { +public: + explicit AddThreadMemTrackerConsumerByHook(const std::shared_ptr& mem_tracker); + ~AddThreadMemTrackerConsumerByHook(); + +private: + std::shared_ptr _mem_tracker = nullptr; +}; + // Basic macros for mem tracker, usually do not need to be modified and used. -#ifdef USE_MEM_TRACKER -// For the memory that cannot be counted by mem hook, manually count it into the mem tracker, such as mmap. -#define CONSUME_THREAD_MEM_TRACKER(size) doris::thread_context()->consume_memory(size) -#define RELEASE_THREAD_MEM_TRACKER(size) doris::thread_context()->consume_memory(-size) - +#if defined(USE_MEM_TRACKER) && !defined(BE_TEST) // used to fix the tracking accuracy of caches. -#define THREAD_MEM_TRACKER_TRANSFER_TO(size, tracker) \ - doris::thread_context()->thread_mem_tracker_mgr->limiter_mem_tracker_raw()->transfer_to( \ - size, tracker) -#define THREAD_MEM_TRACKER_TRANSFER_FROM(size, tracker) \ - tracker->transfer_to( \ - size, doris::thread_context()->thread_mem_tracker_mgr->limiter_mem_tracker_raw()) +#define THREAD_MEM_TRACKER_TRANSFER_TO(size, tracker) \ + do { \ + if (is_thread_context_init()) { \ + doris::thread_context() \ + ->thread_mem_tracker_mgr->limiter_mem_tracker_raw() \ + ->transfer_to(size, tracker); \ + } else { \ + doris::ExecEnv::GetInstance()->orphan_mem_tracker_raw()->transfer_to(size, tracker); \ + } \ + } while (0) -// Mem Hook to consume thread mem tracker -// TODO: In the original design, the MemTracker consume method is called before the memory is allocated. -// If the consume succeeds, the memory is actually allocated, otherwise an exception is thrown. -// But the statistics of memory through TCMalloc new/delete Hook are after the memory is actually allocated, -// which is different from the previous behavior. -#define CONSUME_MEM_TRACKER(size) \ +#define THREAD_MEM_TRACKER_TRANSFER_FROM(size, tracker) \ + do { \ + if (is_thread_context_init()) { \ + tracker->transfer_to( \ + size, \ + doris::thread_context()->thread_mem_tracker_mgr->limiter_mem_tracker_raw()); \ + } else { \ + tracker->transfer_to(size, doris::ExecEnv::GetInstance()->orphan_mem_tracker_raw()); \ + } \ + } while (0) + +// Mem Hook to consume thread mem tracker, not use in brpc thread. +#define CONSUME_THREAD_MEM_TRACKER_BY_HOOK(size) \ + do { \ + if (doris::use_mem_hook) { \ + DCHECK(doris::pthread_context_ptr_init); \ + DCHECK(bthread_self() == 0); \ + doris::thread_context_ptr->consume_memory(size); \ + } \ + } while (0) +#define RELEASE_THREAD_MEM_TRACKER_BY_HOOK(size) CONSUME_THREAD_MEM_TRACKER_BY_HOOK(-size) +#define CONSUME_THREAD_MEM_TRACKER_BY_HOOK_WITH_FN(size_fn, ...) \ + do { \ + if (doris::use_mem_hook) { \ + DCHECK(doris::pthread_context_ptr_init); \ + DCHECK(bthread_self() == 0); \ + doris::thread_context_ptr->consume_memory(size_fn(__VA_ARGS__)); \ + } \ + } while (0) +#define RELEASE_THREAD_MEM_TRACKER_BY_HOOK_WITH_FN(size_fn, ...) \ + do { \ + if (doris::use_mem_hook) { \ + DCHECK(doris::pthread_context_ptr_init); \ + DCHECK(bthread_self() == 0); \ + doris::thread_context_ptr->consume_memory(-size_fn(__VA_ARGS__)); \ + } \ + } while (0) + +// if use mem hook, avoid repeated consume. +// must call create_thread_local_if_not_exits() before use thread_context(). +#define CONSUME_THREAD_MEM_TRACKER(size) \ do { \ - if (doris::thread_context_ptr.init) { \ - doris::thread_context()->consume_memory(size); \ + if (doris::use_mem_hook) { \ + break; \ + } \ + if (doris::pthread_context_ptr_init) { \ + DCHECK(bthread_self() == 0); \ + doris::thread_context_ptr->consume_memory(size); \ + } else if (bthread_self() != 0) { \ + static_cast(bthread_getspecific(doris::btls_key)) \ + ->consume_memory(size); \ } else if (doris::ExecEnv::ready()) { \ doris::ExecEnv::GetInstance()->orphan_mem_tracker_raw()->consume_no_update_peak(size); \ } \ } while (0) -#define RELEASE_MEM_TRACKER(size) \ - do { \ - if (doris::thread_context_ptr.init) { \ - doris::thread_context()->consume_memory(-size); \ - } else if (doris::ExecEnv::ready()) { \ - doris::ExecEnv::GetInstance()->orphan_mem_tracker_raw()->consume_no_update_peak( \ - -size); \ - } \ - } while (0) +#define RELEASE_THREAD_MEM_TRACKER(size) CONSUME_THREAD_MEM_TRACKER(-size) + #else -#define CONSUME_THREAD_MEM_TRACKER(size) (void)0 -#define RELEASE_THREAD_MEM_TRACKER(size) (void)0 #define THREAD_MEM_TRACKER_TRANSFER_TO(size, tracker) (void)0 #define THREAD_MEM_TRACKER_TRANSFER_FROM(size, tracker) (void)0 -#define CONSUME_MEM_TRACKER(size) (void)0 -#define RELEASE_MEM_TRACKER(size) (void)0 +#define CONSUME_THREAD_MEM_TRACKER_BY_HOOK(size) (void)0 +#define RELEASE_THREAD_MEM_TRACKER_BY_HOOK(size) (void)0 +#define CONSUME_THREAD_MEM_TRACKER_BY_HOOK_WITH_FN(size) (void)0 +#define RELEASE_THREAD_MEM_TRACKER_BY_HOOK_WITH_FN(size) (void)0 +#define CONSUME_THREAD_MEM_TRACKER(size) (void)0 +#define RELEASE_THREAD_MEM_TRACKER(size) (void)0 #endif } // namespace doris diff --git a/be/src/runtime/threadlocal.cc b/be/src/runtime/threadlocal.cc deleted file mode 100644 index 69b5760773..0000000000 --- a/be/src/runtime/threadlocal.cc +++ /dev/null @@ -1,84 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. -#include "runtime/threadlocal.h" - -#include - -#include -#include -#include -#include - -#include "common/logging.h" -#include "util/errno.h" - -namespace doris { - -// One key used by the entire process to attach destructors on thread exit. -static pthread_key_t destructors_key; - -// The above key must only be initialized once per process. -static std::once_flag once; - -namespace { - -// List of destructors for all thread locals instantiated on a given thread. -struct PerThreadDestructorList { - void (*destructor)(void*); - void* arg; - PerThreadDestructorList* next; -}; - -} // anonymous namespace - -// Call all the destructors associated with all THREAD_LOCAL instances in this -// thread. -static void invoke_destructors(void* t) { - PerThreadDestructorList* d = reinterpret_cast(t); - while (d != nullptr) { - d->destructor(d->arg); - PerThreadDestructorList* next = d->next; - delete d; - d = next; - } -} - -// This key must be initialized only once. -static void create_key() { - int ret = pthread_key_create(&destructors_key, &invoke_destructors); - // Linux supports up to 1024 keys, we will use only one for all thread locals. - CHECK_EQ(0, ret) << "pthread_key_create() failed, cannot add destructor to thread: " - << "error " << ret << ": " << errno_to_string(ret); -} - -// Adds a destructor to the list. -void add_destructor(void (*destructor)(void*), void* arg) { - std::call_once(once, create_key); - - // Returns NULL if nothing is set yet. - std::unique_ptr p(new PerThreadDestructorList()); - p->destructor = destructor; - p->arg = arg; - p->next = reinterpret_cast(pthread_getspecific(destructors_key)); - int ret = pthread_setspecific(destructors_key, p.release()); - // The only time this check should fail is if we are out of memory, or if - // somehow key creation failed, which should be caught by the above CHECK. - CHECK_EQ(0, ret) << "pthread_setspecific() failed, cannot update destructor list: " - << "error " << ret << ": " << errno_to_string(ret); -} - -} // namespace doris diff --git a/be/src/runtime/threadlocal.h b/be/src/runtime/threadlocal.h deleted file mode 100644 index 3842b2fcad..0000000000 --- a/be/src/runtime/threadlocal.h +++ /dev/null @@ -1,129 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -// Reference from kudu, Solve the problem of gcc11 compiling -// non-trivial thread_local variables on lower versions of GLIBC. -// see https://github.com/apache/doris/pull/7911 -// -// Block-scoped static thread local implementation. -// -// Usage is similar to a C++11 thread_local. The BLOCK_STATIC_THREAD_LOCAL macro -// defines a thread-local pointer to the specified type, which is lazily -// instantiated by any thread entering the block for the first time. The -// constructor for the type T is invoked at macro execution time, as expected, -// and its destructor is invoked when the corresponding thread's Runnable -// returns, or when the thread exits. -// -// Inspired by Poco , -// Andrew Tomazos , and -// the C++11 thread_local API. -// -// Example usage: -// -// // Invokes a 3-arg constructor on SomeClass: -// BLOCK_STATIC_THREAD_LOCAL(SomeClass, instance, arg1, arg2, arg3); -// instance->DoSomething(); -// - -#pragma once - -#include - -#include "gutil/port.h" - -#define BLOCK_STATIC_THREAD_LOCAL(T, t, ...) \ - static __thread T* t; \ - do { \ - if (PREDICT_FALSE(t == NULL)) { \ - t = new T(__VA_ARGS__); \ - add_destructor(destroy, t); \ - } \ - } while (false) - -// Class-scoped static thread local implementation. -// -// Very similar in implementation to the above block-scoped version, but -// requires a bit more syntax and vigilance to use properly. -// -// DECLARE_STATIC_THREAD_LOCAL(Type, instance_var_) must be placed in the -// class header, as usual for variable declarations. -// -// Because these variables are static, they must also be defined in the impl -// file with DEFINE_STATIC_THREAD_LOCAL(Type, Classname, instance_var_), -// which is very much like defining any static member, i.e. int Foo::member_. -// -// Finally, each thread must initialize the instance before using it by calling -// INIT_STATIC_THREAD_LOCAL(Type, instance_var_, ...). This is a cheap -// call, and may be invoked at the top of any method which may reference a -// thread-local variable. -// -// Due to all of these requirements, you should probably declare TLS members -// as private. -// -// Example usage: -// -// // foo.h -// #include "kudu/utils/file.h" -// class Foo { -// public: -// void DoSomething(std::string s); -// private: -// DECLARE_STATIC_THREAD_LOCAL(utils::File, file_); -// }; -// -// // foo.cc -// #include "kudu/foo.h" -// DEFINE_STATIC_THREAD_LOCAL(utils::File, Foo, file_); -// void Foo::WriteToFile(std::string s) { -// // Call constructor if necessary. -// INIT_STATIC_THREAD_LOCAL(utils::File, file_, "/tmp/file_location.txt"); -// file_->Write(s); -// } - -// Goes in the class declaration (usually in a header file). -// dtor must be destructed _after_ t, so it gets defined first. -// Uses a mangled variable name for dtor since it must also be a member of the -// class. -#define DECLARE_STATIC_THREAD_LOCAL(T, t) static __thread T* t - -// You must also define the instance in the .cc file. -#define DEFINE_STATIC_THREAD_LOCAL(T, Class, t) __thread T* Class::t - -// Must be invoked at least once by each thread that will access t. -#define INIT_STATIC_THREAD_LOCAL(T, t, ...) \ - do { \ - if (PREDICT_FALSE(t == NULL)) { \ - t = new T(__VA_ARGS__); \ - add_destructor(destroy, t); \ - } \ - } while (false) - -// Internal implementation below. - -namespace doris { - -// Add a destructor to the list. -void add_destructor(void (*destructor)(void*), void* arg); - -// Destroy the passed object of type T. -template -void destroy(void* t) { - // With tcmalloc, this should be pretty cheap (same thread as new). - delete reinterpret_cast(t); -} - -} // namespace doris diff --git a/be/src/service/doris_main.cpp b/be/src/service/doris_main.cpp index e1fc8f63b3..7da5c74683 100644 --- a/be/src/service/doris_main.cpp +++ b/be/src/service/doris_main.cpp @@ -479,6 +479,8 @@ int main(int argc, char** argv) { exit(-1); } + doris::ThreadLocalHandle::create_thread_local_if_not_exits(); + // init exec env auto exec_env(doris::ExecEnv::GetInstance()); status = doris::ExecEnv::init(doris::ExecEnv::GetInstance(), paths, broken_paths); @@ -580,6 +582,7 @@ int main(int argc, char** argv) { brpc_service.reset(nullptr); LOG(INFO) << "Brpc service stopped"; exec_env->destroy(); + doris::ThreadLocalHandle::del_thread_local_if_count_is_zero(); LOG(INFO) << "Doris main exited."; return 0; } diff --git a/be/src/service/point_query_executor.cpp b/be/src/service/point_query_executor.cpp index 9b5790bf45..580e0b66df 100644 --- a/be/src/service/point_query_executor.cpp +++ b/be/src/service/point_query_executor.cpp @@ -51,7 +51,7 @@ Reusable::~Reusable() {} constexpr static int s_preallocted_blocks_num = 64; Status Reusable::init(const TDescriptorTable& t_desc_tbl, const std::vector& output_exprs, size_t block_size) { - SCOPED_MEM_COUNT(&_mem_size); + SCOPED_MEM_COUNT_BY_HOOK(&_mem_size); _runtime_state = RuntimeState::create_unique(); RETURN_IF_ERROR(DescriptorTbl::create(_runtime_state->obj_pool(), t_desc_tbl, &_desc_tbl)); _runtime_state->set_desc_tbl(_desc_tbl); diff --git a/be/src/util/perf_counters.cpp b/be/src/util/perf_counters.cpp index 74cae81783..67a9c013e1 100644 --- a/be/src/util/perf_counters.cpp +++ b/be/src/util/perf_counters.cpp @@ -48,6 +48,9 @@ static std::unordered_map _process_state; int64_t PerfCounters::_vm_rss = 0; std::string PerfCounters::_vm_rss_str = ""; +int64_t PerfCounters::_vm_hwm = 0; +int64_t PerfCounters::_vm_size = 0; +int64_t PerfCounters::_vm_peak = 0; // This is the order of the counters in /proc/self/io enum PERF_IO_IDX { @@ -579,14 +582,18 @@ void PerfCounters::refresh_proc_status() { if (statusinfo.is_open()) statusinfo.close(); + _vm_size = parse_bytes("status/VmSize"); + _vm_peak = parse_bytes("status/VmPeak"); _vm_rss = parse_bytes("status/VmRSS"); _vm_rss_str = PrettyPrinter::print(_vm_rss, TUnit::BYTES); + _vm_hwm = parse_bytes("status/VmHWM"); } void PerfCounters::get_proc_status(ProcStatus* out) { out->vm_size = parse_bytes("status/VmSize"); out->vm_peak = parse_bytes("status/VmPeak"); out->vm_rss = parse_bytes("status/VmRSS"); + out->vm_hwm = parse_bytes("status/VmHWM"); } } // namespace doris diff --git a/be/src/util/perf_counters.h b/be/src/util/perf_counters.h index 9f0936193d..52fe46acb6 100644 --- a/be/src/util/perf_counters.h +++ b/be/src/util/perf_counters.h @@ -104,6 +104,7 @@ public: int64_t vm_size = 0; int64_t vm_peak = 0; int64_t vm_rss = 0; + int64_t vm_hwm = 0; }; static int parse_int(const std::string& state_key); @@ -118,6 +119,9 @@ public: // Return the process actual physical memory in bytes. static inline int64_t get_vm_rss() { return _vm_rss; } static inline std::string get_vm_rss_str() { return _vm_rss_str; } + static inline int64_t get_vm_hwm() { return _vm_hwm; } + static inline int64_t get_vm_size() { return _vm_size; } + static inline int64_t get_vm_peak() { return _vm_peak; } private: // Copy constructor and assignment not allowed @@ -164,6 +168,9 @@ private: static int64_t _vm_rss; static std::string _vm_rss_str; + static int64_t _vm_hwm; + static int64_t _vm_size; + static int64_t _vm_peak; }; } // namespace doris diff --git a/be/src/util/thread.cpp b/be/src/util/thread.cpp index cf7bd63a87..e6d5d10e73 100644 --- a/be/src/util/thread.cpp +++ b/be/src/util/thread.cpp @@ -59,6 +59,7 @@ #include "gutil/stringprintf.h" #include "gutil/strings/substitute.h" #include "http/web_page_handler.h" +#include "runtime/thread_context.h" #include "util/debug/sanitizer_scopes.h" #include "util/easy_json.h" #include "util/os_util.h" @@ -479,6 +480,9 @@ void* Thread::supervise_thread(void* arg) { // already incremented the reference count in StartThread. Thread::_tls = t; + // Create thread context, there is no need to create it when func is executed. + ThreadLocalHandle::create_thread_local_if_not_exits(); + // Publish our tid to '_tid', which unblocks any callers waiting in // WaitForTid(). Release_Store(&t->_tid, system_tid); @@ -514,6 +518,8 @@ void Thread::finish_thread(void* arg) { // NOTE: the above 'Release' call could be the last reference to 'this', // so 'this' could be destructed at this point. Do not add any code // following here! + + ThreadLocalHandle::del_thread_local_if_count_is_zero(); } void Thread::init_threadmgr() { diff --git a/be/src/vec/common/allocator.cpp b/be/src/vec/common/allocator.cpp index b4025807be..3d6d2a7a11 100644 --- a/be/src/vec/common/allocator.cpp +++ b/be/src/vec/common/allocator.cpp @@ -38,33 +38,45 @@ template void Allocator::sys_memory_check(size_t size) const { - if (doris::thread_context()->skip_memory_check) return; + if (doris::is_thread_context_init() && doris::thread_context()->skip_memory_check != 0) { + return; + } if (doris::MemTrackerLimiter::sys_mem_exceed_limit_check(size)) { // Only thread attach query, and has not completely waited for thread_wait_gc_max_milliseconds, // will wait for gc, asynchronous cancel or throw bad::alloc. // Otherwise, if the external catch, directly throw bad::alloc. - auto err_msg = fmt::format( - "Allocator sys memory check failed: Cannot alloc:{}, consuming " - "tracker:<{}>, peak used {}, current used {}, exec node:<{}>, {}.", - size, doris::thread_context()->thread_mem_tracker()->label(), - doris::thread_context()->thread_mem_tracker()->peak_consumption(), - doris::thread_context()->thread_mem_tracker()->consumption(), - doris::thread_context()->thread_mem_tracker_mgr->last_consumer_tracker(), - doris::MemTrackerLimiter::process_limit_exceeded_errmsg_str()); + std::string err_msg; + if (doris::is_thread_context_init()) { + err_msg += fmt::format( + "Allocator sys memory check failed: Cannot alloc:{}, consuming " + "tracker:<{}>, peak used {}, current used {}, exec node:<{}>, {}.", + size, doris::thread_context()->thread_mem_tracker()->label(), + doris::thread_context()->thread_mem_tracker()->peak_consumption(), + doris::thread_context()->thread_mem_tracker()->consumption(), + doris::thread_context()->thread_mem_tracker_mgr->last_consumer_tracker(), + doris::MemTrackerLimiter::process_limit_exceeded_errmsg_str()); + } else { + err_msg += fmt::format( + "Allocator sys memory check failed: Cannot alloc:{}, consuming " + "tracker:<{}>, {}.", + size, "Orphan", doris::MemTrackerLimiter::process_limit_exceeded_errmsg_str()); + } + if (size > 1024l * 1024 * 1024 && !doris::enable_thread_catch_bad_alloc && !doris::config::disable_memory_gc) { // 1G err_msg += "\nAlloc Stacktrace:\n" + doris::get_stack_trace(); } // TODO, Save the query context in the thread context, instead of finding whether the query id is canceled in fragment_mgr. - if (doris::ExecEnv::GetInstance()->fragment_mgr()->query_is_canceled( + if (doris::is_thread_context_init() && + doris::ExecEnv::GetInstance()->fragment_mgr()->query_is_canceled( doris::thread_context()->task_id())) { if (doris::enable_thread_catch_bad_alloc) { throw doris::Exception(doris::ErrorCode::MEM_ALLOC_FAILED, err_msg); } return; } - if (!doris::config::disable_memory_gc && + if (doris::is_thread_context_init() && !doris::config::disable_memory_gc && doris::thread_context()->thread_mem_tracker_mgr->is_attach_query() && doris::thread_context()->thread_mem_tracker_mgr->wait_gc()) { int64_t wait_milliseconds = 0; @@ -120,7 +132,12 @@ void Allocator::sys_memory_check(size_t template void Allocator::memory_tracker_check(size_t size) const { - if (doris::thread_context()->skip_memory_check) return; + if (doris::is_thread_context_init() && doris::thread_context()->skip_memory_check != 0) { + return; + } + if (!doris::is_thread_context_init()) { + return; + } auto st = doris::thread_context()->thread_mem_tracker()->check_limit(size); if (!st) { auto err_msg = fmt::format("Allocator mem tracker check failed, {}", st.to_string()); diff --git a/be/src/vec/common/allocator.h b/be/src/vec/common/allocator.h index fedfc04f53..8162c94fca 100644 --- a/be/src/vec/common/allocator.h +++ b/be/src/vec/common/allocator.h @@ -97,6 +97,7 @@ public: /// Allocate memory range. void* alloc_impl(size_t size, size_t alignment = 0) { memory_check(size); + consume_memory(size); void* buf; if (use_mmap && size >= doris::config::mmap_threshold) { @@ -106,7 +107,6 @@ public: "Too large alignment {}: more than page size when allocating {}.", alignment, size); - consume_memory(size); buf = mmap(nullptr, size, PROT_READ | PROT_WRITE, mmap_flags, -1, 0); if (MAP_FAILED == buf) { release_memory(size); @@ -122,6 +122,7 @@ public: buf = ::malloc(size); if (nullptr == buf) { + release_memory(size); throw_bad_alloc(fmt::format("Allocator: Cannot malloc {}.", size)); } } else { @@ -129,6 +130,7 @@ public: int res = posix_memalign(&buf, alignment, size); if (0 != res) { + release_memory(size); throw_bad_alloc( fmt::format("Cannot allocate memory (posix_memalign) {}.", size)); } @@ -145,12 +147,11 @@ public: DCHECK(size != -1); if (0 != munmap(buf, size)) { throw_bad_alloc(fmt::format("Allocator: Cannot munmap {}.", size)); - } else { - release_memory(size); } } else { ::free(buf); } + release_memory(size); } /** Enlarge memory range. @@ -161,13 +162,18 @@ public: if (old_size == new_size) { /// nothing to do. /// BTW, it's not possible to change alignment while doing realloc. - } else if (!use_mmap || (old_size < doris::config::mmap_threshold && - new_size < doris::config::mmap_threshold && - alignment <= MALLOC_MIN_ALIGNMENT)) { - memory_check(new_size); + return buf; + } + memory_check(new_size); + consume_memory(new_size - old_size); + + if (!use_mmap || + (old_size < doris::config::mmap_threshold && new_size < doris::config::mmap_threshold && + alignment <= MALLOC_MIN_ALIGNMENT)) { /// Resize malloc'd memory region with no special alignment requirement. void* new_buf = ::realloc(buf, new_size); if (nullptr == new_buf) { + release_memory(new_size - old_size); throw_bad_alloc(fmt::format("Allocator: Cannot realloc from {} to {}.", old_size, new_size)); } @@ -178,9 +184,7 @@ public: memset(reinterpret_cast(buf) + old_size, 0, new_size - old_size); } else if (old_size >= doris::config::mmap_threshold && new_size >= doris::config::mmap_threshold) { - memory_check(new_size); /// Resize mmap'd memory region. - consume_memory(new_size - old_size); // On apple and freebsd self-implemented mremap used (common/mremap.h) buf = clickhouse_mremap(buf, old_size, new_size, MREMAP_MAYMOVE, PROT_READ | PROT_WRITE, mmap_flags, -1, 0); @@ -199,7 +203,6 @@ public: memset(reinterpret_cast(buf) + old_size, 0, new_size - old_size); } } else { - memory_check(new_size); // Big allocs that requires a copy. void* new_buf = alloc(new_size, alignment); memcpy(new_buf, buf, std::min(old_size, new_size)); diff --git a/be/src/vec/sink/vdata_stream_sender.cpp b/be/src/vec/sink/vdata_stream_sender.cpp index c25f4a9ea6..b65e361942 100644 --- a/be/src/vec/sink/vdata_stream_sender.cpp +++ b/be/src/vec/sink/vdata_stream_sender.cpp @@ -178,7 +178,6 @@ Status Channel::send_remote_block(PBlock* block, bool eos, Status exec_s _send_remote_block_callback = DummyBrpcCallback::create_shared(); } else { RETURN_IF_ERROR(_wait_last_brpc()); - SCOPED_TRACK_MEMORY_TO_UNKNOWN(); _send_remote_block_callback->cntl_->Reset(); } VLOG_ROW << "Channel::send_batch() instance_id=" << print_id(_fragment_instance_id) diff --git a/be/src/vec/sink/vtablet_sink.cpp b/be/src/vec/sink/vtablet_sink.cpp index 25443ab314..ae9c4a38e3 100644 --- a/be/src/vec/sink/vtablet_sink.cpp +++ b/be/src/vec/sink/vtablet_sink.cpp @@ -76,7 +76,6 @@ #include "runtime/descriptors.h" #include "runtime/exec_env.h" #include "runtime/runtime_state.h" -#include "runtime/thread_context.h" #include "service/backend_options.h" #include "service/brpc.h" #include "util/binary_cast.hpp" diff --git a/be/src/vec/sink/writer/vtablet_writer.cpp b/be/src/vec/sink/writer/vtablet_writer.cpp index 22b72f28cb..703959da80 100644 --- a/be/src/vec/sink/writer/vtablet_writer.cpp +++ b/be/src/vec/sink/writer/vtablet_writer.cpp @@ -274,9 +274,9 @@ VNodeChannel::VNodeChannel(VTabletWriter* parent, IndexChannel* index_channel, i _node_id(node_id), _is_incremental(is_incremental) { _cur_add_block_request = std::make_shared(); - _node_channel_tracker = std::make_shared(fmt::format( - "NodeChannel:indexID={}:threadId={}", std::to_string(_index_channel->_index_id), - thread_context()->get_thread_id())); + _node_channel_tracker = std::make_shared( + fmt::format("NodeChannel:indexID={}:threadId={}", + std::to_string(_index_channel->_index_id), ThreadContext::get_thread_id())); } VNodeChannel::~VNodeChannel() = default; diff --git a/be/src/vec/sink/writer/vtablet_writer.h b/be/src/vec/sink/writer/vtablet_writer.h index 3b8df40bee..dc68d4d61a 100644 --- a/be/src/vec/sink/writer/vtablet_writer.h +++ b/be/src/vec/sink/writer/vtablet_writer.h @@ -152,7 +152,6 @@ public: // plz follow this order: reset() -> set_in_flight() -> send brpc batch void reset() { - SCOPED_TRACK_MEMORY_TO_UNKNOWN(); ::doris::DummyBrpcCallback::cntl_->Reset(); cid = ::doris::DummyBrpcCallback::cntl_->call_id(); } diff --git a/be/test/testutil/run_all_tests.cpp b/be/test/testutil/run_all_tests.cpp index 6dd65bf2ec..c9ea1c552e 100644 --- a/be/test/testutil/run_all_tests.cpp +++ b/be/test/testutil/run_all_tests.cpp @@ -39,6 +39,7 @@ #include "util/mem_info.h" int main(int argc, char** argv) { + doris::ThreadLocalHandle::create_thread_local_if_not_exits(); doris::ExecEnv::GetInstance()->init_mem_tracker(); doris::thread_context()->thread_mem_tracker_mgr->init(); doris::ExecEnv::GetInstance()->set_cache_manager(doris::CacheManager::create_global_instance());