[Bug](runtime-filter) fix unknown filter on nested loop join sink #32851

This commit is contained in:
Pxl
2024-03-26 19:10:31 +08:00
committed by GitHub
parent efe684572e
commit bda35f9ff0
3 changed files with 9 additions and 6 deletions

View File

@ -42,8 +42,8 @@ Status NestedLoopJoinBuildSinkLocalState::init(RuntimeState* state, LocalSinkSta
}
_runtime_filters.resize(p._runtime_filter_descs.size());
for (size_t i = 0; i < p._runtime_filter_descs.size(); i++) {
RETURN_IF_ERROR(state->register_producer_runtime_filter(p._runtime_filter_descs[i], false,
&_runtime_filters[i], false));
RETURN_IF_ERROR(state->register_producer_runtime_filter(
p._runtime_filter_descs[i], p._need_local_merge, &_runtime_filters[i], false));
}
return Status::OK();
}
@ -51,9 +51,11 @@ Status NestedLoopJoinBuildSinkLocalState::init(RuntimeState* state, LocalSinkSta
NestedLoopJoinBuildSinkOperatorX::NestedLoopJoinBuildSinkOperatorX(ObjectPool* pool,
int operator_id,
const TPlanNode& tnode,
const DescriptorTbl& descs)
const DescriptorTbl& descs,
bool need_local_merge)
: JoinBuildSinkOperatorX<NestedLoopJoinBuildSinkLocalState>(pool, operator_id, tnode,
descs),
_need_local_merge(need_local_merge),
_is_output_left_side_only(tnode.nested_loop_join_node.__isset.is_output_left_side_only &&
tnode.nested_loop_join_node.is_output_left_side_only),
_row_descriptor(descs, tnode.row_tuples, tnode.nullable_tuples) {}

View File

@ -78,7 +78,7 @@ class NestedLoopJoinBuildSinkOperatorX final
: public JoinBuildSinkOperatorX<NestedLoopJoinBuildSinkLocalState> {
public:
NestedLoopJoinBuildSinkOperatorX(ObjectPool* pool, int operator_id, const TPlanNode& tnode,
const DescriptorTbl& descs);
const DescriptorTbl& descs, bool need_local_merge);
Status init(const TDataSink& tsink) override {
return Status::InternalError(
"{} should not init with TDataSink",
@ -105,6 +105,7 @@ private:
vectorized::VExprContextSPtrs _filter_src_expr_ctxs;
bool _need_local_merge;
const bool _is_output_left_side_only;
RowDescriptor _row_descriptor;
};

View File

@ -1087,8 +1087,8 @@ Status PipelineXFragmentContext::_create_operator(ObjectPool* pool, const TPlanN
_dag[downstream_pipeline_id].push_back(build_side_pipe->id());
DataSinkOperatorXPtr sink;
sink.reset(
new NestedLoopJoinBuildSinkOperatorX(pool, next_sink_operator_id(), tnode, descs));
sink.reset(new NestedLoopJoinBuildSinkOperatorX(pool, next_sink_operator_id(), tnode, descs,
_need_local_merge));
sink->set_dests_id({op->operator_id()});
RETURN_IF_ERROR(build_side_pipe->set_sink(sink));
RETURN_IF_ERROR(build_side_pipe->sink_x()->init(tnode, _runtime_state.get()));