// Licensed to the Apache Software Foundation (ASF) under one // or more contributor license agreements. See the NOTICE file // distributed with this work for additional information // regarding copyright ownership. The ASF licenses this file // to you under the Apache License, Version 2.0 (the // "License"); you may not use this file except in compliance // with the License. You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, // software distributed under the License is distributed on an // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY // KIND, either express or implied. See the License for the // specific language governing permissions and limitations // under the License. #pragma once #include #include #include #include #include #include #include #include #include "common/exception.h" #include "common/logging.h" #include "gutil/macros.h" #include "runtime/exec_env.h" #include "runtime/memory/mem_tracker_limiter.h" #include "runtime/memory/thread_mem_tracker_mgr.h" #include "util/defer_op.h" // IWYU pragma: keep // Used to tracking query/load/compaction/e.g. execution thread memory usage. // This series of methods saves some information to the thread local context of the current worker thread, // including MemTracker, QueryID, etc. Use CONSUME_THREAD_MEM_TRACKER/RELEASE_THREAD_MEM_TRACKER in the code segment where // the macro is located to record the memory into MemTracker. // Not use it in rpc done.run(), because bthread_setspecific may have errors when UBSAN compiles. #if defined(USE_MEM_TRACKER) && !defined(BE_TEST) // Attach to query/load/compaction/e.g. when thread starts. // This will save some info about a working thread in the thread context. // Looking forward to tracking memory during thread execution into MemTrackerLimiter. #define SCOPED_ATTACH_TASK(arg1) auto VARNAME_LINENUM(attach_task) = AttachTask(arg1) #define SCOPED_ATTACH_TASK_WITH_ID(arg1, arg2) \ auto VARNAME_LINENUM(attach_task) = AttachTask(arg1, arg2) // Switch MemTrackerLimiter for count memory during thread execution. // Used after SCOPED_ATTACH_TASK, in order to count the memory into another // MemTrackerLimiter instead of the MemTrackerLimiter added by the attach task. #define SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(arg1) \ auto VARNAME_LINENUM(switch_mem_tracker) = SwitchThreadMemTrackerLimiter(arg1) // Looking forward to tracking memory during thread execution into MemTracker. // Usually used to record query more detailed memory, including ExecNode operators. #define SCOPED_CONSUME_MEM_TRACKER(mem_tracker) \ auto VARNAME_LINENUM(add_mem_consumer) = doris::AddThreadMemTrackerConsumer(mem_tracker) // Count a code segment memory (memory malloc - memory free) to int64_t // Usage example: int64_t scope_mem = 0; { SCOPED_MEM_COUNT(&scope_mem); xxx; xxx; } #define SCOPED_MEM_COUNT_BY_HOOK(scope_mem) \ auto VARNAME_LINENUM(scope_mem_count) = doris::ScopeMemCountByHook(scope_mem) // Count a code segment memory (memory malloc - memory free) to MemTracker. // Compared to count `scope_mem`, MemTracker is easier to observe from the outside and is thread-safe. // Usage example: std::unique_ptr tracker = std::make_unique("first_tracker"); // { SCOPED_CONSUME_MEM_TRACKER_BY_HOOK(_mem_tracker.get()); xxx; xxx; } #define SCOPED_CONSUME_MEM_TRACKER_BY_HOOK(mem_tracker) \ auto VARNAME_LINENUM(add_mem_consumer) = doris::AddThreadMemTrackerConsumerByHook(mem_tracker) #define DEFER_RELEASE_RESERVED() \ Defer VARNAME_LINENUM(defer) {[&]() { doris::thread_context()->release_reserved_memory(); }}; #define ORPHAN_TRACKER_CHECK() \ DCHECK(doris::k_doris_exit || !doris::config::enable_memory_orphan_check || \ doris::thread_context()->thread_mem_tracker()->label() != "Orphan") \ << doris::memory_orphan_check_msg #define MEMORY_ORPHAN_CHECK() \ DCHECK(doris::k_doris_exit || !doris::config::enable_memory_orphan_check) \ << doris::memory_orphan_check_msg; #else // thread context need to be initialized, required by Allocator and elsewhere. #define SCOPED_ATTACH_TASK(arg1, ...) \ auto VARNAME_LINENUM(scoped_tls_at) = doris::ScopedInitThreadContext() #define SCOPED_ATTACH_TASK_WITH_ID(arg1, arg2) \ auto VARNAME_LINENUM(scoped_tls_atwi) = doris::ScopedInitThreadContext() #define SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(arg1) \ auto VARNAME_LINENUM(scoped_tls_stmtl) = doris::ScopedInitThreadContext() #define SCOPED_CONSUME_MEM_TRACKER(mem_tracker) \ auto VARNAME_LINENUM(scoped_tls_cmt) = doris::ScopedInitThreadContext() #define SCOPED_MEM_COUNT_BY_HOOK(scope_mem) \ auto VARNAME_LINENUM(scoped_tls_mcbh) = doris::ScopedInitThreadContext() #define SCOPED_CONSUME_MEM_TRACKER_BY_HOOK(mem_tracker) \ auto VARNAME_LINENUM(scoped_tls_cmtbh) = doris::ScopedInitThreadContext() #define DEFER_RELEASE_RESERVED() (void)0 #define ORPHAN_TRACKER_CHECK() (void)0 #define MEMORY_ORPHAN_CHECK() (void)0 #endif #define SCOPED_SKIP_MEMORY_CHECK() \ auto VARNAME_LINENUM(scope_skip_memory_check) = doris::ScopeSkipMemoryCheck() #define SKIP_LARGE_MEMORY_CHECK(...) \ do { \ doris::ThreadLocalHandle::create_thread_local_if_not_exits(); \ doris::thread_context()->skip_large_memory_check++; \ DEFER({ \ doris::thread_context()->skip_large_memory_check--; \ doris::ThreadLocalHandle::del_thread_local_if_count_is_zero(); \ }); \ __VA_ARGS__; \ } while (0) namespace doris { class ThreadContext; class MemTracker; class RuntimeState; class QueryThreadContext; extern bthread_key_t btls_key; // Is true after ThreadContext construction. inline thread_local bool pthread_context_ptr_init = false; inline thread_local constinit ThreadContext* thread_context_ptr = nullptr; // use mem hook to consume thread mem tracker. inline thread_local bool use_mem_hook = false; static std::string memory_orphan_check_msg = "If you crash here, it means that SCOPED_ATTACH_TASK and " "SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER are not used correctly. starting position of " "each thread is expected to use SCOPED_ATTACH_TASK to bind a MemTrackerLimiter belonging " "to Query/Load/Compaction/Other Tasks, otherwise memory alloc using Doris Allocator in the " "thread will crash. If you want to switch MemTrackerLimiter during thread execution, " "please use SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER, do not repeat Attach. Of course, you " "can modify enable_memory_orphan_check=false in be.conf to avoid this crash."; // The thread context saves some info about a working thread. // 2 required info: // 1. thread_id: Current thread id, Auto generated. // 2. type(abolished): The type is a enum value indicating which type of task current thread is running. // For example: QUERY, LOAD, COMPACTION, ... // 3. task id: A unique id to identify this task. maybe query id, load job id, etc. // 4. ThreadMemTrackerMgr // // There may be other optional info to be added later. class ThreadContext { public: ThreadContext() { thread_mem_tracker_mgr = std::make_unique(); } ~ThreadContext() = default; void attach_task(const TUniqueId& task_id, const std::shared_ptr& mem_tracker) { #ifndef BE_TEST // will only attach_task at the beginning of the thread function, there should be no duplicate attach_task. DCHECK(mem_tracker); // Orphan is thread default tracker. DCHECK(thread_mem_tracker()->label() == "Orphan") << ", thread mem tracker label: " << thread_mem_tracker()->label() << ", attach mem tracker label: " << mem_tracker->label(); #endif _task_id = task_id; thread_mem_tracker_mgr->attach_limiter_tracker(mem_tracker); thread_mem_tracker_mgr->set_query_id(_task_id); thread_mem_tracker_mgr->enable_wait_gc(); thread_mem_tracker_mgr->reset_query_cancelled_flag(false); } void detach_task() { _task_id = TUniqueId(); thread_mem_tracker_mgr->detach_limiter_tracker(); thread_mem_tracker_mgr->set_query_id(TUniqueId()); thread_mem_tracker_mgr->disable_wait_gc(); } [[nodiscard]] const TUniqueId& task_id() const { return _task_id; } static std::string get_thread_id() { std::stringstream ss; ss << std::this_thread::get_id(); return ss.str(); } // After thread_mem_tracker_mgr is initialized, the current thread Hook starts to // consume/release mem_tracker. // Note that the use of shared_ptr will cause a crash. The guess is that there is an // intermediate state during the copy construction of shared_ptr. Shared_ptr is not equal // to nullptr, but the object it points to is not initialized. At this time, when the memory // is released somewhere, the hook is triggered to cause the crash. std::unique_ptr thread_mem_tracker_mgr; [[nodiscard]] MemTrackerLimiter* thread_mem_tracker() const { return thread_mem_tracker_mgr->limiter_mem_tracker_raw(); } QueryThreadContext query_thread_context(); void consume_memory(const int64_t size) const { #ifdef USE_MEM_TRACKER DCHECK(doris::k_doris_exit || !doris::config::enable_memory_orphan_check || thread_mem_tracker()->label() != "Orphan") << doris::memory_orphan_check_msg; #endif thread_mem_tracker_mgr->consume(size, skip_large_memory_check); } bool try_reserve_memory(const int64_t size) const { #ifdef USE_MEM_TRACKER DCHECK(doris::k_doris_exit || !doris::config::enable_memory_orphan_check || thread_mem_tracker()->label() != "Orphan") << doris::memory_orphan_check_msg; #endif return thread_mem_tracker_mgr->try_reserve(size); } void release_reserved_memory() const { #ifdef USE_MEM_TRACKER DCHECK(doris::k_doris_exit || !doris::config::enable_memory_orphan_check || thread_mem_tracker()->label() != "Orphan") << doris::memory_orphan_check_msg; #endif thread_mem_tracker_mgr->release_reserved(); } int thread_local_handle_count = 0; int skip_memory_check = 0; int skip_large_memory_check = 0; private: TUniqueId _task_id; }; class ThreadLocalHandle { public: static void create_thread_local_if_not_exits() { if (bthread_self() == 0) { if (!pthread_context_ptr_init) { thread_context_ptr = new ThreadContext(); pthread_context_ptr_init = true; } DCHECK(thread_context_ptr != nullptr); thread_context_ptr->thread_local_handle_count++; } else { // Avoid calling bthread_getspecific frequently to get bthread local. // Very frequent bthread_getspecific will slow, but create_thread_local_if_not_exits is not expected to be much. // Cache the pointer of bthread local in pthead local. auto* bthread_context = static_cast(bthread_getspecific(btls_key)); if (bthread_context == nullptr) { // If bthread_context == nullptr: // 1. First call to bthread_getspecific (and before any bthread_setspecific) returns NULL // 2. There are not enough reusable btls in btls pool. // else if bthread_context != nullptr: // 1. A new bthread starts, but get a reuses btls. bthread_context = new ThreadContext; // The brpc server should respond as quickly as possible. bthread_context->thread_mem_tracker_mgr->disable_wait_gc(); // set the data so that next time bthread_getspecific in the thread returns the data. CHECK(0 == bthread_setspecific(btls_key, bthread_context) || k_doris_exit); } DCHECK(bthread_context != nullptr); bthread_context->thread_local_handle_count++; } } // `create_thread_local_if_not_exits` and `del_thread_local_if_count_is_zero` should be used in pairs, // `del_thread_local_if_count_is_zero` should only be called if `create_thread_local_if_not_exits` returns true static void del_thread_local_if_count_is_zero() { if (pthread_context_ptr_init) { // in pthread thread_context_ptr->thread_local_handle_count--; if (thread_context_ptr->thread_local_handle_count == 0) { pthread_context_ptr_init = false; delete doris::thread_context_ptr; thread_context_ptr = nullptr; } } else if (bthread_self() != 0) { // in bthread auto* bthread_context = static_cast(bthread_getspecific(btls_key)); DCHECK(bthread_context != nullptr); bthread_context->thread_local_handle_count--; } else { LOG(FATAL) << "__builtin_unreachable"; __builtin_unreachable(); } } }; // must call create_thread_local_if_not_exits() before use thread_context(). static ThreadContext* thread_context() { if (pthread_context_ptr_init) { // in pthread DCHECK(bthread_self() == 0); DCHECK(thread_context_ptr != nullptr); return thread_context_ptr; } if (bthread_self() != 0) { // in bthread // bthread switching pthread may be very frequent, remember not to use lock or other time-consuming operations. auto* bthread_context = static_cast(bthread_getspecific(btls_key)); DCHECK(bthread_context != nullptr); return bthread_context; } // It means that use thread_context() but this thread not attached a query/load using SCOPED_ATTACH_TASK macro. LOG(FATAL) << "__builtin_unreachable, " << doris::memory_orphan_check_msg; __builtin_unreachable(); } class QueryThreadContext { public: QueryThreadContext() = default; QueryThreadContext(const TUniqueId& query_id, const std::shared_ptr& mem_tracker) : query_id(query_id), query_mem_tracker(mem_tracker) {} void init() { #ifndef BE_TEST ORPHAN_TRACKER_CHECK(); query_id = doris::thread_context()->task_id(); query_mem_tracker = doris::thread_context()->thread_mem_tracker_mgr->limiter_mem_tracker(); #else query_id = TUniqueId(); query_mem_tracker = doris::ExecEnv::GetInstance()->orphan_mem_tracker(); #endif } TUniqueId query_id; std::shared_ptr query_mem_tracker; }; class ScopeMemCountByHook { public: explicit ScopeMemCountByHook(int64_t* scope_mem) { ThreadLocalHandle::create_thread_local_if_not_exits(); _scope_mem = scope_mem; thread_context()->thread_mem_tracker_mgr->start_count_scope_mem(); use_mem_hook = true; } ~ScopeMemCountByHook() { use_mem_hook = false; *_scope_mem += thread_context()->thread_mem_tracker_mgr->stop_count_scope_mem(); ThreadLocalHandle::del_thread_local_if_count_is_zero(); } private: int64_t* _scope_mem = nullptr; }; // only hold thread context in scope. class ScopedInitThreadContext { public: explicit ScopedInitThreadContext() { ThreadLocalHandle::create_thread_local_if_not_exits(); } ~ScopedInitThreadContext() { ThreadLocalHandle::del_thread_local_if_count_is_zero(); } }; class AttachTask { public: explicit AttachTask(const std::shared_ptr& mem_tracker, const TUniqueId& task_id); explicit AttachTask(const std::shared_ptr& mem_tracker); explicit AttachTask(RuntimeState* runtime_state); explicit AttachTask(const QueryThreadContext& query_thread_context); ~AttachTask(); }; class SwitchThreadMemTrackerLimiter { public: explicit SwitchThreadMemTrackerLimiter(const std::shared_ptr& mem_tracker) { #ifndef BE_TEST DCHECK(mem_tracker); #endif ThreadLocalHandle::create_thread_local_if_not_exits(); _old_mem_tracker = thread_context()->thread_mem_tracker_mgr->limiter_mem_tracker(); thread_context()->thread_mem_tracker_mgr->attach_limiter_tracker(mem_tracker); } explicit SwitchThreadMemTrackerLimiter(const QueryThreadContext& query_thread_context) { ThreadLocalHandle::create_thread_local_if_not_exits(); DCHECK(thread_context()->task_id() == query_thread_context.query_id); #ifndef BE_TEST DCHECK(query_thread_context.query_mem_tracker); #endif _old_mem_tracker = thread_context()->thread_mem_tracker_mgr->limiter_mem_tracker(); thread_context()->thread_mem_tracker_mgr->attach_limiter_tracker( query_thread_context.query_mem_tracker); } ~SwitchThreadMemTrackerLimiter() { thread_context()->thread_mem_tracker_mgr->detach_limiter_tracker(_old_mem_tracker); ThreadLocalHandle::del_thread_local_if_count_is_zero(); } private: std::shared_ptr _old_mem_tracker; }; class AddThreadMemTrackerConsumer { public: // The owner and user of MemTracker are in the same thread, and the raw pointer is faster. // If mem_tracker is nullptr, do nothing. explicit AddThreadMemTrackerConsumer(MemTracker* mem_tracker); // The owner and user of MemTracker are in different threads. If mem_tracker is nullptr, do nothing. explicit AddThreadMemTrackerConsumer(const std::shared_ptr& mem_tracker); ~AddThreadMemTrackerConsumer(); private: std::shared_ptr _mem_tracker; // Avoid mem_tracker being released midway. bool _need_pop = false; }; class AddThreadMemTrackerConsumerByHook { public: explicit AddThreadMemTrackerConsumerByHook(const std::shared_ptr& mem_tracker); ~AddThreadMemTrackerConsumerByHook(); private: std::shared_ptr _mem_tracker; }; class ScopeSkipMemoryCheck { public: explicit ScopeSkipMemoryCheck() { ThreadLocalHandle::create_thread_local_if_not_exits(); doris::thread_context()->skip_memory_check++; } ~ScopeSkipMemoryCheck() { doris::thread_context()->skip_memory_check--; ThreadLocalHandle::del_thread_local_if_count_is_zero(); } }; // Basic macros for mem tracker, usually do not need to be modified and used. #if defined(USE_MEM_TRACKER) && !defined(BE_TEST) // used to fix the tracking accuracy of caches. #define THREAD_MEM_TRACKER_TRANSFER_TO(size, tracker) \ do { \ ORPHAN_TRACKER_CHECK(); \ doris::thread_context()->thread_mem_tracker_mgr->limiter_mem_tracker_raw()->transfer_to( \ size, tracker); \ } while (0) #define THREAD_MEM_TRACKER_TRANSFER_FROM(size, tracker) \ do { \ ORPHAN_TRACKER_CHECK(); \ tracker->transfer_to( \ size, doris::thread_context()->thread_mem_tracker_mgr->limiter_mem_tracker_raw()); \ } while (0) // Mem Hook to consume thread mem tracker #define CONSUME_THREAD_MEM_TRACKER_BY_HOOK(size) \ do { \ if (doris::use_mem_hook) { \ doris::thread_context()->consume_memory(size); \ } \ } while (0) #define RELEASE_THREAD_MEM_TRACKER_BY_HOOK(size) CONSUME_THREAD_MEM_TRACKER_BY_HOOK(-size) #define CONSUME_THREAD_MEM_TRACKER_BY_HOOK_WITH_FN(size_fn, ...) \ do { \ if (doris::use_mem_hook) { \ doris::thread_context()->consume_memory(size_fn(__VA_ARGS__)); \ } \ } while (0) #define RELEASE_THREAD_MEM_TRACKER_BY_HOOK_WITH_FN(size_fn, ...) \ do { \ if (doris::use_mem_hook) { \ doris::thread_context()->consume_memory(-size_fn(__VA_ARGS__)); \ } \ } while (0) // if use mem hook, avoid repeated consume. // must call create_thread_local_if_not_exits() before use thread_context(). #define CONSUME_THREAD_MEM_TRACKER(size) \ do { \ if (doris::use_mem_hook || size == 0) { \ break; \ } \ if (doris::pthread_context_ptr_init) { \ DCHECK(bthread_self() == 0); \ doris::thread_context_ptr->consume_memory(size); \ } else if (bthread_self() != 0) { \ static_cast(bthread_getspecific(doris::btls_key)) \ ->consume_memory(size); \ } else if (doris::ExecEnv::ready()) { \ MEMORY_ORPHAN_CHECK(); \ doris::ExecEnv::GetInstance()->orphan_mem_tracker_raw()->consume_no_update_peak(size); \ } \ } while (0) #define RELEASE_THREAD_MEM_TRACKER(size) CONSUME_THREAD_MEM_TRACKER(-size) #else #define THREAD_MEM_TRACKER_TRANSFER_TO(size, tracker) (void)0 #define THREAD_MEM_TRACKER_TRANSFER_FROM(size, tracker) (void)0 #define CONSUME_THREAD_MEM_TRACKER_BY_HOOK(size) (void)0 #define RELEASE_THREAD_MEM_TRACKER_BY_HOOK(size) (void)0 #define CONSUME_THREAD_MEM_TRACKER_BY_HOOK_WITH_FN(size_fn, ...) (void)0 #define RELEASE_THREAD_MEM_TRACKER_BY_HOOK_WITH_FN(size_fn, ...) (void)0 #define CONSUME_THREAD_MEM_TRACKER(size) (void)0 #define RELEASE_THREAD_MEM_TRACKER(size) (void)0 #endif } // namespace doris