[refactor](memory) Query waits for memory free in Allocator, after memory exceed limit. (#18075)
After the memory exceeds the limit, the previous query waited for memory free in the mem hook, and changed it to wait in the Allocator. more controllable and safe
This commit is contained in:
@ -189,6 +189,16 @@ public:
|
||||
MemInfo::refresh_interval_memory_growth);
|
||||
}
|
||||
|
||||
static std::string process_limit_exceeded_errmsg_str(int64_t bytes) {
|
||||
return fmt::format(
|
||||
"process memory used {} exceed limit {} or sys mem available {} less than low "
|
||||
"water mark {}, failed alloc size {}",
|
||||
PerfCounters::get_vm_rss_str(), MemInfo::mem_limit_str(),
|
||||
MemInfo::sys_mem_available_str(),
|
||||
PrettyPrinter::print(MemInfo::sys_mem_available_low_water_mark(), TUnit::BYTES),
|
||||
print_bytes(bytes));
|
||||
}
|
||||
|
||||
std::string debug_string() {
|
||||
std::stringstream msg;
|
||||
msg << "limit: " << _limit << "; "
|
||||
@ -221,16 +231,6 @@ private:
|
||||
print_bytes(exceed_tracker->_consumption->current_value()));
|
||||
}
|
||||
|
||||
static std::string process_limit_exceeded_errmsg_str(int64_t bytes) {
|
||||
return fmt::format(
|
||||
"process memory used {} exceed limit {} or sys mem available {} less than low "
|
||||
"water mark {}, failed alloc size {}",
|
||||
PerfCounters::get_vm_rss_str(), MemInfo::mem_limit_str(),
|
||||
MemInfo::sys_mem_available_str(),
|
||||
PrettyPrinter::print(MemInfo::sys_mem_available_low_water_mark(), TUnit::BYTES),
|
||||
print_bytes(bytes));
|
||||
}
|
||||
|
||||
private:
|
||||
Type _type;
|
||||
|
||||
|
||||
@ -45,10 +45,11 @@ void ThreadMemTrackerMgr::detach_limiter_tracker(
|
||||
_limiter_tracker_raw = old_mem_tracker.get();
|
||||
}
|
||||
|
||||
void ThreadMemTrackerMgr::cancel_fragment() {
|
||||
ExecEnv::GetInstance()->fragment_mgr()->cancel(_fragment_instance_id,
|
||||
PPlanFragmentCancelReason::MEMORY_LIMIT_EXCEED,
|
||||
_exceed_mem_limit_msg);
|
||||
void ThreadMemTrackerMgr::cancel_fragment(const std::string& exceed_msg) {
|
||||
if (_check_limit) {
|
||||
ExecEnv::GetInstance()->fragment_mgr()->cancel(
|
||||
_fragment_instance_id, PPlanFragmentCancelReason::MEMORY_LIMIT_EXCEED, exceed_msg);
|
||||
}
|
||||
_check_limit = false; // Make sure it will only be canceled once
|
||||
}
|
||||
|
||||
@ -59,18 +60,7 @@ void ThreadMemTrackerMgr::exceeded(int64_t size) {
|
||||
_limiter_tracker_raw->print_log_usage(_exceed_mem_limit_msg);
|
||||
|
||||
if (is_attach_query()) {
|
||||
if (_is_process_exceed && _wait_gc) {
|
||||
int64_t wait_milliseconds = config::thread_wait_gc_max_milliseconds;
|
||||
while (wait_milliseconds > 0) {
|
||||
std::this_thread::sleep_for(std::chrono::milliseconds(100)); // Check every 100 ms.
|
||||
if (!MemTrackerLimiter::sys_mem_exceed_limit_check(size)) {
|
||||
MemInfo::refresh_interval_memory_growth += size;
|
||||
return; // Process memory is sufficient, no cancel query.
|
||||
}
|
||||
wait_milliseconds -= 100;
|
||||
}
|
||||
}
|
||||
cancel_fragment();
|
||||
cancel_fragment(_exceed_mem_limit_msg);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@ -97,6 +97,8 @@ public:
|
||||
std::string exceed_mem_limit_msg() { return _exceed_mem_limit_msg; }
|
||||
void clear_exceed_mem_limit_msg() { _exceed_mem_limit_msg = ""; }
|
||||
void disable_wait_gc() { _wait_gc = false; }
|
||||
bool wait_gc() { return _wait_gc; }
|
||||
void cancel_fragment(const std::string& exceed_msg);
|
||||
|
||||
std::string print_debug_string() {
|
||||
fmt::memory_buffer consumer_tracker_buf;
|
||||
@ -111,7 +113,6 @@ public:
|
||||
}
|
||||
|
||||
private:
|
||||
void cancel_fragment();
|
||||
void exceeded(int64_t size);
|
||||
|
||||
void save_exceed_mem_limit_msg() {
|
||||
|
||||
@ -131,8 +131,45 @@ static constexpr size_t MALLOC_MIN_ALIGNMENT = 8;
|
||||
template <bool clear_memory_, bool mmap_populate>
|
||||
class Allocator {
|
||||
public:
|
||||
void sys_memory_check(size_t size) {
|
||||
if (doris::MemTrackerLimiter::sys_mem_exceed_limit_check(size)) {
|
||||
if (doris::thread_context()->thread_mem_tracker_mgr->is_attach_query() &&
|
||||
doris::thread_context()->thread_mem_tracker_mgr->wait_gc()) {
|
||||
int64_t wait_milliseconds = doris::config::thread_wait_gc_max_milliseconds;
|
||||
while (wait_milliseconds > 0) {
|
||||
std::this_thread::sleep_for(std::chrono::milliseconds(100));
|
||||
if (!doris::MemTrackerLimiter::sys_mem_exceed_limit_check(size)) {
|
||||
doris::MemInfo::refresh_interval_memory_growth += size;
|
||||
break;
|
||||
}
|
||||
wait_milliseconds -= 100;
|
||||
}
|
||||
if (wait_milliseconds <= 0) {
|
||||
auto err_msg = fmt::format(
|
||||
"Allocator Sys Memory Check Failed In Query/Load: Cannot alloc {}, "
|
||||
"{}.",
|
||||
size,
|
||||
doris::MemTrackerLimiter::process_limit_exceeded_errmsg_str(size));
|
||||
doris::thread_context()->thread_mem_tracker_mgr->disable_wait_gc();
|
||||
if (!doris::enable_thread_catch_bad_alloc) {
|
||||
doris::thread_context()->thread_mem_tracker_mgr->cancel_fragment(err_msg);
|
||||
} else {
|
||||
LOG(WARNING) << err_msg;
|
||||
throw std::bad_alloc {};
|
||||
}
|
||||
}
|
||||
} else if (doris::enable_thread_catch_bad_alloc) {
|
||||
LOG(WARNING) << fmt::format(
|
||||
"Allocator Sys Memory Check Failed: Cannot alloc {}, {}.", size,
|
||||
doris::MemTrackerLimiter::process_limit_exceeded_errmsg_str(size));
|
||||
throw std::bad_alloc {};
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Allocate memory range.
|
||||
void* alloc(size_t size, size_t alignment = 0) {
|
||||
sys_memory_check(size);
|
||||
void* buf;
|
||||
|
||||
if (size >= MMAP_THRESHOLD) {
|
||||
@ -219,6 +256,7 @@ public:
|
||||
/// BTW, it's not possible to change alignment while doing realloc.
|
||||
} else if (old_size < CHUNK_THRESHOLD && new_size < CHUNK_THRESHOLD &&
|
||||
alignment <= MALLOC_MIN_ALIGNMENT) {
|
||||
sys_memory_check(new_size);
|
||||
/// Resize malloc'd memory region with no special alignment requirement.
|
||||
void* new_buf = ::realloc(buf, new_size);
|
||||
if (nullptr == new_buf) {
|
||||
@ -231,6 +269,7 @@ public:
|
||||
if (new_size > old_size)
|
||||
memset(reinterpret_cast<char*>(buf) + old_size, 0, new_size - old_size);
|
||||
} else if (old_size >= MMAP_THRESHOLD && new_size >= MMAP_THRESHOLD) {
|
||||
sys_memory_check(new_size);
|
||||
/// Resize mmap'd memory region.
|
||||
if (!TRY_CONSUME_THREAD_MEM_TRACKER(new_size - old_size)) {
|
||||
RETURN_BAD_ALLOC_IF_PRE_CATCH(fmt::format(
|
||||
@ -257,6 +296,7 @@ public:
|
||||
memset(reinterpret_cast<char*>(buf) + old_size, 0, new_size - old_size);
|
||||
}
|
||||
} else {
|
||||
sys_memory_check(new_size);
|
||||
// CHUNK_THRESHOLD <= old_size <= MMAP_THRESHOLD use system realloc is slow, use ChunkAllocator.
|
||||
// Big allocs that requires a copy.
|
||||
void* new_buf = alloc(new_size, alignment);
|
||||
|
||||
Reference in New Issue
Block a user