diff --git a/be/src/olap/page_cache.h b/be/src/olap/page_cache.h index cc52f4a252..7e5ca4de6e 100644 --- a/be/src/olap/page_cache.h +++ b/be/src/olap/page_cache.h @@ -42,7 +42,6 @@ public: PageBase(size_t b) : _size(b), _capacity(b) { _data = reinterpret_cast(TAllocator::alloc(_capacity, ALLOCATOR_ALIGNMENT_16)); - ExecEnv::GetInstance()->page_no_cache_mem_tracker()->consume(_capacity); } PageBase(const PageBase&) = delete; @@ -52,7 +51,6 @@ public: if (_data != nullptr) { DCHECK(_capacity != 0 && _size != 0); TAllocator::free(_data, _capacity); - ExecEnv::GetInstance()->page_no_cache_mem_tracker()->release(_capacity); } } diff --git a/be/src/olap/rowset/segment_v2/page_handle.h b/be/src/olap/rowset/segment_v2/page_handle.h index 7e4f766524..6494048774 100644 --- a/be/src/olap/rowset/segment_v2/page_handle.h +++ b/be/src/olap/rowset/segment_v2/page_handle.h @@ -36,7 +36,9 @@ public: // This class will take the ownership of input data's memory. It will // free it when deconstructs. - PageHandle(DataPage* data) : _is_data_owner(true), _data(data) {} + PageHandle(DataPage* data) : _is_data_owner(true), _data(data) { + ExecEnv::GetInstance()->page_no_cache_mem_tracker()->consume(_data->capacity()); + } // This class will take the content of cache data, and will make input // cache_data to a invalid cache handle. @@ -59,6 +61,7 @@ public: ~PageHandle() { if (_is_data_owner) { + ExecEnv::GetInstance()->page_no_cache_mem_tracker()->release(_data->capacity()); delete _data; } else { DCHECK(_data == nullptr); diff --git a/be/src/pipeline/pipeline_fragment_context.cpp b/be/src/pipeline/pipeline_fragment_context.cpp index eea8d34b08..32207ad216 100644 --- a/be/src/pipeline/pipeline_fragment_context.cpp +++ b/be/src/pipeline/pipeline_fragment_context.cpp @@ -143,7 +143,14 @@ PipelineFragmentContext::PipelineFragmentContext( } PipelineFragmentContext::~PipelineFragmentContext() { - _call_back(_runtime_state.get(), &_exec_status); + if (_runtime_state != nullptr) { + // The memory released by the query end is recorded in the query mem tracker, main memory in _runtime_state. + SCOPED_ATTACH_TASK(_runtime_state.get()); + _call_back(_runtime_state.get(), &_exec_status); + _runtime_state.reset(); + } else { + _call_back(_runtime_state.get(), &_exec_status); + } DCHECK(!_report_thread_active); } diff --git a/be/src/runtime/plan_fragment_executor.cpp b/be/src/runtime/plan_fragment_executor.cpp index bd99a9a2e3..079719927b 100644 --- a/be/src/runtime/plan_fragment_executor.cpp +++ b/be/src/runtime/plan_fragment_executor.cpp @@ -88,7 +88,14 @@ PlanFragmentExecutor::PlanFragmentExecutor(ExecEnv* exec_env, } PlanFragmentExecutor::~PlanFragmentExecutor() { - close(); + if (_runtime_state != nullptr) { + // The memory released by the query end is recorded in the query mem tracker, main memory in _runtime_state. + SCOPED_ATTACH_TASK(_runtime_state.get()); + close(); + _runtime_state.reset(); + } else { + close(); + } // at this point, the report thread should have been stopped DCHECK(!_report_thread_active); } @@ -274,7 +281,6 @@ Status PlanFragmentExecutor::open() { if (_cancel_reason == PPlanFragmentCancelReason::CALL_RPC_ERROR) { status = Status::RuntimeError(_cancel_msg); } else if (_cancel_reason == PPlanFragmentCancelReason::MEMORY_LIMIT_EXCEED) { - // status = Status::MemoryAllocFailed(_cancel_msg); status = Status::MemoryLimitExceeded(_cancel_msg); } }