From ef031c5fb2551837592bf0951022d8d663bbcef1 Mon Sep 17 00:00:00 2001 From: Xinyi Zou Date: Fri, 12 Jul 2024 11:43:26 +0800 Subject: [PATCH] [branch-2.1](memory) Fix reserve memory compatible with memory GC and logging (#37682) pick #36307 #36412 --- be/src/common/daemon.cpp | 23 +- be/src/http/default_path_handlers.cpp | 3 + be/src/olap/memtable_memory_limiter.cpp | 19 +- .../memory/global_memory_arbitrator.cpp | 50 ++- .../runtime/memory/global_memory_arbitrator.h | 161 ++++++---- be/src/runtime/memory/mem_tracker_limiter.cpp | 51 ++-- be/src/runtime/memory/mem_tracker_limiter.h | 22 +- be/src/runtime/memory/memory_arbitrator.cpp | 271 +++++++++++++++++ be/src/runtime/memory/memory_arbitrator.h | 40 +++ .../runtime/workload_group/workload_group.cpp | 22 +- .../workload_group/workload_group_manager.cpp | 13 +- be/src/util/mem_info.cpp | 286 +----------------- be/src/util/mem_info.h | 28 +- be/src/vec/common/allocator.cpp | 12 +- be/src/vec/sink/writer/vtablet_writer.cpp | 5 +- 15 files changed, 556 insertions(+), 450 deletions(-) create mode 100644 be/src/runtime/memory/memory_arbitrator.cpp create mode 100644 be/src/runtime/memory/memory_arbitrator.h diff --git a/be/src/common/daemon.cpp b/be/src/common/daemon.cpp index 4787f60365..77d0fdaf0e 100644 --- a/be/src/common/daemon.cpp +++ b/be/src/common/daemon.cpp @@ -50,6 +50,7 @@ #include "runtime/memory/global_memory_arbitrator.h" #include "runtime/memory/mem_tracker.h" #include "runtime/memory/mem_tracker_limiter.h" +#include "runtime/memory/memory_arbitrator.h" #include "runtime/runtime_query_statistics_mgr.h" #include "runtime/workload_group/workload_group_manager.h" #include "util/cpu_info.h" @@ -192,7 +193,7 @@ void Daemon::memory_maintenance_thread() { // Refresh process memory metrics. doris::PerfCounters::refresh_proc_status(); doris::MemInfo::refresh_proc_meminfo(); - doris::GlobalMemoryArbitrator::refresh_vm_rss_sub_allocator_cache(); + doris::GlobalMemoryArbitrator::reset_refresh_interval_memory_growth(); // Update and print memory stat when the memory changes by 256M. if (abs(last_print_proc_mem - PerfCounters::get_vm_rss()) > 268435456) { @@ -229,11 +230,11 @@ void Daemon::memory_gc_thread() { if (config::disable_memory_gc) { continue; } - auto sys_mem_available = doris::MemInfo::sys_mem_available(); + auto sys_mem_available = doris::GlobalMemoryArbitrator::sys_mem_available(); auto process_memory_usage = doris::GlobalMemoryArbitrator::process_memory_usage(); // GC excess memory for resource groups that not enable overcommit - auto tg_free_mem = doris::MemInfo::tg_disable_overcommit_group_gc(); + auto tg_free_mem = doris::MemoryArbitrator::tg_disable_overcommit_group_gc(); sys_mem_available += tg_free_mem; process_memory_usage -= tg_free_mem; @@ -241,13 +242,13 @@ void Daemon::memory_gc_thread() { (sys_mem_available < doris::MemInfo::sys_mem_available_low_water_mark() || process_memory_usage >= doris::MemInfo::mem_limit())) { // No longer full gc and minor gc during sleep. + std::string mem_info = + doris::GlobalMemoryArbitrator::process_limit_exceeded_errmsg_str(); memory_full_gc_sleep_time_ms = memory_gc_sleep_time_ms; memory_minor_gc_sleep_time_ms = memory_gc_sleep_time_ms; - LOG(INFO) << fmt::format( - "[MemoryGC] start full GC, {}.", - doris::GlobalMemoryArbitrator::process_limit_exceeded_errmsg_str()); + LOG(INFO) << fmt::format("[MemoryGC] start full GC, {}.", mem_info); doris::MemTrackerLimiter::print_log_process_usage(); - if (doris::MemInfo::process_full_gc()) { + if (doris::MemoryArbitrator::process_full_gc(std::move(mem_info))) { // If there is not enough memory to be gc, the process memory usage will not be printed in the next continuous gc. doris::MemTrackerLimiter::enable_print_log_process_usage(); } @@ -255,12 +256,12 @@ void Daemon::memory_gc_thread() { (sys_mem_available < doris::MemInfo::sys_mem_available_warning_water_mark() || process_memory_usage >= doris::MemInfo::soft_mem_limit())) { // No minor gc during sleep, but full gc is possible. + std::string mem_info = + doris::GlobalMemoryArbitrator::process_soft_limit_exceeded_errmsg_str(); memory_minor_gc_sleep_time_ms = memory_gc_sleep_time_ms; - LOG(INFO) << fmt::format( - "[MemoryGC] start minor GC, {}.", - doris::GlobalMemoryArbitrator::process_soft_limit_exceeded_errmsg_str()); + LOG(INFO) << fmt::format("[MemoryGC] start minor GC, {}.", mem_info); doris::MemTrackerLimiter::print_log_process_usage(); - if (doris::MemInfo::process_minor_gc()) { + if (doris::MemoryArbitrator::process_minor_gc(std::move(mem_info))) { doris::MemTrackerLimiter::enable_print_log_process_usage(); } } else { diff --git a/be/src/http/default_path_handlers.cpp b/be/src/http/default_path_handlers.cpp index 6e4ce97b90..5c697539fb 100644 --- a/be/src/http/default_path_handlers.cpp +++ b/be/src/http/default_path_handlers.cpp @@ -40,6 +40,7 @@ #include "gutil/strings/substitute.h" #include "http/action/tablets_info_action.h" #include "http/web_page_handler.h" +#include "runtime/memory/global_memory_arbitrator.h" #include "runtime/memory/mem_tracker.h" #include "runtime/memory/mem_tracker_limiter.h" #include "util/easy_json.h" @@ -155,6 +156,8 @@ void mem_tracker_handler(const WebPageHandler::ArgumentMap& args, std::stringstr MemTrackerLimiter::Type::SCHEMA_CHANGE); } else if (iter->second == "other") { MemTrackerLimiter::make_type_snapshots(&snapshots, MemTrackerLimiter::Type::OTHER); + } else if (iter->second == "reserved_memory") { + GlobalMemoryArbitrator::make_reserved_memory_snapshots(&snapshots); } } else { (*output) << "

*Notice:

\n"; diff --git a/be/src/olap/memtable_memory_limiter.cpp b/be/src/olap/memtable_memory_limiter.cpp index dc128137ae..1eaad31ec2 100644 --- a/be/src/olap/memtable_memory_limiter.cpp +++ b/be/src/olap/memtable_memory_limiter.cpp @@ -80,8 +80,8 @@ void MemTableMemoryLimiter::register_writer(std::weak_ptr writer int64_t MemTableMemoryLimiter::_avail_mem_lack() { // reserve a small amount of memory so we do not trigger MinorGC auto reserved_mem = doris::MemInfo::sys_mem_available_low_water_mark(); - auto avail_mem_lack = - doris::MemInfo::sys_mem_available_warning_water_mark() - MemInfo::sys_mem_available(); + auto avail_mem_lack = doris::MemInfo::sys_mem_available_warning_water_mark() - + doris::GlobalMemoryArbitrator::sys_mem_available(); return avail_mem_lack + reserved_mem; } @@ -225,14 +225,13 @@ void MemTableMemoryLimiter::refresh_mem_tracker() { _log_timer.reset(); // if not exist load task, this log should not be printed. if (_mem_usage != 0) { - LOG(INFO) << ss.str() << ", process mem: " << PerfCounters::get_vm_rss_str() - << " (without allocator cache: " - << PrettyPrinter::print_bytes(GlobalMemoryArbitrator::process_memory_usage()) - << "), load mem: " << PrettyPrinter::print_bytes(_mem_tracker->consumption()) - << ", memtable writers num: " << _writers.size() - << " (active: " << PrettyPrinter::print_bytes(_active_mem_usage) - << ", write: " << PrettyPrinter::print_bytes(_write_mem_usage) - << ", flush: " << PrettyPrinter::print_bytes(_flush_mem_usage) << ")"; + LOG(INFO) << fmt::format( + "{}, {}, load mem: {}, memtable writers num: {} (active: {}, write: {}, flush: {})", + ss.str(), GlobalMemoryArbitrator::process_memory_used_details_str(), + PrettyPrinter::print_bytes(_mem_tracker->consumption()), _writers.size(), + PrettyPrinter::print_bytes(_active_mem_usage), + PrettyPrinter::print_bytes(_write_mem_usage), + PrettyPrinter::print_bytes(_flush_mem_usage)); } } diff --git a/be/src/runtime/memory/global_memory_arbitrator.cpp b/be/src/runtime/memory/global_memory_arbitrator.cpp index dc686f7c5a..35fa350987 100644 --- a/be/src/runtime/memory/global_memory_arbitrator.cpp +++ b/be/src/runtime/memory/global_memory_arbitrator.cpp @@ -19,16 +19,64 @@ #include +#include "runtime/thread_context.h" + namespace doris { +std::mutex GlobalMemoryArbitrator::_reserved_trackers_lock; +std::unordered_map GlobalMemoryArbitrator::_reserved_trackers; + bvar::PassiveStatus g_vm_rss_sub_allocator_cache( "meminfo_vm_rss_sub_allocator_cache", [](void*) { return GlobalMemoryArbitrator::vm_rss_sub_allocator_cache(); }, nullptr); bvar::PassiveStatus g_process_memory_usage( "meminfo_process_memory_usage", [](void*) { return GlobalMemoryArbitrator::process_memory_usage(); }, nullptr); +bvar::PassiveStatus g_sys_mem_avail( + "meminfo_sys_mem_avail", [](void*) { return GlobalMemoryArbitrator::sys_mem_available(); }, + nullptr); -std::atomic GlobalMemoryArbitrator::_s_vm_rss_sub_allocator_cache = -1; std::atomic GlobalMemoryArbitrator::_s_process_reserved_memory = 0; +std::atomic GlobalMemoryArbitrator::refresh_interval_memory_growth = 0; + +bool GlobalMemoryArbitrator::try_reserve_process_memory(int64_t bytes) { + if (sys_mem_available() - bytes < MemInfo::sys_mem_available_low_water_mark()) { + return false; + } + int64_t old_reserved_mem = _s_process_reserved_memory.load(std::memory_order_relaxed); + int64_t new_reserved_mem = 0; + do { + new_reserved_mem = old_reserved_mem + bytes; + if (UNLIKELY(vm_rss_sub_allocator_cache() + + refresh_interval_memory_growth.load(std::memory_order_relaxed) + + new_reserved_mem >= + MemInfo::mem_limit())) { + return false; + } + } while (!_s_process_reserved_memory.compare_exchange_weak(old_reserved_mem, new_reserved_mem, + std::memory_order_relaxed)); + { + std::lock_guard l(_reserved_trackers_lock); + _reserved_trackers[doris::thread_context()->thread_mem_tracker()->label()].add(bytes); + } + return true; +} + +void GlobalMemoryArbitrator::release_process_reserved_memory(int64_t bytes) { + _s_process_reserved_memory.fetch_sub(bytes, std::memory_order_relaxed); + { + std::lock_guard l(_reserved_trackers_lock); + auto label = doris::thread_context()->thread_mem_tracker()->label(); + auto it = _reserved_trackers.find(label); + if (it == _reserved_trackers.end()) { + DCHECK(false) << "release unknown reserved memory " << label << ", bytes: " << bytes; + return; + } + _reserved_trackers[label].sub(bytes); + if (_reserved_trackers[label].current_value() == 0) { + _reserved_trackers.erase(it); + } + } +} } // namespace doris diff --git a/be/src/runtime/memory/global_memory_arbitrator.h b/be/src/runtime/memory/global_memory_arbitrator.h index b1879cb1a7..f8fda18d0e 100644 --- a/be/src/runtime/memory/global_memory_arbitrator.h +++ b/be/src/runtime/memory/global_memory_arbitrator.h @@ -17,6 +17,7 @@ #pragma once +#include "runtime/memory/mem_tracker.h" #include "util/mem_info.h" namespace doris { @@ -30,14 +31,12 @@ public: * accurate, since those pages are not really RSS but a memory * that can be used at anytime via jemalloc. */ - static inline void refresh_vm_rss_sub_allocator_cache() { - _s_vm_rss_sub_allocator_cache.store( - PerfCounters::get_vm_rss() - static_cast(MemInfo::allocator_cache_mem()), - std::memory_order_relaxed); - MemInfo::refresh_interval_memory_growth = 0; - } static inline int64_t vm_rss_sub_allocator_cache() { - return _s_vm_rss_sub_allocator_cache.load(std::memory_order_relaxed); + return PerfCounters::get_vm_rss() - static_cast(MemInfo::allocator_cache_mem()); + } + + static inline void reset_refresh_interval_memory_growth() { + refresh_interval_memory_growth = 0; } // If need to use process memory in your execution logic, pls use it. @@ -45,32 +44,80 @@ public: // add reserved memory and growth memory since the last vm_rss update. static inline int64_t process_memory_usage() { return vm_rss_sub_allocator_cache() + - MemInfo::refresh_interval_memory_growth.load(std::memory_order_relaxed) + + refresh_interval_memory_growth.load(std::memory_order_relaxed) + process_reserved_memory(); } - static inline bool try_reserve_process_memory(int64_t bytes) { - if (MemInfo::sys_mem_available() - bytes < MemInfo::sys_mem_available_low_water_mark()) { - return false; - } - int64_t old_reserved_mem = _s_process_reserved_memory.load(std::memory_order_relaxed); - int64_t new_reserved_mem = 0; - do { - new_reserved_mem = old_reserved_mem + bytes; - if (UNLIKELY(vm_rss_sub_allocator_cache() + - MemInfo::refresh_interval_memory_growth.load( - std::memory_order_relaxed) + - new_reserved_mem >= - MemInfo::mem_limit())) { - return false; - } - } while (!_s_process_reserved_memory.compare_exchange_weak( - old_reserved_mem, new_reserved_mem, std::memory_order_relaxed)); - return true; + static std::string process_memory_used_str() { + auto msg = fmt::format("process memory used {}", + PrettyPrinter::print(process_memory_usage(), TUnit::BYTES)); +#ifdef ADDRESS_SANITIZER + msg = "[ASAN]" + msg; +#endif + return msg; } - static inline void release_process_reserved_memory(int64_t bytes) { - _s_process_reserved_memory.fetch_sub(bytes, std::memory_order_relaxed); + static std::string process_memory_used_details_str() { + auto msg = fmt::format( + "process memory used {}(= {}[vm/rss] - {}[tc/jemalloc_cache] + {}[reserved] + " + "{}B[waiting_refresh])", + PrettyPrinter::print(process_memory_usage(), TUnit::BYTES), + PerfCounters::get_vm_rss_str(), + PrettyPrinter::print(static_cast(MemInfo::allocator_cache_mem()), + TUnit::BYTES), + PrettyPrinter::print(process_reserved_memory(), TUnit::BYTES), + refresh_interval_memory_growth); +#ifdef ADDRESS_SANITIZER + msg = "[ASAN]" + msg; +#endif + return msg; + } + + static inline int64_t sys_mem_available() { + return MemInfo::_s_sys_mem_available.load(std::memory_order_relaxed) - + refresh_interval_memory_growth.load(std::memory_order_relaxed) - + process_reserved_memory(); + } + + static inline std::string sys_mem_available_str() { + auto msg = fmt::format("sys available memory {}", + PrettyPrinter::print(sys_mem_available(), TUnit::BYTES)); +#ifdef ADDRESS_SANITIZER + msg = "[ASAN]" + msg; +#endif + return msg; + } + + static inline std::string sys_mem_available_details_str() { + auto msg = fmt::format( + "sys available memory {}(= {}[proc/available] - {}[reserved] - " + "{}B[waiting_refresh])", + PrettyPrinter::print(sys_mem_available(), TUnit::BYTES), + PrettyPrinter::print(MemInfo::_s_sys_mem_available.load(std::memory_order_relaxed), + TUnit::BYTES), + PrettyPrinter::print(process_reserved_memory(), TUnit::BYTES), + refresh_interval_memory_growth); +#ifdef ADDRESS_SANITIZER + msg = "[ASAN]" + msg; +#endif + return msg; + } + + static bool try_reserve_process_memory(int64_t bytes); + static void release_process_reserved_memory(int64_t bytes); + + static inline void make_reserved_memory_snapshots( + std::vector* snapshots) { + std::lock_guard l(_reserved_trackers_lock); + for (const auto& pair : _reserved_trackers) { + MemTracker::Snapshot snapshot; + snapshot.type = "reserved_memory"; + snapshot.label = pair.first; + snapshot.limit = -1; + snapshot.cur_consumption = pair.second.current_value(); + snapshot.peak_consumption = pair.second.peak_value(); + (*snapshots).emplace_back(snapshot); + } } static inline int64_t process_reserved_memory() { @@ -79,8 +126,7 @@ public: static bool is_exceed_soft_mem_limit(int64_t bytes = 0) { return process_memory_usage() + bytes >= MemInfo::soft_mem_limit() || - MemInfo::sys_mem_available() - bytes < - MemInfo::sys_mem_available_warning_water_mark(); + sys_mem_available() - bytes < MemInfo::sys_mem_available_warning_water_mark(); } static bool is_exceed_hard_mem_limit(int64_t bytes = 0) { @@ -93,44 +139,45 @@ public: // because `new/malloc` will trigger mem hook when using tcmalloc/jemalloc allocator cache, // but it may not actually alloc physical memory, which is not expected in mem hook fail. return process_memory_usage() + bytes >= MemInfo::mem_limit() || - MemInfo::sys_mem_available() - bytes < MemInfo::sys_mem_available_low_water_mark(); + sys_mem_available() - bytes < MemInfo::sys_mem_available_low_water_mark(); } static std::string process_mem_log_str() { return fmt::format( - "os physical memory {}. process memory used {}, limit {}, soft limit {}. sys " - "available memory {}, low water mark {}, warning water mark {}. Refresh interval " - "memory growth {} B", - PrettyPrinter::print(MemInfo::physical_mem(), TUnit::BYTES), - PerfCounters::get_vm_rss_str(), MemInfo::mem_limit_str(), - MemInfo::soft_mem_limit_str(), MemInfo::sys_mem_available_str(), - PrettyPrinter::print(MemInfo::sys_mem_available_low_water_mark(), TUnit::BYTES), - PrettyPrinter::print(MemInfo::sys_mem_available_warning_water_mark(), TUnit::BYTES), - MemInfo::refresh_interval_memory_growth); - } - - static std::string process_limit_exceeded_errmsg_str() { - return fmt::format( - "process memory used {} exceed limit {} or sys available memory {} less than low " - "water mark {}", - PerfCounters::get_vm_rss_str(), MemInfo::mem_limit_str(), - MemInfo::sys_mem_available_str(), - PrettyPrinter::print(MemInfo::sys_mem_available_low_water_mark(), TUnit::BYTES)); - } - - static std::string process_soft_limit_exceeded_errmsg_str() { - return fmt::format( - "process memory used {} exceed soft limit {} or sys available memory {} less than " + "os physical memory {}. {}, limit {}, soft limit {}. {}, low water mark {}, " "warning water mark {}.", - PerfCounters::get_vm_rss_str(), MemInfo::soft_mem_limit_str(), - MemInfo::sys_mem_available_str(), + PrettyPrinter::print(MemInfo::physical_mem(), TUnit::BYTES), + process_memory_used_details_str(), MemInfo::mem_limit_str(), + MemInfo::soft_mem_limit_str(), sys_mem_available_details_str(), + PrettyPrinter::print(MemInfo::sys_mem_available_low_water_mark(), TUnit::BYTES), PrettyPrinter::print(MemInfo::sys_mem_available_warning_water_mark(), TUnit::BYTES)); } + static std::string process_limit_exceeded_errmsg_str() { + return fmt::format( + "{} exceed limit {} or {} less than low water mark {}", process_memory_used_str(), + MemInfo::mem_limit_str(), sys_mem_available_str(), + PrettyPrinter::print(MemInfo::sys_mem_available_low_water_mark(), TUnit::BYTES)); + } + + static std::string process_soft_limit_exceeded_errmsg_str() { + return fmt::format("{} exceed soft limit {} or {} less than warning water mark {}.", + process_memory_used_str(), MemInfo::soft_mem_limit_str(), + sys_mem_available_str(), + PrettyPrinter::print(MemInfo::sys_mem_available_warning_water_mark(), + TUnit::BYTES)); + } + + // It is only used after the memory limit is exceeded. When multiple threads are waiting for the available memory of the process, + // avoid multiple threads starting at the same time and causing OOM. + static std::atomic refresh_interval_memory_growth; + private: - static std::atomic _s_vm_rss_sub_allocator_cache; static std::atomic _s_process_reserved_memory; + + static std::mutex _reserved_trackers_lock; + static std::unordered_map _reserved_trackers; }; } // namespace doris diff --git a/be/src/runtime/memory/mem_tracker_limiter.cpp b/be/src/runtime/memory/mem_tracker_limiter.cpp index b84f7c5495..489d59ab1b 100644 --- a/be/src/runtime/memory/mem_tracker_limiter.cpp +++ b/be/src/runtime/memory/mem_tracker_limiter.cpp @@ -216,6 +216,13 @@ void MemTrackerLimiter::make_process_snapshots(std::vector snapshot.peak_consumption = PerfCounters::get_vm_hwm(); (*snapshots).emplace_back(snapshot); + snapshot.type = "reserved memory"; + snapshot.label = ""; + snapshot.limit = -1; + snapshot.cur_consumption = GlobalMemoryArbitrator::process_reserved_memory(); + snapshot.peak_consumption = -1; + (*snapshots).emplace_back(snapshot); + snapshot.type = "process virtual memory"; // from /proc VmSize VmPeak snapshot.label = ""; snapshot.limit = -1; @@ -359,10 +366,10 @@ void MemTrackerLimiter::print_log_process_usage() { std::string MemTrackerLimiter::tracker_limit_exceeded_str() { std::string err_msg = fmt::format( "memory tracker limit exceeded, tracker label:{}, type:{}, limit " - "{}, peak used {}, current used {}. backend {} process memory used {}.", + "{}, peak used {}, current used {}. backend {}, {}.", label(), type_string(_type), print_bytes(limit()), print_bytes(_consumption->peak_value()), print_bytes(_consumption->current_value()), - BackendOptions::get_localhost(), PerfCounters::get_vm_rss_str()); + BackendOptions::get_localhost(), GlobalMemoryArbitrator::process_memory_used_str()); if (_type == Type::QUERY || _type == Type::LOAD) { err_msg += fmt::format( " exec node:<{}>, can `set exec_mem_limit=8G` to change limit, details see " @@ -377,23 +384,17 @@ std::string MemTrackerLimiter::tracker_limit_exceeded_str() { } int64_t MemTrackerLimiter::free_top_memory_query(int64_t min_free_mem, - const std::string& vm_rss_str, - const std::string& mem_available_str, + const std::string& cancel_reason, RuntimeProfile* profile, Type type) { return free_top_memory_query( min_free_mem, type, ExecEnv::GetInstance()->mem_tracker_limiter_pool, - [&vm_rss_str, &mem_available_str, &type](int64_t mem_consumption, - const std::string& label) { + [&cancel_reason, &type](int64_t mem_consumption, const std::string& label) { return fmt::format( - "Process has no memory available, cancel top memory used {}: " - "{} memory tracker <{}> consumption {}, backend {} " - "process memory used {} exceed limit {} or sys available memory {} " - "less than low water mark {}. Execute again after enough memory, " - "details see be.INFO.", - type_string(type), type_string(type), label, print_bytes(mem_consumption), - BackendOptions::get_localhost(), vm_rss_str, MemInfo::mem_limit_str(), - mem_available_str, - print_bytes(MemInfo::sys_mem_available_low_water_mark())); + "Process memory not enough, cancel top memory used {}: " + "<{}> consumption {}, backend {}, {}. Execute again " + "after enough memory, details see be.INFO.", + type_string(type), label, print_bytes(mem_consumption), + BackendOptions::get_localhost(), cancel_reason); }, profile, GCType::PROCESS); } @@ -504,23 +505,17 @@ int64_t MemTrackerLimiter::free_top_memory_query( } int64_t MemTrackerLimiter::free_top_overcommit_query(int64_t min_free_mem, - const std::string& vm_rss_str, - const std::string& mem_available_str, + const std::string& cancel_reason, RuntimeProfile* profile, Type type) { return free_top_overcommit_query( min_free_mem, type, ExecEnv::GetInstance()->mem_tracker_limiter_pool, - [&vm_rss_str, &mem_available_str, &type](int64_t mem_consumption, - const std::string& label) { + [&cancel_reason, &type](int64_t mem_consumption, const std::string& label) { return fmt::format( - "Process has less memory, cancel top memory overcommit {}: " - "{} memory tracker <{}> consumption {}, backend {} " - "process memory used {} exceed soft limit {} or sys available memory {} " - "less than warning water mark {}. Execute again after enough memory, " - "details see be.INFO.", - type_string(type), type_string(type), label, print_bytes(mem_consumption), - BackendOptions::get_localhost(), vm_rss_str, MemInfo::soft_mem_limit_str(), - mem_available_str, - print_bytes(MemInfo::sys_mem_available_warning_water_mark())); + "Process memory not enough, cancel top memory overcommit {}: " + "<{}> consumption {}, backend {}, {}. Execute again " + "after enough memory, details see be.INFO.", + type_string(type), label, print_bytes(mem_consumption), + BackendOptions::get_localhost(), cancel_reason); }, profile, GCType::PROCESS); } diff --git a/be/src/runtime/memory/mem_tracker_limiter.h b/be/src/runtime/memory/mem_tracker_limiter.h index 3a891ca3a1..2c4221373b 100644 --- a/be/src/runtime/memory/mem_tracker_limiter.h +++ b/be/src/runtime/memory/mem_tracker_limiter.h @@ -141,7 +141,7 @@ public: return true; } bool st = true; - if (is_overcommit_tracker() && config::enable_query_memory_overcommit) { + if (is_overcommit_tracker() && !config::enable_query_memory_overcommit) { st = _consumption->try_add(bytes, _limit); } else { _consumption->add(bytes); @@ -192,9 +192,8 @@ public: static void print_log_process_usage(); // Start canceling from the query with the largest memory usage until the memory of min_free_mem size is freed. - // vm_rss_str and mem_available_str recorded when gc is triggered, for log printing. - static int64_t free_top_memory_query(int64_t min_free_mem, const std::string& vm_rss_str, - const std::string& mem_available_str, + // cancel_reason recorded when gc is triggered, for log printing. + static int64_t free_top_memory_query(int64_t min_free_mem, const std::string& cancel_reason, RuntimeProfile* profile, Type type = Type::QUERY); static int64_t free_top_memory_query( @@ -202,16 +201,13 @@ public: const std::function& cancel_msg, RuntimeProfile* profile, GCType gctype); - static int64_t free_top_memory_load(int64_t min_free_mem, const std::string& vm_rss_str, - const std::string& mem_available_str, + static int64_t free_top_memory_load(int64_t min_free_mem, const std::string& cancel_reason, RuntimeProfile* profile) { - return free_top_memory_query(min_free_mem, vm_rss_str, mem_available_str, profile, - Type::LOAD); + return free_top_memory_query(min_free_mem, cancel_reason, profile, Type::LOAD); } // Start canceling from the query with the largest memory overcommit ratio until the memory // of min_free_mem size is freed. - static int64_t free_top_overcommit_query(int64_t min_free_mem, const std::string& vm_rss_str, - const std::string& mem_available_str, + static int64_t free_top_overcommit_query(int64_t min_free_mem, const std::string& cancel_reason, RuntimeProfile* profile, Type type = Type::QUERY); static int64_t free_top_overcommit_query( @@ -219,11 +215,9 @@ public: const std::function& cancel_msg, RuntimeProfile* profile, GCType gctype); - static int64_t free_top_overcommit_load(int64_t min_free_mem, const std::string& vm_rss_str, - const std::string& mem_available_str, + static int64_t free_top_overcommit_load(int64_t min_free_mem, const std::string& cancel_reason, RuntimeProfile* profile) { - return free_top_overcommit_query(min_free_mem, vm_rss_str, mem_available_str, profile, - Type::LOAD); + return free_top_overcommit_query(min_free_mem, cancel_reason, profile, Type::LOAD); } // only for Type::QUERY or Type::LOAD. diff --git a/be/src/runtime/memory/memory_arbitrator.cpp b/be/src/runtime/memory/memory_arbitrator.cpp new file mode 100644 index 0000000000..a99f358526 --- /dev/null +++ b/be/src/runtime/memory/memory_arbitrator.cpp @@ -0,0 +1,271 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "runtime/memory/memory_arbitrator.h" + +#include "runtime/memory/cache_manager.h" +#include "runtime/workload_group/workload_group.h" +#include "runtime/workload_group/workload_group_manager.h" +#include "util/mem_info.h" +#include "util/runtime_profile.h" +#include "util/stopwatch.hpp" + +namespace doris { + +// step1: free all cache +// step2: free resource groups memory that enable overcommit +// step3: free global top overcommit query, if enable query memory overcommit +// TODO Now, the meaning is different from java minor gc + full gc, more like small gc + large gc. +bool MemoryArbitrator::process_minor_gc(std::string mem_info) { + MonotonicStopWatch watch; + watch.start(); + int64_t freed_mem = 0; + std::unique_ptr profile = std::make_unique(""); + + Defer defer {[&]() { + MemInfo::notify_je_purge_dirty_pages(); + std::stringstream ss; + profile->pretty_print(&ss); + LOG(INFO) << fmt::format( + "[MemoryGC] end minor GC, free memory {}. cost(us): {}, details: {}", + PrettyPrinter::print(freed_mem, TUnit::BYTES), watch.elapsed_time() / 1000, + ss.str()); + }}; + + freed_mem += CacheManager::instance()->for_each_cache_prune_stale(profile.get()); + MemInfo::notify_je_purge_dirty_pages(); + if (freed_mem > MemInfo::process_minor_gc_size()) { + return true; + } + + if (config::enable_workload_group_memory_gc) { + RuntimeProfile* tg_profile = profile->create_child("WorkloadGroup", true, true); + freed_mem += tg_enable_overcommit_group_gc(MemInfo::process_minor_gc_size() - freed_mem, + tg_profile, true); + if (freed_mem > MemInfo::process_minor_gc_size()) { + return true; + } + } + + if (config::enable_query_memory_overcommit) { + VLOG_NOTICE << MemTrackerLimiter::type_detail_usage( + "[MemoryGC] before free top memory overcommit query in minor GC", + MemTrackerLimiter::Type::QUERY); + RuntimeProfile* toq_profile = + profile->create_child("FreeTopOvercommitMemoryQuery", true, true); + freed_mem += MemTrackerLimiter::free_top_overcommit_query( + MemInfo::process_minor_gc_size() - freed_mem, mem_info, toq_profile); + if (freed_mem > MemInfo::process_minor_gc_size()) { + return true; + } + } + return false; +} + +// step1: free all cache +// step2: free resource groups memory that enable overcommit +// step3: free global top memory query +// step4: free top overcommit load, load retries are more expensive, So cancel at the end. +// step5: free top memory load +bool MemoryArbitrator::process_full_gc(std::string mem_info) { + MonotonicStopWatch watch; + watch.start(); + int64_t freed_mem = 0; + std::unique_ptr profile = std::make_unique(""); + + Defer defer {[&]() { + MemInfo::notify_je_purge_dirty_pages(); + std::stringstream ss; + profile->pretty_print(&ss); + LOG(INFO) << fmt::format( + "[MemoryGC] end full GC, free Memory {}. cost(us): {}, details: {}", + PrettyPrinter::print(freed_mem, TUnit::BYTES), watch.elapsed_time() / 1000, + ss.str()); + }}; + + freed_mem += CacheManager::instance()->for_each_cache_prune_all(profile.get()); + MemInfo::notify_je_purge_dirty_pages(); + if (freed_mem > MemInfo::process_full_gc_size()) { + return true; + } + + if (config::enable_workload_group_memory_gc) { + RuntimeProfile* tg_profile = profile->create_child("WorkloadGroup", true, true); + freed_mem += tg_enable_overcommit_group_gc(MemInfo::process_full_gc_size() - freed_mem, + tg_profile, false); + if (freed_mem > MemInfo::process_full_gc_size()) { + return true; + } + } + + VLOG_NOTICE << MemTrackerLimiter::type_detail_usage( + "[MemoryGC] before free top memory query in full GC", MemTrackerLimiter::Type::QUERY); + RuntimeProfile* tmq_profile = profile->create_child("FreeTopMemoryQuery", true, true); + freed_mem += MemTrackerLimiter::free_top_memory_query( + MemInfo::process_full_gc_size() - freed_mem, mem_info, tmq_profile); + if (freed_mem > MemInfo::process_full_gc_size()) { + return true; + } + + if (config::enable_query_memory_overcommit) { + VLOG_NOTICE << MemTrackerLimiter::type_detail_usage( + "[MemoryGC] before free top memory overcommit load in full GC", + MemTrackerLimiter::Type::LOAD); + RuntimeProfile* tol_profile = + profile->create_child("FreeTopMemoryOvercommitLoad", true, true); + freed_mem += MemTrackerLimiter::free_top_overcommit_load( + MemInfo::process_full_gc_size() - freed_mem, mem_info, tol_profile); + if (freed_mem > MemInfo::process_full_gc_size()) { + return true; + } + } + + VLOG_NOTICE << MemTrackerLimiter::type_detail_usage( + "[MemoryGC] before free top memory load in full GC", MemTrackerLimiter::Type::LOAD); + RuntimeProfile* tml_profile = profile->create_child("FreeTopMemoryLoad", true, true); + freed_mem += MemTrackerLimiter::free_top_memory_load( + MemInfo::process_full_gc_size() - freed_mem, mem_info, tml_profile); + return freed_mem > MemInfo::process_full_gc_size(); +} + +int64_t MemoryArbitrator::tg_disable_overcommit_group_gc() { + MonotonicStopWatch watch; + watch.start(); + std::vector task_groups; + std::unique_ptr tg_profile = std::make_unique("WorkloadGroup"); + int64_t total_free_memory = 0; + + ExecEnv::GetInstance()->workload_group_mgr()->get_related_workload_groups( + [](const WorkloadGroupPtr& workload_group) { + return workload_group->is_mem_limit_valid() && + !workload_group->enable_memory_overcommit(); + }, + &task_groups); + if (task_groups.empty()) { + return 0; + } + + std::vector task_groups_overcommit; + for (const auto& workload_group : task_groups) { + if (workload_group->memory_used() > workload_group->memory_limit()) { + task_groups_overcommit.push_back(workload_group); + } + } + if (task_groups_overcommit.empty()) { + return 0; + } + + LOG(INFO) << fmt::format( + "[MemoryGC] start GC work load group that not enable overcommit, number of overcommit " + "group: {}, " + "if it exceeds the limit, try free size = (group used - group limit).", + task_groups_overcommit.size()); + + Defer defer {[&]() { + if (total_free_memory > 0) { + std::stringstream ss; + tg_profile->pretty_print(&ss); + LOG(INFO) << fmt::format( + "[MemoryGC] end GC work load group that not enable overcommit, number of " + "overcommit group: {}, free memory {}. cost(us): {}, details: {}", + task_groups_overcommit.size(), + PrettyPrinter::print(total_free_memory, TUnit::BYTES), + watch.elapsed_time() / 1000, ss.str()); + } + }}; + + for (const auto& workload_group : task_groups_overcommit) { + auto used = workload_group->memory_used(); + total_free_memory += workload_group->gc_memory(used - workload_group->memory_limit(), + tg_profile.get(), false); + } + return total_free_memory; +} + +int64_t MemoryArbitrator::tg_enable_overcommit_group_gc(int64_t request_free_memory, + RuntimeProfile* profile, bool is_minor_gc) { + MonotonicStopWatch watch; + watch.start(); + std::vector task_groups; + ExecEnv::GetInstance()->workload_group_mgr()->get_related_workload_groups( + [](const WorkloadGroupPtr& workload_group) { + return workload_group->is_mem_limit_valid() && + workload_group->enable_memory_overcommit(); + }, + &task_groups); + if (task_groups.empty()) { + return 0; + } + + int64_t total_exceeded_memory = 0; + std::vector used_memorys; + std::vector exceeded_memorys; + for (const auto& workload_group : task_groups) { + int64_t used_memory = workload_group->memory_used(); + int64_t exceeded = used_memory - workload_group->memory_limit(); + int64_t exceeded_memory = exceeded > 0 ? exceeded : 0; + total_exceeded_memory += exceeded_memory; + used_memorys.emplace_back(used_memory); + exceeded_memorys.emplace_back(exceeded_memory); + } + + int64_t total_free_memory = 0; + bool gc_all_exceeded = request_free_memory >= total_exceeded_memory; + std::string log_prefix = fmt::format( + "work load group that enable overcommit, number of group: {}, request_free_memory:{}, " + "total_exceeded_memory:{}", + task_groups.size(), request_free_memory, total_exceeded_memory); + if (gc_all_exceeded) { + LOG(INFO) << fmt::format( + "[MemoryGC] start GC {}, request more than exceeded, try free size = (group used - " + "group limit).", + log_prefix); + } else { + LOG(INFO) << fmt::format( + "[MemoryGC] start GC {}, request less than exceeded, try free size = ((group used " + "- group limit) / all group total_exceeded_memory) * request_free_memory.", + log_prefix); + } + + Defer defer {[&]() { + if (total_free_memory > 0) { + std::stringstream ss; + profile->pretty_print(&ss); + LOG(INFO) << fmt::format( + "[MemoryGC] end GC {}, free memory {}. cost(us): {}, details: {}", log_prefix, + PrettyPrinter::print(total_free_memory, TUnit::BYTES), + watch.elapsed_time() / 1000, ss.str()); + } + }}; + + for (int i = 0; i < task_groups.size(); ++i) { + if (exceeded_memorys[i] == 0) { + continue; + } + + // todo: GC according to resource group priority + auto tg_need_free_memory = int64_t( + gc_all_exceeded ? exceeded_memorys[i] + : static_cast(exceeded_memorys[i]) / total_exceeded_memory * + request_free_memory); // exceeded memory as a weight + auto workload_group = task_groups[i]; + total_free_memory += workload_group->gc_memory(tg_need_free_memory, profile, is_minor_gc); + } + return total_free_memory; +} + +} // namespace doris diff --git a/be/src/runtime/memory/memory_arbitrator.h b/be/src/runtime/memory/memory_arbitrator.h new file mode 100644 index 0000000000..2a936b8ba0 --- /dev/null +++ b/be/src/runtime/memory/memory_arbitrator.h @@ -0,0 +1,40 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include "runtime/memory/global_memory_arbitrator.h" + +namespace doris { + +class MemoryArbitrator { +public: + static bool process_minor_gc( + std::string mem_info = + doris::GlobalMemoryArbitrator::process_soft_limit_exceeded_errmsg_str()); + static bool process_full_gc( + std::string mem_info = + doris::GlobalMemoryArbitrator::process_limit_exceeded_errmsg_str()); + + static int64_t tg_disable_overcommit_group_gc(); + static int64_t tg_enable_overcommit_group_gc(int64_t request_free_memory, + RuntimeProfile* profile, bool is_minor_gc); + +private: +}; + +} // namespace doris diff --git a/be/src/runtime/workload_group/workload_group.cpp b/be/src/runtime/workload_group/workload_group.cpp index 5a57152801..f1c49d9763 100644 --- a/be/src/runtime/workload_group/workload_group.cpp +++ b/be/src/runtime/workload_group/workload_group.cpp @@ -173,18 +173,20 @@ int64_t WorkloadGroup::gc_memory(int64_t need_free_mem, RuntimeProfile* profile, MemTracker::print_bytes(_memory_limit), BackendOptions::get_localhost()); } } - std::string process_mem_usage_str = GlobalMemoryArbitrator::process_mem_log_str(); - auto cancel_top_overcommit_str = [cancel_str, process_mem_usage_str](int64_t mem_consumption, - const std::string& label) { + auto cancel_top_overcommit_str = [cancel_str](int64_t mem_consumption, + const std::string& label) { return fmt::format( - "{} cancel top memory overcommit tracker <{}> consumption {}. details:{}", - cancel_str, label, MemTracker::print_bytes(mem_consumption), process_mem_usage_str); + "{} cancel top memory overcommit tracker <{}> consumption {}. details:{}, Execute " + "again after enough memory, details see be.INFO.", + cancel_str, label, MemTracker::print_bytes(mem_consumption), + GlobalMemoryArbitrator::process_limit_exceeded_errmsg_str()); }; - auto cancel_top_usage_str = [cancel_str, process_mem_usage_str](int64_t mem_consumption, - const std::string& label) { - return fmt::format("{} cancel top memory used tracker <{}> consumption {}. details:{}", - cancel_str, label, MemTracker::print_bytes(mem_consumption), - process_mem_usage_str); + auto cancel_top_usage_str = [cancel_str](int64_t mem_consumption, const std::string& label) { + return fmt::format( + "{} cancel top memory used tracker <{}> consumption {}. details:{}, Execute again " + "after enough memory, details see be.INFO.", + cancel_str, label, MemTracker::print_bytes(mem_consumption), + GlobalMemoryArbitrator::process_soft_limit_exceeded_errmsg_str()); }; LOG(INFO) << fmt::format( diff --git a/be/src/runtime/workload_group/workload_group_manager.cpp b/be/src/runtime/workload_group/workload_group_manager.cpp index dc4c73782e..7a93015030 100644 --- a/be/src/runtime/workload_group/workload_group_manager.cpp +++ b/be/src/runtime/workload_group/workload_group_manager.cpp @@ -193,14 +193,11 @@ void WorkloadGroupMgr::refresh_wg_memory_info() { // we count these cache memories equally on workload groups. double ratio = (double)proc_vm_rss / (double)all_queries_mem_used; if (ratio <= 1.25) { - std::string debug_msg = fmt::format( - "\nProcess Memory Summary: process_vm_rss: {}, process mem: {}, sys mem available: " - "{}, all quries mem: {}", - PrettyPrinter::print(proc_vm_rss, TUnit::BYTES), - PrettyPrinter::print(doris::GlobalMemoryArbitrator::process_memory_usage(), - TUnit::BYTES), - doris::MemInfo::sys_mem_available_str(), - PrettyPrinter::print(all_queries_mem_used, TUnit::BYTES)); + std::string debug_msg = + fmt::format("\nProcess Memory Summary: {}, {}, all quries mem: {}", + doris::GlobalMemoryArbitrator::process_memory_used_details_str(), + doris::GlobalMemoryArbitrator::sys_mem_available_details_str(), + PrettyPrinter::print(all_queries_mem_used, TUnit::BYTES)); LOG_EVERY_T(INFO, 10) << debug_msg; } diff --git a/be/src/util/mem_info.cpp b/be/src/util/mem_info.cpp index a3d391a00b..45e609d710 100644 --- a/be/src/util/mem_info.cpp +++ b/be/src/util/mem_info.cpp @@ -39,33 +39,20 @@ #include "common/config.h" #include "common/status.h" #include "gutil/strings/split.h" -#include "runtime/exec_env.h" -#include "runtime/memory/cache_manager.h" -#include "runtime/memory/mem_tracker_limiter.h" -#include "runtime/workload_group/workload_group.h" -#include "runtime/workload_group/workload_group_manager.h" #include "util/cgroup_util.h" -#include "util/defer_op.h" #include "util/parse_util.h" #include "util/pretty_printer.h" -#include "util/runtime_profile.h" -#include "util/stopwatch.hpp" #include "util/string_parser.hpp" namespace doris { -bvar::PassiveStatus g_sys_mem_avail( - "meminfo_sys_mem_avail", [](void*) { return MemInfo::sys_mem_available(); }, nullptr); - bool MemInfo::_s_initialized = false; std::atomic MemInfo::_s_physical_mem = std::numeric_limits::max(); std::atomic MemInfo::_s_mem_limit = std::numeric_limits::max(); std::atomic MemInfo::_s_soft_mem_limit = std::numeric_limits::max(); std::atomic MemInfo::_s_allocator_cache_mem = 0; -std::string MemInfo::_s_allocator_cache_mem_str = ""; std::atomic MemInfo::_s_virtual_memory_used = 0; -std::atomic MemInfo::refresh_interval_memory_growth = 0; int64_t MemInfo::_s_cgroup_mem_limit = std::numeric_limits::max(); int64_t MemInfo::_s_cgroup_mem_usage = std::numeric_limits::min(); @@ -99,9 +86,6 @@ void MemInfo::refresh_allocator_mem() { get_je_metrics("stats.metadata") + get_je_all_arena_metrics("pdirty") * get_page_size(), std::memory_order_relaxed); - _s_allocator_cache_mem_str = PrettyPrinter::print( - static_cast(_s_allocator_cache_mem.load(std::memory_order_relaxed)), - TUnit::BYTES); _s_virtual_memory_used.store(get_je_metrics("stats.mapped"), std::memory_order_relaxed); #else _s_allocator_cache_mem.store(get_tc_metrics("tcmalloc.pageheap_free_bytes") + @@ -109,265 +93,12 @@ void MemInfo::refresh_allocator_mem() { get_tc_metrics("tcmalloc.transfer_cache_free_bytes") + get_tc_metrics("tcmalloc.thread_cache_free_bytes"), std::memory_order_relaxed); - _s_allocator_cache_mem_str = PrettyPrinter::print( - static_cast(_s_allocator_cache_mem.load(std::memory_order_relaxed)), - TUnit::BYTES); _s_virtual_memory_used.store(get_tc_metrics("generic.total_physical_bytes") + get_tc_metrics("tcmalloc.pageheap_unmapped_bytes"), std::memory_order_relaxed); #endif } -// step1: free all cache -// step2: free resource groups memory that enable overcommit -// step3: free global top overcommit query, if enable query memory overcommit -// TODO Now, the meaning is different from java minor gc + full gc, more like small gc + large gc. -bool MemInfo::process_minor_gc() { - MonotonicStopWatch watch; - watch.start(); - int64_t freed_mem = 0; - std::unique_ptr profile = std::make_unique(""); - std::string pre_vm_rss = PerfCounters::get_vm_rss_str(); - std::string pre_sys_mem_available = MemInfo::sys_mem_available_str(); - - Defer defer {[&]() { - MemInfo::notify_je_purge_dirty_pages(); - std::stringstream ss; - profile->pretty_print(&ss); - LOG(INFO) << fmt::format( - "[MemoryGC] end minor GC, free memory {}. cost(us): {}, details: {}", - PrettyPrinter::print(freed_mem, TUnit::BYTES), watch.elapsed_time() / 1000, - ss.str()); - }}; - - freed_mem += CacheManager::instance()->for_each_cache_prune_stale(profile.get()); - MemInfo::notify_je_purge_dirty_pages(); - if (freed_mem > MemInfo::process_minor_gc_size()) { - return true; - } - - if (config::enable_workload_group_memory_gc) { - RuntimeProfile* tg_profile = profile->create_child("WorkloadGroup", true, true); - freed_mem += tg_enable_overcommit_group_gc(MemInfo::process_minor_gc_size() - freed_mem, - tg_profile, true); - if (freed_mem > MemInfo::process_minor_gc_size()) { - return true; - } - } - - if (config::enable_query_memory_overcommit) { - VLOG_NOTICE << MemTrackerLimiter::type_detail_usage( - "[MemoryGC] before free top memory overcommit query in minor GC", - MemTrackerLimiter::Type::QUERY); - RuntimeProfile* toq_profile = - profile->create_child("FreeTopOvercommitMemoryQuery", true, true); - freed_mem += MemTrackerLimiter::free_top_overcommit_query( - MemInfo::process_minor_gc_size() - freed_mem, pre_vm_rss, pre_sys_mem_available, - toq_profile); - if (freed_mem > MemInfo::process_minor_gc_size()) { - return true; - } - } - return false; -} - -// step1: free all cache -// step2: free resource groups memory that enable overcommit -// step3: free global top memory query -// step4: free top overcommit load, load retries are more expensive, So cancel at the end. -// step5: free top memory load -bool MemInfo::process_full_gc() { - MonotonicStopWatch watch; - watch.start(); - int64_t freed_mem = 0; - std::unique_ptr profile = std::make_unique(""); - std::string pre_vm_rss = PerfCounters::get_vm_rss_str(); - std::string pre_sys_mem_available = MemInfo::sys_mem_available_str(); - - Defer defer {[&]() { - MemInfo::notify_je_purge_dirty_pages(); - std::stringstream ss; - profile->pretty_print(&ss); - LOG(INFO) << fmt::format( - "[MemoryGC] end full GC, free Memory {}. cost(us): {}, details: {}", - PrettyPrinter::print(freed_mem, TUnit::BYTES), watch.elapsed_time() / 1000, - ss.str()); - }}; - - freed_mem += CacheManager::instance()->for_each_cache_prune_all(profile.get()); - MemInfo::notify_je_purge_dirty_pages(); - if (freed_mem > MemInfo::process_full_gc_size()) { - return true; - } - - if (config::enable_workload_group_memory_gc) { - RuntimeProfile* tg_profile = profile->create_child("WorkloadGroup", true, true); - freed_mem += tg_enable_overcommit_group_gc(MemInfo::process_full_gc_size() - freed_mem, - tg_profile, false); - if (freed_mem > MemInfo::process_full_gc_size()) { - return true; - } - } - - VLOG_NOTICE << MemTrackerLimiter::type_detail_usage( - "[MemoryGC] before free top memory query in full GC", MemTrackerLimiter::Type::QUERY); - RuntimeProfile* tmq_profile = profile->create_child("FreeTopMemoryQuery", true, true); - freed_mem += MemTrackerLimiter::free_top_memory_query( - MemInfo::process_full_gc_size() - freed_mem, pre_vm_rss, pre_sys_mem_available, - tmq_profile); - if (freed_mem > MemInfo::process_full_gc_size()) { - return true; - } - - if (config::enable_query_memory_overcommit) { - VLOG_NOTICE << MemTrackerLimiter::type_detail_usage( - "[MemoryGC] before free top memory overcommit load in full GC", - MemTrackerLimiter::Type::LOAD); - RuntimeProfile* tol_profile = - profile->create_child("FreeTopMemoryOvercommitLoad", true, true); - freed_mem += MemTrackerLimiter::free_top_overcommit_load( - MemInfo::process_full_gc_size() - freed_mem, pre_vm_rss, pre_sys_mem_available, - tol_profile); - if (freed_mem > MemInfo::process_full_gc_size()) { - return true; - } - } - - VLOG_NOTICE << MemTrackerLimiter::type_detail_usage( - "[MemoryGC] before free top memory load in full GC", MemTrackerLimiter::Type::LOAD); - RuntimeProfile* tml_profile = profile->create_child("FreeTopMemoryLoad", true, true); - freed_mem += - MemTrackerLimiter::free_top_memory_load(MemInfo::process_full_gc_size() - freed_mem, - pre_vm_rss, pre_sys_mem_available, tml_profile); - return freed_mem > MemInfo::process_full_gc_size(); -} - -int64_t MemInfo::tg_disable_overcommit_group_gc() { - MonotonicStopWatch watch; - watch.start(); - std::vector task_groups; - std::unique_ptr tg_profile = std::make_unique("WorkloadGroup"); - int64_t total_free_memory = 0; - - ExecEnv::GetInstance()->workload_group_mgr()->get_related_workload_groups( - [](const WorkloadGroupPtr& workload_group) { - return workload_group->is_mem_limit_valid() && - !workload_group->enable_memory_overcommit(); - }, - &task_groups); - if (task_groups.empty()) { - return 0; - } - - std::vector task_groups_overcommit; - for (const auto& workload_group : task_groups) { - if (workload_group->memory_used() > workload_group->memory_limit()) { - task_groups_overcommit.push_back(workload_group); - } - } - if (task_groups_overcommit.empty()) { - return 0; - } - - LOG(INFO) << fmt::format( - "[MemoryGC] start GC work load group that not enable overcommit, number of overcommit " - "group: {}, " - "if it exceeds the limit, try free size = (group used - group limit).", - task_groups_overcommit.size()); - - Defer defer {[&]() { - if (total_free_memory > 0) { - std::stringstream ss; - tg_profile->pretty_print(&ss); - LOG(INFO) << fmt::format( - "[MemoryGC] end GC work load group that not enable overcommit, number of " - "overcommit group: {}, free memory {}. cost(us): {}, details: {}", - task_groups_overcommit.size(), - PrettyPrinter::print(total_free_memory, TUnit::BYTES), - watch.elapsed_time() / 1000, ss.str()); - } - }}; - - for (const auto& workload_group : task_groups_overcommit) { - auto used = workload_group->memory_used(); - total_free_memory += workload_group->gc_memory(used - workload_group->memory_limit(), - tg_profile.get(), false); - } - return total_free_memory; -} - -int64_t MemInfo::tg_enable_overcommit_group_gc(int64_t request_free_memory, RuntimeProfile* profile, - bool is_minor_gc) { - MonotonicStopWatch watch; - watch.start(); - std::vector task_groups; - ExecEnv::GetInstance()->workload_group_mgr()->get_related_workload_groups( - [](const WorkloadGroupPtr& workload_group) { - return workload_group->is_mem_limit_valid() && - workload_group->enable_memory_overcommit(); - }, - &task_groups); - if (task_groups.empty()) { - return 0; - } - - int64_t total_exceeded_memory = 0; - std::vector used_memorys; - std::vector exceeded_memorys; - for (const auto& workload_group : task_groups) { - int64_t used_memory = workload_group->memory_used(); - int64_t exceeded = used_memory - workload_group->memory_limit(); - int64_t exceeded_memory = exceeded > 0 ? exceeded : 0; - total_exceeded_memory += exceeded_memory; - used_memorys.emplace_back(used_memory); - exceeded_memorys.emplace_back(exceeded_memory); - } - - int64_t total_free_memory = 0; - bool gc_all_exceeded = request_free_memory >= total_exceeded_memory; - std::string log_prefix = fmt::format( - "work load group that enable overcommit, number of group: {}, request_free_memory:{}, " - "total_exceeded_memory:{}", - task_groups.size(), request_free_memory, total_exceeded_memory); - if (gc_all_exceeded) { - LOG(INFO) << fmt::format( - "[MemoryGC] start GC {}, request more than exceeded, try free size = (group used - " - "group limit).", - log_prefix); - } else { - LOG(INFO) << fmt::format( - "[MemoryGC] start GC {}, request less than exceeded, try free size = ((group used " - "- group limit) / all group total_exceeded_memory) * request_free_memory.", - log_prefix); - } - - Defer defer {[&]() { - if (total_free_memory > 0) { - std::stringstream ss; - profile->pretty_print(&ss); - LOG(INFO) << fmt::format( - "[MemoryGC] end GC {}, free memory {}. cost(us): {}, details: {}", log_prefix, - PrettyPrinter::print(total_free_memory, TUnit::BYTES), - watch.elapsed_time() / 1000, ss.str()); - } - }}; - - for (int i = 0; i < task_groups.size(); ++i) { - if (exceeded_memorys[i] == 0) { - continue; - } - - // todo: GC according to resource group priority - auto tg_need_free_memory = int64_t( - gc_all_exceeded ? exceeded_memorys[i] - : static_cast(exceeded_memorys[i]) / total_exceeded_memory * - request_free_memory); // exceeded memory as a weight - auto workload_group = task_groups[i]; - total_free_memory += workload_group->gc_memory(tg_need_free_memory, profile, is_minor_gc); - } - return total_free_memory; -} - #ifndef __APPLE__ void MemInfo::refresh_proc_meminfo() { std::ifstream meminfo("/proc/meminfo", std::ios::in); @@ -546,13 +277,15 @@ void MemInfo::init() { getline(vminfo, line); boost::algorithm::trim(line); StringParser::ParseResult result; - int64_t mem_value = StringParser::string_to_int(line.data(), line.size(), &result); + auto mem_value = StringParser::string_to_int(line.data(), line.size(), &result); if (result == StringParser::PARSE_SUCCESS) { _s_vm_min_free_kbytes = mem_value * 1024L; } } - if (vminfo.is_open()) vminfo.close(); + if (vminfo.is_open()) { + vminfo.close(); + } // Redhat 4.x OS, `/proc/meminfo` has no `MemAvailable`. if (_mem_info_bytes.find("MemAvailable") != _mem_info_bytes.end()) { @@ -576,7 +309,9 @@ void MemInfo::init() { std::string hugepage_enable; // If file not exist, getline returns an empty string. getline(sys_transparent_hugepage, hugepage_enable); - if (sys_transparent_hugepage.is_open()) sys_transparent_hugepage.close(); + if (sys_transparent_hugepage.is_open()) { + sys_transparent_hugepage.close(); + } if (hugepage_enable == "[always] madvise never") { std::cout << "[WARNING!] /sys/kernel/mm/transparent_hugepage/enabled: " << hugepage_enable << ", Doris not recommend turning on THP, which may cause the BE process to use " @@ -591,7 +326,9 @@ void MemInfo::init() { std::ifstream sys_vm("/proc/sys/vm/overcommit_memory", std::ios::in); std::string vm_overcommit; getline(sys_vm, vm_overcommit); - if (sys_vm.is_open()) sys_vm.close(); + if (sys_vm.is_open()) { + sys_vm.close(); + } if (!vm_overcommit.empty() && std::stoi(vm_overcommit) == 2) { std::cout << "[WARNING!] /proc/sys/vm/overcommit_memory: " << vm_overcommit << ", expect is 1, memory limit check is handed over to Doris Allocator, " @@ -632,12 +369,11 @@ void MemInfo::init() { std::string MemInfo::debug_string() { DCHECK(_s_initialized); - CGroupUtil util; std::stringstream stream; stream << "Physical Memory: " << PrettyPrinter::print(_s_physical_mem, TUnit::BYTES) << std::endl; stream << "Memory Limt: " << PrettyPrinter::print(_s_mem_limit, TUnit::BYTES) << std::endl; - stream << "CGroup Info: " << util.debug_string() << std::endl; + stream << "CGroup Info: " << doris::CGroupUtil::debug_string() << std::endl; return stream.str(); } diff --git a/be/src/util/mem_info.h b/be/src/util/mem_info.h index dc4b0e0d29..1b92d0eb9f 100644 --- a/be/src/util/mem_info.h +++ b/be/src/util/mem_info.h @@ -73,19 +73,6 @@ public: static void refresh_proc_meminfo(); - static inline int64_t sys_mem_available() { - return _s_sys_mem_available.load(std::memory_order_relaxed) - - refresh_interval_memory_growth; - } - static inline std::string sys_mem_available_str() { -#ifdef ADDRESS_SANITIZER - return "[ASAN]" + PrettyPrinter::print(_s_sys_mem_available.load(std::memory_order_relaxed), - TUnit::BYTES); -#else - return PrettyPrinter::print(_s_sys_mem_available.load(std::memory_order_relaxed), - TUnit::BYTES); -#endif - } static inline int64_t sys_mem_available_low_water_mark() { return _s_sys_mem_available_low_water_mark; } @@ -157,7 +144,6 @@ public: static inline size_t allocator_cache_mem() { return _s_allocator_cache_mem.load(std::memory_order_relaxed); } - static inline std::string allocator_cache_mem_str() { return _s_allocator_cache_mem_str; } // Tcmalloc property `generic.total_physical_bytes` records the total length of the virtual memory // obtained by the process malloc, not the physical memory actually used by the process in the OS. @@ -183,25 +169,15 @@ public: static std::string debug_string(); - static bool process_minor_gc(); - static bool process_full_gc(); - - static int64_t tg_disable_overcommit_group_gc(); - static int64_t tg_enable_overcommit_group_gc(int64_t request_free_memory, - RuntimeProfile* profile, bool is_minor_gc); - - // It is only used after the memory limit is exceeded. When multiple threads are waiting for the available memory of the process, - // avoid multiple threads starting at the same time and causing OOM. - static std::atomic refresh_interval_memory_growth; - private: + friend class GlobalMemoryArbitrator; + static bool _s_initialized; static std::atomic _s_physical_mem; static std::atomic _s_mem_limit; static std::atomic _s_soft_mem_limit; static std::atomic _s_allocator_cache_mem; - static std::string _s_allocator_cache_mem_str; static std::atomic _s_virtual_memory_used; static int64_t _s_cgroup_mem_limit; diff --git a/be/src/vec/common/allocator.cpp b/be/src/vec/common/allocator.cpp index 88c92e4bd8..39abfa3926 100644 --- a/be/src/vec/common/allocator.cpp +++ b/be/src/vec/common/allocator.cpp @@ -86,7 +86,7 @@ void Allocator::sys_memory_check(size_t while (wait_milliseconds < doris::config::thread_wait_gc_max_milliseconds) { std::this_thread::sleep_for(std::chrono::milliseconds(100)); if (!doris::GlobalMemoryArbitrator::is_exceed_hard_mem_limit(size)) { - doris::MemInfo::refresh_interval_memory_growth += size; + doris::GlobalMemoryArbitrator::refresh_interval_memory_growth += size; break; } if (doris::thread_context()->thread_mem_tracker_mgr->is_query_cancelled()) { @@ -189,13 +189,9 @@ template void Allocator::throw_bad_alloc( const std::string& err) const { LOG(WARNING) << err - << fmt::format( - " os physical memory {}. process memory used {}, sys available memory " - "{}, Stacktrace: {}", - doris::PrettyPrinter::print(doris::MemInfo::physical_mem(), - doris::TUnit::BYTES), - doris::PerfCounters::get_vm_rss_str(), - doris::MemInfo::sys_mem_available_str(), doris::get_stack_trace()); + << fmt::format("{}, Stacktrace: {}", + doris::GlobalMemoryArbitrator::process_mem_log_str(), + doris::get_stack_trace()); doris::MemTrackerLimiter::print_log_process_usage(); throw doris::Exception(doris::ErrorCode::MEM_ALLOC_FAILED, err); } diff --git a/be/src/vec/sink/writer/vtablet_writer.cpp b/be/src/vec/sink/writer/vtablet_writer.cpp index 5a061cc51d..60bd4eafa8 100644 --- a/be/src/vec/sink/writer/vtablet_writer.cpp +++ b/be/src/vec/sink/writer/vtablet_writer.cpp @@ -62,6 +62,7 @@ #include "exec/tablet_info.h" #include "runtime/descriptors.h" #include "runtime/exec_env.h" +#include "runtime/memory/memory_arbitrator.h" #include "runtime/runtime_state.h" #include "runtime/thread_context.h" #include "service/backend_options.h" @@ -554,7 +555,7 @@ Status VNodeChannel::add_block(vectorized::Block* block, const Payload* payload) int VNodeChannel::try_send_and_fetch_status(RuntimeState* state, std::unique_ptr& thread_pool_token) { DBUG_EXECUTE_IF("VNodeChannel.try_send_and_fetch_status_full_gc", - { MemInfo::process_full_gc(); }); + { MemoryArbitrator::process_full_gc(); }); if (_cancelled || _send_finished) { // not run return 0; @@ -875,7 +876,7 @@ void VNodeChannel::cancel(const std::string& cancel_msg) { } Status VNodeChannel::close_wait(RuntimeState* state) { - DBUG_EXECUTE_IF("VNodeChannel.close_wait_full_gc", { MemInfo::process_full_gc(); }); + DBUG_EXECUTE_IF("VNodeChannel.close_wait_full_gc", { MemoryArbitrator::process_full_gc(); }); SCOPED_CONSUME_MEM_TRACKER(_node_channel_tracker.get()); // set _is_closed to true finally Defer set_closed {[&]() {