[fix](memory) Fix memory exceed limit and query has been canceled, Allocator will block 100ms (#20959)
This commit is contained in:
@ -437,7 +437,9 @@ int64_t MemTrackerLimiter::free_top_overcommit_query(
|
||||
std::lock_guard<std::mutex> l(tracker_groups[i].group_lock);
|
||||
for (auto tracker : tracker_groups[i].trackers) {
|
||||
if (tracker->type() == type) {
|
||||
if (tracker->consumption() <= 33554432) { // 32M small query does not cancel
|
||||
// 32M small query does not cancel
|
||||
if (tracker->consumption() <= 33554432 ||
|
||||
tracker->consumption() < tracker->limit()) {
|
||||
continue;
|
||||
}
|
||||
if (tracker->is_query_cancelled()) {
|
||||
|
||||
@ -31,6 +31,7 @@
|
||||
#include "runtime/memory/mem_tracker_limiter.h"
|
||||
#include "runtime/memory/thread_mem_tracker_mgr.h"
|
||||
#include "runtime/thread_context.h"
|
||||
#include "util/defer_op.h"
|
||||
#include "util/mem_info.h"
|
||||
#include "util/uid_util.h"
|
||||
|
||||
@ -47,12 +48,22 @@ void Allocator<clear_memory_, mmap_populate, use_mmap>::sys_memory_check(size_t
|
||||
size, doris::thread_context()->thread_mem_tracker()->label(),
|
||||
doris::thread_context()->thread_mem_tracker_mgr->last_consumer_tracker(),
|
||||
doris::MemTrackerLimiter::process_limit_exceeded_errmsg_str());
|
||||
|
||||
// TODO, Save the query context in the thread context, instead of finding whether the query id is canceled in fragment_mgr.
|
||||
if (doris::ExecEnv::GetInstance()->fragment_mgr()->query_is_canceled(
|
||||
doris::thread_context()->task_id())) {
|
||||
if (doris::enable_thread_catch_bad_alloc) {
|
||||
throw doris::Exception(doris::ErrorCode::MEM_ALLOC_FAILED, err_msg);
|
||||
}
|
||||
return;
|
||||
}
|
||||
if (doris::thread_context()->thread_mem_tracker_mgr->is_attach_query() &&
|
||||
doris::thread_context()->thread_mem_tracker_mgr->wait_gc()) {
|
||||
int64_t wait_milliseconds = doris::config::thread_wait_gc_max_milliseconds;
|
||||
LOG(INFO) << fmt::format("Query:{} waiting for enough memory, maximum 5s, {}.",
|
||||
print_id(doris::thread_context()->task_id()), err_msg);
|
||||
while (wait_milliseconds > 0) {
|
||||
int64_t wait_milliseconds = 0;
|
||||
LOG(INFO) << fmt::format("Query:{} waiting for enough memory, maximum {}ms, {}.",
|
||||
print_id(doris::thread_context()->task_id()),
|
||||
doris::config::thread_wait_gc_max_milliseconds, err_msg);
|
||||
while (wait_milliseconds < doris::config::thread_wait_gc_max_milliseconds) {
|
||||
std::this_thread::sleep_for(std::chrono::milliseconds(100));
|
||||
if (!doris::MemTrackerLimiter::sys_mem_exceed_limit_check(size)) {
|
||||
doris::MemInfo::refresh_interval_memory_growth += size;
|
||||
@ -60,25 +71,29 @@ void Allocator<clear_memory_, mmap_populate, use_mmap>::sys_memory_check(size_t
|
||||
}
|
||||
if (doris::ExecEnv::GetInstance()->fragment_mgr()->query_is_canceled(
|
||||
doris::thread_context()->task_id())) {
|
||||
wait_milliseconds = 0;
|
||||
break;
|
||||
if (doris::enable_thread_catch_bad_alloc) {
|
||||
throw doris::Exception(doris::ErrorCode::MEM_ALLOC_FAILED, err_msg);
|
||||
}
|
||||
return;
|
||||
}
|
||||
wait_milliseconds -= 100;
|
||||
wait_milliseconds += 100;
|
||||
}
|
||||
if (wait_milliseconds <= 0) {
|
||||
if (wait_milliseconds >= doris::config::thread_wait_gc_max_milliseconds) {
|
||||
// Make sure to completely wait thread_wait_gc_max_milliseconds only once.
|
||||
doris::thread_context()->thread_mem_tracker_mgr->disable_wait_gc();
|
||||
doris::MemTrackerLimiter::print_log_process_usage(err_msg);
|
||||
// If the external catch, throw bad::alloc first, let the query actively cancel. Otherwise asynchronous cancel.
|
||||
if (!doris::enable_thread_catch_bad_alloc) {
|
||||
LOG(INFO) << fmt::format(
|
||||
"Query:{} canceled asyn, after waiting for memory 5s, {}.",
|
||||
print_id(doris::thread_context()->task_id()), err_msg);
|
||||
"Query:{} canceled asyn, after waiting for memory {}ms, {}.",
|
||||
print_id(doris::thread_context()->task_id()), wait_milliseconds,
|
||||
err_msg);
|
||||
doris::thread_context()->thread_mem_tracker_mgr->cancel_fragment(err_msg);
|
||||
} else {
|
||||
LOG(INFO) << fmt::format(
|
||||
"Query:{} throw exception, after waiting for memory 5s, {}.",
|
||||
print_id(doris::thread_context()->task_id()), err_msg);
|
||||
"Query:{} throw exception, after waiting for memory {}ms, {}.",
|
||||
print_id(doris::thread_context()->task_id()), wait_milliseconds,
|
||||
err_msg);
|
||||
throw doris::Exception(doris::ErrorCode::MEM_ALLOC_FAILED, err_msg);
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user