[Improvement](executor)Refactor Workload group memory GC (#33797)
* just gc group's overcommit query when minor gc * add process usage
This commit is contained in:
@ -1124,6 +1124,8 @@ DEFINE_Bool(enable_flush_file_cache_async, "true");
|
||||
DEFINE_mString(doris_cgroup_cpu_path, "");
|
||||
DEFINE_mBool(enable_cgroup_cpu_soft_limit, "true");
|
||||
|
||||
DEFINE_mBool(enable_workload_group_memory_gc, "true");
|
||||
|
||||
DEFINE_Bool(ignore_always_true_predicate_for_segment, "true");
|
||||
|
||||
// Dir of default timezone files
|
||||
|
||||
@ -1196,6 +1196,8 @@ DECLARE_mBool(exit_on_exception);
|
||||
DECLARE_mString(doris_cgroup_cpu_path);
|
||||
DECLARE_mBool(enable_cgroup_cpu_soft_limit);
|
||||
|
||||
DECLARE_mBool(enable_workload_group_memory_gc);
|
||||
|
||||
// This config controls whether the s3 file writer would flush cache asynchronously
|
||||
DECLARE_Bool(enable_flush_file_cache_async);
|
||||
|
||||
|
||||
@ -142,30 +142,47 @@ void WorkloadGroup::remove_mem_tracker_limiter(std::shared_ptr<MemTrackerLimiter
|
||||
}
|
||||
}
|
||||
|
||||
int64_t WorkloadGroup::gc_memory(int64_t need_free_mem, RuntimeProfile* profile) {
|
||||
int64_t WorkloadGroup::gc_memory(int64_t need_free_mem, RuntimeProfile* profile, bool is_minor_gc) {
|
||||
if (need_free_mem <= 0) {
|
||||
return 0;
|
||||
}
|
||||
int64_t used_memory = memory_used();
|
||||
int64_t freed_mem = 0;
|
||||
|
||||
std::string cancel_str = fmt::format(
|
||||
"work load group memory exceeded limit, group id:{}, name:{}, used:{}, limit:{}, "
|
||||
"backend:{}.",
|
||||
_id, _name, MemTracker::print_bytes(used_memory),
|
||||
MemTracker::print_bytes(_memory_limit), BackendOptions::get_localhost());
|
||||
auto cancel_top_overcommit_str = [cancel_str](int64_t mem_consumption,
|
||||
const std::string& label) {
|
||||
std::string cancel_str = "";
|
||||
if (is_minor_gc) {
|
||||
cancel_str = fmt::format(
|
||||
"MinorGC kill overcommit query, wg id:{}, name:{}, used:{}, limit:{}, "
|
||||
"backend:{}.",
|
||||
_id, _name, MemTracker::print_bytes(used_memory),
|
||||
MemTracker::print_bytes(_memory_limit), BackendOptions::get_localhost());
|
||||
} else {
|
||||
if (_enable_memory_overcommit) {
|
||||
cancel_str = fmt::format(
|
||||
"FullGC release wg overcommit mem, wg id:{}, name:{}, "
|
||||
"used:{},limit:{},backend:{}.",
|
||||
_id, _name, MemTracker::print_bytes(used_memory),
|
||||
MemTracker::print_bytes(_memory_limit), BackendOptions::get_localhost());
|
||||
} else {
|
||||
cancel_str = fmt::format(
|
||||
"GC wg for hard limit, wg id:{}, name:{}, used:{}, limit:{}, "
|
||||
"backend:{}.",
|
||||
_id, _name, MemTracker::print_bytes(used_memory),
|
||||
MemTracker::print_bytes(_memory_limit), BackendOptions::get_localhost());
|
||||
}
|
||||
}
|
||||
std::string process_mem_usage_str = MemTrackerLimiter::process_mem_log_str();
|
||||
auto cancel_top_overcommit_str = [cancel_str, process_mem_usage_str](int64_t mem_consumption,
|
||||
const std::string& label) {
|
||||
return fmt::format(
|
||||
"{} cancel top memory overcommit tracker <{}> consumption {}. execute again after "
|
||||
"enough memory, details see be.INFO.",
|
||||
cancel_str, label, MemTracker::print_bytes(mem_consumption));
|
||||
"{} cancel top memory overcommit tracker <{}> consumption {}. details:{}",
|
||||
cancel_str, label, MemTracker::print_bytes(mem_consumption), process_mem_usage_str);
|
||||
};
|
||||
auto cancel_top_usage_str = [cancel_str](int64_t mem_consumption, const std::string& label) {
|
||||
return fmt::format(
|
||||
"{} cancel top memory used tracker <{}> consumption {}. execute again after "
|
||||
"enough memory, details see be.INFO.",
|
||||
cancel_str, label, MemTracker::print_bytes(mem_consumption));
|
||||
auto cancel_top_usage_str = [cancel_str, process_mem_usage_str](int64_t mem_consumption,
|
||||
const std::string& label) {
|
||||
return fmt::format("{} cancel top memory used tracker <{}> consumption {}. details:{}",
|
||||
cancel_str, label, MemTracker::print_bytes(mem_consumption),
|
||||
process_mem_usage_str);
|
||||
};
|
||||
|
||||
LOG(INFO) << fmt::format(
|
||||
@ -188,7 +205,8 @@ int64_t WorkloadGroup::gc_memory(int64_t need_free_mem, RuntimeProfile* profile)
|
||||
_mem_tracker_limiter_pool, cancel_top_overcommit_str, tmq_profile,
|
||||
MemTrackerLimiter::GCType::WORK_LOAD_GROUP);
|
||||
}
|
||||
if (freed_mem >= need_free_mem) {
|
||||
// To be compatible with the non-group's gc logic, minorGC just gc overcommit query
|
||||
if (is_minor_gc || freed_mem >= need_free_mem) {
|
||||
return freed_mem;
|
||||
}
|
||||
|
||||
|
||||
@ -146,7 +146,7 @@ public:
|
||||
return _query_ctxs.size();
|
||||
}
|
||||
|
||||
int64_t gc_memory(int64_t need_free_mem, RuntimeProfile* profile);
|
||||
int64_t gc_memory(int64_t need_free_mem, RuntimeProfile* profile, bool is_minor_gc);
|
||||
|
||||
void upsert_task_scheduler(WorkloadGroupInfo* tg_info, ExecEnv* exec_env);
|
||||
|
||||
|
||||
@ -147,10 +147,13 @@ bool MemInfo::process_minor_gc() {
|
||||
return true;
|
||||
}
|
||||
|
||||
RuntimeProfile* tg_profile = profile->create_child("WorkloadGroup", true, true);
|
||||
freed_mem += tg_enable_overcommit_group_gc(_s_process_minor_gc_size - freed_mem, tg_profile);
|
||||
if (freed_mem > _s_process_minor_gc_size) {
|
||||
return true;
|
||||
if (config::enable_workload_group_memory_gc) {
|
||||
RuntimeProfile* tg_profile = profile->create_child("WorkloadGroup", true, true);
|
||||
freed_mem += tg_enable_overcommit_group_gc(_s_process_minor_gc_size - freed_mem, tg_profile,
|
||||
true);
|
||||
if (freed_mem > _s_process_minor_gc_size) {
|
||||
return true;
|
||||
}
|
||||
}
|
||||
|
||||
if (config::enable_query_memory_overcommit) {
|
||||
@ -198,10 +201,13 @@ bool MemInfo::process_full_gc() {
|
||||
return true;
|
||||
}
|
||||
|
||||
RuntimeProfile* tg_profile = profile->create_child("WorkloadGroup", true, true);
|
||||
freed_mem += tg_enable_overcommit_group_gc(_s_process_full_gc_size - freed_mem, tg_profile);
|
||||
if (freed_mem > _s_process_full_gc_size) {
|
||||
return true;
|
||||
if (config::enable_workload_group_memory_gc) {
|
||||
RuntimeProfile* tg_profile = profile->create_child("WorkloadGroup", true, true);
|
||||
freed_mem += tg_enable_overcommit_group_gc(_s_process_full_gc_size - freed_mem, tg_profile,
|
||||
false);
|
||||
if (freed_mem > _s_process_full_gc_size) {
|
||||
return true;
|
||||
}
|
||||
}
|
||||
|
||||
VLOG_NOTICE << MemTrackerLimiter::type_detail_usage(
|
||||
@ -286,14 +292,14 @@ int64_t MemInfo::tg_not_enable_overcommit_group_gc() {
|
||||
|
||||
for (const auto& workload_group : task_groups_overcommit) {
|
||||
auto used = workload_group->memory_used();
|
||||
total_free_memory +=
|
||||
workload_group->gc_memory(used - workload_group->memory_limit(), tg_profile.get());
|
||||
total_free_memory += workload_group->gc_memory(used - workload_group->memory_limit(),
|
||||
tg_profile.get(), false);
|
||||
}
|
||||
return total_free_memory;
|
||||
}
|
||||
|
||||
int64_t MemInfo::tg_enable_overcommit_group_gc(int64_t request_free_memory,
|
||||
RuntimeProfile* profile) {
|
||||
int64_t MemInfo::tg_enable_overcommit_group_gc(int64_t request_free_memory, RuntimeProfile* profile,
|
||||
bool is_minor_gc) {
|
||||
MonotonicStopWatch watch;
|
||||
watch.start();
|
||||
std::vector<WorkloadGroupPtr> task_groups;
|
||||
@ -359,7 +365,7 @@ int64_t MemInfo::tg_enable_overcommit_group_gc(int64_t request_free_memory,
|
||||
: static_cast<double>(exceeded_memorys[i]) / total_exceeded_memory *
|
||||
request_free_memory); // exceeded memory as a weight
|
||||
auto workload_group = task_groups[i];
|
||||
total_free_memory += workload_group->gc_memory(tg_need_free_memory, profile);
|
||||
total_free_memory += workload_group->gc_memory(tg_need_free_memory, profile, is_minor_gc);
|
||||
}
|
||||
return total_free_memory;
|
||||
}
|
||||
|
||||
@ -195,7 +195,7 @@ public:
|
||||
|
||||
static int64_t tg_not_enable_overcommit_group_gc();
|
||||
static int64_t tg_enable_overcommit_group_gc(int64_t request_free_memory,
|
||||
RuntimeProfile* profile);
|
||||
RuntimeProfile* profile, bool is_minor_gc);
|
||||
|
||||
// It is only used after the memory limit is exceeded. When multiple threads are waiting for the available memory of the process,
|
||||
// avoid multiple threads starting at the same time and causing OOM.
|
||||
|
||||
Reference in New Issue
Block a user