From 990479e1776ae20be92d2d3be178a2e9faa42afa Mon Sep 17 00:00:00 2001 From: Xinyi Zou Date: Mon, 27 Mar 2023 09:06:03 +0800 Subject: [PATCH] [refactor](memory) Query waits for memory free in Allocator, after memory exceed limit. (#18075) After the memory exceeds the limit, the previous query waited for memory free in the mem hook, and changed it to wait in the Allocator. more controllable and safe --- be/src/runtime/memory/mem_tracker_limiter.h | 20 +++++----- .../runtime/memory/thread_mem_tracker_mgr.cpp | 22 +++------- .../runtime/memory/thread_mem_tracker_mgr.h | 3 +- be/src/vec/common/allocator.h | 40 +++++++++++++++++++ 4 files changed, 58 insertions(+), 27 deletions(-) diff --git a/be/src/runtime/memory/mem_tracker_limiter.h b/be/src/runtime/memory/mem_tracker_limiter.h index 5140eec228..32e212155e 100644 --- a/be/src/runtime/memory/mem_tracker_limiter.h +++ b/be/src/runtime/memory/mem_tracker_limiter.h @@ -189,6 +189,16 @@ public: MemInfo::refresh_interval_memory_growth); } + static std::string process_limit_exceeded_errmsg_str(int64_t bytes) { + return fmt::format( + "process memory used {} exceed limit {} or sys mem available {} less than low " + "water mark {}, failed alloc size {}", + PerfCounters::get_vm_rss_str(), MemInfo::mem_limit_str(), + MemInfo::sys_mem_available_str(), + PrettyPrinter::print(MemInfo::sys_mem_available_low_water_mark(), TUnit::BYTES), + print_bytes(bytes)); + } + std::string debug_string() { std::stringstream msg; msg << "limit: " << _limit << "; " @@ -221,16 +231,6 @@ private: print_bytes(exceed_tracker->_consumption->current_value())); } - static std::string process_limit_exceeded_errmsg_str(int64_t bytes) { - return fmt::format( - "process memory used {} exceed limit {} or sys mem available {} less than low " - "water mark {}, failed alloc size {}", - PerfCounters::get_vm_rss_str(), MemInfo::mem_limit_str(), - MemInfo::sys_mem_available_str(), - PrettyPrinter::print(MemInfo::sys_mem_available_low_water_mark(), TUnit::BYTES), - print_bytes(bytes)); - } - private: Type _type; diff --git a/be/src/runtime/memory/thread_mem_tracker_mgr.cpp b/be/src/runtime/memory/thread_mem_tracker_mgr.cpp index d45d6b8cb5..6057ce0325 100644 --- a/be/src/runtime/memory/thread_mem_tracker_mgr.cpp +++ b/be/src/runtime/memory/thread_mem_tracker_mgr.cpp @@ -45,10 +45,11 @@ void ThreadMemTrackerMgr::detach_limiter_tracker( _limiter_tracker_raw = old_mem_tracker.get(); } -void ThreadMemTrackerMgr::cancel_fragment() { - ExecEnv::GetInstance()->fragment_mgr()->cancel(_fragment_instance_id, - PPlanFragmentCancelReason::MEMORY_LIMIT_EXCEED, - _exceed_mem_limit_msg); +void ThreadMemTrackerMgr::cancel_fragment(const std::string& exceed_msg) { + if (_check_limit) { + ExecEnv::GetInstance()->fragment_mgr()->cancel( + _fragment_instance_id, PPlanFragmentCancelReason::MEMORY_LIMIT_EXCEED, exceed_msg); + } _check_limit = false; // Make sure it will only be canceled once } @@ -59,18 +60,7 @@ void ThreadMemTrackerMgr::exceeded(int64_t size) { _limiter_tracker_raw->print_log_usage(_exceed_mem_limit_msg); if (is_attach_query()) { - if (_is_process_exceed && _wait_gc) { - int64_t wait_milliseconds = config::thread_wait_gc_max_milliseconds; - while (wait_milliseconds > 0) { - std::this_thread::sleep_for(std::chrono::milliseconds(100)); // Check every 100 ms. - if (!MemTrackerLimiter::sys_mem_exceed_limit_check(size)) { - MemInfo::refresh_interval_memory_growth += size; - return; // Process memory is sufficient, no cancel query. - } - wait_milliseconds -= 100; - } - } - cancel_fragment(); + cancel_fragment(_exceed_mem_limit_msg); } } diff --git a/be/src/runtime/memory/thread_mem_tracker_mgr.h b/be/src/runtime/memory/thread_mem_tracker_mgr.h index 04ec86f7f8..428e2595e9 100644 --- a/be/src/runtime/memory/thread_mem_tracker_mgr.h +++ b/be/src/runtime/memory/thread_mem_tracker_mgr.h @@ -97,6 +97,8 @@ public: std::string exceed_mem_limit_msg() { return _exceed_mem_limit_msg; } void clear_exceed_mem_limit_msg() { _exceed_mem_limit_msg = ""; } void disable_wait_gc() { _wait_gc = false; } + bool wait_gc() { return _wait_gc; } + void cancel_fragment(const std::string& exceed_msg); std::string print_debug_string() { fmt::memory_buffer consumer_tracker_buf; @@ -111,7 +113,6 @@ public: } private: - void cancel_fragment(); void exceeded(int64_t size); void save_exceed_mem_limit_msg() { diff --git a/be/src/vec/common/allocator.h b/be/src/vec/common/allocator.h index 6287c837b6..7e72893a37 100644 --- a/be/src/vec/common/allocator.h +++ b/be/src/vec/common/allocator.h @@ -131,8 +131,45 @@ static constexpr size_t MALLOC_MIN_ALIGNMENT = 8; template class Allocator { public: + void sys_memory_check(size_t size) { + if (doris::MemTrackerLimiter::sys_mem_exceed_limit_check(size)) { + if (doris::thread_context()->thread_mem_tracker_mgr->is_attach_query() && + doris::thread_context()->thread_mem_tracker_mgr->wait_gc()) { + int64_t wait_milliseconds = doris::config::thread_wait_gc_max_milliseconds; + while (wait_milliseconds > 0) { + std::this_thread::sleep_for(std::chrono::milliseconds(100)); + if (!doris::MemTrackerLimiter::sys_mem_exceed_limit_check(size)) { + doris::MemInfo::refresh_interval_memory_growth += size; + break; + } + wait_milliseconds -= 100; + } + if (wait_milliseconds <= 0) { + auto err_msg = fmt::format( + "Allocator Sys Memory Check Failed In Query/Load: Cannot alloc {}, " + "{}.", + size, + doris::MemTrackerLimiter::process_limit_exceeded_errmsg_str(size)); + doris::thread_context()->thread_mem_tracker_mgr->disable_wait_gc(); + if (!doris::enable_thread_catch_bad_alloc) { + doris::thread_context()->thread_mem_tracker_mgr->cancel_fragment(err_msg); + } else { + LOG(WARNING) << err_msg; + throw std::bad_alloc {}; + } + } + } else if (doris::enable_thread_catch_bad_alloc) { + LOG(WARNING) << fmt::format( + "Allocator Sys Memory Check Failed: Cannot alloc {}, {}.", size, + doris::MemTrackerLimiter::process_limit_exceeded_errmsg_str(size)); + throw std::bad_alloc {}; + } + } + } + /// Allocate memory range. void* alloc(size_t size, size_t alignment = 0) { + sys_memory_check(size); void* buf; if (size >= MMAP_THRESHOLD) { @@ -219,6 +256,7 @@ public: /// BTW, it's not possible to change alignment while doing realloc. } else if (old_size < CHUNK_THRESHOLD && new_size < CHUNK_THRESHOLD && alignment <= MALLOC_MIN_ALIGNMENT) { + sys_memory_check(new_size); /// Resize malloc'd memory region with no special alignment requirement. void* new_buf = ::realloc(buf, new_size); if (nullptr == new_buf) { @@ -231,6 +269,7 @@ public: if (new_size > old_size) memset(reinterpret_cast(buf) + old_size, 0, new_size - old_size); } else if (old_size >= MMAP_THRESHOLD && new_size >= MMAP_THRESHOLD) { + sys_memory_check(new_size); /// Resize mmap'd memory region. if (!TRY_CONSUME_THREAD_MEM_TRACKER(new_size - old_size)) { RETURN_BAD_ALLOC_IF_PRE_CATCH(fmt::format( @@ -257,6 +296,7 @@ public: memset(reinterpret_cast(buf) + old_size, 0, new_size - old_size); } } else { + sys_memory_check(new_size); // CHUNK_THRESHOLD <= old_size <= MMAP_THRESHOLD use system realloc is slow, use ChunkAllocator. // Big allocs that requires a copy. void* new_buf = alloc(new_size, alignment);