diff --git a/be/src/common/daemon.cpp b/be/src/common/daemon.cpp
index 4787f60365..77d0fdaf0e 100644
--- a/be/src/common/daemon.cpp
+++ b/be/src/common/daemon.cpp
@@ -50,6 +50,7 @@
#include "runtime/memory/global_memory_arbitrator.h"
#include "runtime/memory/mem_tracker.h"
#include "runtime/memory/mem_tracker_limiter.h"
+#include "runtime/memory/memory_arbitrator.h"
#include "runtime/runtime_query_statistics_mgr.h"
#include "runtime/workload_group/workload_group_manager.h"
#include "util/cpu_info.h"
@@ -192,7 +193,7 @@ void Daemon::memory_maintenance_thread() {
// Refresh process memory metrics.
doris::PerfCounters::refresh_proc_status();
doris::MemInfo::refresh_proc_meminfo();
- doris::GlobalMemoryArbitrator::refresh_vm_rss_sub_allocator_cache();
+ doris::GlobalMemoryArbitrator::reset_refresh_interval_memory_growth();
// Update and print memory stat when the memory changes by 256M.
if (abs(last_print_proc_mem - PerfCounters::get_vm_rss()) > 268435456) {
@@ -229,11 +230,11 @@ void Daemon::memory_gc_thread() {
if (config::disable_memory_gc) {
continue;
}
- auto sys_mem_available = doris::MemInfo::sys_mem_available();
+ auto sys_mem_available = doris::GlobalMemoryArbitrator::sys_mem_available();
auto process_memory_usage = doris::GlobalMemoryArbitrator::process_memory_usage();
// GC excess memory for resource groups that not enable overcommit
- auto tg_free_mem = doris::MemInfo::tg_disable_overcommit_group_gc();
+ auto tg_free_mem = doris::MemoryArbitrator::tg_disable_overcommit_group_gc();
sys_mem_available += tg_free_mem;
process_memory_usage -= tg_free_mem;
@@ -241,13 +242,13 @@ void Daemon::memory_gc_thread() {
(sys_mem_available < doris::MemInfo::sys_mem_available_low_water_mark() ||
process_memory_usage >= doris::MemInfo::mem_limit())) {
// No longer full gc and minor gc during sleep.
+ std::string mem_info =
+ doris::GlobalMemoryArbitrator::process_limit_exceeded_errmsg_str();
memory_full_gc_sleep_time_ms = memory_gc_sleep_time_ms;
memory_minor_gc_sleep_time_ms = memory_gc_sleep_time_ms;
- LOG(INFO) << fmt::format(
- "[MemoryGC] start full GC, {}.",
- doris::GlobalMemoryArbitrator::process_limit_exceeded_errmsg_str());
+ LOG(INFO) << fmt::format("[MemoryGC] start full GC, {}.", mem_info);
doris::MemTrackerLimiter::print_log_process_usage();
- if (doris::MemInfo::process_full_gc()) {
+ if (doris::MemoryArbitrator::process_full_gc(std::move(mem_info))) {
// If there is not enough memory to be gc, the process memory usage will not be printed in the next continuous gc.
doris::MemTrackerLimiter::enable_print_log_process_usage();
}
@@ -255,12 +256,12 @@ void Daemon::memory_gc_thread() {
(sys_mem_available < doris::MemInfo::sys_mem_available_warning_water_mark() ||
process_memory_usage >= doris::MemInfo::soft_mem_limit())) {
// No minor gc during sleep, but full gc is possible.
+ std::string mem_info =
+ doris::GlobalMemoryArbitrator::process_soft_limit_exceeded_errmsg_str();
memory_minor_gc_sleep_time_ms = memory_gc_sleep_time_ms;
- LOG(INFO) << fmt::format(
- "[MemoryGC] start minor GC, {}.",
- doris::GlobalMemoryArbitrator::process_soft_limit_exceeded_errmsg_str());
+ LOG(INFO) << fmt::format("[MemoryGC] start minor GC, {}.", mem_info);
doris::MemTrackerLimiter::print_log_process_usage();
- if (doris::MemInfo::process_minor_gc()) {
+ if (doris::MemoryArbitrator::process_minor_gc(std::move(mem_info))) {
doris::MemTrackerLimiter::enable_print_log_process_usage();
}
} else {
diff --git a/be/src/http/default_path_handlers.cpp b/be/src/http/default_path_handlers.cpp
index 6e4ce97b90..5c697539fb 100644
--- a/be/src/http/default_path_handlers.cpp
+++ b/be/src/http/default_path_handlers.cpp
@@ -40,6 +40,7 @@
#include "gutil/strings/substitute.h"
#include "http/action/tablets_info_action.h"
#include "http/web_page_handler.h"
+#include "runtime/memory/global_memory_arbitrator.h"
#include "runtime/memory/mem_tracker.h"
#include "runtime/memory/mem_tracker_limiter.h"
#include "util/easy_json.h"
@@ -155,6 +156,8 @@ void mem_tracker_handler(const WebPageHandler::ArgumentMap& args, std::stringstr
MemTrackerLimiter::Type::SCHEMA_CHANGE);
} else if (iter->second == "other") {
MemTrackerLimiter::make_type_snapshots(&snapshots, MemTrackerLimiter::Type::OTHER);
+ } else if (iter->second == "reserved_memory") {
+ GlobalMemoryArbitrator::make_reserved_memory_snapshots(&snapshots);
}
} else {
(*output) << "
*Notice:
\n";
diff --git a/be/src/olap/memtable_memory_limiter.cpp b/be/src/olap/memtable_memory_limiter.cpp
index dc128137ae..1eaad31ec2 100644
--- a/be/src/olap/memtable_memory_limiter.cpp
+++ b/be/src/olap/memtable_memory_limiter.cpp
@@ -80,8 +80,8 @@ void MemTableMemoryLimiter::register_writer(std::weak_ptr writer
int64_t MemTableMemoryLimiter::_avail_mem_lack() {
// reserve a small amount of memory so we do not trigger MinorGC
auto reserved_mem = doris::MemInfo::sys_mem_available_low_water_mark();
- auto avail_mem_lack =
- doris::MemInfo::sys_mem_available_warning_water_mark() - MemInfo::sys_mem_available();
+ auto avail_mem_lack = doris::MemInfo::sys_mem_available_warning_water_mark() -
+ doris::GlobalMemoryArbitrator::sys_mem_available();
return avail_mem_lack + reserved_mem;
}
@@ -225,14 +225,13 @@ void MemTableMemoryLimiter::refresh_mem_tracker() {
_log_timer.reset();
// if not exist load task, this log should not be printed.
if (_mem_usage != 0) {
- LOG(INFO) << ss.str() << ", process mem: " << PerfCounters::get_vm_rss_str()
- << " (without allocator cache: "
- << PrettyPrinter::print_bytes(GlobalMemoryArbitrator::process_memory_usage())
- << "), load mem: " << PrettyPrinter::print_bytes(_mem_tracker->consumption())
- << ", memtable writers num: " << _writers.size()
- << " (active: " << PrettyPrinter::print_bytes(_active_mem_usage)
- << ", write: " << PrettyPrinter::print_bytes(_write_mem_usage)
- << ", flush: " << PrettyPrinter::print_bytes(_flush_mem_usage) << ")";
+ LOG(INFO) << fmt::format(
+ "{}, {}, load mem: {}, memtable writers num: {} (active: {}, write: {}, flush: {})",
+ ss.str(), GlobalMemoryArbitrator::process_memory_used_details_str(),
+ PrettyPrinter::print_bytes(_mem_tracker->consumption()), _writers.size(),
+ PrettyPrinter::print_bytes(_active_mem_usage),
+ PrettyPrinter::print_bytes(_write_mem_usage),
+ PrettyPrinter::print_bytes(_flush_mem_usage));
}
}
diff --git a/be/src/runtime/memory/global_memory_arbitrator.cpp b/be/src/runtime/memory/global_memory_arbitrator.cpp
index dc686f7c5a..35fa350987 100644
--- a/be/src/runtime/memory/global_memory_arbitrator.cpp
+++ b/be/src/runtime/memory/global_memory_arbitrator.cpp
@@ -19,16 +19,64 @@
#include
+#include "runtime/thread_context.h"
+
namespace doris {
+std::mutex GlobalMemoryArbitrator::_reserved_trackers_lock;
+std::unordered_map GlobalMemoryArbitrator::_reserved_trackers;
+
bvar::PassiveStatus g_vm_rss_sub_allocator_cache(
"meminfo_vm_rss_sub_allocator_cache",
[](void*) { return GlobalMemoryArbitrator::vm_rss_sub_allocator_cache(); }, nullptr);
bvar::PassiveStatus g_process_memory_usage(
"meminfo_process_memory_usage",
[](void*) { return GlobalMemoryArbitrator::process_memory_usage(); }, nullptr);
+bvar::PassiveStatus g_sys_mem_avail(
+ "meminfo_sys_mem_avail", [](void*) { return GlobalMemoryArbitrator::sys_mem_available(); },
+ nullptr);
-std::atomic GlobalMemoryArbitrator::_s_vm_rss_sub_allocator_cache = -1;
std::atomic GlobalMemoryArbitrator::_s_process_reserved_memory = 0;
+std::atomic GlobalMemoryArbitrator::refresh_interval_memory_growth = 0;
+
+bool GlobalMemoryArbitrator::try_reserve_process_memory(int64_t bytes) {
+ if (sys_mem_available() - bytes < MemInfo::sys_mem_available_low_water_mark()) {
+ return false;
+ }
+ int64_t old_reserved_mem = _s_process_reserved_memory.load(std::memory_order_relaxed);
+ int64_t new_reserved_mem = 0;
+ do {
+ new_reserved_mem = old_reserved_mem + bytes;
+ if (UNLIKELY(vm_rss_sub_allocator_cache() +
+ refresh_interval_memory_growth.load(std::memory_order_relaxed) +
+ new_reserved_mem >=
+ MemInfo::mem_limit())) {
+ return false;
+ }
+ } while (!_s_process_reserved_memory.compare_exchange_weak(old_reserved_mem, new_reserved_mem,
+ std::memory_order_relaxed));
+ {
+ std::lock_guard l(_reserved_trackers_lock);
+ _reserved_trackers[doris::thread_context()->thread_mem_tracker()->label()].add(bytes);
+ }
+ return true;
+}
+
+void GlobalMemoryArbitrator::release_process_reserved_memory(int64_t bytes) {
+ _s_process_reserved_memory.fetch_sub(bytes, std::memory_order_relaxed);
+ {
+ std::lock_guard l(_reserved_trackers_lock);
+ auto label = doris::thread_context()->thread_mem_tracker()->label();
+ auto it = _reserved_trackers.find(label);
+ if (it == _reserved_trackers.end()) {
+ DCHECK(false) << "release unknown reserved memory " << label << ", bytes: " << bytes;
+ return;
+ }
+ _reserved_trackers[label].sub(bytes);
+ if (_reserved_trackers[label].current_value() == 0) {
+ _reserved_trackers.erase(it);
+ }
+ }
+}
} // namespace doris
diff --git a/be/src/runtime/memory/global_memory_arbitrator.h b/be/src/runtime/memory/global_memory_arbitrator.h
index b1879cb1a7..f8fda18d0e 100644
--- a/be/src/runtime/memory/global_memory_arbitrator.h
+++ b/be/src/runtime/memory/global_memory_arbitrator.h
@@ -17,6 +17,7 @@
#pragma once
+#include "runtime/memory/mem_tracker.h"
#include "util/mem_info.h"
namespace doris {
@@ -30,14 +31,12 @@ public:
* accurate, since those pages are not really RSS but a memory
* that can be used at anytime via jemalloc.
*/
- static inline void refresh_vm_rss_sub_allocator_cache() {
- _s_vm_rss_sub_allocator_cache.store(
- PerfCounters::get_vm_rss() - static_cast(MemInfo::allocator_cache_mem()),
- std::memory_order_relaxed);
- MemInfo::refresh_interval_memory_growth = 0;
- }
static inline int64_t vm_rss_sub_allocator_cache() {
- return _s_vm_rss_sub_allocator_cache.load(std::memory_order_relaxed);
+ return PerfCounters::get_vm_rss() - static_cast(MemInfo::allocator_cache_mem());
+ }
+
+ static inline void reset_refresh_interval_memory_growth() {
+ refresh_interval_memory_growth = 0;
}
// If need to use process memory in your execution logic, pls use it.
@@ -45,32 +44,80 @@ public:
// add reserved memory and growth memory since the last vm_rss update.
static inline int64_t process_memory_usage() {
return vm_rss_sub_allocator_cache() +
- MemInfo::refresh_interval_memory_growth.load(std::memory_order_relaxed) +
+ refresh_interval_memory_growth.load(std::memory_order_relaxed) +
process_reserved_memory();
}
- static inline bool try_reserve_process_memory(int64_t bytes) {
- if (MemInfo::sys_mem_available() - bytes < MemInfo::sys_mem_available_low_water_mark()) {
- return false;
- }
- int64_t old_reserved_mem = _s_process_reserved_memory.load(std::memory_order_relaxed);
- int64_t new_reserved_mem = 0;
- do {
- new_reserved_mem = old_reserved_mem + bytes;
- if (UNLIKELY(vm_rss_sub_allocator_cache() +
- MemInfo::refresh_interval_memory_growth.load(
- std::memory_order_relaxed) +
- new_reserved_mem >=
- MemInfo::mem_limit())) {
- return false;
- }
- } while (!_s_process_reserved_memory.compare_exchange_weak(
- old_reserved_mem, new_reserved_mem, std::memory_order_relaxed));
- return true;
+ static std::string process_memory_used_str() {
+ auto msg = fmt::format("process memory used {}",
+ PrettyPrinter::print(process_memory_usage(), TUnit::BYTES));
+#ifdef ADDRESS_SANITIZER
+ msg = "[ASAN]" + msg;
+#endif
+ return msg;
}
- static inline void release_process_reserved_memory(int64_t bytes) {
- _s_process_reserved_memory.fetch_sub(bytes, std::memory_order_relaxed);
+ static std::string process_memory_used_details_str() {
+ auto msg = fmt::format(
+ "process memory used {}(= {}[vm/rss] - {}[tc/jemalloc_cache] + {}[reserved] + "
+ "{}B[waiting_refresh])",
+ PrettyPrinter::print(process_memory_usage(), TUnit::BYTES),
+ PerfCounters::get_vm_rss_str(),
+ PrettyPrinter::print(static_cast(MemInfo::allocator_cache_mem()),
+ TUnit::BYTES),
+ PrettyPrinter::print(process_reserved_memory(), TUnit::BYTES),
+ refresh_interval_memory_growth);
+#ifdef ADDRESS_SANITIZER
+ msg = "[ASAN]" + msg;
+#endif
+ return msg;
+ }
+
+ static inline int64_t sys_mem_available() {
+ return MemInfo::_s_sys_mem_available.load(std::memory_order_relaxed) -
+ refresh_interval_memory_growth.load(std::memory_order_relaxed) -
+ process_reserved_memory();
+ }
+
+ static inline std::string sys_mem_available_str() {
+ auto msg = fmt::format("sys available memory {}",
+ PrettyPrinter::print(sys_mem_available(), TUnit::BYTES));
+#ifdef ADDRESS_SANITIZER
+ msg = "[ASAN]" + msg;
+#endif
+ return msg;
+ }
+
+ static inline std::string sys_mem_available_details_str() {
+ auto msg = fmt::format(
+ "sys available memory {}(= {}[proc/available] - {}[reserved] - "
+ "{}B[waiting_refresh])",
+ PrettyPrinter::print(sys_mem_available(), TUnit::BYTES),
+ PrettyPrinter::print(MemInfo::_s_sys_mem_available.load(std::memory_order_relaxed),
+ TUnit::BYTES),
+ PrettyPrinter::print(process_reserved_memory(), TUnit::BYTES),
+ refresh_interval_memory_growth);
+#ifdef ADDRESS_SANITIZER
+ msg = "[ASAN]" + msg;
+#endif
+ return msg;
+ }
+
+ static bool try_reserve_process_memory(int64_t bytes);
+ static void release_process_reserved_memory(int64_t bytes);
+
+ static inline void make_reserved_memory_snapshots(
+ std::vector* snapshots) {
+ std::lock_guard l(_reserved_trackers_lock);
+ for (const auto& pair : _reserved_trackers) {
+ MemTracker::Snapshot snapshot;
+ snapshot.type = "reserved_memory";
+ snapshot.label = pair.first;
+ snapshot.limit = -1;
+ snapshot.cur_consumption = pair.second.current_value();
+ snapshot.peak_consumption = pair.second.peak_value();
+ (*snapshots).emplace_back(snapshot);
+ }
}
static inline int64_t process_reserved_memory() {
@@ -79,8 +126,7 @@ public:
static bool is_exceed_soft_mem_limit(int64_t bytes = 0) {
return process_memory_usage() + bytes >= MemInfo::soft_mem_limit() ||
- MemInfo::sys_mem_available() - bytes <
- MemInfo::sys_mem_available_warning_water_mark();
+ sys_mem_available() - bytes < MemInfo::sys_mem_available_warning_water_mark();
}
static bool is_exceed_hard_mem_limit(int64_t bytes = 0) {
@@ -93,44 +139,45 @@ public:
// because `new/malloc` will trigger mem hook when using tcmalloc/jemalloc allocator cache,
// but it may not actually alloc physical memory, which is not expected in mem hook fail.
return process_memory_usage() + bytes >= MemInfo::mem_limit() ||
- MemInfo::sys_mem_available() - bytes < MemInfo::sys_mem_available_low_water_mark();
+ sys_mem_available() - bytes < MemInfo::sys_mem_available_low_water_mark();
}
static std::string process_mem_log_str() {
return fmt::format(
- "os physical memory {}. process memory used {}, limit {}, soft limit {}. sys "
- "available memory {}, low water mark {}, warning water mark {}. Refresh interval "
- "memory growth {} B",
- PrettyPrinter::print(MemInfo::physical_mem(), TUnit::BYTES),
- PerfCounters::get_vm_rss_str(), MemInfo::mem_limit_str(),
- MemInfo::soft_mem_limit_str(), MemInfo::sys_mem_available_str(),
- PrettyPrinter::print(MemInfo::sys_mem_available_low_water_mark(), TUnit::BYTES),
- PrettyPrinter::print(MemInfo::sys_mem_available_warning_water_mark(), TUnit::BYTES),
- MemInfo::refresh_interval_memory_growth);
- }
-
- static std::string process_limit_exceeded_errmsg_str() {
- return fmt::format(
- "process memory used {} exceed limit {} or sys available memory {} less than low "
- "water mark {}",
- PerfCounters::get_vm_rss_str(), MemInfo::mem_limit_str(),
- MemInfo::sys_mem_available_str(),
- PrettyPrinter::print(MemInfo::sys_mem_available_low_water_mark(), TUnit::BYTES));
- }
-
- static std::string process_soft_limit_exceeded_errmsg_str() {
- return fmt::format(
- "process memory used {} exceed soft limit {} or sys available memory {} less than "
+ "os physical memory {}. {}, limit {}, soft limit {}. {}, low water mark {}, "
"warning water mark {}.",
- PerfCounters::get_vm_rss_str(), MemInfo::soft_mem_limit_str(),
- MemInfo::sys_mem_available_str(),
+ PrettyPrinter::print(MemInfo::physical_mem(), TUnit::BYTES),
+ process_memory_used_details_str(), MemInfo::mem_limit_str(),
+ MemInfo::soft_mem_limit_str(), sys_mem_available_details_str(),
+ PrettyPrinter::print(MemInfo::sys_mem_available_low_water_mark(), TUnit::BYTES),
PrettyPrinter::print(MemInfo::sys_mem_available_warning_water_mark(),
TUnit::BYTES));
}
+ static std::string process_limit_exceeded_errmsg_str() {
+ return fmt::format(
+ "{} exceed limit {} or {} less than low water mark {}", process_memory_used_str(),
+ MemInfo::mem_limit_str(), sys_mem_available_str(),
+ PrettyPrinter::print(MemInfo::sys_mem_available_low_water_mark(), TUnit::BYTES));
+ }
+
+ static std::string process_soft_limit_exceeded_errmsg_str() {
+ return fmt::format("{} exceed soft limit {} or {} less than warning water mark {}.",
+ process_memory_used_str(), MemInfo::soft_mem_limit_str(),
+ sys_mem_available_str(),
+ PrettyPrinter::print(MemInfo::sys_mem_available_warning_water_mark(),
+ TUnit::BYTES));
+ }
+
+ // It is only used after the memory limit is exceeded. When multiple threads are waiting for the available memory of the process,
+ // avoid multiple threads starting at the same time and causing OOM.
+ static std::atomic refresh_interval_memory_growth;
+
private:
- static std::atomic _s_vm_rss_sub_allocator_cache;
static std::atomic _s_process_reserved_memory;
+
+ static std::mutex _reserved_trackers_lock;
+ static std::unordered_map _reserved_trackers;
};
} // namespace doris
diff --git a/be/src/runtime/memory/mem_tracker_limiter.cpp b/be/src/runtime/memory/mem_tracker_limiter.cpp
index b84f7c5495..489d59ab1b 100644
--- a/be/src/runtime/memory/mem_tracker_limiter.cpp
+++ b/be/src/runtime/memory/mem_tracker_limiter.cpp
@@ -216,6 +216,13 @@ void MemTrackerLimiter::make_process_snapshots(std::vector
snapshot.peak_consumption = PerfCounters::get_vm_hwm();
(*snapshots).emplace_back(snapshot);
+ snapshot.type = "reserved memory";
+ snapshot.label = "";
+ snapshot.limit = -1;
+ snapshot.cur_consumption = GlobalMemoryArbitrator::process_reserved_memory();
+ snapshot.peak_consumption = -1;
+ (*snapshots).emplace_back(snapshot);
+
snapshot.type = "process virtual memory"; // from /proc VmSize VmPeak
snapshot.label = "";
snapshot.limit = -1;
@@ -359,10 +366,10 @@ void MemTrackerLimiter::print_log_process_usage() {
std::string MemTrackerLimiter::tracker_limit_exceeded_str() {
std::string err_msg = fmt::format(
"memory tracker limit exceeded, tracker label:{}, type:{}, limit "
- "{}, peak used {}, current used {}. backend {} process memory used {}.",
+ "{}, peak used {}, current used {}. backend {}, {}.",
label(), type_string(_type), print_bytes(limit()),
print_bytes(_consumption->peak_value()), print_bytes(_consumption->current_value()),
- BackendOptions::get_localhost(), PerfCounters::get_vm_rss_str());
+ BackendOptions::get_localhost(), GlobalMemoryArbitrator::process_memory_used_str());
if (_type == Type::QUERY || _type == Type::LOAD) {
err_msg += fmt::format(
" exec node:<{}>, can `set exec_mem_limit=8G` to change limit, details see "
@@ -377,23 +384,17 @@ std::string MemTrackerLimiter::tracker_limit_exceeded_str() {
}
int64_t MemTrackerLimiter::free_top_memory_query(int64_t min_free_mem,
- const std::string& vm_rss_str,
- const std::string& mem_available_str,
+ const std::string& cancel_reason,
RuntimeProfile* profile, Type type) {
return free_top_memory_query(
min_free_mem, type, ExecEnv::GetInstance()->mem_tracker_limiter_pool,
- [&vm_rss_str, &mem_available_str, &type](int64_t mem_consumption,
- const std::string& label) {
+ [&cancel_reason, &type](int64_t mem_consumption, const std::string& label) {
return fmt::format(
- "Process has no memory available, cancel top memory used {}: "
- "{} memory tracker <{}> consumption {}, backend {} "
- "process memory used {} exceed limit {} or sys available memory {} "
- "less than low water mark {}. Execute again after enough memory, "
- "details see be.INFO.",
- type_string(type), type_string(type), label, print_bytes(mem_consumption),
- BackendOptions::get_localhost(), vm_rss_str, MemInfo::mem_limit_str(),
- mem_available_str,
- print_bytes(MemInfo::sys_mem_available_low_water_mark()));
+ "Process memory not enough, cancel top memory used {}: "
+ "<{}> consumption {}, backend {}, {}. Execute again "
+ "after enough memory, details see be.INFO.",
+ type_string(type), label, print_bytes(mem_consumption),
+ BackendOptions::get_localhost(), cancel_reason);
},
profile, GCType::PROCESS);
}
@@ -504,23 +505,17 @@ int64_t MemTrackerLimiter::free_top_memory_query(
}
int64_t MemTrackerLimiter::free_top_overcommit_query(int64_t min_free_mem,
- const std::string& vm_rss_str,
- const std::string& mem_available_str,
+ const std::string& cancel_reason,
RuntimeProfile* profile, Type type) {
return free_top_overcommit_query(
min_free_mem, type, ExecEnv::GetInstance()->mem_tracker_limiter_pool,
- [&vm_rss_str, &mem_available_str, &type](int64_t mem_consumption,
- const std::string& label) {
+ [&cancel_reason, &type](int64_t mem_consumption, const std::string& label) {
return fmt::format(
- "Process has less memory, cancel top memory overcommit {}: "
- "{} memory tracker <{}> consumption {}, backend {} "
- "process memory used {} exceed soft limit {} or sys available memory {} "
- "less than warning water mark {}. Execute again after enough memory, "
- "details see be.INFO.",
- type_string(type), type_string(type), label, print_bytes(mem_consumption),
- BackendOptions::get_localhost(), vm_rss_str, MemInfo::soft_mem_limit_str(),
- mem_available_str,
- print_bytes(MemInfo::sys_mem_available_warning_water_mark()));
+ "Process memory not enough, cancel top memory overcommit {}: "
+ "<{}> consumption {}, backend {}, {}. Execute again "
+ "after enough memory, details see be.INFO.",
+ type_string(type), label, print_bytes(mem_consumption),
+ BackendOptions::get_localhost(), cancel_reason);
},
profile, GCType::PROCESS);
}
diff --git a/be/src/runtime/memory/mem_tracker_limiter.h b/be/src/runtime/memory/mem_tracker_limiter.h
index 3a891ca3a1..2c4221373b 100644
--- a/be/src/runtime/memory/mem_tracker_limiter.h
+++ b/be/src/runtime/memory/mem_tracker_limiter.h
@@ -141,7 +141,7 @@ public:
return true;
}
bool st = true;
- if (is_overcommit_tracker() && config::enable_query_memory_overcommit) {
+ if (is_overcommit_tracker() && !config::enable_query_memory_overcommit) {
st = _consumption->try_add(bytes, _limit);
} else {
_consumption->add(bytes);
@@ -192,9 +192,8 @@ public:
static void print_log_process_usage();
// Start canceling from the query with the largest memory usage until the memory of min_free_mem size is freed.
- // vm_rss_str and mem_available_str recorded when gc is triggered, for log printing.
- static int64_t free_top_memory_query(int64_t min_free_mem, const std::string& vm_rss_str,
- const std::string& mem_available_str,
+ // cancel_reason recorded when gc is triggered, for log printing.
+ static int64_t free_top_memory_query(int64_t min_free_mem, const std::string& cancel_reason,
RuntimeProfile* profile, Type type = Type::QUERY);
static int64_t free_top_memory_query(
@@ -202,16 +201,13 @@ public:
const std::function& cancel_msg,
RuntimeProfile* profile, GCType gctype);
- static int64_t free_top_memory_load(int64_t min_free_mem, const std::string& vm_rss_str,
- const std::string& mem_available_str,
+ static int64_t free_top_memory_load(int64_t min_free_mem, const std::string& cancel_reason,
RuntimeProfile* profile) {
- return free_top_memory_query(min_free_mem, vm_rss_str, mem_available_str, profile,
- Type::LOAD);
+ return free_top_memory_query(min_free_mem, cancel_reason, profile, Type::LOAD);
}
// Start canceling from the query with the largest memory overcommit ratio until the memory
// of min_free_mem size is freed.
- static int64_t free_top_overcommit_query(int64_t min_free_mem, const std::string& vm_rss_str,
- const std::string& mem_available_str,
+ static int64_t free_top_overcommit_query(int64_t min_free_mem, const std::string& cancel_reason,
RuntimeProfile* profile, Type type = Type::QUERY);
static int64_t free_top_overcommit_query(
@@ -219,11 +215,9 @@ public:
const std::function& cancel_msg,
RuntimeProfile* profile, GCType gctype);
- static int64_t free_top_overcommit_load(int64_t min_free_mem, const std::string& vm_rss_str,
- const std::string& mem_available_str,
+ static int64_t free_top_overcommit_load(int64_t min_free_mem, const std::string& cancel_reason,
RuntimeProfile* profile) {
- return free_top_overcommit_query(min_free_mem, vm_rss_str, mem_available_str, profile,
- Type::LOAD);
+ return free_top_overcommit_query(min_free_mem, cancel_reason, profile, Type::LOAD);
}
// only for Type::QUERY or Type::LOAD.
diff --git a/be/src/runtime/memory/memory_arbitrator.cpp b/be/src/runtime/memory/memory_arbitrator.cpp
new file mode 100644
index 0000000000..a99f358526
--- /dev/null
+++ b/be/src/runtime/memory/memory_arbitrator.cpp
@@ -0,0 +1,271 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "runtime/memory/memory_arbitrator.h"
+
+#include "runtime/memory/cache_manager.h"
+#include "runtime/workload_group/workload_group.h"
+#include "runtime/workload_group/workload_group_manager.h"
+#include "util/mem_info.h"
+#include "util/runtime_profile.h"
+#include "util/stopwatch.hpp"
+
+namespace doris {
+
+// step1: free all cache
+// step2: free resource groups memory that enable overcommit
+// step3: free global top overcommit query, if enable query memory overcommit
+// TODO Now, the meaning is different from java minor gc + full gc, more like small gc + large gc.
+bool MemoryArbitrator::process_minor_gc(std::string mem_info) {
+ MonotonicStopWatch watch;
+ watch.start();
+ int64_t freed_mem = 0;
+ std::unique_ptr profile = std::make_unique("");
+
+ Defer defer {[&]() {
+ MemInfo::notify_je_purge_dirty_pages();
+ std::stringstream ss;
+ profile->pretty_print(&ss);
+ LOG(INFO) << fmt::format(
+ "[MemoryGC] end minor GC, free memory {}. cost(us): {}, details: {}",
+ PrettyPrinter::print(freed_mem, TUnit::BYTES), watch.elapsed_time() / 1000,
+ ss.str());
+ }};
+
+ freed_mem += CacheManager::instance()->for_each_cache_prune_stale(profile.get());
+ MemInfo::notify_je_purge_dirty_pages();
+ if (freed_mem > MemInfo::process_minor_gc_size()) {
+ return true;
+ }
+
+ if (config::enable_workload_group_memory_gc) {
+ RuntimeProfile* tg_profile = profile->create_child("WorkloadGroup", true, true);
+ freed_mem += tg_enable_overcommit_group_gc(MemInfo::process_minor_gc_size() - freed_mem,
+ tg_profile, true);
+ if (freed_mem > MemInfo::process_minor_gc_size()) {
+ return true;
+ }
+ }
+
+ if (config::enable_query_memory_overcommit) {
+ VLOG_NOTICE << MemTrackerLimiter::type_detail_usage(
+ "[MemoryGC] before free top memory overcommit query in minor GC",
+ MemTrackerLimiter::Type::QUERY);
+ RuntimeProfile* toq_profile =
+ profile->create_child("FreeTopOvercommitMemoryQuery", true, true);
+ freed_mem += MemTrackerLimiter::free_top_overcommit_query(
+ MemInfo::process_minor_gc_size() - freed_mem, mem_info, toq_profile);
+ if (freed_mem > MemInfo::process_minor_gc_size()) {
+ return true;
+ }
+ }
+ return false;
+}
+
+// step1: free all cache
+// step2: free resource groups memory that enable overcommit
+// step3: free global top memory query
+// step4: free top overcommit load, load retries are more expensive, So cancel at the end.
+// step5: free top memory load
+bool MemoryArbitrator::process_full_gc(std::string mem_info) {
+ MonotonicStopWatch watch;
+ watch.start();
+ int64_t freed_mem = 0;
+ std::unique_ptr profile = std::make_unique("");
+
+ Defer defer {[&]() {
+ MemInfo::notify_je_purge_dirty_pages();
+ std::stringstream ss;
+ profile->pretty_print(&ss);
+ LOG(INFO) << fmt::format(
+ "[MemoryGC] end full GC, free Memory {}. cost(us): {}, details: {}",
+ PrettyPrinter::print(freed_mem, TUnit::BYTES), watch.elapsed_time() / 1000,
+ ss.str());
+ }};
+
+ freed_mem += CacheManager::instance()->for_each_cache_prune_all(profile.get());
+ MemInfo::notify_je_purge_dirty_pages();
+ if (freed_mem > MemInfo::process_full_gc_size()) {
+ return true;
+ }
+
+ if (config::enable_workload_group_memory_gc) {
+ RuntimeProfile* tg_profile = profile->create_child("WorkloadGroup", true, true);
+ freed_mem += tg_enable_overcommit_group_gc(MemInfo::process_full_gc_size() - freed_mem,
+ tg_profile, false);
+ if (freed_mem > MemInfo::process_full_gc_size()) {
+ return true;
+ }
+ }
+
+ VLOG_NOTICE << MemTrackerLimiter::type_detail_usage(
+ "[MemoryGC] before free top memory query in full GC", MemTrackerLimiter::Type::QUERY);
+ RuntimeProfile* tmq_profile = profile->create_child("FreeTopMemoryQuery", true, true);
+ freed_mem += MemTrackerLimiter::free_top_memory_query(
+ MemInfo::process_full_gc_size() - freed_mem, mem_info, tmq_profile);
+ if (freed_mem > MemInfo::process_full_gc_size()) {
+ return true;
+ }
+
+ if (config::enable_query_memory_overcommit) {
+ VLOG_NOTICE << MemTrackerLimiter::type_detail_usage(
+ "[MemoryGC] before free top memory overcommit load in full GC",
+ MemTrackerLimiter::Type::LOAD);
+ RuntimeProfile* tol_profile =
+ profile->create_child("FreeTopMemoryOvercommitLoad", true, true);
+ freed_mem += MemTrackerLimiter::free_top_overcommit_load(
+ MemInfo::process_full_gc_size() - freed_mem, mem_info, tol_profile);
+ if (freed_mem > MemInfo::process_full_gc_size()) {
+ return true;
+ }
+ }
+
+ VLOG_NOTICE << MemTrackerLimiter::type_detail_usage(
+ "[MemoryGC] before free top memory load in full GC", MemTrackerLimiter::Type::LOAD);
+ RuntimeProfile* tml_profile = profile->create_child("FreeTopMemoryLoad", true, true);
+ freed_mem += MemTrackerLimiter::free_top_memory_load(
+ MemInfo::process_full_gc_size() - freed_mem, mem_info, tml_profile);
+ return freed_mem > MemInfo::process_full_gc_size();
+}
+
+int64_t MemoryArbitrator::tg_disable_overcommit_group_gc() {
+ MonotonicStopWatch watch;
+ watch.start();
+ std::vector task_groups;
+ std::unique_ptr tg_profile = std::make_unique("WorkloadGroup");
+ int64_t total_free_memory = 0;
+
+ ExecEnv::GetInstance()->workload_group_mgr()->get_related_workload_groups(
+ [](const WorkloadGroupPtr& workload_group) {
+ return workload_group->is_mem_limit_valid() &&
+ !workload_group->enable_memory_overcommit();
+ },
+ &task_groups);
+ if (task_groups.empty()) {
+ return 0;
+ }
+
+ std::vector task_groups_overcommit;
+ for (const auto& workload_group : task_groups) {
+ if (workload_group->memory_used() > workload_group->memory_limit()) {
+ task_groups_overcommit.push_back(workload_group);
+ }
+ }
+ if (task_groups_overcommit.empty()) {
+ return 0;
+ }
+
+ LOG(INFO) << fmt::format(
+ "[MemoryGC] start GC work load group that not enable overcommit, number of overcommit "
+ "group: {}, "
+ "if it exceeds the limit, try free size = (group used - group limit).",
+ task_groups_overcommit.size());
+
+ Defer defer {[&]() {
+ if (total_free_memory > 0) {
+ std::stringstream ss;
+ tg_profile->pretty_print(&ss);
+ LOG(INFO) << fmt::format(
+ "[MemoryGC] end GC work load group that not enable overcommit, number of "
+ "overcommit group: {}, free memory {}. cost(us): {}, details: {}",
+ task_groups_overcommit.size(),
+ PrettyPrinter::print(total_free_memory, TUnit::BYTES),
+ watch.elapsed_time() / 1000, ss.str());
+ }
+ }};
+
+ for (const auto& workload_group : task_groups_overcommit) {
+ auto used = workload_group->memory_used();
+ total_free_memory += workload_group->gc_memory(used - workload_group->memory_limit(),
+ tg_profile.get(), false);
+ }
+ return total_free_memory;
+}
+
+int64_t MemoryArbitrator::tg_enable_overcommit_group_gc(int64_t request_free_memory,
+ RuntimeProfile* profile, bool is_minor_gc) {
+ MonotonicStopWatch watch;
+ watch.start();
+ std::vector task_groups;
+ ExecEnv::GetInstance()->workload_group_mgr()->get_related_workload_groups(
+ [](const WorkloadGroupPtr& workload_group) {
+ return workload_group->is_mem_limit_valid() &&
+ workload_group->enable_memory_overcommit();
+ },
+ &task_groups);
+ if (task_groups.empty()) {
+ return 0;
+ }
+
+ int64_t total_exceeded_memory = 0;
+ std::vector used_memorys;
+ std::vector exceeded_memorys;
+ for (const auto& workload_group : task_groups) {
+ int64_t used_memory = workload_group->memory_used();
+ int64_t exceeded = used_memory - workload_group->memory_limit();
+ int64_t exceeded_memory = exceeded > 0 ? exceeded : 0;
+ total_exceeded_memory += exceeded_memory;
+ used_memorys.emplace_back(used_memory);
+ exceeded_memorys.emplace_back(exceeded_memory);
+ }
+
+ int64_t total_free_memory = 0;
+ bool gc_all_exceeded = request_free_memory >= total_exceeded_memory;
+ std::string log_prefix = fmt::format(
+ "work load group that enable overcommit, number of group: {}, request_free_memory:{}, "
+ "total_exceeded_memory:{}",
+ task_groups.size(), request_free_memory, total_exceeded_memory);
+ if (gc_all_exceeded) {
+ LOG(INFO) << fmt::format(
+ "[MemoryGC] start GC {}, request more than exceeded, try free size = (group used - "
+ "group limit).",
+ log_prefix);
+ } else {
+ LOG(INFO) << fmt::format(
+ "[MemoryGC] start GC {}, request less than exceeded, try free size = ((group used "
+ "- group limit) / all group total_exceeded_memory) * request_free_memory.",
+ log_prefix);
+ }
+
+ Defer defer {[&]() {
+ if (total_free_memory > 0) {
+ std::stringstream ss;
+ profile->pretty_print(&ss);
+ LOG(INFO) << fmt::format(
+ "[MemoryGC] end GC {}, free memory {}. cost(us): {}, details: {}", log_prefix,
+ PrettyPrinter::print(total_free_memory, TUnit::BYTES),
+ watch.elapsed_time() / 1000, ss.str());
+ }
+ }};
+
+ for (int i = 0; i < task_groups.size(); ++i) {
+ if (exceeded_memorys[i] == 0) {
+ continue;
+ }
+
+ // todo: GC according to resource group priority
+ auto tg_need_free_memory = int64_t(
+ gc_all_exceeded ? exceeded_memorys[i]
+ : static_cast(exceeded_memorys[i]) / total_exceeded_memory *
+ request_free_memory); // exceeded memory as a weight
+ auto workload_group = task_groups[i];
+ total_free_memory += workload_group->gc_memory(tg_need_free_memory, profile, is_minor_gc);
+ }
+ return total_free_memory;
+}
+
+} // namespace doris
diff --git a/be/src/runtime/memory/memory_arbitrator.h b/be/src/runtime/memory/memory_arbitrator.h
new file mode 100644
index 0000000000..2a936b8ba0
--- /dev/null
+++ b/be/src/runtime/memory/memory_arbitrator.h
@@ -0,0 +1,40 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include "runtime/memory/global_memory_arbitrator.h"
+
+namespace doris {
+
+class MemoryArbitrator {
+public:
+ static bool process_minor_gc(
+ std::string mem_info =
+ doris::GlobalMemoryArbitrator::process_soft_limit_exceeded_errmsg_str());
+ static bool process_full_gc(
+ std::string mem_info =
+ doris::GlobalMemoryArbitrator::process_limit_exceeded_errmsg_str());
+
+ static int64_t tg_disable_overcommit_group_gc();
+ static int64_t tg_enable_overcommit_group_gc(int64_t request_free_memory,
+ RuntimeProfile* profile, bool is_minor_gc);
+
+private:
+};
+
+} // namespace doris
diff --git a/be/src/runtime/workload_group/workload_group.cpp b/be/src/runtime/workload_group/workload_group.cpp
index 5a57152801..f1c49d9763 100644
--- a/be/src/runtime/workload_group/workload_group.cpp
+++ b/be/src/runtime/workload_group/workload_group.cpp
@@ -173,18 +173,20 @@ int64_t WorkloadGroup::gc_memory(int64_t need_free_mem, RuntimeProfile* profile,
MemTracker::print_bytes(_memory_limit), BackendOptions::get_localhost());
}
}
- std::string process_mem_usage_str = GlobalMemoryArbitrator::process_mem_log_str();
- auto cancel_top_overcommit_str = [cancel_str, process_mem_usage_str](int64_t mem_consumption,
- const std::string& label) {
+ auto cancel_top_overcommit_str = [cancel_str](int64_t mem_consumption,
+ const std::string& label) {
return fmt::format(
- "{} cancel top memory overcommit tracker <{}> consumption {}. details:{}",
- cancel_str, label, MemTracker::print_bytes(mem_consumption), process_mem_usage_str);
+ "{} cancel top memory overcommit tracker <{}> consumption {}. details:{}, Execute "
+ "again after enough memory, details see be.INFO.",
+ cancel_str, label, MemTracker::print_bytes(mem_consumption),
+ GlobalMemoryArbitrator::process_limit_exceeded_errmsg_str());
};
- auto cancel_top_usage_str = [cancel_str, process_mem_usage_str](int64_t mem_consumption,
- const std::string& label) {
- return fmt::format("{} cancel top memory used tracker <{}> consumption {}. details:{}",
- cancel_str, label, MemTracker::print_bytes(mem_consumption),
- process_mem_usage_str);
+ auto cancel_top_usage_str = [cancel_str](int64_t mem_consumption, const std::string& label) {
+ return fmt::format(
+ "{} cancel top memory used tracker <{}> consumption {}. details:{}, Execute again "
+ "after enough memory, details see be.INFO.",
+ cancel_str, label, MemTracker::print_bytes(mem_consumption),
+ GlobalMemoryArbitrator::process_soft_limit_exceeded_errmsg_str());
};
LOG(INFO) << fmt::format(
diff --git a/be/src/runtime/workload_group/workload_group_manager.cpp b/be/src/runtime/workload_group/workload_group_manager.cpp
index dc4c73782e..7a93015030 100644
--- a/be/src/runtime/workload_group/workload_group_manager.cpp
+++ b/be/src/runtime/workload_group/workload_group_manager.cpp
@@ -193,14 +193,11 @@ void WorkloadGroupMgr::refresh_wg_memory_info() {
// we count these cache memories equally on workload groups.
double ratio = (double)proc_vm_rss / (double)all_queries_mem_used;
if (ratio <= 1.25) {
- std::string debug_msg = fmt::format(
- "\nProcess Memory Summary: process_vm_rss: {}, process mem: {}, sys mem available: "
- "{}, all quries mem: {}",
- PrettyPrinter::print(proc_vm_rss, TUnit::BYTES),
- PrettyPrinter::print(doris::GlobalMemoryArbitrator::process_memory_usage(),
- TUnit::BYTES),
- doris::MemInfo::sys_mem_available_str(),
- PrettyPrinter::print(all_queries_mem_used, TUnit::BYTES));
+ std::string debug_msg =
+ fmt::format("\nProcess Memory Summary: {}, {}, all quries mem: {}",
+ doris::GlobalMemoryArbitrator::process_memory_used_details_str(),
+ doris::GlobalMemoryArbitrator::sys_mem_available_details_str(),
+ PrettyPrinter::print(all_queries_mem_used, TUnit::BYTES));
LOG_EVERY_T(INFO, 10) << debug_msg;
}
diff --git a/be/src/util/mem_info.cpp b/be/src/util/mem_info.cpp
index a3d391a00b..45e609d710 100644
--- a/be/src/util/mem_info.cpp
+++ b/be/src/util/mem_info.cpp
@@ -39,33 +39,20 @@
#include "common/config.h"
#include "common/status.h"
#include "gutil/strings/split.h"
-#include "runtime/exec_env.h"
-#include "runtime/memory/cache_manager.h"
-#include "runtime/memory/mem_tracker_limiter.h"
-#include "runtime/workload_group/workload_group.h"
-#include "runtime/workload_group/workload_group_manager.h"
#include "util/cgroup_util.h"
-#include "util/defer_op.h"
#include "util/parse_util.h"
#include "util/pretty_printer.h"
-#include "util/runtime_profile.h"
-#include "util/stopwatch.hpp"
#include "util/string_parser.hpp"
namespace doris {
-bvar::PassiveStatus g_sys_mem_avail(
- "meminfo_sys_mem_avail", [](void*) { return MemInfo::sys_mem_available(); }, nullptr);
-
bool MemInfo::_s_initialized = false;
std::atomic MemInfo::_s_physical_mem = std::numeric_limits::max();
std::atomic MemInfo::_s_mem_limit = std::numeric_limits::max();
std::atomic MemInfo::_s_soft_mem_limit = std::numeric_limits::max();
std::atomic MemInfo::_s_allocator_cache_mem = 0;
-std::string MemInfo::_s_allocator_cache_mem_str = "";
std::atomic MemInfo::_s_virtual_memory_used = 0;
-std::atomic MemInfo::refresh_interval_memory_growth = 0;
int64_t MemInfo::_s_cgroup_mem_limit = std::numeric_limits::max();
int64_t MemInfo::_s_cgroup_mem_usage = std::numeric_limits::min();
@@ -99,9 +86,6 @@ void MemInfo::refresh_allocator_mem() {
get_je_metrics("stats.metadata") +
get_je_all_arena_metrics("pdirty") * get_page_size(),
std::memory_order_relaxed);
- _s_allocator_cache_mem_str = PrettyPrinter::print(
- static_cast(_s_allocator_cache_mem.load(std::memory_order_relaxed)),
- TUnit::BYTES);
_s_virtual_memory_used.store(get_je_metrics("stats.mapped"), std::memory_order_relaxed);
#else
_s_allocator_cache_mem.store(get_tc_metrics("tcmalloc.pageheap_free_bytes") +
@@ -109,265 +93,12 @@ void MemInfo::refresh_allocator_mem() {
get_tc_metrics("tcmalloc.transfer_cache_free_bytes") +
get_tc_metrics("tcmalloc.thread_cache_free_bytes"),
std::memory_order_relaxed);
- _s_allocator_cache_mem_str = PrettyPrinter::print(
- static_cast(_s_allocator_cache_mem.load(std::memory_order_relaxed)),
- TUnit::BYTES);
_s_virtual_memory_used.store(get_tc_metrics("generic.total_physical_bytes") +
get_tc_metrics("tcmalloc.pageheap_unmapped_bytes"),
std::memory_order_relaxed);
#endif
}
-// step1: free all cache
-// step2: free resource groups memory that enable overcommit
-// step3: free global top overcommit query, if enable query memory overcommit
-// TODO Now, the meaning is different from java minor gc + full gc, more like small gc + large gc.
-bool MemInfo::process_minor_gc() {
- MonotonicStopWatch watch;
- watch.start();
- int64_t freed_mem = 0;
- std::unique_ptr profile = std::make_unique("");
- std::string pre_vm_rss = PerfCounters::get_vm_rss_str();
- std::string pre_sys_mem_available = MemInfo::sys_mem_available_str();
-
- Defer defer {[&]() {
- MemInfo::notify_je_purge_dirty_pages();
- std::stringstream ss;
- profile->pretty_print(&ss);
- LOG(INFO) << fmt::format(
- "[MemoryGC] end minor GC, free memory {}. cost(us): {}, details: {}",
- PrettyPrinter::print(freed_mem, TUnit::BYTES), watch.elapsed_time() / 1000,
- ss.str());
- }};
-
- freed_mem += CacheManager::instance()->for_each_cache_prune_stale(profile.get());
- MemInfo::notify_je_purge_dirty_pages();
- if (freed_mem > MemInfo::process_minor_gc_size()) {
- return true;
- }
-
- if (config::enable_workload_group_memory_gc) {
- RuntimeProfile* tg_profile = profile->create_child("WorkloadGroup", true, true);
- freed_mem += tg_enable_overcommit_group_gc(MemInfo::process_minor_gc_size() - freed_mem,
- tg_profile, true);
- if (freed_mem > MemInfo::process_minor_gc_size()) {
- return true;
- }
- }
-
- if (config::enable_query_memory_overcommit) {
- VLOG_NOTICE << MemTrackerLimiter::type_detail_usage(
- "[MemoryGC] before free top memory overcommit query in minor GC",
- MemTrackerLimiter::Type::QUERY);
- RuntimeProfile* toq_profile =
- profile->create_child("FreeTopOvercommitMemoryQuery", true, true);
- freed_mem += MemTrackerLimiter::free_top_overcommit_query(
- MemInfo::process_minor_gc_size() - freed_mem, pre_vm_rss, pre_sys_mem_available,
- toq_profile);
- if (freed_mem > MemInfo::process_minor_gc_size()) {
- return true;
- }
- }
- return false;
-}
-
-// step1: free all cache
-// step2: free resource groups memory that enable overcommit
-// step3: free global top memory query
-// step4: free top overcommit load, load retries are more expensive, So cancel at the end.
-// step5: free top memory load
-bool MemInfo::process_full_gc() {
- MonotonicStopWatch watch;
- watch.start();
- int64_t freed_mem = 0;
- std::unique_ptr profile = std::make_unique("");
- std::string pre_vm_rss = PerfCounters::get_vm_rss_str();
- std::string pre_sys_mem_available = MemInfo::sys_mem_available_str();
-
- Defer defer {[&]() {
- MemInfo::notify_je_purge_dirty_pages();
- std::stringstream ss;
- profile->pretty_print(&ss);
- LOG(INFO) << fmt::format(
- "[MemoryGC] end full GC, free Memory {}. cost(us): {}, details: {}",
- PrettyPrinter::print(freed_mem, TUnit::BYTES), watch.elapsed_time() / 1000,
- ss.str());
- }};
-
- freed_mem += CacheManager::instance()->for_each_cache_prune_all(profile.get());
- MemInfo::notify_je_purge_dirty_pages();
- if (freed_mem > MemInfo::process_full_gc_size()) {
- return true;
- }
-
- if (config::enable_workload_group_memory_gc) {
- RuntimeProfile* tg_profile = profile->create_child("WorkloadGroup", true, true);
- freed_mem += tg_enable_overcommit_group_gc(MemInfo::process_full_gc_size() - freed_mem,
- tg_profile, false);
- if (freed_mem > MemInfo::process_full_gc_size()) {
- return true;
- }
- }
-
- VLOG_NOTICE << MemTrackerLimiter::type_detail_usage(
- "[MemoryGC] before free top memory query in full GC", MemTrackerLimiter::Type::QUERY);
- RuntimeProfile* tmq_profile = profile->create_child("FreeTopMemoryQuery", true, true);
- freed_mem += MemTrackerLimiter::free_top_memory_query(
- MemInfo::process_full_gc_size() - freed_mem, pre_vm_rss, pre_sys_mem_available,
- tmq_profile);
- if (freed_mem > MemInfo::process_full_gc_size()) {
- return true;
- }
-
- if (config::enable_query_memory_overcommit) {
- VLOG_NOTICE << MemTrackerLimiter::type_detail_usage(
- "[MemoryGC] before free top memory overcommit load in full GC",
- MemTrackerLimiter::Type::LOAD);
- RuntimeProfile* tol_profile =
- profile->create_child("FreeTopMemoryOvercommitLoad", true, true);
- freed_mem += MemTrackerLimiter::free_top_overcommit_load(
- MemInfo::process_full_gc_size() - freed_mem, pre_vm_rss, pre_sys_mem_available,
- tol_profile);
- if (freed_mem > MemInfo::process_full_gc_size()) {
- return true;
- }
- }
-
- VLOG_NOTICE << MemTrackerLimiter::type_detail_usage(
- "[MemoryGC] before free top memory load in full GC", MemTrackerLimiter::Type::LOAD);
- RuntimeProfile* tml_profile = profile->create_child("FreeTopMemoryLoad", true, true);
- freed_mem +=
- MemTrackerLimiter::free_top_memory_load(MemInfo::process_full_gc_size() - freed_mem,
- pre_vm_rss, pre_sys_mem_available, tml_profile);
- return freed_mem > MemInfo::process_full_gc_size();
-}
-
-int64_t MemInfo::tg_disable_overcommit_group_gc() {
- MonotonicStopWatch watch;
- watch.start();
- std::vector task_groups;
- std::unique_ptr tg_profile = std::make_unique("WorkloadGroup");
- int64_t total_free_memory = 0;
-
- ExecEnv::GetInstance()->workload_group_mgr()->get_related_workload_groups(
- [](const WorkloadGroupPtr& workload_group) {
- return workload_group->is_mem_limit_valid() &&
- !workload_group->enable_memory_overcommit();
- },
- &task_groups);
- if (task_groups.empty()) {
- return 0;
- }
-
- std::vector task_groups_overcommit;
- for (const auto& workload_group : task_groups) {
- if (workload_group->memory_used() > workload_group->memory_limit()) {
- task_groups_overcommit.push_back(workload_group);
- }
- }
- if (task_groups_overcommit.empty()) {
- return 0;
- }
-
- LOG(INFO) << fmt::format(
- "[MemoryGC] start GC work load group that not enable overcommit, number of overcommit "
- "group: {}, "
- "if it exceeds the limit, try free size = (group used - group limit).",
- task_groups_overcommit.size());
-
- Defer defer {[&]() {
- if (total_free_memory > 0) {
- std::stringstream ss;
- tg_profile->pretty_print(&ss);
- LOG(INFO) << fmt::format(
- "[MemoryGC] end GC work load group that not enable overcommit, number of "
- "overcommit group: {}, free memory {}. cost(us): {}, details: {}",
- task_groups_overcommit.size(),
- PrettyPrinter::print(total_free_memory, TUnit::BYTES),
- watch.elapsed_time() / 1000, ss.str());
- }
- }};
-
- for (const auto& workload_group : task_groups_overcommit) {
- auto used = workload_group->memory_used();
- total_free_memory += workload_group->gc_memory(used - workload_group->memory_limit(),
- tg_profile.get(), false);
- }
- return total_free_memory;
-}
-
-int64_t MemInfo::tg_enable_overcommit_group_gc(int64_t request_free_memory, RuntimeProfile* profile,
- bool is_minor_gc) {
- MonotonicStopWatch watch;
- watch.start();
- std::vector task_groups;
- ExecEnv::GetInstance()->workload_group_mgr()->get_related_workload_groups(
- [](const WorkloadGroupPtr& workload_group) {
- return workload_group->is_mem_limit_valid() &&
- workload_group->enable_memory_overcommit();
- },
- &task_groups);
- if (task_groups.empty()) {
- return 0;
- }
-
- int64_t total_exceeded_memory = 0;
- std::vector used_memorys;
- std::vector exceeded_memorys;
- for (const auto& workload_group : task_groups) {
- int64_t used_memory = workload_group->memory_used();
- int64_t exceeded = used_memory - workload_group->memory_limit();
- int64_t exceeded_memory = exceeded > 0 ? exceeded : 0;
- total_exceeded_memory += exceeded_memory;
- used_memorys.emplace_back(used_memory);
- exceeded_memorys.emplace_back(exceeded_memory);
- }
-
- int64_t total_free_memory = 0;
- bool gc_all_exceeded = request_free_memory >= total_exceeded_memory;
- std::string log_prefix = fmt::format(
- "work load group that enable overcommit, number of group: {}, request_free_memory:{}, "
- "total_exceeded_memory:{}",
- task_groups.size(), request_free_memory, total_exceeded_memory);
- if (gc_all_exceeded) {
- LOG(INFO) << fmt::format(
- "[MemoryGC] start GC {}, request more than exceeded, try free size = (group used - "
- "group limit).",
- log_prefix);
- } else {
- LOG(INFO) << fmt::format(
- "[MemoryGC] start GC {}, request less than exceeded, try free size = ((group used "
- "- group limit) / all group total_exceeded_memory) * request_free_memory.",
- log_prefix);
- }
-
- Defer defer {[&]() {
- if (total_free_memory > 0) {
- std::stringstream ss;
- profile->pretty_print(&ss);
- LOG(INFO) << fmt::format(
- "[MemoryGC] end GC {}, free memory {}. cost(us): {}, details: {}", log_prefix,
- PrettyPrinter::print(total_free_memory, TUnit::BYTES),
- watch.elapsed_time() / 1000, ss.str());
- }
- }};
-
- for (int i = 0; i < task_groups.size(); ++i) {
- if (exceeded_memorys[i] == 0) {
- continue;
- }
-
- // todo: GC according to resource group priority
- auto tg_need_free_memory = int64_t(
- gc_all_exceeded ? exceeded_memorys[i]
- : static_cast(exceeded_memorys[i]) / total_exceeded_memory *
- request_free_memory); // exceeded memory as a weight
- auto workload_group = task_groups[i];
- total_free_memory += workload_group->gc_memory(tg_need_free_memory, profile, is_minor_gc);
- }
- return total_free_memory;
-}
-
#ifndef __APPLE__
void MemInfo::refresh_proc_meminfo() {
std::ifstream meminfo("/proc/meminfo", std::ios::in);
@@ -546,13 +277,15 @@ void MemInfo::init() {
getline(vminfo, line);
boost::algorithm::trim(line);
StringParser::ParseResult result;
- int64_t mem_value = StringParser::string_to_int(line.data(), line.size(), &result);
+ auto mem_value = StringParser::string_to_int(line.data(), line.size(), &result);
if (result == StringParser::PARSE_SUCCESS) {
_s_vm_min_free_kbytes = mem_value * 1024L;
}
}
- if (vminfo.is_open()) vminfo.close();
+ if (vminfo.is_open()) {
+ vminfo.close();
+ }
// Redhat 4.x OS, `/proc/meminfo` has no `MemAvailable`.
if (_mem_info_bytes.find("MemAvailable") != _mem_info_bytes.end()) {
@@ -576,7 +309,9 @@ void MemInfo::init() {
std::string hugepage_enable;
// If file not exist, getline returns an empty string.
getline(sys_transparent_hugepage, hugepage_enable);
- if (sys_transparent_hugepage.is_open()) sys_transparent_hugepage.close();
+ if (sys_transparent_hugepage.is_open()) {
+ sys_transparent_hugepage.close();
+ }
if (hugepage_enable == "[always] madvise never") {
std::cout << "[WARNING!] /sys/kernel/mm/transparent_hugepage/enabled: " << hugepage_enable
<< ", Doris not recommend turning on THP, which may cause the BE process to use "
@@ -591,7 +326,9 @@ void MemInfo::init() {
std::ifstream sys_vm("/proc/sys/vm/overcommit_memory", std::ios::in);
std::string vm_overcommit;
getline(sys_vm, vm_overcommit);
- if (sys_vm.is_open()) sys_vm.close();
+ if (sys_vm.is_open()) {
+ sys_vm.close();
+ }
if (!vm_overcommit.empty() && std::stoi(vm_overcommit) == 2) {
std::cout << "[WARNING!] /proc/sys/vm/overcommit_memory: " << vm_overcommit
<< ", expect is 1, memory limit check is handed over to Doris Allocator, "
@@ -632,12 +369,11 @@ void MemInfo::init() {
std::string MemInfo::debug_string() {
DCHECK(_s_initialized);
- CGroupUtil util;
std::stringstream stream;
stream << "Physical Memory: " << PrettyPrinter::print(_s_physical_mem, TUnit::BYTES)
<< std::endl;
stream << "Memory Limt: " << PrettyPrinter::print(_s_mem_limit, TUnit::BYTES) << std::endl;
- stream << "CGroup Info: " << util.debug_string() << std::endl;
+ stream << "CGroup Info: " << doris::CGroupUtil::debug_string() << std::endl;
return stream.str();
}
diff --git a/be/src/util/mem_info.h b/be/src/util/mem_info.h
index dc4b0e0d29..1b92d0eb9f 100644
--- a/be/src/util/mem_info.h
+++ b/be/src/util/mem_info.h
@@ -73,19 +73,6 @@ public:
static void refresh_proc_meminfo();
- static inline int64_t sys_mem_available() {
- return _s_sys_mem_available.load(std::memory_order_relaxed) -
- refresh_interval_memory_growth;
- }
- static inline std::string sys_mem_available_str() {
-#ifdef ADDRESS_SANITIZER
- return "[ASAN]" + PrettyPrinter::print(_s_sys_mem_available.load(std::memory_order_relaxed),
- TUnit::BYTES);
-#else
- return PrettyPrinter::print(_s_sys_mem_available.load(std::memory_order_relaxed),
- TUnit::BYTES);
-#endif
- }
static inline int64_t sys_mem_available_low_water_mark() {
return _s_sys_mem_available_low_water_mark;
}
@@ -157,7 +144,6 @@ public:
static inline size_t allocator_cache_mem() {
return _s_allocator_cache_mem.load(std::memory_order_relaxed);
}
- static inline std::string allocator_cache_mem_str() { return _s_allocator_cache_mem_str; }
// Tcmalloc property `generic.total_physical_bytes` records the total length of the virtual memory
// obtained by the process malloc, not the physical memory actually used by the process in the OS.
@@ -183,25 +169,15 @@ public:
static std::string debug_string();
- static bool process_minor_gc();
- static bool process_full_gc();
-
- static int64_t tg_disable_overcommit_group_gc();
- static int64_t tg_enable_overcommit_group_gc(int64_t request_free_memory,
- RuntimeProfile* profile, bool is_minor_gc);
-
- // It is only used after the memory limit is exceeded. When multiple threads are waiting for the available memory of the process,
- // avoid multiple threads starting at the same time and causing OOM.
- static std::atomic refresh_interval_memory_growth;
-
private:
+ friend class GlobalMemoryArbitrator;
+
static bool _s_initialized;
static std::atomic _s_physical_mem;
static std::atomic _s_mem_limit;
static std::atomic _s_soft_mem_limit;
static std::atomic _s_allocator_cache_mem;
- static std::string _s_allocator_cache_mem_str;
static std::atomic _s_virtual_memory_used;
static int64_t _s_cgroup_mem_limit;
diff --git a/be/src/vec/common/allocator.cpp b/be/src/vec/common/allocator.cpp
index 88c92e4bd8..39abfa3926 100644
--- a/be/src/vec/common/allocator.cpp
+++ b/be/src/vec/common/allocator.cpp
@@ -86,7 +86,7 @@ void Allocator::sys_memory_check(size_t
while (wait_milliseconds < doris::config::thread_wait_gc_max_milliseconds) {
std::this_thread::sleep_for(std::chrono::milliseconds(100));
if (!doris::GlobalMemoryArbitrator::is_exceed_hard_mem_limit(size)) {
- doris::MemInfo::refresh_interval_memory_growth += size;
+ doris::GlobalMemoryArbitrator::refresh_interval_memory_growth += size;
break;
}
if (doris::thread_context()->thread_mem_tracker_mgr->is_query_cancelled()) {
@@ -189,13 +189,9 @@ template
void Allocator::throw_bad_alloc(
const std::string& err) const {
LOG(WARNING) << err
- << fmt::format(
- " os physical memory {}. process memory used {}, sys available memory "
- "{}, Stacktrace: {}",
- doris::PrettyPrinter::print(doris::MemInfo::physical_mem(),
- doris::TUnit::BYTES),
- doris::PerfCounters::get_vm_rss_str(),
- doris::MemInfo::sys_mem_available_str(), doris::get_stack_trace());
+ << fmt::format("{}, Stacktrace: {}",
+ doris::GlobalMemoryArbitrator::process_mem_log_str(),
+ doris::get_stack_trace());
doris::MemTrackerLimiter::print_log_process_usage();
throw doris::Exception(doris::ErrorCode::MEM_ALLOC_FAILED, err);
}
diff --git a/be/src/vec/sink/writer/vtablet_writer.cpp b/be/src/vec/sink/writer/vtablet_writer.cpp
index 5a061cc51d..60bd4eafa8 100644
--- a/be/src/vec/sink/writer/vtablet_writer.cpp
+++ b/be/src/vec/sink/writer/vtablet_writer.cpp
@@ -62,6 +62,7 @@
#include "exec/tablet_info.h"
#include "runtime/descriptors.h"
#include "runtime/exec_env.h"
+#include "runtime/memory/memory_arbitrator.h"
#include "runtime/runtime_state.h"
#include "runtime/thread_context.h"
#include "service/backend_options.h"
@@ -554,7 +555,7 @@ Status VNodeChannel::add_block(vectorized::Block* block, const Payload* payload)
int VNodeChannel::try_send_and_fetch_status(RuntimeState* state,
std::unique_ptr& thread_pool_token) {
DBUG_EXECUTE_IF("VNodeChannel.try_send_and_fetch_status_full_gc",
- { MemInfo::process_full_gc(); });
+ { MemoryArbitrator::process_full_gc(); });
if (_cancelled || _send_finished) { // not run
return 0;
@@ -875,7 +876,7 @@ void VNodeChannel::cancel(const std::string& cancel_msg) {
}
Status VNodeChannel::close_wait(RuntimeState* state) {
- DBUG_EXECUTE_IF("VNodeChannel.close_wait_full_gc", { MemInfo::process_full_gc(); });
+ DBUG_EXECUTE_IF("VNodeChannel.close_wait_full_gc", { MemoryArbitrator::process_full_gc(); });
SCOPED_CONSUME_MEM_TRACKER(_node_channel_tracker.get());
// set _is_closed to true finally
Defer set_closed {[&]() {