diff --git a/be/src/common/config.h b/be/src/common/config.h index 9a6750067a..c3ae42329a 100644 --- a/be/src/common/config.h +++ b/be/src/common/config.h @@ -58,6 +58,16 @@ CONF_String(memory_mode, "moderate"); // it will be set to physical memory size. CONF_String(mem_limit, "80%"); +// Soft memory limit as a fraction of hard memory limit. +CONF_Double(soft_mem_limit_frac, "0.9"); + +// The maximum low water mark of the system `/proc/meminfo/MemAvailable`, Unit byte, default 1.6G, +// actual low water mark=min(1.6G, MemTotal * 10%), avoid wasting too much memory on machines +// with large memory larger than 16G. +// Turn up max. On machines with more than 16G memory, more memory buffers will be reserved for Full GC. +// Turn down max. will use as much memory as possible. +CONF_Int64(max_sys_mem_available_low_water_mark_bytes, "1717986918"); + // the port heartbeat service used CONF_Int32(heartbeat_service_port, "9050"); // the count of heart beat service @@ -461,8 +471,11 @@ CONF_String(buffer_pool_limit, "20%"); // This is the percentage of buffer_pool_limit CONF_String(buffer_pool_clean_pages_limit, "50%"); -// Sleep time in seconds between memory maintenance iterations -CONF_mInt64(memory_maintenance_sleep_time_s, "10"); +// Sleep time in milliseconds between memory maintenance iterations +CONF_mInt64(memory_maintenance_sleep_time_ms, "500"); + +// Sleep time in milliseconds between load channel memory refresh iterations +CONF_mInt64(load_channel_memory_refresh_sleep_time_ms, "100"); // Alignment CONF_Int32(memory_max_alignment, "16"); @@ -594,9 +607,6 @@ CONF_Bool(ignore_load_tablet_failure, "false"); // Whether to continue to start be when load tablet from header failed. CONF_mBool(ignore_rowset_stale_unconsistent_delete, "false"); -// Soft memory limit as a fraction of hard memory limit. -CONF_Double(soft_mem_limit_frac, "0.9"); - // Set max cache's size of query results, the unit is M byte CONF_Int32(query_cache_max_size_mb, "256"); diff --git a/be/src/common/daemon.cpp b/be/src/common/daemon.cpp index 9a828b3d7a..d8e6ff0a69 100644 --- a/be/src/common/daemon.cpp +++ b/be/src/common/daemon.cpp @@ -51,6 +51,8 @@ #include "olap/options.h" #include "runtime/bufferpool/buffer_pool.h" #include "runtime/exec_env.h" +#include "runtime/fragment_mgr.h" +#include "runtime/load_channel_mgr.h" #include "runtime/memory/chunk_allocator.h" #include "runtime/user_function_cache.h" #include "util/cpu_info.h" @@ -180,9 +182,8 @@ void Daemon::tcmalloc_gc_thread() { #endif } -void Daemon::memory_maintenance_thread() { - while (!_stop_background_threads_latch.wait_for( - std::chrono::seconds(config::memory_maintenance_sleep_time_s))) { +void Daemon::buffer_pool_gc_thread() { + while (!_stop_background_threads_latch.wait_for(std::chrono::seconds(10))) { ExecEnv* env = ExecEnv::GetInstance(); // ExecEnv may not have been created yet or this may be the catalogd or statestored, // which don't have ExecEnvs. @@ -195,6 +196,53 @@ void Daemon::memory_maintenance_thread() { } } +void Daemon::memory_maintenance_thread() { + int64_t interval_milliseconds = config::memory_maintenance_sleep_time_ms; + while (!_stop_background_threads_latch.wait_for( + std::chrono::milliseconds(interval_milliseconds))) { + // Refresh process memory metrics. + doris::PerfCounters::refresh_proc_status(); + doris::MemInfo::refresh_proc_meminfo(); + + // Refresh allocator memory metrics. +#if !defined(ADDRESS_SANITIZER) && !defined(LEAK_SANITIZER) && !defined(THREAD_SANITIZER) + doris::MemInfo::refresh_allocator_mem(); +#endif + doris::MemInfo::refresh_proc_mem_no_allocator_cache(); + LOG_EVERY_N(INFO, 10) << MemTrackerLimiter::process_mem_log_str(); + + // Refresh mem tracker each type metrics. + doris::MemTrackerLimiter::refresh_global_counter(); + if (doris::config::memory_debug) { + doris::MemTrackerLimiter::print_log_process_usage("memory_debug", false); + } + doris::MemTrackerLimiter::enable_print_log_process_usage(); + + // If system available memory is not enough, or the process memory exceeds the limit, reduce refresh interval. + if (doris::MemInfo::sys_mem_available() < + doris::MemInfo::sys_mem_available_low_water_mark() || + doris::MemInfo::proc_mem_no_allocator_cache() >= doris::MemInfo::mem_limit()) { + interval_milliseconds = 100; + } else if (doris::MemInfo::sys_mem_available() < + doris::MemInfo::sys_mem_available_warning_water_mark() || + doris::MemInfo::proc_mem_no_allocator_cache() >= + doris::MemInfo::soft_mem_limit()) { + interval_milliseconds = 200; + } else { + interval_milliseconds = config::memory_maintenance_sleep_time_ms; + } + } +} + +void Daemon::load_channel_tracker_refresh_thread() { + // Refresh the memory statistics of the load channel tracker more frequently, + // which helps to accurately control the memory of LoadChannelMgr. + while (!_stop_background_threads_latch.wait_for( + std::chrono::seconds(config::load_channel_memory_refresh_sleep_time_ms))) { + doris::ExecEnv::GetInstance()->load_channel_mgr()->refresh_mem_tracker(); + } +} + /* * this thread will calculate some metrics at a fix interval(15 sec) * 1. push bytes per second @@ -359,10 +407,19 @@ void Daemon::start() { "Daemon", "tcmalloc_gc_thread", [this]() { this->tcmalloc_gc_thread(); }, &_tcmalloc_gc_thread); CHECK(st.ok()) << st.to_string(); + st = Thread::create( + "Daemon", "buffer_pool_gc_thread", [this]() { this->buffer_pool_gc_thread(); }, + &_buffer_pool_gc_thread); + CHECK(st.ok()) << st.to_string(); st = Thread::create( "Daemon", "memory_maintenance_thread", [this]() { this->memory_maintenance_thread(); }, &_memory_maintenance_thread); CHECK(st.ok()) << st.to_string(); + st = Thread::create( + "Daemon", "load_channel_tracker_refresh_thread", + [this]() { this->load_channel_tracker_refresh_thread(); }, + &_load_channel_tracker_refresh_thread); + CHECK(st.ok()) << st.to_string(); if (config::enable_metric_calculator) { CHECK(DorisMetrics::instance()->is_inited()) @@ -384,9 +441,15 @@ void Daemon::stop() { if (_tcmalloc_gc_thread) { _tcmalloc_gc_thread->join(); } + if (_buffer_pool_gc_thread) { + _buffer_pool_gc_thread->join(); + } if (_memory_maintenance_thread) { _memory_maintenance_thread->join(); } + if (_load_channel_tracker_refresh_thread) { + _load_channel_tracker_refresh_thread->join(); + } if (_calculate_metrics_thread) { _calculate_metrics_thread->join(); } diff --git a/be/src/common/daemon.h b/be/src/common/daemon.h index cd03625e4b..53cd925ef5 100644 --- a/be/src/common/daemon.h +++ b/be/src/common/daemon.h @@ -46,12 +46,17 @@ public: private: void tcmalloc_gc_thread(); + void buffer_pool_gc_thread(); void memory_maintenance_thread(); + void load_channel_tracker_refresh_thread(); void calculate_metrics_thread(); CountDownLatch _stop_background_threads_latch; scoped_refptr _tcmalloc_gc_thread; + // only buffer pool gc, will be removed after. + scoped_refptr _buffer_pool_gc_thread; scoped_refptr _memory_maintenance_thread; + scoped_refptr _load_channel_tracker_refresh_thread; scoped_refptr _calculate_metrics_thread; }; } // namespace doris diff --git a/be/src/olap/memtable.cpp b/be/src/olap/memtable.cpp index c89afe8169..892592b797 100644 --- a/be/src/olap/memtable.cpp +++ b/be/src/olap/memtable.cpp @@ -23,6 +23,7 @@ #include "olap/rowset/rowset_writer.h" #include "olap/schema.h" #include "olap/schema_change.h" +#include "runtime/load_channel_mgr.h" #include "runtime/tuple.h" #include "util/doris_metrics.h" #include "vec/aggregate_functions/aggregate_function_reader.h" @@ -53,8 +54,14 @@ MemTable::MemTable(TabletSharedPtr tablet, Schema* schema, const TabletSchema* t _delete_bitmap(delete_bitmap), _rowset_ids(rowset_ids), _cur_max_version(cur_max_version) { +#ifndef BE_TEST + _insert_mem_tracker_use_hook = std::make_unique( + fmt::format("MemTableHookInsert:TabletId={}", std::to_string(tablet_id())), nullptr, + ExecEnv::GetInstance()->load_channel_mgr()->mem_tracker_set()); +#else _insert_mem_tracker_use_hook = std::make_unique( fmt::format("MemTableHookInsert:TabletId={}", std::to_string(tablet_id()))); +#endif _buffer_mem_pool = std::make_unique(_insert_mem_tracker.get()); _table_mem_pool = std::make_unique(_insert_mem_tracker.get()); if (support_vec) { diff --git a/be/src/runtime/load_channel_mgr.cpp b/be/src/runtime/load_channel_mgr.cpp index d08104032d..b44fe23335 100644 --- a/be/src/runtime/load_channel_mgr.cpp +++ b/be/src/runtime/load_channel_mgr.cpp @@ -220,7 +220,7 @@ Status LoadChannelMgr::_start_load_channels_clean() { void LoadChannelMgr::_handle_mem_exceed_limit() { // Check the soft limit. DCHECK(_load_soft_mem_limit > 0); - int64_t process_mem_limit = MemInfo::mem_limit() * config::soft_mem_limit_frac; + int64_t process_mem_limit = MemInfo::soft_mem_limit(); if (_mem_tracker->consumption() < _load_soft_mem_limit && MemInfo::proc_mem_no_allocator_cache() < process_mem_limit) { return; diff --git a/be/src/runtime/load_channel_mgr.h b/be/src/runtime/load_channel_mgr.h index 2dcc79fab8..beaaf29a91 100644 --- a/be/src/runtime/load_channel_mgr.h +++ b/be/src/runtime/load_channel_mgr.h @@ -61,9 +61,11 @@ public: void refresh_mem_tracker() { int64_t mem_usage = 0; - std::lock_guard l(_lock); - for (auto& kv : _load_channels) { - mem_usage += kv.second->mem_consumption(); + { + std::lock_guard l(_lock); + for (auto& kv : _load_channels) { + mem_usage += kv.second->mem_consumption(); + } } _mem_tracker->set_consumption(mem_usage); } diff --git a/be/src/runtime/memory/jemalloc_hook.cpp b/be/src/runtime/memory/jemalloc_hook.cpp index 5bba0877a6..bf0c3e20bc 100644 --- a/be/src/runtime/memory/jemalloc_hook.cpp +++ b/be/src/runtime/memory/jemalloc_hook.cpp @@ -31,7 +31,7 @@ void* doris_malloc(size_t size) __THROW { TRY_CONSUME_MEM_TRACKER(je_nallocx(size, 0), nullptr); void* ptr = je_malloc(size); if (UNLIKELY(ptr == nullptr)) { - TRY_RELEASE_MEM_TRACKER(je_nallocx(size, 0)); + RELEASE_MEM_TRACKER(je_nallocx(size, 0)); } return ptr; } @@ -53,7 +53,7 @@ void* doris_realloc(void* p, size_t size) __THROW { TRY_CONSUME_MEM_TRACKER(je_nallocx(size, 0) - old_size, nullptr); void* ptr = je_realloc(p, size); if (UNLIKELY(ptr == nullptr)) { - TRY_RELEASE_MEM_TRACKER(je_nallocx(size, 0) - old_size); + RELEASE_MEM_TRACKER(je_nallocx(size, 0) - old_size); } return ptr; } @@ -66,7 +66,7 @@ void* doris_calloc(size_t n, size_t size) __THROW { TRY_CONSUME_MEM_TRACKER(n * size, nullptr); void* ptr = je_calloc(n, size); if (UNLIKELY(ptr == nullptr)) { - TRY_RELEASE_MEM_TRACKER(n * size); + RELEASE_MEM_TRACKER(n * size); } else { CONSUME_MEM_TRACKER(je_malloc_usable_size(ptr) - n * size); } @@ -82,7 +82,7 @@ void* doris_memalign(size_t align, size_t size) __THROW { TRY_CONSUME_MEM_TRACKER(size, nullptr); void* ptr = je_aligned_alloc(align, size); if (UNLIKELY(ptr == nullptr)) { - TRY_RELEASE_MEM_TRACKER(size); + RELEASE_MEM_TRACKER(size); } else { CONSUME_MEM_TRACKER(je_malloc_usable_size(ptr) - size); } @@ -93,7 +93,7 @@ void* doris_aligned_alloc(size_t align, size_t size) __THROW { TRY_CONSUME_MEM_TRACKER(size, nullptr); void* ptr = je_aligned_alloc(align, size); if (UNLIKELY(ptr == nullptr)) { - TRY_RELEASE_MEM_TRACKER(size); + RELEASE_MEM_TRACKER(size); } else { CONSUME_MEM_TRACKER(je_malloc_usable_size(ptr) - size); } @@ -104,7 +104,7 @@ void* doris_valloc(size_t size) __THROW { TRY_CONSUME_MEM_TRACKER(size, nullptr); void* ptr = je_valloc(size); if (UNLIKELY(ptr == nullptr)) { - TRY_RELEASE_MEM_TRACKER(size); + RELEASE_MEM_TRACKER(size); } else { CONSUME_MEM_TRACKER(je_malloc_usable_size(ptr) - size); } @@ -115,7 +115,7 @@ void* doris_pvalloc(size_t size) __THROW { TRY_CONSUME_MEM_TRACKER(size, nullptr); void* ptr = je_valloc(size); if (UNLIKELY(ptr == nullptr)) { - TRY_RELEASE_MEM_TRACKER(size); + RELEASE_MEM_TRACKER(size); } else { CONSUME_MEM_TRACKER(je_malloc_usable_size(ptr) - size); } @@ -126,7 +126,7 @@ int doris_posix_memalign(void** r, size_t align, size_t size) __THROW { TRY_CONSUME_MEM_TRACKER(size, ENOMEM); int ret = je_posix_memalign(r, align, size); if (UNLIKELY(ret != 0)) { - TRY_RELEASE_MEM_TRACKER(size); + RELEASE_MEM_TRACKER(size); } else { CONSUME_MEM_TRACKER(je_malloc_usable_size(*r) - size); } diff --git a/be/src/runtime/memory/mem_tracker_limiter.cpp b/be/src/runtime/memory/mem_tracker_limiter.cpp index c1ef640c7b..abb3d85d5c 100644 --- a/be/src/runtime/memory/mem_tracker_limiter.cpp +++ b/be/src/runtime/memory/mem_tracker_limiter.cpp @@ -178,9 +178,9 @@ void MemTrackerLimiter::print_log_usage(const std::string& msg) { if (_enable_print_log_usage) { _enable_print_log_usage = false; std::string detail = msg; - detail += "\n " + MemTrackerLimiter::process_mem_log_str(); - detail += "\n" + get_stack_trace(); - detail += "\n " + log_usage(); + detail += "\nProcess Memory Summary:\n " + MemTrackerLimiter::process_mem_log_str(); + detail += "\nAlloc Stacktrace:\n" + get_stack_trace(); + detail += "\nMemory Tracker Summary: " + log_usage(); std::string child_trackers_usage; std::vector snapshots; MemTracker::make_group_snapshot(&snapshots, _group_num, _label); @@ -222,7 +222,7 @@ std::string MemTrackerLimiter::mem_limit_exceeded(const std::string& msg, std::string detail = fmt::format( "Memory limit exceeded:, {}>, executing msg:<{}>. backend {} " "process memory used {}, limit {}. If query tracker exceed, `set " - "exec_mem_limit=8G` to change limit, details mem usage see be.INFO.", + "exec_mem_limit=8G` to change limit, details see be.INFO.", _label, limit_exceeded_errmsg, msg, BackendOptions::get_localhost(), PerfCounters::get_vm_rss_str(), MemInfo::mem_limit_str()); return detail; diff --git a/be/src/runtime/memory/mem_tracker_limiter.h b/be/src/runtime/memory/mem_tracker_limiter.h index 8685cdf953..dfba608144 100644 --- a/be/src/runtime/memory/mem_tracker_limiter.h +++ b/be/src/runtime/memory/mem_tracker_limiter.h @@ -144,6 +144,16 @@ public: Status fragment_mem_limit_exceeded(RuntimeState* state, const std::string& msg, int64_t failed_allocation_size = 0); + static std::string process_mem_log_str() { + return fmt::format( + "physical memory {}, process memory used {} limit {}, sys mem available {} low " + "water mark {}", + PrettyPrinter::print(MemInfo::physical_mem(), TUnit::BYTES), + PerfCounters::get_vm_rss_str(), MemInfo::mem_limit_str(), + MemInfo::sys_mem_available_str(), + PrettyPrinter::print(MemInfo::sys_mem_available_low_water_mark(), TUnit::BYTES)); + } + std::string debug_string() { std::stringstream msg; msg << "limit: " << _limit << "; " @@ -178,23 +188,12 @@ private: static std::string process_limit_exceeded_errmsg_str(int64_t bytes) { return fmt::format( - "process memory used {} exceed limit {} or sys mem available {} less than min " - "reserve {}, failed " - "alloc size {}, tc/jemalloc allocator cache {}", + "process memory used {} exceed limit {} or sys mem available {} less than low " + "water mark {}, failed alloc size {}", PerfCounters::get_vm_rss_str(), MemInfo::mem_limit_str(), MemInfo::sys_mem_available_str(), PrettyPrinter::print(MemInfo::sys_mem_available_low_water_mark(), TUnit::BYTES), - print_bytes(bytes), MemInfo::allocator_cache_mem_str()); - } - - static std::string process_mem_log_str() { - return fmt::format( - "process memory used {} limit {}, sys mem available {} min reserve {}, tc/jemalloc " - "allocator cache {}", - PerfCounters::get_vm_rss_str(), MemInfo::mem_limit_str(), - MemInfo::sys_mem_available_str(), - PrettyPrinter::print(MemInfo::sys_mem_available_low_water_mark(), TUnit::BYTES), - MemInfo::allocator_cache_mem_str()); + print_bytes(bytes)); } private: @@ -239,6 +238,11 @@ inline bool MemTrackerLimiter::try_consume(int64_t bytes, std::string& failed_ms failed_msg = std::string(); return true; } + + if (config::memory_debug && bytes > 1073741824) { // 1G + print_log_process_usage(fmt::format("Alloc Large Memory, Try Alloc: {}", bytes)); + } + if (sys_mem_exceed_limit_check(bytes)) { failed_msg = process_limit_exceeded_errmsg_str(bytes); return false; diff --git a/be/src/runtime/memory/system_allocator.cpp b/be/src/runtime/memory/system_allocator.cpp index c0721a046e..34e379d9f1 100644 --- a/be/src/runtime/memory/system_allocator.cpp +++ b/be/src/runtime/memory/system_allocator.cpp @@ -46,7 +46,7 @@ uint8_t* SystemAllocator::allocate_via_malloc(size_t length) { auto err = fmt::format("fail to allocate mem via posix_memalign, res={}, errmsg={}.", res, strerror_r(res, buf, 64)); LOG(ERROR) << err; - if (enable_thread_cache_bad_alloc) throw std::bad_alloc {}; + if (enable_thread_catch_bad_alloc) throw std::bad_alloc {}; MemTrackerLimiter::print_log_process_usage(err); return nullptr; } diff --git a/be/src/runtime/memory/thread_mem_tracker_mgr.cpp b/be/src/runtime/memory/thread_mem_tracker_mgr.cpp index ac853d7c87..f197cad79a 100644 --- a/be/src/runtime/memory/thread_mem_tracker_mgr.cpp +++ b/be/src/runtime/memory/thread_mem_tracker_mgr.cpp @@ -27,36 +27,38 @@ void ThreadMemTrackerMgr::attach_limiter_tracker( const std::shared_ptr& mem_tracker, const TUniqueId& fragment_instance_id) { DCHECK(mem_tracker); - flush_untracked_mem(); + flush_untracked_mem(); _fragment_instance_id = fragment_instance_id; _limiter_tracker = mem_tracker; _limiter_tracker_raw = mem_tracker.get(); + _check_limit = true; } void ThreadMemTrackerMgr::detach_limiter_tracker( const std::shared_ptr& old_mem_tracker) { - flush_untracked_mem(); + flush_untracked_mem(); _fragment_instance_id = TUniqueId(); _limiter_tracker = old_mem_tracker; _limiter_tracker_raw = old_mem_tracker.get(); } -void ThreadMemTrackerMgr::exceeded_cancel_task(const std::string& cancel_details) { - if (_fragment_instance_id != TUniqueId()) { - ExecEnv::GetInstance()->fragment_mgr()->cancel( - _fragment_instance_id, PPlanFragmentCancelReason::MEMORY_LIMIT_EXCEED, - cancel_details); - } +void ThreadMemTrackerMgr::cancel_fragment() { + ExecEnv::GetInstance()->fragment_mgr()->cancel(_fragment_instance_id, + PPlanFragmentCancelReason::MEMORY_LIMIT_EXCEED, + _exceed_mem_limit_msg); + _check_limit = false; // Make sure it will only be canceled once } void ThreadMemTrackerMgr::exceeded() { if (_cb_func != nullptr) { _cb_func(); } + _limiter_tracker_raw->print_log_usage(_exceed_mem_limit_msg); + if (is_attach_query()) { - exceeded_cancel_task(_exceed_mem_limit_msg); + // TODO wait gc + cancel_fragment(); } - _check_limit = false; // Make sure it will only be canceled once } } // namespace doris diff --git a/be/src/runtime/memory/thread_mem_tracker_mgr.h b/be/src/runtime/memory/thread_mem_tracker_mgr.h index 27c2f892e1..86a5c51d6e 100644 --- a/be/src/runtime/memory/thread_mem_tracker_mgr.h +++ b/be/src/runtime/memory/thread_mem_tracker_mgr.h @@ -36,7 +36,7 @@ public: ~ThreadMemTrackerMgr() { // if _init == false, exec env is not initialized when init(). and never consumed mem tracker once. - if (_init) flush_untracked_mem(); + if (_init) flush_untracked_mem(); } // only for memory hook @@ -67,7 +67,7 @@ public: } int64_t stop_count_scope_mem() { - flush_untracked_mem(); + flush_untracked_mem(); _count_scope_mem = false; return _scope_mem; } @@ -77,11 +77,17 @@ public: // Note that, If call the memory allocation operation in Memory Hook, // such as calling LOG/iostream/sstream/stringstream/etc. related methods, // must increase the control to avoid entering infinite recursion, otherwise it may cause crash or stuck, + // Returns whether the memory exceeds limit, and will consume mem trcker no matter whether the limit is exceeded. void consume(int64_t size); + // If the memory exceeds the limit, return false, and will not consume mem tracker. bool try_consume(int64_t size); - template - void flush_untracked_mem(); + // Force is equal to false. When the memory exceeds the limit,this alloc will be terminated and false + // will be returned. + // Force is equal to true, even if the memory is found to be overrun, continue to consume mem tracker, + // because this time alloc will still actually allocate memory, and always return true. + template + bool flush_untracked_mem(); bool is_attach_query() { return _fragment_instance_id != TUniqueId(); } @@ -112,14 +118,12 @@ public: } private: - // If tryConsume fails due to task mem tracker exceeding the limit, the task must be canceled - void exceeded_cancel_task(const std::string& cancel_details); - + void cancel_fragment(); void exceeded(); void save_exceed_mem_limit_msg() { _exceed_mem_limit_msg = _limiter_tracker_raw->mem_limit_exceeded( - fmt::format("execute:<{}>", last_consumer_tracker()), _bad_consume_msg); + fmt::format("execute:<{}>", last_consumer_tracker()), _failed_consume_msg); } private: @@ -132,7 +136,7 @@ private: bool _count_scope_mem = false; int64_t _scope_mem = 0; - std::string _bad_consume_msg = std::string(); + std::string _failed_consume_msg = std::string(); std::string _exceed_mem_limit_msg = std::string(); std::shared_ptr _limiter_tracker; @@ -181,15 +185,29 @@ inline void ThreadMemTrackerMgr::consume(int64_t size) { _untracked_mem <= -config::mem_tracker_consume_min_size_bytes) && !_stop_consume && ExecEnv::GetInstance()->initialized()) { if (_check_limit) { - flush_untracked_mem(); + flush_untracked_mem(); } else { - flush_untracked_mem(); + flush_untracked_mem(); } } } -template -inline void ThreadMemTrackerMgr::flush_untracked_mem() { +inline bool ThreadMemTrackerMgr::try_consume(int64_t size) { + _untracked_mem += size; + if ((_untracked_mem >= config::mem_tracker_consume_min_size_bytes || + _untracked_mem <= -config::mem_tracker_consume_min_size_bytes) && + !_stop_consume && ExecEnv::GetInstance()->initialized()) { + if (_check_limit) { + return flush_untracked_mem(); + } else { + return flush_untracked_mem(); + } + } + return true; +} + +template +inline bool ThreadMemTrackerMgr::flush_untracked_mem() { // Temporary memory may be allocated during the consumption of the mem tracker, which will lead to entering // the Memory Hook again, so suspend consumption to avoid falling into an infinite loop. _stop_consume = true; @@ -198,13 +216,11 @@ inline void ThreadMemTrackerMgr::flush_untracked_mem() { old_untracked_mem = _untracked_mem; if (_count_scope_mem) _scope_mem += _untracked_mem; if (CheckLimit) { - if (!_limiter_tracker_raw->try_consume(old_untracked_mem, _bad_consume_msg)) { - // The memory has been allocated, so when TryConsume fails, need to continue to complete - // the consume to ensure the accuracy of the statistics. - _limiter_tracker_raw->consume(old_untracked_mem); + if (!_limiter_tracker_raw->try_consume(old_untracked_mem, _failed_consume_msg)) { + if (Force) _limiter_tracker_raw->consume(old_untracked_mem); save_exceed_mem_limit_msg(); - _limiter_tracker_raw->print_log_usage(_exceed_mem_limit_msg); exceeded(); + if (!Force) return false; } } else { _limiter_tracker_raw->consume(old_untracked_mem); @@ -214,24 +230,7 @@ inline void ThreadMemTrackerMgr::flush_untracked_mem() { } _untracked_mem -= old_untracked_mem; _stop_consume = false; -} - -inline bool ThreadMemTrackerMgr::try_consume(int64_t size) { - if (!_stop_consume) { - // Temporary memory may be allocated during the consumption of the mem tracker, which will lead to entering - // the Memory Hook again, so suspend consumption to avoid falling into an infinite loop. - _stop_consume = true; - if (!_limiter_tracker_raw->try_consume(size, _bad_consume_msg)) { - save_exceed_mem_limit_msg(); - _stop_consume = false; - return false; - } - _stop_consume = false; - return true; - } else { - _untracked_mem += size; - return true; - } + return true; } } // namespace doris diff --git a/be/src/runtime/thread_context.h b/be/src/runtime/thread_context.h index e586143b8e..cdc8d483f2 100644 --- a/be/src/runtime/thread_context.h +++ b/be/src/runtime/thread_context.h @@ -123,7 +123,7 @@ public: }; inline thread_local ThreadContextPtr thread_context_ptr; -inline thread_local bool enable_thread_cache_bad_alloc = false; +inline thread_local bool enable_thread_catch_bad_alloc = false; // To avoid performance problems caused by frequently calling `bthread_getspecific` to obtain bthread TLS // in tcmalloc hook, cache the key and value of bthread TLS in pthread TLS. @@ -290,6 +290,8 @@ private: // For the memory that cannot be counted by mem hook, manually count it into the mem tracker, such as mmap. #define CONSUME_THREAD_MEM_TRACKER(size) \ doris::thread_context()->thread_mem_tracker_mgr->consume(size) +#define TRY_CONSUME_THREAD_MEM_TRACKER(size) \ + doris::thread_context()->thread_mem_tracker_mgr->try_consume(size) #define RELEASE_THREAD_MEM_TRACKER(size) \ doris::thread_context()->thread_mem_tracker_mgr->consume(-size) @@ -305,7 +307,7 @@ private: #define RETURN_IF_CATCH_BAD_ALLOC(stmt) \ do { \ doris::thread_context()->thread_mem_tracker_mgr->clear_exceed_mem_limit_msg(); \ - if (doris::enable_thread_cache_bad_alloc) { \ + if (doris::enable_thread_catch_bad_alloc) { \ try { \ { stmt; } \ } catch (std::bad_alloc const& e) { \ @@ -317,8 +319,8 @@ private: } \ } else { \ try { \ - doris::enable_thread_cache_bad_alloc = true; \ - Defer defer {[&]() { doris::enable_thread_cache_bad_alloc = false; }}; \ + doris::enable_thread_catch_bad_alloc = true; \ + Defer defer {[&]() { doris::enable_thread_catch_bad_alloc = false; }}; \ { stmt; } \ } catch (std::bad_alloc const& e) { \ doris::thread_context()->thread_mem_tracker()->print_log_usage( \ @@ -348,7 +350,7 @@ private: #define TRY_CONSUME_MEM_TRACKER(size, fail_ret) \ do { \ if (doris::thread_context_ptr.init) { \ - if (doris::enable_thread_cache_bad_alloc) { \ + if (doris::enable_thread_catch_bad_alloc) { \ if (!doris::thread_context()->thread_mem_tracker_mgr->try_consume(size)) { \ return fail_ret; \ } \ @@ -367,25 +369,15 @@ private: doris::ThreadMemTrackerMgr::consume_no_attach(-size); \ } \ } while (0) -#define TRY_RELEASE_MEM_TRACKER(size) \ - do { \ - if (doris::thread_context_ptr.init) { \ - if (!doris::enable_thread_cache_bad_alloc) { \ - doris::thread_context()->thread_mem_tracker_mgr->consume(-size); \ - } \ - } else { \ - doris::ThreadMemTrackerMgr::consume_no_attach(-size); \ - } \ - } while (0) #else #define CONSUME_THREAD_MEM_TRACKER(size) (void)0 +#define TRY_CONSUME_THREAD_MEM_TRACKER(size) true #define RELEASE_THREAD_MEM_TRACKER(size) (void)0 #define THREAD_MEM_TRACKER_TRANSFER_TO(size, tracker) (void)0 #define THREAD_MEM_TRACKER_TRANSFER_FROM(size, tracker) (void)0 #define CONSUME_MEM_TRACKER(size) (void)0 #define TRY_CONSUME_MEM_TRACKER(size, fail_ret) (void)0 #define RELEASE_MEM_TRACKER(size) (void)0 -#define TRY_RELEASE_MEM_TRACKER(size) (void)0 #define RETURN_IF_CATCH_BAD_ALLOC(stmt) (stmt) #endif } // namespace doris diff --git a/be/src/service/doris_main.cpp b/be/src/service/doris_main.cpp index 793fd4d4c8..be08539d5a 100644 --- a/be/src/service/doris_main.cpp +++ b/be/src/service/doris_main.cpp @@ -51,7 +51,6 @@ #include "olap/storage_engine.h" #include "runtime/exec_env.h" #include "runtime/heartbeat_flags.h" -#include "runtime/load_channel_mgr.h" #include "service/backend_options.h" #include "service/backend_service.h" #include "service/brpc_service.h" @@ -494,19 +493,7 @@ int main(int argc, char** argv) { #if defined(LEAK_SANITIZER) __lsan_do_leak_check(); #endif - doris::PerfCounters::refresh_proc_status(); - doris::MemInfo::refresh_proc_meminfo(); - doris::MemTrackerLimiter::refresh_global_counter(); - doris::ExecEnv::GetInstance()->load_channel_mgr()->refresh_mem_tracker(); -#if !defined(ADDRESS_SANITIZER) && !defined(LEAK_SANITIZER) && !defined(THREAD_SANITIZER) - doris::MemInfo::refresh_allocator_mem(); -#endif - doris::MemInfo::refresh_proc_mem_no_allocator_cache(); - if (doris::config::memory_debug) { - doris::MemTrackerLimiter::print_log_process_usage("memory_debug", false); - } - doris::MemTrackerLimiter::enable_print_log_process_usage(); - sleep(1); + sleep(10); } http_service.stop(); diff --git a/be/src/util/mem_info.cpp b/be/src/util/mem_info.cpp index 64fe58567d..ebc51a7e0b 100644 --- a/be/src/util/mem_info.cpp +++ b/be/src/util/mem_info.cpp @@ -45,6 +45,7 @@ bool MemInfo::_s_initialized = false; int64_t MemInfo::_s_physical_mem = -1; int64_t MemInfo::_s_mem_limit = -1; std::string MemInfo::_s_mem_limit_str = ""; +int64_t MemInfo::_s_soft_mem_limit = -1; int64_t MemInfo::_s_allocator_cache_mem = 0; std::string MemInfo::_s_allocator_cache_mem_str = ""; @@ -55,6 +56,7 @@ static std::unordered_map _mem_info_bytes; int64_t MemInfo::_s_sys_mem_available = 0; std::string MemInfo::_s_sys_mem_available_str = ""; int64_t MemInfo::_s_sys_mem_available_low_water_mark = 0; +int64_t MemInfo::_s_sys_mem_available_warning_water_mark = 0; void MemInfo::refresh_allocator_mem() { #if defined(ADDRESS_SANITIZER) || defined(LEAK_SANITIZER) || defined(THREAD_SANITIZER) @@ -109,11 +111,6 @@ void MemInfo::refresh_proc_meminfo() { _s_sys_mem_available = _mem_info_bytes["MemAvailable"]; _s_sys_mem_available_str = PrettyPrinter::print(_s_sys_mem_available, TUnit::BYTES); - - LOG_EVERY_N(INFO, 10) << fmt::format( - "Physical Memory: {}, Sys Mem Available {}, Tc/Jemalloc Allocator Cache {}", - PrettyPrinter::print(_mem_info_bytes["MemTotal"], TUnit::BYTES), - _s_sys_mem_available_str, MemInfo::allocator_cache_mem_str()); } void MemInfo::init() { @@ -143,6 +140,7 @@ void MemInfo::init() { _s_mem_limit = _s_physical_mem; } _s_mem_limit_str = PrettyPrinter::print(_s_mem_limit, TUnit::BYTES); + _s_soft_mem_limit = _s_mem_limit * config::soft_mem_limit_frac; std::string line; int64_t _s_vm_min_free_kbytes = 0; @@ -166,13 +164,15 @@ void MemInfo::init() { // https://git.kernel.org/pub/scm/linux/kernel/git/torvalds/linux.git/commit/?id=34e431b0ae398fc54ea69ff85ec700722c9da773 // // available_low_water_mark = p1 - p2 - // p1: max 3.2G, avoid wasting too much memory on machines with large memory larger than 32G. + // p1: upper sys_mem_available_low_water_mark, avoid wasting too much memory. // p2: vm/min_free_kbytes is usually 0.4% - 5% of the total memory, some cloud machines vm/min_free_kbytes is 5%, // in order to avoid wasting too much memory, available_low_water_mark minus 1% at most. int64_t p1 = std::min( - std::min(_s_physical_mem - _s_mem_limit, _s_physical_mem * 0.1), 3435973836L); + std::min(_s_physical_mem - _s_mem_limit, _s_physical_mem * 0.1), + config::max_sys_mem_available_low_water_mark_bytes); int64_t p2 = std::max(_s_vm_min_free_kbytes - _s_physical_mem * 0.01, 0); _s_sys_mem_available_low_water_mark = std::max(p1 - p2, 0); + _s_sys_mem_available_warning_water_mark = _s_sys_mem_available_low_water_mark + p1 * 2; LOG(INFO) << "Physical Memory: " << PrettyPrinter::print(_s_physical_mem, TUnit::BYTES) << ", Mem Limit: " << _s_mem_limit_str @@ -196,6 +196,7 @@ void MemInfo::init() { bool is_percent = true; _s_mem_limit = ParseUtil::parse_mem_spec(config::mem_limit, -1, _s_physical_mem, &is_percent); _s_mem_limit_str = PrettyPrinter::print(_s_mem_limit, TUnit::BYTES); + _s_soft_mem_limit = _s_mem_limit * config::soft_mem_limit_frac; LOG(INFO) << "Physical Memory: " << PrettyPrinter::print(_s_physical_mem, TUnit::BYTES); _s_initialized = true; diff --git a/be/src/util/mem_info.h b/be/src/util/mem_info.h index b6d2392a6e..52281f508e 100644 --- a/be/src/util/mem_info.h +++ b/be/src/util/mem_info.h @@ -58,6 +58,9 @@ public: static inline int64_t sys_mem_available_low_water_mark() { return _s_sys_mem_available_low_water_mark; } + static inline int64_t sys_mem_available_warning_water_mark() { + return _s_sys_mem_available_warning_water_mark; + } static inline int64_t get_tc_metrics(const std::string& name) { #ifndef USE_JEMALLOC @@ -99,6 +102,10 @@ public: DCHECK(_s_initialized); return _s_mem_limit_str; } + static inline int64_t soft_mem_limit() { + DCHECK(_s_initialized); + return _s_soft_mem_limit; + } static std::string debug_string(); @@ -107,6 +114,7 @@ private: static int64_t _s_physical_mem; static int64_t _s_mem_limit; static std::string _s_mem_limit_str; + static int64_t _s_soft_mem_limit; static int64_t _s_allocator_cache_mem; static std::string _s_allocator_cache_mem_str; @@ -116,6 +124,7 @@ private: static int64_t _s_sys_mem_available; static std::string _s_sys_mem_available_str; static int64_t _s_sys_mem_available_low_water_mark; + static int64_t _s_sys_mem_available_warning_water_mark; }; } // namespace doris diff --git a/be/src/vec/common/allocator.h b/be/src/vec/common/allocator.h index 423f1eb4db..22b69c1854 100644 --- a/be/src/vec/common/allocator.h +++ b/be/src/vec/common/allocator.h @@ -104,11 +104,22 @@ static constexpr size_t MALLOC_MIN_ALIGNMENT = 8; #define RETURN_BAD_ALLOC(err) \ do { \ - if (!doris::enable_thread_cache_bad_alloc) \ + LOG(WARNING) << err; \ + if (!doris::enable_thread_catch_bad_alloc) \ doris::MemTrackerLimiter::print_log_process_usage(err); \ throw std::bad_alloc {}; \ } while (0) +#define RETURN_BAD_ALLOC_IF_PRE_CATCH(err) \ + do { \ + LOG(WARNING) << err; \ + if (!doris::enable_thread_catch_bad_alloc) { \ + doris::MemTrackerLimiter::print_log_process_usage(err); \ + } else { \ + throw std::bad_alloc {}; \ + } \ + } while (0) + /** Responsible for allocating / freeing memory. Used, for example, in PODArray, Arena. * Also used in hash tables. * The interface is different from std::allocator @@ -134,7 +145,13 @@ public: alignment, size), doris::TStatusCode::VEC_BAD_ARGUMENTS); - CONSUME_THREAD_MEM_TRACKER(size); + if (!TRY_CONSUME_THREAD_MEM_TRACKER(size)) { + RETURN_BAD_ALLOC_IF_PRE_CATCH( + fmt::format("Allocator Pre Catch: Cannot mmap {}.", size)); + // memory exceeds the limit, consume mem tracker fails, but there is no external catch bad_alloc, + // alloc will continue to execute, so the consume memtracker is forced. + CONSUME_THREAD_MEM_TRACKER(size); + } buf = mmap(get_mmap_hint(), size, PROT_READ | PROT_WRITE, mmap_flags, -1, 0); if (MAP_FAILED == buf) { RELEASE_THREAD_MEM_TRACKER(size); @@ -180,7 +197,7 @@ public: if (0 != munmap(buf, size)) { auto err = fmt::format("Allocator: Cannot munmap {}.", size); LOG(ERROR) << err; - if (!doris::enable_thread_cache_bad_alloc) + if (!doris::enable_thread_catch_bad_alloc) doris::MemTrackerLimiter::print_log_process_usage(err); throw std::bad_alloc {}; } else { @@ -218,7 +235,12 @@ public: memset(reinterpret_cast(buf) + old_size, 0, new_size - old_size); } else if (old_size >= MMAP_THRESHOLD && new_size >= MMAP_THRESHOLD) { /// Resize mmap'd memory region. - CONSUME_THREAD_MEM_TRACKER(new_size - old_size); + if (!TRY_CONSUME_THREAD_MEM_TRACKER(new_size - old_size)) { + RETURN_BAD_ALLOC_IF_PRE_CATCH(fmt::format( + "Allocator Pre Catch: Cannot mremap memory chunk from {} to {}.", old_size, + new_size)); + CONSUME_THREAD_MEM_TRACKER(new_size - old_size); + } // On apple and freebsd self-implemented mremap used (common/mremap.h) buf = clickhouse_mremap(buf, old_size, new_size, MREMAP_MAYMOVE, PROT_READ | PROT_WRITE, diff --git a/docs/en/docs/admin-manual/config/be-config.md b/docs/en/docs/admin-manual/config/be-config.md index 43b126a765..6e5eda0d1a 100644 --- a/docs/en/docs/admin-manual/config/be-config.md +++ b/docs/en/docs/admin-manual/config/be-config.md @@ -844,6 +844,12 @@ The number of sliced tablets, plan the layout of the tablet, and avoid too many * Description: Control gc of tcmalloc, in performance mode doirs releases memory of tcmalloc cache when usgae >= 90% * mem_limit, otherwise, doris releases memory of tcmalloc cache when usage >= 50% * mem_limit; * Default value: performance +### `max_sys_mem_available_low_water_mark_bytes` + +* Type: int64 +* Description: The maximum low water mark of the system `/proc/meminfo/MemAvailable`, Unit byte, default 1.6G, actual low water mark=min(1.6G, MemTotal * 10%), avoid wasting too much memory on machines with large memory larger than 16G. Turn up max. On machines with more than 16G memory, more memory buffers will be reserved for Full GC. Turn down max. will use as much memory as possible. +* Default value: 1717986918 + ### `memory_limitation_per_thread_for_schema_change_bytes` Default: 2147483648 diff --git a/docs/zh-CN/docs/admin-manual/config/be-config.md b/docs/zh-CN/docs/admin-manual/config/be-config.md index b3d187e0d4..40277d6a8e 100644 --- a/docs/zh-CN/docs/admin-manual/config/be-config.md +++ b/docs/zh-CN/docs/admin-manual/config/be-config.md @@ -845,6 +845,12 @@ txn 管理器中每个 txn_partition_map 的最大 txns 数,这是一种自我 * 描述:控制tcmalloc的回收。如果配置为performance,内存使用超过mem_limit的90%时,doris会释放tcmalloc cache中的内存,如果配置为compact,内存使用超过mem_limit的50%时,doris会释放tcmalloc cache中的内存。 * 默认值:performance +### `max_sys_mem_available_low_water_mark_bytes` + +* 类型:int64 +* 描述:系统`/proc/meminfo/MemAvailable` 的最大低水位线,单位字节,默认1.6G,实际低水位线=min(1.6G,MemTotal * 10%),避免在大于16G的机器上浪费过多内存。调大max,在大于16G内存的机器上,将为Full GC预留更多的内存buffer;反之调小max,将尽可能充分使用内存。 +* 默认值:1717986918 + ### `memory_limitation_per_thread_for_schema_change_bytes` 默认值:2147483648