From 24b1b4d96bfb90ea23687d6224e95a4dbe615a22 Mon Sep 17 00:00:00 2001 From: Mryange <59914473+Mryange@users.noreply.github.com> Date: Mon, 25 Dec 2023 10:35:22 +0800 Subject: [PATCH] [fix](pipelineX) fix use global rf when there no shared_scans (#28869) --- be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp | 4 +++- be/src/pipeline/pipeline_x/pipeline_x_fragment_context.h | 2 ++ 2 files changed, 5 insertions(+), 1 deletion(-) diff --git a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp index 7efe476c6d..86440051a9 100644 --- a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp +++ b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp @@ -212,6 +212,8 @@ Status PipelineXFragmentContext::prepare(const doris::TPipelineFragmentParams& r _runtime_state->set_total_load_streams(request.total_load_streams); _runtime_state->set_num_local_sink(request.num_local_sink); + _use_global_rf = request.__isset.parallel_instances && (request.__isset.per_node_shared_scans && + !request.per_node_shared_scans.empty()); // 2. Build pipelines with operators in this fragment. auto root_pipeline = add_pipeline(); RETURN_IF_ERROR_OR_CATCH_EXCEPTION(_build_pipelines( @@ -985,7 +987,7 @@ Status PipelineXFragmentContext::_create_operator(ObjectPool* pool, const TPlanN DataSinkOperatorXPtr sink; sink.reset(new HashJoinBuildSinkOperatorX(pool, next_sink_operator_id(), tnode, descs, - request.__isset.parallel_instances)); + _use_global_rf)); sink->set_dests_id({op->operator_id()}); RETURN_IF_ERROR(build_side_pipe->set_sink(sink)); RETURN_IF_ERROR(build_side_pipe->sink_x()->init(tnode, _runtime_state.get())); diff --git a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.h b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.h index be0e26461d..6fe3488096 100644 --- a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.h +++ b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.h @@ -168,6 +168,8 @@ private: // this is a [n * m] matrix. n is parallelism of pipeline engine and m is the number of pipelines. std::vector>> _tasks; + bool _use_global_rf = false; + // It is used to manage the lifecycle of RuntimeFilterMergeController std::vector> _merge_controller_handlers;