[Bug] fix Runtime filter can't find fragment-id when apply_filter called early (#6923)

#6921
This commit is contained in:
Pxl
2021-10-27 09:54:52 +08:00
committed by GitHub
parent 77a954d02c
commit d4249e4f2d
2 changed files with 13 additions and 3 deletions

View File

@ -610,6 +610,7 @@ Status FragmentMgr::exec_plan_fragment(const TExecPlanFragmentParams& params, Fi
{
std::lock_guard<std::mutex> lock(_lock);
_fragment_map.insert(std::make_pair(params.params.fragment_instance_id, exec_state));
_cv.notify_all();
}
auto st = _thread_pool->submit_func(
@ -747,7 +748,7 @@ Status FragmentMgr::exec_external_plan_fragment(const TScanOpenParams& params,
}
VLOG_QUERY << "BackendService execute open() TQueryPlanInfo: "
<< apache::thrift::ThriftDebugString(t_query_plan_info);
<< apache::thrift::ThriftDebugString(t_query_plan_info);
// assign the param used to execute PlanFragment
TExecPlanFragmentParams exec_fragment_params;
exec_fragment_params.protocol_version = (PaloInternalServiceVersion::type)0;
@ -811,14 +812,21 @@ Status FragmentMgr::apply_filter(const PPublishFilterRequest* request, const cha
std::shared_ptr<FragmentExecState> fragment_state;
{
std::lock_guard<std::mutex> lock(_lock);
std::unique_lock<std::mutex> lock(_lock);
if (!_fragment_map.count(tfragment_instance_id)) {
VLOG_NOTICE << "wait for fragment start execute, fragment-id:" << fragment_instance_id;
_cv.wait_for(lock, std::chrono::milliseconds(1000),
[&] { return _fragment_map.count(tfragment_instance_id); });
}
auto iter = _fragment_map.find(tfragment_instance_id);
if (iter == _fragment_map.end()) {
LOG(WARNING) << "unknown.... fragment-id:" << fragment_instance_id;
VLOG_CRITICAL << "unknown.... fragment-id:" << fragment_instance_id;
return Status::InvalidArgument("fragment-id");
}
fragment_state = iter->second;
}
DCHECK(fragment_state != nullptr);
RuntimeFilterMgr* runtime_filter_mgr =
fragment_state->executor()->runtime_state()->runtime_filter_mgr();

View File

@ -99,6 +99,8 @@ private:
std::mutex _lock;
std::condition_variable _cv;
// Make sure that remove this before no data reference FragmentExecState
std::unordered_map<TUniqueId, std::shared_ptr<FragmentExecState>> _fragment_map;
// query id -> QueryFragmentsCtx