From bda35f9ff097864689229e0423bcbd03f1add711 Mon Sep 17 00:00:00 2001 From: Pxl Date: Tue, 26 Mar 2024 19:10:31 +0800 Subject: [PATCH] [Bug](runtime-filter) fix unknown filter on nested loop join sink #32851 --- be/src/pipeline/exec/nested_loop_join_build_operator.cpp | 8 +++++--- be/src/pipeline/exec/nested_loop_join_build_operator.h | 3 ++- .../pipeline/pipeline_x/pipeline_x_fragment_context.cpp | 4 ++-- 3 files changed, 9 insertions(+), 6 deletions(-) diff --git a/be/src/pipeline/exec/nested_loop_join_build_operator.cpp b/be/src/pipeline/exec/nested_loop_join_build_operator.cpp index fec2edc71b..f074afce37 100644 --- a/be/src/pipeline/exec/nested_loop_join_build_operator.cpp +++ b/be/src/pipeline/exec/nested_loop_join_build_operator.cpp @@ -42,8 +42,8 @@ Status NestedLoopJoinBuildSinkLocalState::init(RuntimeState* state, LocalSinkSta } _runtime_filters.resize(p._runtime_filter_descs.size()); for (size_t i = 0; i < p._runtime_filter_descs.size(); i++) { - RETURN_IF_ERROR(state->register_producer_runtime_filter(p._runtime_filter_descs[i], false, - &_runtime_filters[i], false)); + RETURN_IF_ERROR(state->register_producer_runtime_filter( + p._runtime_filter_descs[i], p._need_local_merge, &_runtime_filters[i], false)); } return Status::OK(); } @@ -51,9 +51,11 @@ Status NestedLoopJoinBuildSinkLocalState::init(RuntimeState* state, LocalSinkSta NestedLoopJoinBuildSinkOperatorX::NestedLoopJoinBuildSinkOperatorX(ObjectPool* pool, int operator_id, const TPlanNode& tnode, - const DescriptorTbl& descs) + const DescriptorTbl& descs, + bool need_local_merge) : JoinBuildSinkOperatorX(pool, operator_id, tnode, descs), + _need_local_merge(need_local_merge), _is_output_left_side_only(tnode.nested_loop_join_node.__isset.is_output_left_side_only && tnode.nested_loop_join_node.is_output_left_side_only), _row_descriptor(descs, tnode.row_tuples, tnode.nullable_tuples) {} diff --git a/be/src/pipeline/exec/nested_loop_join_build_operator.h b/be/src/pipeline/exec/nested_loop_join_build_operator.h index 801d4ff88e..52f723b13a 100644 --- a/be/src/pipeline/exec/nested_loop_join_build_operator.h +++ b/be/src/pipeline/exec/nested_loop_join_build_operator.h @@ -78,7 +78,7 @@ class NestedLoopJoinBuildSinkOperatorX final : public JoinBuildSinkOperatorX { public: NestedLoopJoinBuildSinkOperatorX(ObjectPool* pool, int operator_id, const TPlanNode& tnode, - const DescriptorTbl& descs); + const DescriptorTbl& descs, bool need_local_merge); Status init(const TDataSink& tsink) override { return Status::InternalError( "{} should not init with TDataSink", @@ -105,6 +105,7 @@ private: vectorized::VExprContextSPtrs _filter_src_expr_ctxs; + bool _need_local_merge; const bool _is_output_left_side_only; RowDescriptor _row_descriptor; }; diff --git a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp index 2ee60091a7..fa53e6f4b1 100644 --- a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp +++ b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp @@ -1087,8 +1087,8 @@ Status PipelineXFragmentContext::_create_operator(ObjectPool* pool, const TPlanN _dag[downstream_pipeline_id].push_back(build_side_pipe->id()); DataSinkOperatorXPtr sink; - sink.reset( - new NestedLoopJoinBuildSinkOperatorX(pool, next_sink_operator_id(), tnode, descs)); + sink.reset(new NestedLoopJoinBuildSinkOperatorX(pool, next_sink_operator_id(), tnode, descs, + _need_local_merge)); sink->set_dests_id({op->operator_id()}); RETURN_IF_ERROR(build_side_pipe->set_sink(sink)); RETURN_IF_ERROR(build_side_pipe->sink_x()->init(tnode, _runtime_state.get()));