diff --git a/be/src/common/config.h b/be/src/common/config.h index 1cf3807958..142731b537 100644 --- a/be/src/common/config.h +++ b/be/src/common/config.h @@ -74,6 +74,10 @@ CONF_Int64(max_sys_mem_available_low_water_mark_bytes, "1717986918"); // The size of the memory that gc wants to release each time, as a percentage of the mem limit. CONF_mString(process_minor_gc_size, "10%"); CONF_mString(process_full_gc_size, "20%"); +// Some caches have their own gc threads, such as segment cache. +// For caches that do not have a separate gc thread, perform regular gc in the memory maintenance thread. +// Currently only storage page cache, chunk allocator, more in the future. +CONF_mInt32(cache_gc_interval_s, "60"); // If true, when the process does not exceed the soft mem limit, the query memory will not be limited; // when the process memory exceeds the soft mem limit, the query with the largest ratio between the currently @@ -477,7 +481,7 @@ CONF_Bool(madvise_huge_pages, "false"); CONF_Bool(mmap_buffers, "false"); // Sleep time in milliseconds between memory maintenance iterations -CONF_mInt64(memory_maintenance_sleep_time_ms, "500"); +CONF_mInt32(memory_maintenance_sleep_time_ms, "500"); // Sleep time in milliseconds between load channel memory refresh iterations CONF_mInt64(load_channel_memory_refresh_sleep_time_ms, "100"); diff --git a/be/src/common/daemon.cpp b/be/src/common/daemon.cpp index 00dd7263c9..d6b6568e3b 100644 --- a/be/src/common/daemon.cpp +++ b/be/src/common/daemon.cpp @@ -165,7 +165,9 @@ void Daemon::tcmalloc_gc_thread() { } void Daemon::memory_maintenance_thread() { - int64_t interval_milliseconds = config::memory_maintenance_sleep_time_ms; + int32_t interval_milliseconds = config::memory_maintenance_sleep_time_ms; + int32_t cache_gc_interval_ms = config::cache_gc_interval_s * 1000; + int64_t cache_gc_freed_mem = 0; while (!_stop_background_threads_latch.wait_for( std::chrono::milliseconds(interval_milliseconds))) { if (!MemInfo::initialized()) { @@ -180,29 +182,49 @@ void Daemon::memory_maintenance_thread() { doris::MemInfo::refresh_allocator_mem(); #endif doris::MemInfo::refresh_proc_mem_no_allocator_cache(); - LOG_EVERY_N(INFO, 10) << MemTrackerLimiter::process_mem_log_str(); // Refresh mem tracker each type metrics. doris::MemTrackerLimiter::refresh_global_counter(); - if (doris::config::memory_debug) { - doris::MemTrackerLimiter::print_log_process_usage("memory_debug", false); - } - doris::MemTrackerLimiter::enable_print_log_process_usage(); // If system available memory is not enough, or the process memory exceeds the limit, reduce refresh interval. if (doris::MemInfo::sys_mem_available() < doris::MemInfo::sys_mem_available_low_water_mark() || doris::MemInfo::proc_mem_no_allocator_cache() >= doris::MemInfo::mem_limit()) { - interval_milliseconds = 100; - doris::MemInfo::process_full_gc(); + doris::MemTrackerLimiter::print_log_process_usage("process full gc", false); + interval_milliseconds = std::min(100, config::memory_maintenance_sleep_time_ms); + if (doris::MemInfo::process_full_gc()) { + // If there is not enough memory to be gc, the process memory usage will not be printed in the next continuous gc. + doris::MemTrackerLimiter::enable_print_log_process_usage(); + } + cache_gc_interval_ms = config::cache_gc_interval_s * 1000; } else if (doris::MemInfo::sys_mem_available() < doris::MemInfo::sys_mem_available_warning_water_mark() || doris::MemInfo::proc_mem_no_allocator_cache() >= doris::MemInfo::soft_mem_limit()) { - interval_milliseconds = 200; - doris::MemInfo::process_minor_gc(); + doris::MemTrackerLimiter::print_log_process_usage("process minor gc", false); + interval_milliseconds = std::min(200, config::memory_maintenance_sleep_time_ms); + if (doris::MemInfo::process_minor_gc()) { + doris::MemTrackerLimiter::enable_print_log_process_usage(); + } + cache_gc_interval_ms = config::cache_gc_interval_s * 1000; } else { + doris::MemTrackerLimiter::enable_print_log_process_usage(); interval_milliseconds = config::memory_maintenance_sleep_time_ms; + if (doris::config::memory_debug) { + LOG_EVERY_N(WARNING, 20) << doris::MemTrackerLimiter::log_process_usage_str( + "memory debug", false); // default 10s print once + } else { + LOG_EVERY_N(INFO, 10) + << MemTrackerLimiter::process_mem_log_str(); // default 5s print once + } + cache_gc_interval_ms -= interval_milliseconds; + if (cache_gc_interval_ms < 0) { + cache_gc_freed_mem = 0; + doris::MemInfo::process_cache_gc(cache_gc_freed_mem); + LOG(INFO) << fmt::format("Process regular GC Cache, Free Memory {} Bytes", + cache_gc_freed_mem); // default 6s print once + cache_gc_interval_ms = config::cache_gc_interval_s * 1000; + } } } } diff --git a/be/src/runtime/memory/mem_tracker_limiter.cpp b/be/src/runtime/memory/mem_tracker_limiter.cpp index 77126eba2d..938f72d2a0 100644 --- a/be/src/runtime/memory/mem_tracker_limiter.cpp +++ b/be/src/runtime/memory/mem_tracker_limiter.cpp @@ -23,6 +23,7 @@ #include #include "runtime/fragment_mgr.h" +#include "runtime/load_channel_mgr.h" #include "runtime/runtime_state.h" #include "runtime/thread_context.h" #include "util/pretty_printer.h" @@ -195,26 +196,35 @@ void MemTrackerLimiter::print_log_usage(const std::string& msg) { } } +std::string MemTrackerLimiter::log_process_usage_str(const std::string& msg, bool with_stacktrace) { + std::string detail = msg; + detail += "\nProcess Memory Summary:\n " + MemTrackerLimiter::process_mem_log_str(); + if (with_stacktrace) detail += "\nAlloc Stacktrace:\n" + get_stack_trace(); + std::vector snapshots; + MemTrackerLimiter::make_process_snapshots(&snapshots); + MemTrackerLimiter::make_type_snapshots(&snapshots, MemTrackerLimiter::Type::GLOBAL); + + // Add additional tracker printed when memory exceeds limit. + snapshots.emplace_back( + ExecEnv::GetInstance()->load_channel_mgr()->mem_tracker()->make_snapshot()); + + detail += "\nMemory Tracker Summary:"; + for (const auto& snapshot : snapshots) { + if (snapshot.label == "" && snapshot.parent_label == "") { + detail += "\n " + MemTrackerLimiter::type_log_usage(snapshot); + } else if (snapshot.parent_label == "") { + detail += "\n " + MemTrackerLimiter::log_usage(snapshot); + } else { + detail += "\n " + MemTracker::log_usage(snapshot); + } + } + return detail; +} + void MemTrackerLimiter::print_log_process_usage(const std::string& msg, bool with_stacktrace) { if (MemTrackerLimiter::_enable_print_log_process_usage) { MemTrackerLimiter::_enable_print_log_process_usage = false; - std::string detail = msg; - detail += "\nProcess Memory Summary:\n " + MemTrackerLimiter::process_mem_log_str(); - if (with_stacktrace) detail += "\nAlloc Stacktrace:\n" + get_stack_trace(); - std::vector snapshots; - MemTrackerLimiter::make_process_snapshots(&snapshots); - MemTrackerLimiter::make_type_snapshots(&snapshots, MemTrackerLimiter::Type::GLOBAL); - detail += "\nMemory Tracker Summary:"; - for (const auto& snapshot : snapshots) { - if (snapshot.label == "" && snapshot.parent_label == "") { - detail += "\n " + MemTrackerLimiter::type_log_usage(snapshot); - } else if (snapshot.parent_label == "") { - detail += "\n " + MemTrackerLimiter::log_usage(snapshot); - } else { - detail += "\n " + MemTracker::log_usage(snapshot); - } - } - LOG(WARNING) << detail; + LOG(WARNING) << log_process_usage_str(msg, with_stacktrace); } } @@ -252,6 +262,10 @@ int64_t MemTrackerLimiter::free_top_memory_query(int64_t min_free_mem, Type type int64_t freed_mem = 0; while (!min_pq.empty()) { TUniqueId cancelled_queryid = label_to_queryid(min_pq.top().second); + if (cancelled_queryid == TUniqueId()) { + min_pq.pop(); + continue; + } ExecEnv::GetInstance()->fragment_mgr()->cancel_query( cancelled_queryid, PPlanFragmentCancelReason::MEMORY_LIMIT_EXCEED, fmt::format("Process has no memory available, cancel top memory usage {}: " @@ -344,6 +358,10 @@ int64_t MemTrackerLimiter::free_top_overcommit_query(int64_t min_free_mem, Type int64_t freed_mem = 0; while (!max_pq.empty()) { TUniqueId cancelled_queryid = label_to_queryid(max_pq.top().second); + if (cancelled_queryid == TUniqueId()) { + max_pq.pop(); + continue; + } int64_t query_mem = query_consumption[max_pq.top().second]; ExecEnv::GetInstance()->fragment_mgr()->cancel_query( cancelled_queryid, PPlanFragmentCancelReason::MEMORY_LIMIT_EXCEED, @@ -354,7 +372,7 @@ int64_t MemTrackerLimiter::free_top_overcommit_query(int64_t min_free_mem, Type "details see be.INFO.", TypeString[type], TypeString[type], max_pq.top().second, print_bytes(query_mem), BackendOptions::get_localhost(), - PerfCounters::get_vm_rss_str(), print_bytes(MemInfo::soft_mem_limit()), + PerfCounters::get_vm_rss_str(), MemInfo::soft_mem_limit_str(), MemInfo::sys_mem_available_str(), print_bytes(MemInfo::sys_mem_available_warning_water_mark()))); diff --git a/be/src/runtime/memory/mem_tracker_limiter.h b/be/src/runtime/memory/mem_tracker_limiter.h index 9523dd12e8..2e059f8770 100644 --- a/be/src/runtime/memory/mem_tracker_limiter.h +++ b/be/src/runtime/memory/mem_tracker_limiter.h @@ -136,6 +136,7 @@ public: void print_log_usage(const std::string& msg); void enable_print_log_usage() { _enable_print_log_usage = true; } static void enable_print_log_process_usage() { _enable_print_log_process_usage = true; } + static std::string log_process_usage_str(const std::string& msg, bool with_stacktrace = true); static void print_log_process_usage(const std::string& msg, bool with_stacktrace = true); // Log the memory usage when memory limit is exceeded. @@ -157,6 +158,9 @@ public: } // only for Type::QUERY or Type::LOAD. static TUniqueId label_to_queryid(const std::string& label) { + if (label.rfind("Query#Id=", 0) != 0 && label.rfind("Load#Id=", 0) != 0) { + return TUniqueId(); + } auto queryid = split(label, "#Id=")[1]; TUniqueId querytid; parse_id(queryid, &querytid); @@ -165,12 +169,14 @@ public: static std::string process_mem_log_str() { return fmt::format( - "OS physical memory {}, process memory used {} limit {}, sys mem available {} low " - "water mark {}, refresh interval memory growth {} B", + "OS physical memory {}. Process memory usage {}, limit {}, soft limit {}. Sys " + "available memory {}, low water mark {}, warning water mark {}. Refresh interval " + "memory growth {} B", PrettyPrinter::print(MemInfo::physical_mem(), TUnit::BYTES), PerfCounters::get_vm_rss_str(), MemInfo::mem_limit_str(), - MemInfo::sys_mem_available_str(), + MemInfo::soft_mem_limit_str(), MemInfo::sys_mem_available_str(), PrettyPrinter::print(MemInfo::sys_mem_available_low_water_mark(), TUnit::BYTES), + PrettyPrinter::print(MemInfo::sys_mem_available_warning_water_mark(), TUnit::BYTES), MemInfo::refresh_interval_memory_growth); } diff --git a/be/src/util/mem_info.cpp b/be/src/util/mem_info.cpp index 82e4f125db..b7e68674e8 100644 --- a/be/src/util/mem_info.cpp +++ b/be/src/util/mem_info.cpp @@ -47,6 +47,7 @@ int64_t MemInfo::_s_physical_mem = -1; int64_t MemInfo::_s_mem_limit = -1; std::string MemInfo::_s_mem_limit_str = ""; int64_t MemInfo::_s_soft_mem_limit = -1; +std::string MemInfo::_s_soft_mem_limit_str = ""; int64_t MemInfo::_s_allocator_cache_mem = 0; std::string MemInfo::_s_allocator_cache_mem_str = ""; @@ -89,61 +90,66 @@ void MemInfo::refresh_allocator_mem() { #endif } +void MemInfo::process_cache_gc(int64_t& freed_mem) { + // TODO, free more cache, and should free a certain percentage of capacity, not all. + freed_mem += ChunkAllocator::instance()->mem_consumption(); + ChunkAllocator::instance()->clear(); + freed_mem += + StoragePageCache::instance()->get_page_cache_mem_consumption(segment_v2::DATA_PAGE); + StoragePageCache::instance()->prune(segment_v2::DATA_PAGE); +} + // step1: free all cache // step2: free top overcommit query, if enable query memroy overcommit -void MemInfo::process_minor_gc() { - // TODO, free more cache, and should free a certain percentage of capacity, not all. +bool MemInfo::process_minor_gc() { int64_t freed_mem = 0; Defer defer {[&]() { LOG(INFO) << fmt::format("Process Minor GC Free Memory {} Bytes", freed_mem); }}; - freed_mem += ChunkAllocator::instance()->mem_consumption(); - ChunkAllocator::instance()->clear(); + MemInfo::process_cache_gc(freed_mem); if (freed_mem > _s_process_minor_gc_size) { - return; + return true; } - freed_mem += - StoragePageCache::instance()->get_page_cache_mem_consumption(segment_v2::DATA_PAGE); - StoragePageCache::instance()->prune(segment_v2::DATA_PAGE); if (config::enable_query_memroy_overcommit) { freed_mem += MemTrackerLimiter::free_top_overcommit_query(_s_process_minor_gc_size - freed_mem); } + if (freed_mem > _s_process_minor_gc_size) { + return true; + } + return false; } // step1: free all cache // step2: free top memory query // step3: free top overcommit load, load retries are more expensive, So cancel at the end. // step4: free top memory load -void MemInfo::process_full_gc() { +bool MemInfo::process_full_gc() { int64_t freed_mem = 0; Defer defer { [&]() { LOG(INFO) << fmt::format("Process Full GC Free Memory {} Bytes", freed_mem); }}; - freed_mem += - StoragePageCache::instance()->get_page_cache_mem_consumption(segment_v2::DATA_PAGE); - StoragePageCache::instance()->prune(segment_v2::DATA_PAGE); + MemInfo::process_cache_gc(freed_mem); if (freed_mem > _s_process_full_gc_size) { - return; - } - freed_mem += ChunkAllocator::instance()->mem_consumption(); - ChunkAllocator::instance()->clear(); - if (freed_mem > _s_process_full_gc_size) { - return; + return true; } freed_mem += MemTrackerLimiter::free_top_memory_query(_s_process_full_gc_size - freed_mem); if (freed_mem > _s_process_full_gc_size) { - return; + return true; } if (config::enable_query_memroy_overcommit) { freed_mem += MemTrackerLimiter::free_top_overcommit_load(_s_process_full_gc_size - freed_mem); if (freed_mem > _s_process_full_gc_size) { - return; + return true; } } freed_mem += MemTrackerLimiter::free_top_memory_load(_s_process_full_gc_size - freed_mem); + if (freed_mem > _s_process_full_gc_size) { + return true; + } + return false; } #ifndef __APPLE__ @@ -203,6 +209,7 @@ void MemInfo::init() { } _s_mem_limit_str = PrettyPrinter::print(_s_mem_limit, TUnit::BYTES); _s_soft_mem_limit = _s_mem_limit * config::soft_mem_limit_frac; + _s_soft_mem_limit_str = PrettyPrinter::print(_s_soft_mem_limit, TUnit::BYTES); _s_process_minor_gc_size = ParseUtil::parse_mem_spec(config::process_minor_gc_size, -1, _s_mem_limit, &is_percent); @@ -239,7 +246,7 @@ void MemInfo::init() { config::max_sys_mem_available_low_water_mark_bytes); int64_t p2 = std::max(_s_vm_min_free_kbytes - _s_physical_mem * 0.01, 0); _s_sys_mem_available_low_water_mark = std::max(p1 - p2, 0); - _s_sys_mem_available_warning_water_mark = _s_sys_mem_available_low_water_mark + p1 * 2; + _s_sys_mem_available_warning_water_mark = _s_sys_mem_available_low_water_mark + p1; LOG(INFO) << "Physical Memory: " << PrettyPrinter::print(_s_physical_mem, TUnit::BYTES) << ", Mem Limit: " << _s_mem_limit_str @@ -264,6 +271,7 @@ void MemInfo::init() { _s_mem_limit = ParseUtil::parse_mem_spec(config::mem_limit, -1, _s_physical_mem, &is_percent); _s_mem_limit_str = PrettyPrinter::print(_s_mem_limit, TUnit::BYTES); _s_soft_mem_limit = _s_mem_limit * config::soft_mem_limit_frac; + _s_soft_mem_limit_str = PrettyPrinter::print(_s_soft_mem_limit, TUnit::BYTES); LOG(INFO) << "Physical Memory: " << PrettyPrinter::print(_s_physical_mem, TUnit::BYTES); _s_initialized = true; diff --git a/be/src/util/mem_info.h b/be/src/util/mem_info.h index bd76c6124c..2cb17eb6b8 100644 --- a/be/src/util/mem_info.h +++ b/be/src/util/mem_info.h @@ -111,11 +111,16 @@ public: DCHECK(_s_initialized); return _s_soft_mem_limit; } + static inline std::string soft_mem_limit_str() { + DCHECK(_s_initialized); + return _s_soft_mem_limit_str; + } static std::string debug_string(); - static void process_minor_gc(); - static void process_full_gc(); + static void process_cache_gc(int64_t& freed_mem); + static bool process_minor_gc(); + static bool process_full_gc(); // It is only used after the memory limit is exceeded. When multiple threads are waiting for the available memory of the process, // avoid multiple threads starting at the same time and causing OOM. @@ -127,6 +132,7 @@ private: static int64_t _s_mem_limit; static std::string _s_mem_limit_str; static int64_t _s_soft_mem_limit; + static std::string _s_soft_mem_limit_str; static int64_t _s_allocator_cache_mem; static std::string _s_allocator_cache_mem_str;