[branch-2.1](memory) Fix reserve memory compatible with memory GC and logging (#37682)

pick
#36307
#36412
This commit is contained in:
Xinyi Zou
2024-07-12 11:43:26 +08:00
committed by GitHub
parent ffa9e49bc7
commit ef031c5fb2
15 changed files with 556 additions and 450 deletions

View File

@ -19,16 +19,64 @@
#include <bvar/bvar.h>
#include "runtime/thread_context.h"
namespace doris {
std::mutex GlobalMemoryArbitrator::_reserved_trackers_lock;
std::unordered_map<std::string, MemTracker::MemCounter> GlobalMemoryArbitrator::_reserved_trackers;
bvar::PassiveStatus<int64_t> g_vm_rss_sub_allocator_cache(
"meminfo_vm_rss_sub_allocator_cache",
[](void*) { return GlobalMemoryArbitrator::vm_rss_sub_allocator_cache(); }, nullptr);
bvar::PassiveStatus<int64_t> g_process_memory_usage(
"meminfo_process_memory_usage",
[](void*) { return GlobalMemoryArbitrator::process_memory_usage(); }, nullptr);
bvar::PassiveStatus<int64_t> g_sys_mem_avail(
"meminfo_sys_mem_avail", [](void*) { return GlobalMemoryArbitrator::sys_mem_available(); },
nullptr);
std::atomic<int64_t> GlobalMemoryArbitrator::_s_vm_rss_sub_allocator_cache = -1;
std::atomic<int64_t> GlobalMemoryArbitrator::_s_process_reserved_memory = 0;
std::atomic<int64_t> GlobalMemoryArbitrator::refresh_interval_memory_growth = 0;
bool GlobalMemoryArbitrator::try_reserve_process_memory(int64_t bytes) {
if (sys_mem_available() - bytes < MemInfo::sys_mem_available_low_water_mark()) {
return false;
}
int64_t old_reserved_mem = _s_process_reserved_memory.load(std::memory_order_relaxed);
int64_t new_reserved_mem = 0;
do {
new_reserved_mem = old_reserved_mem + bytes;
if (UNLIKELY(vm_rss_sub_allocator_cache() +
refresh_interval_memory_growth.load(std::memory_order_relaxed) +
new_reserved_mem >=
MemInfo::mem_limit())) {
return false;
}
} while (!_s_process_reserved_memory.compare_exchange_weak(old_reserved_mem, new_reserved_mem,
std::memory_order_relaxed));
{
std::lock_guard<std::mutex> l(_reserved_trackers_lock);
_reserved_trackers[doris::thread_context()->thread_mem_tracker()->label()].add(bytes);
}
return true;
}
void GlobalMemoryArbitrator::release_process_reserved_memory(int64_t bytes) {
_s_process_reserved_memory.fetch_sub(bytes, std::memory_order_relaxed);
{
std::lock_guard<std::mutex> l(_reserved_trackers_lock);
auto label = doris::thread_context()->thread_mem_tracker()->label();
auto it = _reserved_trackers.find(label);
if (it == _reserved_trackers.end()) {
DCHECK(false) << "release unknown reserved memory " << label << ", bytes: " << bytes;
return;
}
_reserved_trackers[label].sub(bytes);
if (_reserved_trackers[label].current_value() == 0) {
_reserved_trackers.erase(it);
}
}
}
} // namespace doris

View File

@ -17,6 +17,7 @@
#pragma once
#include "runtime/memory/mem_tracker.h"
#include "util/mem_info.h"
namespace doris {
@ -30,14 +31,12 @@ public:
* accurate, since those pages are not really RSS but a memory
* that can be used at anytime via jemalloc.
*/
static inline void refresh_vm_rss_sub_allocator_cache() {
_s_vm_rss_sub_allocator_cache.store(
PerfCounters::get_vm_rss() - static_cast<int64_t>(MemInfo::allocator_cache_mem()),
std::memory_order_relaxed);
MemInfo::refresh_interval_memory_growth = 0;
}
static inline int64_t vm_rss_sub_allocator_cache() {
return _s_vm_rss_sub_allocator_cache.load(std::memory_order_relaxed);
return PerfCounters::get_vm_rss() - static_cast<int64_t>(MemInfo::allocator_cache_mem());
}
static inline void reset_refresh_interval_memory_growth() {
refresh_interval_memory_growth = 0;
}
// If need to use process memory in your execution logic, pls use it.
@ -45,32 +44,80 @@ public:
// add reserved memory and growth memory since the last vm_rss update.
static inline int64_t process_memory_usage() {
return vm_rss_sub_allocator_cache() +
MemInfo::refresh_interval_memory_growth.load(std::memory_order_relaxed) +
refresh_interval_memory_growth.load(std::memory_order_relaxed) +
process_reserved_memory();
}
static inline bool try_reserve_process_memory(int64_t bytes) {
if (MemInfo::sys_mem_available() - bytes < MemInfo::sys_mem_available_low_water_mark()) {
return false;
}
int64_t old_reserved_mem = _s_process_reserved_memory.load(std::memory_order_relaxed);
int64_t new_reserved_mem = 0;
do {
new_reserved_mem = old_reserved_mem + bytes;
if (UNLIKELY(vm_rss_sub_allocator_cache() +
MemInfo::refresh_interval_memory_growth.load(
std::memory_order_relaxed) +
new_reserved_mem >=
MemInfo::mem_limit())) {
return false;
}
} while (!_s_process_reserved_memory.compare_exchange_weak(
old_reserved_mem, new_reserved_mem, std::memory_order_relaxed));
return true;
static std::string process_memory_used_str() {
auto msg = fmt::format("process memory used {}",
PrettyPrinter::print(process_memory_usage(), TUnit::BYTES));
#ifdef ADDRESS_SANITIZER
msg = "[ASAN]" + msg;
#endif
return msg;
}
static inline void release_process_reserved_memory(int64_t bytes) {
_s_process_reserved_memory.fetch_sub(bytes, std::memory_order_relaxed);
static std::string process_memory_used_details_str() {
auto msg = fmt::format(
"process memory used {}(= {}[vm/rss] - {}[tc/jemalloc_cache] + {}[reserved] + "
"{}B[waiting_refresh])",
PrettyPrinter::print(process_memory_usage(), TUnit::BYTES),
PerfCounters::get_vm_rss_str(),
PrettyPrinter::print(static_cast<uint64_t>(MemInfo::allocator_cache_mem()),
TUnit::BYTES),
PrettyPrinter::print(process_reserved_memory(), TUnit::BYTES),
refresh_interval_memory_growth);
#ifdef ADDRESS_SANITIZER
msg = "[ASAN]" + msg;
#endif
return msg;
}
static inline int64_t sys_mem_available() {
return MemInfo::_s_sys_mem_available.load(std::memory_order_relaxed) -
refresh_interval_memory_growth.load(std::memory_order_relaxed) -
process_reserved_memory();
}
static inline std::string sys_mem_available_str() {
auto msg = fmt::format("sys available memory {}",
PrettyPrinter::print(sys_mem_available(), TUnit::BYTES));
#ifdef ADDRESS_SANITIZER
msg = "[ASAN]" + msg;
#endif
return msg;
}
static inline std::string sys_mem_available_details_str() {
auto msg = fmt::format(
"sys available memory {}(= {}[proc/available] - {}[reserved] - "
"{}B[waiting_refresh])",
PrettyPrinter::print(sys_mem_available(), TUnit::BYTES),
PrettyPrinter::print(MemInfo::_s_sys_mem_available.load(std::memory_order_relaxed),
TUnit::BYTES),
PrettyPrinter::print(process_reserved_memory(), TUnit::BYTES),
refresh_interval_memory_growth);
#ifdef ADDRESS_SANITIZER
msg = "[ASAN]" + msg;
#endif
return msg;
}
static bool try_reserve_process_memory(int64_t bytes);
static void release_process_reserved_memory(int64_t bytes);
static inline void make_reserved_memory_snapshots(
std::vector<MemTracker::Snapshot>* snapshots) {
std::lock_guard<std::mutex> l(_reserved_trackers_lock);
for (const auto& pair : _reserved_trackers) {
MemTracker::Snapshot snapshot;
snapshot.type = "reserved_memory";
snapshot.label = pair.first;
snapshot.limit = -1;
snapshot.cur_consumption = pair.second.current_value();
snapshot.peak_consumption = pair.second.peak_value();
(*snapshots).emplace_back(snapshot);
}
}
static inline int64_t process_reserved_memory() {
@ -79,8 +126,7 @@ public:
static bool is_exceed_soft_mem_limit(int64_t bytes = 0) {
return process_memory_usage() + bytes >= MemInfo::soft_mem_limit() ||
MemInfo::sys_mem_available() - bytes <
MemInfo::sys_mem_available_warning_water_mark();
sys_mem_available() - bytes < MemInfo::sys_mem_available_warning_water_mark();
}
static bool is_exceed_hard_mem_limit(int64_t bytes = 0) {
@ -93,44 +139,45 @@ public:
// because `new/malloc` will trigger mem hook when using tcmalloc/jemalloc allocator cache,
// but it may not actually alloc physical memory, which is not expected in mem hook fail.
return process_memory_usage() + bytes >= MemInfo::mem_limit() ||
MemInfo::sys_mem_available() - bytes < MemInfo::sys_mem_available_low_water_mark();
sys_mem_available() - bytes < MemInfo::sys_mem_available_low_water_mark();
}
static std::string process_mem_log_str() {
return fmt::format(
"os physical memory {}. process memory used {}, limit {}, soft limit {}. sys "
"available memory {}, low water mark {}, warning water mark {}. Refresh interval "
"memory growth {} B",
PrettyPrinter::print(MemInfo::physical_mem(), TUnit::BYTES),
PerfCounters::get_vm_rss_str(), MemInfo::mem_limit_str(),
MemInfo::soft_mem_limit_str(), MemInfo::sys_mem_available_str(),
PrettyPrinter::print(MemInfo::sys_mem_available_low_water_mark(), TUnit::BYTES),
PrettyPrinter::print(MemInfo::sys_mem_available_warning_water_mark(), TUnit::BYTES),
MemInfo::refresh_interval_memory_growth);
}
static std::string process_limit_exceeded_errmsg_str() {
return fmt::format(
"process memory used {} exceed limit {} or sys available memory {} less than low "
"water mark {}",
PerfCounters::get_vm_rss_str(), MemInfo::mem_limit_str(),
MemInfo::sys_mem_available_str(),
PrettyPrinter::print(MemInfo::sys_mem_available_low_water_mark(), TUnit::BYTES));
}
static std::string process_soft_limit_exceeded_errmsg_str() {
return fmt::format(
"process memory used {} exceed soft limit {} or sys available memory {} less than "
"os physical memory {}. {}, limit {}, soft limit {}. {}, low water mark {}, "
"warning water mark {}.",
PerfCounters::get_vm_rss_str(), MemInfo::soft_mem_limit_str(),
MemInfo::sys_mem_available_str(),
PrettyPrinter::print(MemInfo::physical_mem(), TUnit::BYTES),
process_memory_used_details_str(), MemInfo::mem_limit_str(),
MemInfo::soft_mem_limit_str(), sys_mem_available_details_str(),
PrettyPrinter::print(MemInfo::sys_mem_available_low_water_mark(), TUnit::BYTES),
PrettyPrinter::print(MemInfo::sys_mem_available_warning_water_mark(),
TUnit::BYTES));
}
static std::string process_limit_exceeded_errmsg_str() {
return fmt::format(
"{} exceed limit {} or {} less than low water mark {}", process_memory_used_str(),
MemInfo::mem_limit_str(), sys_mem_available_str(),
PrettyPrinter::print(MemInfo::sys_mem_available_low_water_mark(), TUnit::BYTES));
}
static std::string process_soft_limit_exceeded_errmsg_str() {
return fmt::format("{} exceed soft limit {} or {} less than warning water mark {}.",
process_memory_used_str(), MemInfo::soft_mem_limit_str(),
sys_mem_available_str(),
PrettyPrinter::print(MemInfo::sys_mem_available_warning_water_mark(),
TUnit::BYTES));
}
// It is only used after the memory limit is exceeded. When multiple threads are waiting for the available memory of the process,
// avoid multiple threads starting at the same time and causing OOM.
static std::atomic<int64_t> refresh_interval_memory_growth;
private:
static std::atomic<int64_t> _s_vm_rss_sub_allocator_cache;
static std::atomic<int64_t> _s_process_reserved_memory;
static std::mutex _reserved_trackers_lock;
static std::unordered_map<std::string, MemTracker::MemCounter> _reserved_trackers;
};
} // namespace doris

View File

@ -216,6 +216,13 @@ void MemTrackerLimiter::make_process_snapshots(std::vector<MemTracker::Snapshot>
snapshot.peak_consumption = PerfCounters::get_vm_hwm();
(*snapshots).emplace_back(snapshot);
snapshot.type = "reserved memory";
snapshot.label = "";
snapshot.limit = -1;
snapshot.cur_consumption = GlobalMemoryArbitrator::process_reserved_memory();
snapshot.peak_consumption = -1;
(*snapshots).emplace_back(snapshot);
snapshot.type = "process virtual memory"; // from /proc VmSize VmPeak
snapshot.label = "";
snapshot.limit = -1;
@ -359,10 +366,10 @@ void MemTrackerLimiter::print_log_process_usage() {
std::string MemTrackerLimiter::tracker_limit_exceeded_str() {
std::string err_msg = fmt::format(
"memory tracker limit exceeded, tracker label:{}, type:{}, limit "
"{}, peak used {}, current used {}. backend {} process memory used {}.",
"{}, peak used {}, current used {}. backend {}, {}.",
label(), type_string(_type), print_bytes(limit()),
print_bytes(_consumption->peak_value()), print_bytes(_consumption->current_value()),
BackendOptions::get_localhost(), PerfCounters::get_vm_rss_str());
BackendOptions::get_localhost(), GlobalMemoryArbitrator::process_memory_used_str());
if (_type == Type::QUERY || _type == Type::LOAD) {
err_msg += fmt::format(
" exec node:<{}>, can `set exec_mem_limit=8G` to change limit, details see "
@ -377,23 +384,17 @@ std::string MemTrackerLimiter::tracker_limit_exceeded_str() {
}
int64_t MemTrackerLimiter::free_top_memory_query(int64_t min_free_mem,
const std::string& vm_rss_str,
const std::string& mem_available_str,
const std::string& cancel_reason,
RuntimeProfile* profile, Type type) {
return free_top_memory_query(
min_free_mem, type, ExecEnv::GetInstance()->mem_tracker_limiter_pool,
[&vm_rss_str, &mem_available_str, &type](int64_t mem_consumption,
const std::string& label) {
[&cancel_reason, &type](int64_t mem_consumption, const std::string& label) {
return fmt::format(
"Process has no memory available, cancel top memory used {}: "
"{} memory tracker <{}> consumption {}, backend {} "
"process memory used {} exceed limit {} or sys available memory {} "
"less than low water mark {}. Execute again after enough memory, "
"details see be.INFO.",
type_string(type), type_string(type), label, print_bytes(mem_consumption),
BackendOptions::get_localhost(), vm_rss_str, MemInfo::mem_limit_str(),
mem_available_str,
print_bytes(MemInfo::sys_mem_available_low_water_mark()));
"Process memory not enough, cancel top memory used {}: "
"<{}> consumption {}, backend {}, {}. Execute again "
"after enough memory, details see be.INFO.",
type_string(type), label, print_bytes(mem_consumption),
BackendOptions::get_localhost(), cancel_reason);
},
profile, GCType::PROCESS);
}
@ -504,23 +505,17 @@ int64_t MemTrackerLimiter::free_top_memory_query(
}
int64_t MemTrackerLimiter::free_top_overcommit_query(int64_t min_free_mem,
const std::string& vm_rss_str,
const std::string& mem_available_str,
const std::string& cancel_reason,
RuntimeProfile* profile, Type type) {
return free_top_overcommit_query(
min_free_mem, type, ExecEnv::GetInstance()->mem_tracker_limiter_pool,
[&vm_rss_str, &mem_available_str, &type](int64_t mem_consumption,
const std::string& label) {
[&cancel_reason, &type](int64_t mem_consumption, const std::string& label) {
return fmt::format(
"Process has less memory, cancel top memory overcommit {}: "
"{} memory tracker <{}> consumption {}, backend {} "
"process memory used {} exceed soft limit {} or sys available memory {} "
"less than warning water mark {}. Execute again after enough memory, "
"details see be.INFO.",
type_string(type), type_string(type), label, print_bytes(mem_consumption),
BackendOptions::get_localhost(), vm_rss_str, MemInfo::soft_mem_limit_str(),
mem_available_str,
print_bytes(MemInfo::sys_mem_available_warning_water_mark()));
"Process memory not enough, cancel top memory overcommit {}: "
"<{}> consumption {}, backend {}, {}. Execute again "
"after enough memory, details see be.INFO.",
type_string(type), label, print_bytes(mem_consumption),
BackendOptions::get_localhost(), cancel_reason);
},
profile, GCType::PROCESS);
}

View File

@ -141,7 +141,7 @@ public:
return true;
}
bool st = true;
if (is_overcommit_tracker() && config::enable_query_memory_overcommit) {
if (is_overcommit_tracker() && !config::enable_query_memory_overcommit) {
st = _consumption->try_add(bytes, _limit);
} else {
_consumption->add(bytes);
@ -192,9 +192,8 @@ public:
static void print_log_process_usage();
// Start canceling from the query with the largest memory usage until the memory of min_free_mem size is freed.
// vm_rss_str and mem_available_str recorded when gc is triggered, for log printing.
static int64_t free_top_memory_query(int64_t min_free_mem, const std::string& vm_rss_str,
const std::string& mem_available_str,
// cancel_reason recorded when gc is triggered, for log printing.
static int64_t free_top_memory_query(int64_t min_free_mem, const std::string& cancel_reason,
RuntimeProfile* profile, Type type = Type::QUERY);
static int64_t free_top_memory_query(
@ -202,16 +201,13 @@ public:
const std::function<std::string(int64_t, const std::string&)>& cancel_msg,
RuntimeProfile* profile, GCType gctype);
static int64_t free_top_memory_load(int64_t min_free_mem, const std::string& vm_rss_str,
const std::string& mem_available_str,
static int64_t free_top_memory_load(int64_t min_free_mem, const std::string& cancel_reason,
RuntimeProfile* profile) {
return free_top_memory_query(min_free_mem, vm_rss_str, mem_available_str, profile,
Type::LOAD);
return free_top_memory_query(min_free_mem, cancel_reason, profile, Type::LOAD);
}
// Start canceling from the query with the largest memory overcommit ratio until the memory
// of min_free_mem size is freed.
static int64_t free_top_overcommit_query(int64_t min_free_mem, const std::string& vm_rss_str,
const std::string& mem_available_str,
static int64_t free_top_overcommit_query(int64_t min_free_mem, const std::string& cancel_reason,
RuntimeProfile* profile, Type type = Type::QUERY);
static int64_t free_top_overcommit_query(
@ -219,11 +215,9 @@ public:
const std::function<std::string(int64_t, const std::string&)>& cancel_msg,
RuntimeProfile* profile, GCType gctype);
static int64_t free_top_overcommit_load(int64_t min_free_mem, const std::string& vm_rss_str,
const std::string& mem_available_str,
static int64_t free_top_overcommit_load(int64_t min_free_mem, const std::string& cancel_reason,
RuntimeProfile* profile) {
return free_top_overcommit_query(min_free_mem, vm_rss_str, mem_available_str, profile,
Type::LOAD);
return free_top_overcommit_query(min_free_mem, cancel_reason, profile, Type::LOAD);
}
// only for Type::QUERY or Type::LOAD.

View File

@ -0,0 +1,271 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "runtime/memory/memory_arbitrator.h"
#include "runtime/memory/cache_manager.h"
#include "runtime/workload_group/workload_group.h"
#include "runtime/workload_group/workload_group_manager.h"
#include "util/mem_info.h"
#include "util/runtime_profile.h"
#include "util/stopwatch.hpp"
namespace doris {
// step1: free all cache
// step2: free resource groups memory that enable overcommit
// step3: free global top overcommit query, if enable query memory overcommit
// TODO Now, the meaning is different from java minor gc + full gc, more like small gc + large gc.
bool MemoryArbitrator::process_minor_gc(std::string mem_info) {
MonotonicStopWatch watch;
watch.start();
int64_t freed_mem = 0;
std::unique_ptr<RuntimeProfile> profile = std::make_unique<RuntimeProfile>("");
Defer defer {[&]() {
MemInfo::notify_je_purge_dirty_pages();
std::stringstream ss;
profile->pretty_print(&ss);
LOG(INFO) << fmt::format(
"[MemoryGC] end minor GC, free memory {}. cost(us): {}, details: {}",
PrettyPrinter::print(freed_mem, TUnit::BYTES), watch.elapsed_time() / 1000,
ss.str());
}};
freed_mem += CacheManager::instance()->for_each_cache_prune_stale(profile.get());
MemInfo::notify_je_purge_dirty_pages();
if (freed_mem > MemInfo::process_minor_gc_size()) {
return true;
}
if (config::enable_workload_group_memory_gc) {
RuntimeProfile* tg_profile = profile->create_child("WorkloadGroup", true, true);
freed_mem += tg_enable_overcommit_group_gc(MemInfo::process_minor_gc_size() - freed_mem,
tg_profile, true);
if (freed_mem > MemInfo::process_minor_gc_size()) {
return true;
}
}
if (config::enable_query_memory_overcommit) {
VLOG_NOTICE << MemTrackerLimiter::type_detail_usage(
"[MemoryGC] before free top memory overcommit query in minor GC",
MemTrackerLimiter::Type::QUERY);
RuntimeProfile* toq_profile =
profile->create_child("FreeTopOvercommitMemoryQuery", true, true);
freed_mem += MemTrackerLimiter::free_top_overcommit_query(
MemInfo::process_minor_gc_size() - freed_mem, mem_info, toq_profile);
if (freed_mem > MemInfo::process_minor_gc_size()) {
return true;
}
}
return false;
}
// step1: free all cache
// step2: free resource groups memory that enable overcommit
// step3: free global top memory query
// step4: free top overcommit load, load retries are more expensive, So cancel at the end.
// step5: free top memory load
bool MemoryArbitrator::process_full_gc(std::string mem_info) {
MonotonicStopWatch watch;
watch.start();
int64_t freed_mem = 0;
std::unique_ptr<RuntimeProfile> profile = std::make_unique<RuntimeProfile>("");
Defer defer {[&]() {
MemInfo::notify_je_purge_dirty_pages();
std::stringstream ss;
profile->pretty_print(&ss);
LOG(INFO) << fmt::format(
"[MemoryGC] end full GC, free Memory {}. cost(us): {}, details: {}",
PrettyPrinter::print(freed_mem, TUnit::BYTES), watch.elapsed_time() / 1000,
ss.str());
}};
freed_mem += CacheManager::instance()->for_each_cache_prune_all(profile.get());
MemInfo::notify_je_purge_dirty_pages();
if (freed_mem > MemInfo::process_full_gc_size()) {
return true;
}
if (config::enable_workload_group_memory_gc) {
RuntimeProfile* tg_profile = profile->create_child("WorkloadGroup", true, true);
freed_mem += tg_enable_overcommit_group_gc(MemInfo::process_full_gc_size() - freed_mem,
tg_profile, false);
if (freed_mem > MemInfo::process_full_gc_size()) {
return true;
}
}
VLOG_NOTICE << MemTrackerLimiter::type_detail_usage(
"[MemoryGC] before free top memory query in full GC", MemTrackerLimiter::Type::QUERY);
RuntimeProfile* tmq_profile = profile->create_child("FreeTopMemoryQuery", true, true);
freed_mem += MemTrackerLimiter::free_top_memory_query(
MemInfo::process_full_gc_size() - freed_mem, mem_info, tmq_profile);
if (freed_mem > MemInfo::process_full_gc_size()) {
return true;
}
if (config::enable_query_memory_overcommit) {
VLOG_NOTICE << MemTrackerLimiter::type_detail_usage(
"[MemoryGC] before free top memory overcommit load in full GC",
MemTrackerLimiter::Type::LOAD);
RuntimeProfile* tol_profile =
profile->create_child("FreeTopMemoryOvercommitLoad", true, true);
freed_mem += MemTrackerLimiter::free_top_overcommit_load(
MemInfo::process_full_gc_size() - freed_mem, mem_info, tol_profile);
if (freed_mem > MemInfo::process_full_gc_size()) {
return true;
}
}
VLOG_NOTICE << MemTrackerLimiter::type_detail_usage(
"[MemoryGC] before free top memory load in full GC", MemTrackerLimiter::Type::LOAD);
RuntimeProfile* tml_profile = profile->create_child("FreeTopMemoryLoad", true, true);
freed_mem += MemTrackerLimiter::free_top_memory_load(
MemInfo::process_full_gc_size() - freed_mem, mem_info, tml_profile);
return freed_mem > MemInfo::process_full_gc_size();
}
int64_t MemoryArbitrator::tg_disable_overcommit_group_gc() {
MonotonicStopWatch watch;
watch.start();
std::vector<WorkloadGroupPtr> task_groups;
std::unique_ptr<RuntimeProfile> tg_profile = std::make_unique<RuntimeProfile>("WorkloadGroup");
int64_t total_free_memory = 0;
ExecEnv::GetInstance()->workload_group_mgr()->get_related_workload_groups(
[](const WorkloadGroupPtr& workload_group) {
return workload_group->is_mem_limit_valid() &&
!workload_group->enable_memory_overcommit();
},
&task_groups);
if (task_groups.empty()) {
return 0;
}
std::vector<WorkloadGroupPtr> task_groups_overcommit;
for (const auto& workload_group : task_groups) {
if (workload_group->memory_used() > workload_group->memory_limit()) {
task_groups_overcommit.push_back(workload_group);
}
}
if (task_groups_overcommit.empty()) {
return 0;
}
LOG(INFO) << fmt::format(
"[MemoryGC] start GC work load group that not enable overcommit, number of overcommit "
"group: {}, "
"if it exceeds the limit, try free size = (group used - group limit).",
task_groups_overcommit.size());
Defer defer {[&]() {
if (total_free_memory > 0) {
std::stringstream ss;
tg_profile->pretty_print(&ss);
LOG(INFO) << fmt::format(
"[MemoryGC] end GC work load group that not enable overcommit, number of "
"overcommit group: {}, free memory {}. cost(us): {}, details: {}",
task_groups_overcommit.size(),
PrettyPrinter::print(total_free_memory, TUnit::BYTES),
watch.elapsed_time() / 1000, ss.str());
}
}};
for (const auto& workload_group : task_groups_overcommit) {
auto used = workload_group->memory_used();
total_free_memory += workload_group->gc_memory(used - workload_group->memory_limit(),
tg_profile.get(), false);
}
return total_free_memory;
}
int64_t MemoryArbitrator::tg_enable_overcommit_group_gc(int64_t request_free_memory,
RuntimeProfile* profile, bool is_minor_gc) {
MonotonicStopWatch watch;
watch.start();
std::vector<WorkloadGroupPtr> task_groups;
ExecEnv::GetInstance()->workload_group_mgr()->get_related_workload_groups(
[](const WorkloadGroupPtr& workload_group) {
return workload_group->is_mem_limit_valid() &&
workload_group->enable_memory_overcommit();
},
&task_groups);
if (task_groups.empty()) {
return 0;
}
int64_t total_exceeded_memory = 0;
std::vector<int64_t> used_memorys;
std::vector<int64_t> exceeded_memorys;
for (const auto& workload_group : task_groups) {
int64_t used_memory = workload_group->memory_used();
int64_t exceeded = used_memory - workload_group->memory_limit();
int64_t exceeded_memory = exceeded > 0 ? exceeded : 0;
total_exceeded_memory += exceeded_memory;
used_memorys.emplace_back(used_memory);
exceeded_memorys.emplace_back(exceeded_memory);
}
int64_t total_free_memory = 0;
bool gc_all_exceeded = request_free_memory >= total_exceeded_memory;
std::string log_prefix = fmt::format(
"work load group that enable overcommit, number of group: {}, request_free_memory:{}, "
"total_exceeded_memory:{}",
task_groups.size(), request_free_memory, total_exceeded_memory);
if (gc_all_exceeded) {
LOG(INFO) << fmt::format(
"[MemoryGC] start GC {}, request more than exceeded, try free size = (group used - "
"group limit).",
log_prefix);
} else {
LOG(INFO) << fmt::format(
"[MemoryGC] start GC {}, request less than exceeded, try free size = ((group used "
"- group limit) / all group total_exceeded_memory) * request_free_memory.",
log_prefix);
}
Defer defer {[&]() {
if (total_free_memory > 0) {
std::stringstream ss;
profile->pretty_print(&ss);
LOG(INFO) << fmt::format(
"[MemoryGC] end GC {}, free memory {}. cost(us): {}, details: {}", log_prefix,
PrettyPrinter::print(total_free_memory, TUnit::BYTES),
watch.elapsed_time() / 1000, ss.str());
}
}};
for (int i = 0; i < task_groups.size(); ++i) {
if (exceeded_memorys[i] == 0) {
continue;
}
// todo: GC according to resource group priority
auto tg_need_free_memory = int64_t(
gc_all_exceeded ? exceeded_memorys[i]
: static_cast<double>(exceeded_memorys[i]) / total_exceeded_memory *
request_free_memory); // exceeded memory as a weight
auto workload_group = task_groups[i];
total_free_memory += workload_group->gc_memory(tg_need_free_memory, profile, is_minor_gc);
}
return total_free_memory;
}
} // namespace doris

View File

@ -0,0 +1,40 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#pragma once
#include "runtime/memory/global_memory_arbitrator.h"
namespace doris {
class MemoryArbitrator {
public:
static bool process_minor_gc(
std::string mem_info =
doris::GlobalMemoryArbitrator::process_soft_limit_exceeded_errmsg_str());
static bool process_full_gc(
std::string mem_info =
doris::GlobalMemoryArbitrator::process_limit_exceeded_errmsg_str());
static int64_t tg_disable_overcommit_group_gc();
static int64_t tg_enable_overcommit_group_gc(int64_t request_free_memory,
RuntimeProfile* profile, bool is_minor_gc);
private:
};
} // namespace doris