// Licensed to the Apache Software Foundation (ASF) under one // or more contributor license agreements. See the NOTICE file // distributed with this work for additional information // regarding copyright ownership. The ASF licenses this file // to you under the Apache License, Version 2.0 (the // "License"); you may not use this file except in compliance // with the License. You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, // software distributed under the License is distributed on an // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY // KIND, either express or implied. See the License for the // specific language governing permissions and limitations // under the License. #include "vec/common/allocator.h" #include #include // IWYU pragma: no_include #include // IWYU pragma: keep #include #include #include // Allocator is used by too many files. For compilation speed, put dependencies in `.cpp` as much as possible. #include "runtime/fragment_mgr.h" #include "runtime/memory/mem_tracker_limiter.h" #include "runtime/memory/thread_mem_tracker_mgr.h" #include "runtime/thread_context.h" #include "util/defer_op.h" #include "util/mem_info.h" #include "util/stack_util.h" #include "util/uid_util.h" template void Allocator::sys_memory_check(size_t size) const { if (doris::is_thread_context_init() && doris::thread_context()->skip_memory_check != 0) { return; } if (doris::MemTrackerLimiter::sys_mem_exceed_limit_check(size)) { // Only thread attach query, and has not completely waited for thread_wait_gc_max_milliseconds, // will wait for gc, asynchronous cancel or throw bad::alloc. // Otherwise, if the external catch, directly throw bad::alloc. std::string err_msg; if (doris::is_thread_context_init()) { err_msg += fmt::format( "Allocator sys memory check failed: Cannot alloc:{}, consuming " "tracker:<{}>, peak used {}, current used {}, exec node:<{}>, {}.", size, doris::thread_context()->thread_mem_tracker()->label(), doris::thread_context()->thread_mem_tracker()->peak_consumption(), doris::thread_context()->thread_mem_tracker()->consumption(), doris::thread_context()->thread_mem_tracker_mgr->last_consumer_tracker(), doris::MemTrackerLimiter::process_limit_exceeded_errmsg_str()); } else { err_msg += fmt::format( "Allocator sys memory check failed: Cannot alloc:{}, consuming " "tracker:<{}>, {}.", size, "Orphan", doris::MemTrackerLimiter::process_limit_exceeded_errmsg_str()); } if (size > 1024l * 1024 * 1024 && !doris::enable_thread_catch_bad_alloc && !doris::config::disable_memory_gc) { // 1G err_msg += "\nAlloc Stacktrace:\n" + doris::get_stack_trace(); } // TODO, Save the query context in the thread context, instead of finding whether the query id is canceled in fragment_mgr. if (doris::is_thread_context_init() && doris::ExecEnv::GetInstance()->fragment_mgr()->query_is_canceled( doris::thread_context()->task_id())) { if (doris::enable_thread_catch_bad_alloc) { throw doris::Exception(doris::ErrorCode::MEM_ALLOC_FAILED, err_msg); } return; } if (doris::is_thread_context_init() && !doris::config::disable_memory_gc && doris::thread_context()->thread_mem_tracker_mgr->is_attach_query() && doris::thread_context()->thread_mem_tracker_mgr->wait_gc()) { int64_t wait_milliseconds = 0; LOG(INFO) << fmt::format( "Query:{} waiting for enough memory in thread id:{}, maximum {}ms, {}.", print_id(doris::thread_context()->task_id()), doris::thread_context()->get_thread_id(), doris::config::thread_wait_gc_max_milliseconds, err_msg); while (wait_milliseconds < doris::config::thread_wait_gc_max_milliseconds) { std::this_thread::sleep_for(std::chrono::milliseconds(100)); if (!doris::MemTrackerLimiter::sys_mem_exceed_limit_check(size)) { doris::MemInfo::refresh_interval_memory_growth += size; break; } if (doris::ExecEnv::GetInstance()->fragment_mgr()->query_is_canceled( doris::thread_context()->task_id())) { if (doris::enable_thread_catch_bad_alloc) { throw doris::Exception(doris::ErrorCode::MEM_ALLOC_FAILED, err_msg); } return; } wait_milliseconds += 100; } if (wait_milliseconds >= doris::config::thread_wait_gc_max_milliseconds) { // Make sure to completely wait thread_wait_gc_max_milliseconds only once. doris::thread_context()->thread_mem_tracker_mgr->disable_wait_gc(); doris::MemTrackerLimiter::print_log_process_usage(); // If the external catch, throw bad::alloc first, let the query actively cancel. Otherwise asynchronous cancel. if (!doris::enable_thread_catch_bad_alloc) { LOG(INFO) << fmt::format( "Query:{} canceled asyn, after waiting for memory {}ms, {}.", print_id(doris::thread_context()->task_id()), wait_milliseconds, err_msg); doris::thread_context()->thread_mem_tracker_mgr->cancel_instance(err_msg); } else { LOG(INFO) << fmt::format( "Query:{} throw exception, after waiting for memory {}ms, {}.", print_id(doris::thread_context()->task_id()), wait_milliseconds, err_msg); throw doris::Exception(doris::ErrorCode::MEM_ALLOC_FAILED, err_msg); } } // else, enough memory is available, the query continues execute. } else if (doris::enable_thread_catch_bad_alloc) { LOG(INFO) << fmt::format("sys memory check failed, throw exception, {}.", err_msg); doris::MemTrackerLimiter::print_log_process_usage(); throw doris::Exception(doris::ErrorCode::MEM_ALLOC_FAILED, err_msg); } else { LOG(INFO) << fmt::format("sys memory check failed, no throw exception, {}.", err_msg); } } } template void Allocator::memory_tracker_check(size_t size) const { if (doris::is_thread_context_init() && doris::thread_context()->skip_memory_check != 0) { return; } if (!doris::is_thread_context_init()) { return; } auto st = doris::thread_context()->thread_mem_tracker()->check_limit(size); if (!st) { auto err_msg = fmt::format("Allocator mem tracker check failed, {}", st.to_string()); doris::thread_context()->thread_mem_tracker()->print_log_usage(err_msg); // If the external catch, throw bad::alloc first, let the query actively cancel. Otherwise asynchronous cancel. if (doris::thread_context()->thread_mem_tracker_mgr->is_attach_query()) { doris::thread_context()->thread_mem_tracker_mgr->disable_wait_gc(); if (!doris::enable_thread_catch_bad_alloc) { LOG(INFO) << fmt::format("query/load:{} canceled asyn, {}.", print_id(doris::thread_context()->task_id()), err_msg); doris::thread_context()->thread_mem_tracker_mgr->cancel_instance(err_msg); } else { LOG(INFO) << fmt::format("query/load:{} throw exception, {}.", print_id(doris::thread_context()->task_id()), err_msg); throw doris::Exception(doris::ErrorCode::MEM_ALLOC_FAILED, err_msg); } } else if (doris::enable_thread_catch_bad_alloc) { LOG(INFO) << fmt::format("memory tracker check failed, throw exception, {}.", err_msg); throw doris::Exception(doris::ErrorCode::MEM_ALLOC_FAILED, err_msg); } else { LOG(INFO) << fmt::format("memory tracker check failed, no throw exception, {}.", err_msg); } } } template void Allocator::memory_check(size_t size) const { sys_memory_check(size); memory_tracker_check(size); } template void Allocator::consume_memory(size_t size) const { CONSUME_THREAD_MEM_TRACKER(size); } template void Allocator::release_memory(size_t size) const { RELEASE_THREAD_MEM_TRACKER(size); } template void Allocator::throw_bad_alloc( const std::string& err) const { LOG(WARNING) << err << fmt::format( " os physical memory {}. process memory used {}, sys available memory " "{}, Stacktrace: {}", doris::PrettyPrinter::print(doris::MemInfo::physical_mem(), doris::TUnit::BYTES), doris::PerfCounters::get_vm_rss_str(), doris::MemInfo::sys_mem_available_str(), doris::get_stack_trace()); doris::MemTrackerLimiter::print_log_process_usage(); throw doris::Exception(doris::ErrorCode::MEM_ALLOC_FAILED, err); } template void* Allocator::alloc(size_t size, size_t alignment) { return alloc_impl(size, alignment); } template void* Allocator::realloc(void* buf, size_t old_size, size_t new_size, size_t alignment) { return realloc_impl(buf, old_size, new_size, alignment); } template class Allocator; template class Allocator; template class Allocator; template class Allocator; template class Allocator; template class Allocator; template class Allocator; template class Allocator;