diff --git a/be/src/common/daemon.cpp b/be/src/common/daemon.cpp index 82c7309766..38017bbca5 100644 --- a/be/src/common/daemon.cpp +++ b/be/src/common/daemon.cpp @@ -197,8 +197,8 @@ void Daemon::memory_maintenance_thread() { doris::MemInfo::refresh_proc_meminfo(); doris::MemInfo::refresh_proc_mem_no_allocator_cache(); - // Update and print memory stat when the memory changes by 100M. - if (abs(last_print_proc_mem - PerfCounters::get_vm_rss()) > 104857600) { + // Update and print memory stat when the memory changes by 256M. + if (abs(last_print_proc_mem - PerfCounters::get_vm_rss()) > 268435456) { last_print_proc_mem = PerfCounters::get_vm_rss(); doris::MemTrackerLimiter::enable_print_log_process_usage(); @@ -213,7 +213,7 @@ void Daemon::memory_maintenance_thread() { } #endif LOG(INFO) << MemTrackerLimiter:: - process_mem_log_str(); // print mem log when memory state by 100M + process_mem_log_str(); // print mem log when memory state by 256M } } } diff --git a/be/src/runtime/thread_context.cpp b/be/src/runtime/thread_context.cpp index 2e8845b206..15efa37b88 100644 --- a/be/src/runtime/thread_context.cpp +++ b/be/src/runtime/thread_context.cpp @@ -20,7 +20,6 @@ #include "common/signal_handler.h" #include "runtime/runtime_state.h" #include "util/doris_metrics.h" // IWYU pragma: keep -#include "util/uid_util.h" namespace doris { class MemTracker; @@ -33,15 +32,14 @@ ThreadContextPtr::ThreadContextPtr() { } AttachTask::AttachTask(const std::shared_ptr& mem_tracker, - const std::string& task_id, const TUniqueId& fragment_instance_id) { + const TUniqueId& task_id, const TUniqueId& fragment_instance_id) { thread_context()->attach_task(task_id, fragment_instance_id, mem_tracker); } AttachTask::AttachTask(RuntimeState* runtime_state) { doris::signal::query_id_hi = runtime_state->query_id().hi; doris::signal::query_id_lo = runtime_state->query_id().lo; - thread_context()->attach_task(print_id(runtime_state->query_id()), - runtime_state->fragment_instance_id(), + thread_context()->attach_task(runtime_state->query_id(), runtime_state->fragment_instance_id(), runtime_state->query_mem_tracker()); } diff --git a/be/src/runtime/thread_context.h b/be/src/runtime/thread_context.h index c6f6a46c4e..4fbfd9155d 100644 --- a/be/src/runtime/thread_context.h +++ b/be/src/runtime/thread_context.h @@ -149,7 +149,7 @@ public: ~ThreadContext() { thread_context_ptr.init = false; } - void attach_task(const std::string& task_id, const TUniqueId& fragment_instance_id, + void attach_task(const TUniqueId& task_id, const TUniqueId& fragment_instance_id, const std::shared_ptr& mem_tracker) { #ifndef BE_TEST // will only attach_task at the beginning of the thread function, there should be no duplicate attach_task. @@ -164,11 +164,12 @@ public: } void detach_task() { - _task_id = ""; + _task_id = TUniqueId(); _fragment_instance_id = TUniqueId(); thread_mem_tracker_mgr->detach_limiter_tracker(); } + const TUniqueId& task_id() const { return _task_id; } const TUniqueId& fragment_instance_id() const { return _fragment_instance_id; } std::string get_thread_id() { @@ -189,7 +190,7 @@ public: } private: - std::string _task_id = ""; + TUniqueId _task_id; TUniqueId _fragment_instance_id; }; @@ -252,7 +253,7 @@ private: class AttachTask { public: explicit AttachTask(const std::shared_ptr& mem_tracker, - const std::string& task_id = "", + const TUniqueId& task_id = TUniqueId(), const TUniqueId& fragment_instance_id = TUniqueId()); explicit AttachTask(RuntimeState* runtime_state); diff --git a/be/src/vec/common/allocator.cpp b/be/src/vec/common/allocator.cpp index e41454a5c3..b74ed398d4 100644 --- a/be/src/vec/common/allocator.cpp +++ b/be/src/vec/common/allocator.cpp @@ -27,10 +27,12 @@ #include // Allocator is used by too many files. For compilation speed, put dependencies in `.cpp` as much as possible. +#include "runtime/fragment_mgr.h" #include "runtime/memory/mem_tracker_limiter.h" #include "runtime/memory/thread_mem_tracker_mgr.h" #include "runtime/thread_context.h" #include "util/mem_info.h" +#include "util/uid_util.h" template void Allocator::sys_memory_check(size_t size) const { @@ -39,41 +41,50 @@ void Allocator::sys_memory_check(size_t // Only thread attach query, and has not completely waited for thread_wait_gc_max_milliseconds, // will wait for gc, asynchronous cancel or throw bad::alloc. // Otherwise, if the external catch, directly throw bad::alloc. + auto err_msg = fmt::format( + "Allocator sys memory check failed: Cannot alloc:{}, consuming " + "tracker:<{}>, exec node:<{}>, {}.", + size, doris::thread_context()->thread_mem_tracker()->label(), + doris::thread_context()->thread_mem_tracker_mgr->last_consumer_tracker(), + doris::MemTrackerLimiter::process_limit_exceeded_errmsg_str()); if (doris::thread_context()->thread_mem_tracker_mgr->is_attach_query() && doris::thread_context()->thread_mem_tracker_mgr->wait_gc()) { int64_t wait_milliseconds = doris::config::thread_wait_gc_max_milliseconds; + LOG(INFO) << fmt::format("Query:{} waiting for enough memory, maximum 5s, {}.", + print_id(doris::thread_context()->task_id()), err_msg); while (wait_milliseconds > 0) { std::this_thread::sleep_for(std::chrono::milliseconds(100)); if (!doris::MemTrackerLimiter::sys_mem_exceed_limit_check(size)) { doris::MemInfo::refresh_interval_memory_growth += size; break; } + if (doris::ExecEnv::GetInstance()->fragment_mgr()->query_is_canceled( + doris::thread_context()->task_id())) { + wait_milliseconds = 0; + break; + } wait_milliseconds -= 100; } if (wait_milliseconds <= 0) { // Make sure to completely wait thread_wait_gc_max_milliseconds only once. doris::thread_context()->thread_mem_tracker_mgr->disable_wait_gc(); - auto err_msg = fmt::format( - "Allocator sys memory check failed: Cannot alloc:{}, consuming " - "tracker:<{}>, exec node:<{}>, {}.", - size, doris::thread_context()->thread_mem_tracker()->label(), - doris::thread_context()->thread_mem_tracker_mgr->last_consumer_tracker(), - doris::MemTrackerLimiter::process_limit_exceeded_errmsg_str()); doris::MemTrackerLimiter::print_log_process_usage(err_msg); // If the external catch, throw bad::alloc first, let the query actively cancel. Otherwise asynchronous cancel. if (!doris::enable_thread_catch_bad_alloc) { + LOG(INFO) << fmt::format( + "Query:{} canceled asyn, after waiting for memory 5s, {}.", + print_id(doris::thread_context()->task_id()), err_msg); doris::thread_context()->thread_mem_tracker_mgr->cancel_fragment(err_msg); } else { + LOG(INFO) << fmt::format( + "Query:{} throw exception, after waiting for memory 5s, {}.", + print_id(doris::thread_context()->task_id()), err_msg); throw doris::Exception(doris::ErrorCode::MEM_ALLOC_FAILED, err_msg); } } + // else, enough memory is available, the query continues execute. } else if (doris::enable_thread_catch_bad_alloc) { - auto err_msg = fmt::format( - "Allocator sys memory check failed: Cannot alloc:{}, consuming tracker:<{}>, " - "exec node:<{}>, {}.", - size, doris::thread_context()->thread_mem_tracker()->label(), - doris::thread_context()->thread_mem_tracker_mgr->last_consumer_tracker(), - doris::MemTrackerLimiter::process_limit_exceeded_errmsg_str()); + LOG(INFO) << fmt::format("throw exception, {}.", err_msg); doris::MemTrackerLimiter::print_log_process_usage(err_msg); throw doris::Exception(doris::ErrorCode::MEM_ALLOC_FAILED, err_msg); } @@ -85,7 +96,6 @@ void Allocator::memory_tracker_check(siz if (doris::skip_memory_check) return; auto st = doris::thread_context()->thread_mem_tracker()->check_limit(size); if (!st) { - doris::thread_context()->thread_mem_tracker_mgr->disable_wait_gc(); auto err_msg = doris::thread_context()->thread_mem_tracker()->query_tracker_limit_exceeded_str( st.to_string(), @@ -93,9 +103,19 @@ void Allocator::memory_tracker_check(siz "Allocator mem tracker check failed"); doris::thread_context()->thread_mem_tracker()->print_log_usage(err_msg); // If the external catch, throw bad::alloc first, let the query actively cancel. Otherwise asynchronous cancel. - if (!doris::enable_thread_catch_bad_alloc) { - doris::thread_context()->thread_mem_tracker_mgr->cancel_fragment(err_msg); + if (doris::thread_context()->thread_mem_tracker_mgr->is_attach_query()) { + doris::thread_context()->thread_mem_tracker_mgr->disable_wait_gc(); + if (!doris::enable_thread_catch_bad_alloc) { + LOG(INFO) << fmt::format("Query:{} canceled asyn, {}.", + print_id(doris::thread_context()->task_id()), err_msg); + doris::thread_context()->thread_mem_tracker_mgr->cancel_fragment(err_msg); + } else { + LOG(INFO) << fmt::format("Query:{} throw exception, {}.", + print_id(doris::thread_context()->task_id()), err_msg); + throw doris::Exception(doris::ErrorCode::MEM_ALLOC_FAILED, err_msg); + } } else { + LOG(INFO) << fmt::format("throw exception, {}.", err_msg); throw doris::Exception(doris::ErrorCode::MEM_ALLOC_FAILED, err_msg); } } diff --git a/be/src/vec/runtime/vdata_stream_recvr.cpp b/be/src/vec/runtime/vdata_stream_recvr.cpp index 1c965a9d3d..ebee26783a 100644 --- a/be/src/vec/runtime/vdata_stream_recvr.cpp +++ b/be/src/vec/runtime/vdata_stream_recvr.cpp @@ -360,7 +360,7 @@ Status VDataStreamRecvr::create_merger(const std::vector& orderin void VDataStreamRecvr::add_block(const PBlock& pblock, int sender_id, int be_number, int64_t packet_seq, ::google::protobuf::Closure** done) { - SCOPED_ATTACH_TASK(_query_mem_tracker, print_id(_query_id), _fragment_instance_id); + SCOPED_ATTACH_TASK(_query_mem_tracker, _query_id, _fragment_instance_id); int use_sender_id = _is_merging ? sender_id : 0; _sender_queues[use_sender_id]->add_block(pblock, be_number, packet_seq, done); }