diff --git a/be/src/common/config.h b/be/src/common/config.h index d46d7abd3d..b0257740fb 100644 --- a/be/src/common/config.h +++ b/be/src/common/config.h @@ -68,6 +68,13 @@ CONF_Double(soft_mem_limit_frac, "0.9"); // Turn down max. will use as much memory as possible. CONF_Int64(max_sys_mem_available_low_water_mark_bytes, "1717986918"); +// The size of the memory that gc wants to release each time, as a percentage of the mem limit. +CONF_mString(process_minor_gc_size, "10%"); +CONF_mString(process_full_gc_size, "20%"); + +// The maximum time a thread waits for a full GC. Currently only query will wait for full gc. +CONF_mInt32(thread_wait_gc_max_milliseconds, "1000"); + // the port heartbeat service used CONF_Int32(heartbeat_service_port, "9050"); // the count of heart beat service diff --git a/be/src/common/daemon.cpp b/be/src/common/daemon.cpp index 3687f99fc2..08a47b37f4 100644 --- a/be/src/common/daemon.cpp +++ b/be/src/common/daemon.cpp @@ -228,11 +228,13 @@ void Daemon::memory_maintenance_thread() { doris::MemInfo::sys_mem_available_low_water_mark() || doris::MemInfo::proc_mem_no_allocator_cache() >= doris::MemInfo::mem_limit()) { interval_milliseconds = 100; + doris::MemInfo::process_full_gc(); } else if (doris::MemInfo::sys_mem_available() < doris::MemInfo::sys_mem_available_warning_water_mark() || doris::MemInfo::proc_mem_no_allocator_cache() >= doris::MemInfo::soft_mem_limit()) { interval_milliseconds = 200; + doris::MemInfo::process_minor_gc(); } else { interval_milliseconds = config::memory_maintenance_sleep_time_ms; } diff --git a/be/src/olap/lru_cache.cpp b/be/src/olap/lru_cache.cpp index a6505d9bbe..484660e1bb 100644 --- a/be/src/olap/lru_cache.cpp +++ b/be/src/olap/lru_cache.cpp @@ -523,6 +523,10 @@ int64_t ShardedLRUCache::prune_if(CacheValuePredicate pred) { return num_prune; } +int64_t ShardedLRUCache::mem_consumption() { + return _mem_tracker->consumption(); +} + void ShardedLRUCache::update_cache_metrics() const { size_t total_capacity = 0; size_t total_usage = 0; diff --git a/be/src/olap/lru_cache.h b/be/src/olap/lru_cache.h index 7ae79ef3ba..480a61c712 100644 --- a/be/src/olap/lru_cache.h +++ b/be/src/olap/lru_cache.h @@ -217,6 +217,8 @@ public: // may hold lock for a long time to execute predicate. virtual int64_t prune_if(CacheValuePredicate pred) { return 0; } + virtual int64_t mem_consumption() = 0; + private: DISALLOW_COPY_AND_ASSIGN(Cache); }; @@ -370,6 +372,7 @@ public: virtual uint64_t new_id() override; virtual int64_t prune() override; virtual int64_t prune_if(CacheValuePredicate pred) override; + int64_t mem_consumption() override; private: void update_cache_metrics() const; diff --git a/be/src/olap/page_cache.cpp b/be/src/olap/page_cache.cpp index fa9f9010be..2813f85dd3 100644 --- a/be/src/olap/page_cache.cpp +++ b/be/src/olap/page_cache.cpp @@ -76,4 +76,9 @@ void StoragePageCache::insert(const CacheKey& key, const Slice& data, PageCacheH *handle = PageCacheHandle(cache, lru_handle); } +void StoragePageCache::prune(segment_v2::PageTypePB page_type) { + auto cache = _get_page_cache(page_type); + cache->prune(); +} + } // namespace doris diff --git a/be/src/olap/page_cache.h b/be/src/olap/page_cache.h index c1f0b48da3..6313e931e0 100644 --- a/be/src/olap/page_cache.h +++ b/be/src/olap/page_cache.h @@ -91,6 +91,12 @@ public: return _get_page_cache(page_type) != nullptr; } + void prune(segment_v2::PageTypePB page_type); + + int64_t get_page_cache_mem_consumption(segment_v2::PageTypePB page_type) { + return _get_page_cache(page_type)->mem_consumption(); + } + private: StoragePageCache(); static StoragePageCache* _s_instance; diff --git a/be/src/runtime/fragment_mgr.cpp b/be/src/runtime/fragment_mgr.cpp index b7ae9cf735..30d916a49c 100644 --- a/be/src/runtime/fragment_mgr.cpp +++ b/be/src/runtime/fragment_mgr.cpp @@ -818,6 +818,7 @@ Status FragmentMgr::exec_plan_fragment(const TExecPlanFragmentParams& params, Fi } } } + fragments_ctx->fragment_ids.push_back(fragment_instance_id); exec_state.reset(new FragmentExecState(fragments_ctx->query_id, params.params.fragment_instance_id, params.backend_num, @@ -941,6 +942,21 @@ void FragmentMgr::cancel(const TUniqueId& fragment_id, const PPlanFragmentCancel } } +void FragmentMgr::cancel_query(const TUniqueId& query_id, const PPlanFragmentCancelReason& reason, + const std::string& msg) { + std::vector cancel_fragment_ids; + { + std::lock_guard lock(_lock); + auto ctx = _fragments_ctx_map.find(query_id); + if (ctx != _fragments_ctx_map.end()) { + cancel_fragment_ids = ctx->second->fragment_ids; + } + } + for (auto it : cancel_fragment_ids) { + cancel(it, reason, msg); + } +} + void FragmentMgr::cancel_worker() { LOG(INFO) << "FragmentMgr cancel worker start working."; do { diff --git a/be/src/runtime/fragment_mgr.h b/be/src/runtime/fragment_mgr.h index 4894ccddbf..51a5ea2dd5 100644 --- a/be/src/runtime/fragment_mgr.h +++ b/be/src/runtime/fragment_mgr.h @@ -87,6 +87,9 @@ public: void cancel(const TUniqueId& fragment_id, const PPlanFragmentCancelReason& reason, const std::string& msg = ""); + void cancel_query(const TUniqueId& query_id, const PPlanFragmentCancelReason& reason, + const std::string& msg = ""); + void cancel_worker(); virtual void debug(std::stringstream& ss); diff --git a/be/src/runtime/memory/chunk_allocator.cpp b/be/src/runtime/memory/chunk_allocator.cpp index 6ac8021648..a99b8b0873 100644 --- a/be/src/runtime/memory/chunk_allocator.cpp +++ b/be/src/runtime/memory/chunk_allocator.cpp @@ -120,6 +120,19 @@ public: _chunk_lists[idx].push_back(ptr); } + void clear() { + std::lock_guard l(_lock); + for (int i = 0; i < 64; ++i) { + if (_chunk_lists[i].empty()) { + continue; + } + for (auto ptr : _chunk_lists[i]) { + ::free(ptr); + } + std::vector().swap(_chunk_lists[i]); + } + } + private: SpinLock _lock; std::vector> _chunk_lists; @@ -256,4 +269,11 @@ void ChunkAllocator::free(uint8_t* data, size_t size) { free(chunk); } +void ChunkAllocator::clear() { + for (int i = 0; i < _arenas.size(); ++i) { + _arenas[i]->clear(); + } + THREAD_MEM_TRACKER_TRANSFER_FROM(_mem_tracker->consumption(), _mem_tracker.get()); +} + } // namespace doris diff --git a/be/src/runtime/memory/chunk_allocator.h b/be/src/runtime/memory/chunk_allocator.h index de9ff70487..8602815a57 100644 --- a/be/src/runtime/memory/chunk_allocator.h +++ b/be/src/runtime/memory/chunk_allocator.h @@ -72,6 +72,10 @@ public: // otherwise the capacity of chunk allocator will be wrong. void free(uint8_t* data, size_t size); + void clear(); + + int64_t mem_consumption() { return _reserved_bytes; } + private: ChunkAllocator(size_t reserve_limit); diff --git a/be/src/runtime/memory/mem_tracker_limiter.cpp b/be/src/runtime/memory/mem_tracker_limiter.cpp index 16f74267a3..8e96a58178 100644 --- a/be/src/runtime/memory/mem_tracker_limiter.cpp +++ b/be/src/runtime/memory/mem_tracker_limiter.cpp @@ -20,8 +20,10 @@ #include #include +#include #include "gutil/once.h" +#include "runtime/fragment_mgr.h" #include "runtime/runtime_state.h" #include "runtime/thread_context.h" #include "util/pretty_printer.h" @@ -240,42 +242,78 @@ Status MemTrackerLimiter::fragment_mem_limit_exceeded(RuntimeState* state, const return Status::MemoryLimitExceeded(failed_msg); } -// TODO(zxy) More observable methods -// /// Logs the usage of 'limit' number of queries based on maximum total memory -// /// consumption. -// std::string MemTracker::LogTopNQueries(int limit) { -// if (limit == 0) return ""; -// priority_queue, std::vector>, -// std::greater>> -// min_pq; -// GetTopNQueries(min_pq, limit); -// std::vector usage_strings(min_pq.size()); -// while (!min_pq.empty()) { -// usage_strings.push_back(min_pq.top().second); -// min_pq.pop(); -// } -// std::reverse(usage_strings.begin(), usage_strings.end()); -// return join(usage_strings, "\n"); -// } +int64_t MemTrackerLimiter::free_top_query(int64_t min_free_mem) { + std::priority_queue, + std::vector>, + std::greater>> + min_pq; + // After greater than min_free_mem, will not be modified. + int64_t prepare_free_mem = 0; -// /// Helper function for LogTopNQueries that iterates through the MemTracker hierarchy -// /// and populates 'min_pq' with 'limit' number of elements (that contain state related -// /// to query MemTrackers) based on maximum total memory consumption. -// void MemTracker::GetTopNQueries( -// priority_queue, std::vector>, -// greater>>& min_pq, -// int limit) { -// list> children; -// { -// lock_guard l(child_trackers_lock_); -// children = child_trackers_; -// } -// for (const auto& child_weak : children) { -// shared_ptr child = child_weak.lock(); -// if (child) { -// child->GetTopNQueries(min_pq, limit); -// } -// } -// } + auto label_to_queryid = [&](const std::string& label) -> TUniqueId { + auto queryid = split(label, "#Id=")[1]; + TUniqueId querytid; + parse_id(queryid, &querytid); + return querytid; + }; + + auto cancel_top_query = [&](auto min_pq, auto label_to_queryid) -> int64_t { + std::vector usage_strings; + bool had_cancel = false; + int64_t freed_mem = 0; + while (!min_pq.empty()) { + TUniqueId cancelled_queryid = label_to_queryid(min_pq.top().second); + ExecEnv::GetInstance()->fragment_mgr()->cancel_query( + cancelled_queryid, PPlanFragmentCancelReason::MEMORY_LIMIT_EXCEED, + fmt::format("Process has no memory available, cancel top memory usage query: " + "query memory tracker <{}> consumption {}, backend {} " + "process memory used {} exceed limit {} or sys mem available {} " + "less than low water mark {}. Execute again after enough memory, " + "details see be.INFO.", + min_pq.top().second, print_bytes(min_pq.top().first), + BackendOptions::get_localhost(), PerfCounters::get_vm_rss_str(), + MemInfo::mem_limit_str(), MemInfo::sys_mem_available_str(), + print_bytes(MemInfo::sys_mem_available_low_water_mark()))); + + freed_mem += min_pq.top().first; + usage_strings.push_back(fmt::format("{} memory usage {} Bytes", min_pq.top().second, + min_pq.top().first)); + had_cancel = true; + min_pq.pop(); + } + if (had_cancel) { + LOG(INFO) << "Process GC Free Top Memory Usage Query: " << join(usage_strings, ","); + } + return freed_mem; + }; + + for (unsigned i = 1; i < mem_tracker_limiter_pool.size(); ++i) { + std::lock_guard l(mem_tracker_limiter_pool[i].group_lock); + for (auto tracker : mem_tracker_limiter_pool[i].trackers) { + if (tracker->type() == Type::QUERY) { + if (tracker->consumption() > min_free_mem) { + std::priority_queue, + std::vector>, + std::greater>> + min_pq_null; + std::swap(min_pq, min_pq_null); + min_pq.push( + pair(tracker->consumption(), tracker->label())); + return cancel_top_query(min_pq, label_to_queryid); + } else if (tracker->consumption() + prepare_free_mem < min_free_mem) { + min_pq.push( + pair(tracker->consumption(), tracker->label())); + prepare_free_mem += tracker->consumption(); + } else if (tracker->consumption() > min_pq.top().first) { + // No need to modify prepare_free_mem, prepare_free_mem will always be greater than min_free_mem. + min_pq.push( + pair(tracker->consumption(), tracker->label())); + min_pq.pop(); + } + } + } + } + return cancel_top_query(min_pq, label_to_queryid); +} } // namespace doris diff --git a/be/src/runtime/memory/mem_tracker_limiter.h b/be/src/runtime/memory/mem_tracker_limiter.h index 5ebda8230b..243fc57355 100644 --- a/be/src/runtime/memory/mem_tracker_limiter.h +++ b/be/src/runtime/memory/mem_tracker_limiter.h @@ -145,14 +145,18 @@ public: Status fragment_mem_limit_exceeded(RuntimeState* state, const std::string& msg, int64_t failed_allocation_size = 0); + // Start canceling from the query with the largest memory usage until the memory of min_free_mem size is released. + static int64_t free_top_query(int64_t min_free_mem); + static std::string process_mem_log_str() { return fmt::format( "physical memory {}, process memory used {} limit {}, sys mem available {} low " - "water mark {}", + "water mark {}, refresh interval memory growth {} B", PrettyPrinter::print(MemInfo::physical_mem(), TUnit::BYTES), PerfCounters::get_vm_rss_str(), MemInfo::mem_limit_str(), MemInfo::sys_mem_available_str(), - PrettyPrinter::print(MemInfo::sys_mem_available_low_water_mark(), TUnit::BYTES)); + PrettyPrinter::print(MemInfo::sys_mem_available_low_water_mark(), TUnit::BYTES), + MemInfo::refresh_interval_memory_growth); } std::string debug_string() { @@ -170,7 +174,7 @@ private: // Increases consumption of this tracker by 'bytes' only if will not exceeding limit. // Returns true if the consumption was successfully updated. WARN_UNUSED_RESULT - bool try_consume(int64_t bytes, std::string& failed_msg); + bool try_consume(int64_t bytes, std::string& failed_msg, bool& is_process_exceed); // When the accumulated untracked memory value exceeds the upper limit, // the current value is returned and set to 0. @@ -233,7 +237,8 @@ inline void MemTrackerLimiter::cache_consume(int64_t bytes) { consume(consume_bytes); } -inline bool MemTrackerLimiter::try_consume(int64_t bytes, std::string& failed_msg) { +inline bool MemTrackerLimiter::try_consume(int64_t bytes, std::string& failed_msg, + bool& is_process_exceed) { if (bytes <= 0) { release(-bytes); failed_msg = std::string(); @@ -246,6 +251,7 @@ inline bool MemTrackerLimiter::try_consume(int64_t bytes, std::string& failed_ms if (sys_mem_exceed_limit_check(bytes)) { failed_msg = process_limit_exceeded_errmsg_str(bytes); + is_process_exceed = true; return false; } @@ -254,6 +260,7 @@ inline bool MemTrackerLimiter::try_consume(int64_t bytes, std::string& failed_ms } else { if (!_consumption->try_add(bytes, _limit)) { failed_msg = tracker_limit_exceeded_errmsg_str(bytes, this); + is_process_exceed = false; return false; } } diff --git a/be/src/runtime/memory/thread_mem_tracker_mgr.cpp b/be/src/runtime/memory/thread_mem_tracker_mgr.cpp index f197cad79a..d45d6b8cb5 100644 --- a/be/src/runtime/memory/thread_mem_tracker_mgr.cpp +++ b/be/src/runtime/memory/thread_mem_tracker_mgr.cpp @@ -17,6 +17,9 @@ #include "runtime/memory/thread_mem_tracker_mgr.h" +#include +#include + #include "runtime/exec_env.h" #include "runtime/fragment_mgr.h" #include "service/backend_options.h" @@ -49,14 +52,24 @@ void ThreadMemTrackerMgr::cancel_fragment() { _check_limit = false; // Make sure it will only be canceled once } -void ThreadMemTrackerMgr::exceeded() { +void ThreadMemTrackerMgr::exceeded(int64_t size) { if (_cb_func != nullptr) { _cb_func(); } _limiter_tracker_raw->print_log_usage(_exceed_mem_limit_msg); if (is_attach_query()) { - // TODO wait gc + if (_is_process_exceed && _wait_gc) { + int64_t wait_milliseconds = config::thread_wait_gc_max_milliseconds; + while (wait_milliseconds > 0) { + std::this_thread::sleep_for(std::chrono::milliseconds(100)); // Check every 100 ms. + if (!MemTrackerLimiter::sys_mem_exceed_limit_check(size)) { + MemInfo::refresh_interval_memory_growth += size; + return; // Process memory is sufficient, no cancel query. + } + wait_milliseconds -= 100; + } + } cancel_fragment(); } } diff --git a/be/src/runtime/memory/thread_mem_tracker_mgr.h b/be/src/runtime/memory/thread_mem_tracker_mgr.h index 86a5c51d6e..151f79cfcb 100644 --- a/be/src/runtime/memory/thread_mem_tracker_mgr.h +++ b/be/src/runtime/memory/thread_mem_tracker_mgr.h @@ -104,6 +104,7 @@ public: void set_check_limit(bool check_limit) { _check_limit = check_limit; } std::string exceed_mem_limit_msg() { return _exceed_mem_limit_msg; } void clear_exceed_mem_limit_msg() { _exceed_mem_limit_msg = ""; } + void disable_wait_gc() { _wait_gc = false; } std::string print_debug_string() { fmt::memory_buffer consumer_tracker_buf; @@ -119,7 +120,7 @@ public: private: void cancel_fragment(); - void exceeded(); + void exceeded(int64_t size); void save_exceed_mem_limit_msg() { _exceed_mem_limit_msg = _limiter_tracker_raw->mem_limit_exceeded( @@ -138,6 +139,8 @@ private: std::string _failed_consume_msg = std::string(); std::string _exceed_mem_limit_msg = std::string(); + bool _is_process_exceed = false; + bool _wait_gc = true; std::shared_ptr _limiter_tracker; MemTrackerLimiter* _limiter_tracker_raw = nullptr; @@ -216,10 +219,11 @@ inline bool ThreadMemTrackerMgr::flush_untracked_mem() { old_untracked_mem = _untracked_mem; if (_count_scope_mem) _scope_mem += _untracked_mem; if (CheckLimit) { - if (!_limiter_tracker_raw->try_consume(old_untracked_mem, _failed_consume_msg)) { + if (!_limiter_tracker_raw->try_consume(old_untracked_mem, _failed_consume_msg, + _is_process_exceed)) { if (Force) _limiter_tracker_raw->consume(old_untracked_mem); save_exceed_mem_limit_msg(); - exceeded(); + exceeded(old_untracked_mem); if (!Force) return false; } } else { diff --git a/be/src/runtime/plan_fragment_executor.cpp b/be/src/runtime/plan_fragment_executor.cpp index f386da763d..d7ba70403e 100644 --- a/be/src/runtime/plan_fragment_executor.cpp +++ b/be/src/runtime/plan_fragment_executor.cpp @@ -276,6 +276,7 @@ Status PlanFragmentExecutor::open_vectorized_internal() { SCOPED_CPU_TIMER(_fragment_cpu_timer); SCOPED_TIMER(profile()->total_time_counter()); RETURN_IF_ERROR(_plan->open(_runtime_state.get())); + RETURN_IF_CANCELLED(_runtime_state); } if (_sink == nullptr) { return Status::OK(); @@ -289,6 +290,7 @@ Status PlanFragmentExecutor::open_vectorized_internal() { auto sink_send_span_guard = Defer {[this]() { this->_sink->end_send_span(); }}; while (true) { doris::vectorized::Block* block; + RETURN_IF_CANCELLED(_runtime_state); { SCOPED_CPU_TIMER(_fragment_cpu_timer); diff --git a/be/src/runtime/query_fragments_ctx.h b/be/src/runtime/query_fragments_ctx.h index a6e00b718d..5d592ab22e 100644 --- a/be/src/runtime/query_fragments_ctx.h +++ b/be/src/runtime/query_fragments_ctx.h @@ -140,6 +140,8 @@ public: // MemTracker that is shared by all fragment instances running on this host. std::shared_ptr query_mem_tracker; + std::vector fragment_ids; + private: ExecEnv* _exec_env; DateTimeValue _start_time; diff --git a/be/src/runtime/thread_context.h b/be/src/runtime/thread_context.h index fd16150f46..e72840373f 100644 --- a/be/src/runtime/thread_context.h +++ b/be/src/runtime/thread_context.h @@ -205,6 +205,8 @@ static void pthread_attach_bthread() { // 2. A pthread switch occurs. Because the pthread switch cannot be accurately identified at the moment. // So tracker call reset 0 like reuses btls. bthread_context = new ThreadContext; + // The brpc server should respond as quickly as possible. + bthread_context->thread_mem_tracker_mgr->disable_wait_gc(); // set the data so that next time bthread_getspecific in the thread returns the data. CHECK_EQ(0, bthread_setspecific(btls_key, bthread_context)); } diff --git a/be/src/util/mem_info.cpp b/be/src/util/mem_info.cpp index 6a2f57cc4b..453c1cd2bd 100644 --- a/be/src/util/mem_info.cpp +++ b/be/src/util/mem_info.cpp @@ -34,6 +34,7 @@ #include "common/config.h" #include "gutil/strings/split.h" +#include "olap/page_cache.h" #include "util/cgroup_util.h" #include "util/parse_util.h" #include "util/pretty_printer.h" @@ -51,12 +52,15 @@ int64_t MemInfo::_s_allocator_cache_mem = 0; std::string MemInfo::_s_allocator_cache_mem_str = ""; int64_t MemInfo::_s_virtual_memory_used = 0; int64_t MemInfo::_s_proc_mem_no_allocator_cache = -1; +std::atomic MemInfo::refresh_interval_memory_growth = 0; static std::unordered_map _mem_info_bytes; int64_t MemInfo::_s_sys_mem_available = 0; std::string MemInfo::_s_sys_mem_available_str = ""; int64_t MemInfo::_s_sys_mem_available_low_water_mark = 0; int64_t MemInfo::_s_sys_mem_available_warning_water_mark = 0; +int64_t MemInfo::_s_process_minor_gc_size = -1; +int64_t MemInfo::_s_process_full_gc_size = -1; void MemInfo::refresh_allocator_mem() { #if defined(ADDRESS_SANITIZER) || defined(LEAK_SANITIZER) || defined(THREAD_SANITIZER) @@ -85,6 +89,42 @@ void MemInfo::refresh_allocator_mem() { #endif } +void MemInfo::process_minor_gc() { + // TODO, free more cache, and should free a certain percentage of capacity, not all. + int64_t freed_mem = 0; + Defer defer {[&]() { + LOG(INFO) << fmt::format("Process Minor GC Free Memory {} Bytes", freed_mem); + }}; + + freed_mem += ChunkAllocator::instance()->mem_consumption(); + ChunkAllocator::instance()->clear(); + if (freed_mem > _s_process_minor_gc_size) { + return; + } + freed_mem += + StoragePageCache::instance()->get_page_cache_mem_consumption(segment_v2::DATA_PAGE); + StoragePageCache::instance()->prune(segment_v2::DATA_PAGE); +} + +void MemInfo::process_full_gc() { + int64_t freed_mem = 0; + Defer defer { + [&]() { LOG(INFO) << fmt::format("Process Full GC Free Memory {} Bytes", freed_mem); }}; + + freed_mem += + StoragePageCache::instance()->get_page_cache_mem_consumption(segment_v2::DATA_PAGE); + StoragePageCache::instance()->prune(segment_v2::DATA_PAGE); + if (freed_mem > _s_process_full_gc_size) { + return; + } + freed_mem += ChunkAllocator::instance()->mem_consumption(); + ChunkAllocator::instance()->clear(); + if (freed_mem > _s_process_full_gc_size) { + return; + } + freed_mem += MemTrackerLimiter::free_top_query(_s_process_full_gc_size - freed_mem); +} + #ifndef __APPLE__ void MemInfo::refresh_proc_meminfo() { std::ifstream meminfo("/proc/meminfo", std::ios::in); @@ -143,6 +183,11 @@ void MemInfo::init() { _s_mem_limit_str = PrettyPrinter::print(_s_mem_limit, TUnit::BYTES); _s_soft_mem_limit = _s_mem_limit * config::soft_mem_limit_frac; + _s_process_minor_gc_size = + ParseUtil::parse_mem_spec(config::process_minor_gc_size, -1, _s_mem_limit, &is_percent); + _s_process_full_gc_size = + ParseUtil::parse_mem_spec(config::process_full_gc_size, -1, _s_mem_limit, &is_percent); + std::string line; int64_t _s_vm_min_free_kbytes = 0; std::ifstream vminfo("/proc/sys/vm/min_free_kbytes", std::ios::in); diff --git a/be/src/util/mem_info.h b/be/src/util/mem_info.h index 52281f508e..bd76c6124c 100644 --- a/be/src/util/mem_info.h +++ b/be/src/util/mem_info.h @@ -53,7 +53,9 @@ public: static void refresh_proc_meminfo(); - static inline int64_t sys_mem_available() { return _s_sys_mem_available; } + static inline int64_t sys_mem_available() { + return _s_sys_mem_available - refresh_interval_memory_growth; + } static inline std::string sys_mem_available_str() { return _s_sys_mem_available_str; } static inline int64_t sys_mem_available_low_water_mark() { return _s_sys_mem_available_low_water_mark; @@ -83,7 +85,9 @@ public: static inline size_t allocator_virtual_mem() { return _s_virtual_memory_used; } static inline size_t allocator_cache_mem() { return _s_allocator_cache_mem; } static inline std::string allocator_cache_mem_str() { return _s_allocator_cache_mem_str; } - static inline int64_t proc_mem_no_allocator_cache() { return _s_proc_mem_no_allocator_cache; } + static inline int64_t proc_mem_no_allocator_cache() { + return _s_proc_mem_no_allocator_cache + refresh_interval_memory_growth; + } // Tcmalloc property `generic.total_physical_bytes` records the total length of the virtual memory // obtained by the process malloc, not the physical memory actually used by the process in the OS. @@ -92,6 +96,7 @@ public: static inline void refresh_proc_mem_no_allocator_cache() { _s_proc_mem_no_allocator_cache = PerfCounters::get_vm_rss() - static_cast(_s_allocator_cache_mem); + refresh_interval_memory_growth = 0; } static inline int64_t mem_limit() { @@ -109,6 +114,13 @@ public: static std::string debug_string(); + static void process_minor_gc(); + static void process_full_gc(); + + // It is only used after the memory limit is exceeded. When multiple threads are waiting for the available memory of the process, + // avoid multiple threads starting at the same time and causing OOM. + static std::atomic refresh_interval_memory_growth; + private: static bool _s_initialized; static int64_t _s_physical_mem; @@ -125,6 +137,8 @@ private: static std::string _s_sys_mem_available_str; static int64_t _s_sys_mem_available_low_water_mark; static int64_t _s_sys_mem_available_warning_water_mark; + static int64_t _s_process_minor_gc_size; + static int64_t _s_process_full_gc_size; }; } // namespace doris