[fix](mem_tracker] attach mem tracker in FragmentMgr::apply_filter (#35128)

This commit is contained in:
Jerry Hu
2024-05-21 19:39:39 +08:00
committed by yiguolei
parent e8fb47bec1
commit ced0093d74

View File

@ -1313,6 +1313,7 @@ Status FragmentMgr::apply_filter(const PPublishFilterRequest* request,
std::shared_ptr<PlanFragmentExecutor> fragment_executor;
std::shared_ptr<pipeline::PipelineFragmentContext> pip_context;
QueryThreadContext query_thread_context;
RuntimeFilterMgr* runtime_filter_mgr = nullptr;
if (is_pipeline) {
@ -1326,6 +1327,8 @@ Status FragmentMgr::apply_filter(const PPublishFilterRequest* request,
DCHECK(pip_context != nullptr);
runtime_filter_mgr = pip_context->get_query_ctx()->runtime_filter_mgr();
query_thread_context = {pip_context->get_query_ctx()->query_id(),
pip_context->get_query_ctx()->query_mem_tracker};
} else {
std::unique_lock<std::mutex> lock(_lock);
auto iter = _fragment_instance_map.find(tfragment_instance_id);
@ -1338,8 +1341,11 @@ Status FragmentMgr::apply_filter(const PPublishFilterRequest* request,
DCHECK(fragment_executor != nullptr);
runtime_filter_mgr =
fragment_executor->runtime_state()->get_query_ctx()->runtime_filter_mgr();
query_thread_context = {fragment_executor->get_query_ctx()->query_id(),
fragment_executor->get_query_ctx()->query_mem_tracker};
}
SCOPED_ATTACH_TASK(query_thread_context);
return runtime_filter_mgr->update_filter(request, attach_data);
}