[enhancement](memory) Support query memroy overcommit #14948
Add conf enable_query_memroy_overcommit If true, when the process does not exceed the soft mem limit, the query memory will not be limited; when the process memory exceeds the soft mem limit, the query with the largest ratio between the currently used memory and the exec_mem_limit will be canceled. If false, cancel query when the memory used exceeds exec_mem_limit, same as before.
This commit is contained in:
@ -72,6 +72,12 @@ CONF_Int64(max_sys_mem_available_low_water_mark_bytes, "1717986918");
|
||||
CONF_mString(process_minor_gc_size, "10%");
|
||||
CONF_mString(process_full_gc_size, "20%");
|
||||
|
||||
// If true, when the process does not exceed the soft mem limit, the query memory will not be limited;
|
||||
// when the process memory exceeds the soft mem limit, the query with the largest ratio between the currently
|
||||
// used memory and the exec_mem_limit will be canceled.
|
||||
// If false, cancel query when the memory used exceeds exec_mem_limit, same as before.
|
||||
CONF_mBool(enable_query_memroy_overcommit, "true");
|
||||
|
||||
// The maximum time a thread waits for a full GC. Currently only query will wait for full gc.
|
||||
CONF_mInt32(thread_wait_gc_max_milliseconds, "1000");
|
||||
|
||||
|
||||
@ -28,7 +28,6 @@
|
||||
#include "runtime/thread_context.h"
|
||||
#include "util/pretty_printer.h"
|
||||
#include "util/stack_util.h"
|
||||
#include "util/string_util.h"
|
||||
|
||||
namespace doris {
|
||||
|
||||
@ -242,7 +241,7 @@ Status MemTrackerLimiter::fragment_mem_limit_exceeded(RuntimeState* state, const
|
||||
return Status::MemoryLimitExceeded(failed_msg);
|
||||
}
|
||||
|
||||
int64_t MemTrackerLimiter::free_top_query(int64_t min_free_mem) {
|
||||
int64_t MemTrackerLimiter::free_top_memory_query(int64_t min_free_mem) {
|
||||
std::priority_queue<std::pair<int64_t, std::string>,
|
||||
std::vector<std::pair<int64_t, std::string>>,
|
||||
std::greater<std::pair<int64_t, std::string>>>
|
||||
@ -250,16 +249,8 @@ int64_t MemTrackerLimiter::free_top_query(int64_t min_free_mem) {
|
||||
// After greater than min_free_mem, will not be modified.
|
||||
int64_t prepare_free_mem = 0;
|
||||
|
||||
auto label_to_queryid = [&](const std::string& label) -> TUniqueId {
|
||||
auto queryid = split(label, "#Id=")[1];
|
||||
TUniqueId querytid;
|
||||
parse_id(queryid, &querytid);
|
||||
return querytid;
|
||||
};
|
||||
|
||||
auto cancel_top_query = [&](auto min_pq, auto label_to_queryid) -> int64_t {
|
||||
auto cancel_top_query = [&](auto min_pq) -> int64_t {
|
||||
std::vector<std::string> usage_strings;
|
||||
bool had_cancel = false;
|
||||
int64_t freed_mem = 0;
|
||||
while (!min_pq.empty()) {
|
||||
TUniqueId cancelled_queryid = label_to_queryid(min_pq.top().second);
|
||||
@ -278,10 +269,9 @@ int64_t MemTrackerLimiter::free_top_query(int64_t min_free_mem) {
|
||||
freed_mem += min_pq.top().first;
|
||||
usage_strings.push_back(fmt::format("{} memory usage {} Bytes", min_pq.top().second,
|
||||
min_pq.top().first));
|
||||
had_cancel = true;
|
||||
min_pq.pop();
|
||||
}
|
||||
if (had_cancel) {
|
||||
if (!usage_strings.empty()) {
|
||||
LOG(INFO) << "Process GC Free Top Memory Usage Query: " << join(usage_strings, ",");
|
||||
}
|
||||
return freed_mem;
|
||||
@ -299,7 +289,7 @@ int64_t MemTrackerLimiter::free_top_query(int64_t min_free_mem) {
|
||||
std::swap(min_pq, min_pq_null);
|
||||
min_pq.push(
|
||||
pair<int64_t, std::string>(tracker->consumption(), tracker->label()));
|
||||
return cancel_top_query(min_pq, label_to_queryid);
|
||||
return cancel_top_query(min_pq);
|
||||
} else if (tracker->consumption() + prepare_free_mem < min_free_mem) {
|
||||
min_pq.push(
|
||||
pair<int64_t, std::string>(tracker->consumption(), tracker->label()));
|
||||
@ -313,7 +303,67 @@ int64_t MemTrackerLimiter::free_top_query(int64_t min_free_mem) {
|
||||
}
|
||||
}
|
||||
}
|
||||
return cancel_top_query(min_pq, label_to_queryid);
|
||||
return cancel_top_query(min_pq);
|
||||
}
|
||||
|
||||
int64_t MemTrackerLimiter::free_top_overcommit_query(int64_t min_free_mem) {
|
||||
std::priority_queue<std::pair<int64_t, std::string>,
|
||||
std::vector<std::pair<int64_t, std::string>>,
|
||||
std::greater<std::pair<int64_t, std::string>>>
|
||||
min_pq;
|
||||
std::unordered_map<std::string, int64_t> query_consumption;
|
||||
|
||||
for (unsigned i = 1; i < mem_tracker_limiter_pool.size(); ++i) {
|
||||
std::lock_guard<std::mutex> l(mem_tracker_limiter_pool[i].group_lock);
|
||||
for (auto tracker : mem_tracker_limiter_pool[i].trackers) {
|
||||
if (tracker->type() == Type::QUERY) {
|
||||
int64_t overcommit_ratio =
|
||||
(static_cast<double>(tracker->consumption()) / tracker->limit()) * 10000;
|
||||
if (overcommit_ratio == 0) { // Small query does not cancel
|
||||
continue;
|
||||
}
|
||||
min_pq.push(pair<int64_t, std::string>(overcommit_ratio, tracker->label()));
|
||||
query_consumption[tracker->label()] = tracker->consumption();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
std::priority_queue<std::pair<int64_t, std::string>> max_pq;
|
||||
// Min-heap to Max-heap.
|
||||
while (!min_pq.empty()) {
|
||||
max_pq.push(min_pq.top());
|
||||
min_pq.pop();
|
||||
}
|
||||
|
||||
std::vector<std::string> usage_strings;
|
||||
int64_t freed_mem = 0;
|
||||
while (!max_pq.empty()) {
|
||||
TUniqueId cancelled_queryid = label_to_queryid(max_pq.top().second);
|
||||
int64_t query_mem = query_consumption[max_pq.top().second];
|
||||
ExecEnv::GetInstance()->fragment_mgr()->cancel_query(
|
||||
cancelled_queryid, PPlanFragmentCancelReason::MEMORY_LIMIT_EXCEED,
|
||||
fmt::format("Process has no memory available, cancel top memory usage query: "
|
||||
"query memory tracker <{}> consumption {}, backend {} "
|
||||
"process memory used {} exceed limit {} or sys mem available {} "
|
||||
"less than low water mark {}. Execute again after enough memory, "
|
||||
"details see be.INFO.",
|
||||
max_pq.top().second, print_bytes(query_mem),
|
||||
BackendOptions::get_localhost(), PerfCounters::get_vm_rss_str(),
|
||||
MemInfo::mem_limit_str(), MemInfo::sys_mem_available_str(),
|
||||
print_bytes(MemInfo::sys_mem_available_low_water_mark())));
|
||||
|
||||
usage_strings.push_back(fmt::format("{} memory usage {} Bytes, overcommit ratio: {}",
|
||||
max_pq.top().second, query_mem, max_pq.top().first));
|
||||
freed_mem += query_mem;
|
||||
if (freed_mem > min_free_mem) {
|
||||
break;
|
||||
}
|
||||
max_pq.pop();
|
||||
}
|
||||
if (!usage_strings.empty()) {
|
||||
LOG(INFO) << "Process GC Free Top Memory Overcommit Query: " << join(usage_strings, ",");
|
||||
}
|
||||
return freed_mem;
|
||||
}
|
||||
|
||||
} // namespace doris
|
||||
|
||||
@ -25,6 +25,7 @@
|
||||
#include "service/backend_options.h"
|
||||
#include "util/mem_info.h"
|
||||
#include "util/perf_counters.h"
|
||||
#include "util/string_util.h"
|
||||
|
||||
namespace doris {
|
||||
|
||||
@ -145,8 +146,18 @@ public:
|
||||
Status fragment_mem_limit_exceeded(RuntimeState* state, const std::string& msg,
|
||||
int64_t failed_allocation_size = 0);
|
||||
|
||||
// Start canceling from the query with the largest memory usage until the memory of min_free_mem size is released.
|
||||
static int64_t free_top_query(int64_t min_free_mem);
|
||||
// Start canceling from the query with the largest memory usage until the memory of min_free_mem size is freed.
|
||||
static int64_t free_top_memory_query(int64_t min_free_mem);
|
||||
// Start canceling from the query with the largest memory overcommit ratio until the memory
|
||||
// of min_free_mem size is freed.
|
||||
static int64_t free_top_overcommit_query(int64_t min_free_mem);
|
||||
// only for Type::QUERY or Type::LOAD.
|
||||
static TUniqueId label_to_queryid(const std::string& label) {
|
||||
auto queryid = split(label, "#Id=")[1];
|
||||
TUniqueId querytid;
|
||||
parse_id(queryid, &querytid);
|
||||
return querytid;
|
||||
};
|
||||
|
||||
static std::string process_mem_log_str() {
|
||||
return fmt::format(
|
||||
@ -255,7 +266,7 @@ inline bool MemTrackerLimiter::try_consume(int64_t bytes, std::string& failed_ms
|
||||
return false;
|
||||
}
|
||||
|
||||
if (_limit < 0) {
|
||||
if (_limit < 0 || (_type == Type::QUERY && config::enable_query_memroy_overcommit)) {
|
||||
_consumption->add(bytes); // No limit at this tracker.
|
||||
} else {
|
||||
if (!_consumption->try_add(bytes, _limit)) {
|
||||
@ -272,7 +283,9 @@ inline Status MemTrackerLimiter::check_limit(int64_t bytes) {
|
||||
if (sys_mem_exceed_limit_check(bytes)) {
|
||||
return Status::MemoryLimitExceeded(process_limit_exceeded_errmsg_str(bytes));
|
||||
}
|
||||
if (bytes <= 0) return Status::OK();
|
||||
if (bytes <= 0 || (_type == Type::QUERY && config::enable_query_memroy_overcommit)) {
|
||||
return Status::OK();
|
||||
}
|
||||
if (_limit > 0 && _consumption->current_value() + bytes > _limit) {
|
||||
return Status::MemoryLimitExceeded(tracker_limit_exceeded_errmsg_str(bytes, this));
|
||||
}
|
||||
|
||||
@ -104,6 +104,10 @@ void MemInfo::process_minor_gc() {
|
||||
freed_mem +=
|
||||
StoragePageCache::instance()->get_page_cache_mem_consumption(segment_v2::DATA_PAGE);
|
||||
StoragePageCache::instance()->prune(segment_v2::DATA_PAGE);
|
||||
if (config::enable_query_memroy_overcommit) {
|
||||
freed_mem +=
|
||||
MemTrackerLimiter::free_top_overcommit_query(_s_process_full_gc_size - freed_mem);
|
||||
}
|
||||
}
|
||||
|
||||
void MemInfo::process_full_gc() {
|
||||
@ -122,7 +126,7 @@ void MemInfo::process_full_gc() {
|
||||
if (freed_mem > _s_process_full_gc_size) {
|
||||
return;
|
||||
}
|
||||
freed_mem += MemTrackerLimiter::free_top_query(_s_process_full_gc_size - freed_mem);
|
||||
freed_mem += MemTrackerLimiter::free_top_memory_query(_s_process_full_gc_size - freed_mem);
|
||||
}
|
||||
|
||||
#ifndef __APPLE__
|
||||
|
||||
Reference in New Issue
Block a user