From d4249e4f2dddd13ca6db472d772a7b2c5146be40 Mon Sep 17 00:00:00 2001 From: Pxl <952130278@qq.com> Date: Wed, 27 Oct 2021 09:54:52 +0800 Subject: [PATCH] [Bug] fix Runtime filter can't find fragment-id when apply_filter called early (#6923) #6921 --- be/src/runtime/fragment_mgr.cpp | 14 +++++++++++--- be/src/runtime/fragment_mgr.h | 2 ++ 2 files changed, 13 insertions(+), 3 deletions(-) diff --git a/be/src/runtime/fragment_mgr.cpp b/be/src/runtime/fragment_mgr.cpp index f980e7b384..a8ff209dc8 100644 --- a/be/src/runtime/fragment_mgr.cpp +++ b/be/src/runtime/fragment_mgr.cpp @@ -610,6 +610,7 @@ Status FragmentMgr::exec_plan_fragment(const TExecPlanFragmentParams& params, Fi { std::lock_guard lock(_lock); _fragment_map.insert(std::make_pair(params.params.fragment_instance_id, exec_state)); + _cv.notify_all(); } auto st = _thread_pool->submit_func( @@ -747,7 +748,7 @@ Status FragmentMgr::exec_external_plan_fragment(const TScanOpenParams& params, } VLOG_QUERY << "BackendService execute open() TQueryPlanInfo: " - << apache::thrift::ThriftDebugString(t_query_plan_info); + << apache::thrift::ThriftDebugString(t_query_plan_info); // assign the param used to execute PlanFragment TExecPlanFragmentParams exec_fragment_params; exec_fragment_params.protocol_version = (PaloInternalServiceVersion::type)0; @@ -811,14 +812,21 @@ Status FragmentMgr::apply_filter(const PPublishFilterRequest* request, const cha std::shared_ptr fragment_state; { - std::lock_guard lock(_lock); + std::unique_lock lock(_lock); + if (!_fragment_map.count(tfragment_instance_id)) { + VLOG_NOTICE << "wait for fragment start execute, fragment-id:" << fragment_instance_id; + _cv.wait_for(lock, std::chrono::milliseconds(1000), + [&] { return _fragment_map.count(tfragment_instance_id); }); + } + auto iter = _fragment_map.find(tfragment_instance_id); if (iter == _fragment_map.end()) { - LOG(WARNING) << "unknown.... fragment-id:" << fragment_instance_id; + VLOG_CRITICAL << "unknown.... fragment-id:" << fragment_instance_id; return Status::InvalidArgument("fragment-id"); } fragment_state = iter->second; } + DCHECK(fragment_state != nullptr); RuntimeFilterMgr* runtime_filter_mgr = fragment_state->executor()->runtime_state()->runtime_filter_mgr(); diff --git a/be/src/runtime/fragment_mgr.h b/be/src/runtime/fragment_mgr.h index 70233e1e44..ba562164a2 100644 --- a/be/src/runtime/fragment_mgr.h +++ b/be/src/runtime/fragment_mgr.h @@ -99,6 +99,8 @@ private: std::mutex _lock; + std::condition_variable _cv; + // Make sure that remove this before no data reference FragmentExecState std::unordered_map> _fragment_map; // query id -> QueryFragmentsCtx