diff --git a/be/src/runtime/memory/mem_tracker_limiter.cpp b/be/src/runtime/memory/mem_tracker_limiter.cpp index 7a4ff271f4..24a79bc1b6 100644 --- a/be/src/runtime/memory/mem_tracker_limiter.cpp +++ b/be/src/runtime/memory/mem_tracker_limiter.cpp @@ -381,8 +381,7 @@ int64_t MemTrackerLimiter::free_top_memory_query( std::lock_guard l(tracker_groups[i].group_lock); for (auto tracker : tracker_groups[i].trackers) { if (tracker->type() == type) { - if (ExecEnv::GetInstance()->fragment_mgr()->query_is_canceled( - label_to_queryid(tracker->label()))) { + if (tracker->is_query_cancelled()) { continue; } if (tracker->consumption() > min_free_mem) { @@ -441,8 +440,7 @@ int64_t MemTrackerLimiter::free_top_overcommit_query( if (tracker->consumption() <= 33554432) { // 32M small query does not cancel continue; } - if (ExecEnv::GetInstance()->fragment_mgr()->query_is_canceled( - label_to_queryid(tracker->label()))) { + if (tracker->is_query_cancelled()) { continue; } int64_t overcommit_ratio = diff --git a/be/src/runtime/memory/mem_tracker_limiter.h b/be/src/runtime/memory/mem_tracker_limiter.h index ffeceae352..cb939518bc 100644 --- a/be/src/runtime/memory/mem_tracker_limiter.h +++ b/be/src/runtime/memory/mem_tracker_limiter.h @@ -138,6 +138,10 @@ public: // this tracker limiter. int64_t spare_capacity() const { return _limit - consumption(); } + bool is_query_cancelled() { return _is_query_cancelled; } + + void set_is_query_cancelled(bool is_cancelled) { _is_query_cancelled.store(is_cancelled); } + static void disable_oom_avoidance() { _oom_avoidance = false; } public: @@ -254,6 +258,9 @@ private: // to avoid frequent calls to consume/release of MemTracker. std::atomic _untracked_mem = 0; + // query or load + std::atomic _is_query_cancelled = false; + // Avoid frequent printing. bool _enable_print_log_usage = false; static std::atomic _enable_print_log_process_usage; diff --git a/be/src/runtime/query_context.h b/be/src/runtime/query_context.h index e1516cc391..8ed0150db5 100644 --- a/be/src/runtime/query_context.h +++ b/be/src/runtime/query_context.h @@ -109,6 +109,9 @@ public: _is_cancelled = is_cancelled; _ready_to_execute = true; } + if (query_mem_tracker && is_cancelled) { + query_mem_tracker->set_is_query_cancelled(is_cancelled); + } _start_cond.notify_all(); } void set_ready_to_execute_only() { diff --git a/be/src/runtime/task_group/task_group.cpp b/be/src/runtime/task_group/task_group.cpp index ea4fe90632..dad31cae1e 100644 --- a/be/src/runtime/task_group/task_group.cpp +++ b/be/src/runtime/task_group/task_group.cpp @@ -128,7 +128,7 @@ int64_t TaskGroup::memory_used() { for (auto& mem_tracker_group : _mem_tracker_limiter_pool) { std::lock_guard l(mem_tracker_group.group_lock); for (const auto& tracker : mem_tracker_group.trackers) { - used_memory += tracker->consumption(); + used_memory += tracker->is_query_cancelled() ? 0 : tracker->consumption(); } } return used_memory;