[fix](pipelineX) fix use global rf when there no shared_scans (#28869)

This commit is contained in:
Mryange
2023-12-25 10:35:22 +08:00
committed by GitHub
parent e326ebb63e
commit 24b1b4d96b
2 changed files with 5 additions and 1 deletions

View File

@ -212,6 +212,8 @@ Status PipelineXFragmentContext::prepare(const doris::TPipelineFragmentParams& r
_runtime_state->set_total_load_streams(request.total_load_streams);
_runtime_state->set_num_local_sink(request.num_local_sink);
_use_global_rf = request.__isset.parallel_instances && (request.__isset.per_node_shared_scans &&
!request.per_node_shared_scans.empty());
// 2. Build pipelines with operators in this fragment.
auto root_pipeline = add_pipeline();
RETURN_IF_ERROR_OR_CATCH_EXCEPTION(_build_pipelines(
@ -985,7 +987,7 @@ Status PipelineXFragmentContext::_create_operator(ObjectPool* pool, const TPlanN
DataSinkOperatorXPtr sink;
sink.reset(new HashJoinBuildSinkOperatorX(pool, next_sink_operator_id(), tnode, descs,
request.__isset.parallel_instances));
_use_global_rf));
sink->set_dests_id({op->operator_id()});
RETURN_IF_ERROR(build_side_pipe->set_sink(sink));
RETURN_IF_ERROR(build_side_pipe->sink_x()->init(tnode, _runtime_state.get()));

View File

@ -168,6 +168,8 @@ private:
// this is a [n * m] matrix. n is parallelism of pipeline engine and m is the number of pipelines.
std::vector<std::vector<std::unique_ptr<PipelineXTask>>> _tasks;
bool _use_global_rf = false;
// It is used to manage the lifecycle of RuntimeFilterMergeController
std::vector<std::shared_ptr<RuntimeFilterMergeControllerEntity>> _merge_controller_handlers;