diff --git a/be/src/vec/exec/join/vhash_join_node.cpp b/be/src/vec/exec/join/vhash_join_node.cpp index 86e919a6ed..b067604d87 100644 --- a/be/src/vec/exec/join/vhash_join_node.cpp +++ b/be/src/vec/exec/join/vhash_join_node.cpp @@ -1031,9 +1031,9 @@ Status HashJoinNode::prepare(RuntimeState* state) { // _vother_join_conjuncts are evaluated in the context of the rows produced by this node if (_vother_join_conjunct_ptr) { - RETURN_IF_ERROR((*_vother_join_conjunct_ptr)->prepare(state, _intermediate_row_desc)); + RETURN_IF_ERROR((*_vother_join_conjunct_ptr)->prepare(state, *_intermediate_row_desc)); } - RETURN_IF_ERROR(VExpr::prepare(_output_expr_ctxs, state, _intermediate_row_desc)); + RETURN_IF_ERROR(VExpr::prepare(_output_expr_ctxs, state, *_intermediate_row_desc)); // right table data types _right_table_data_types = VectorizedUtils::get_data_types(child(1)->row_desc()); diff --git a/be/src/vec/exec/join/vjoin_node_base.cpp b/be/src/vec/exec/join/vjoin_node_base.cpp index 2c59b66b04..9a73de0c5b 100644 --- a/be/src/vec/exec/join/vjoin_node_base.cpp +++ b/be/src/vec/exec/join/vjoin_node_base.cpp @@ -28,7 +28,9 @@ namespace doris::vectorized { VJoinNodeBase::VJoinNodeBase(ObjectPool* pool, const TPlanNode& tnode, const DescriptorTbl& descs) : ExecNode(pool, tnode, descs), _join_op(tnode.__isset.hash_join_node ? tnode.hash_join_node.join_op - : tnode.nested_loop_join_node.join_op), + : (tnode.__isset.nested_loop_join_node + ? tnode.nested_loop_join_node.join_op + : TJoinOp::CROSS_JOIN)), _have_other_join_conjunct(tnode.__isset.hash_join_node ? tnode.hash_join_node.__isset.vother_join_conjunct : false), @@ -42,22 +44,24 @@ VJoinNodeBase::VJoinNodeBase(ObjectPool* pool, const TPlanNode& tnode, const Des _join_op == TJoinOp::LEFT_SEMI_JOIN)), _is_right_semi_anti(_join_op == TJoinOp::RIGHT_ANTI_JOIN || _join_op == TJoinOp::RIGHT_SEMI_JOIN), - _is_outer_join(_match_all_build || _match_all_probe), - _output_row_desc( - descs, - {tnode.__isset.hash_join_node ? tnode.hash_join_node.voutput_tuple_id - : tnode.nested_loop_join_node.voutput_tuple_id}, - {false}), - _intermediate_row_desc( - descs, - tnode.__isset.hash_join_node - ? tnode.hash_join_node.vintermediate_tuple_id_list - : tnode.nested_loop_join_node.vintermediate_tuple_id_list, - std::vector( - tnode.__isset.hash_join_node - ? tnode.hash_join_node.vintermediate_tuple_id_list.size() - : tnode.nested_loop_join_node.vintermediate_tuple_id_list - .size())) {} + _is_outer_join(_match_all_build || _match_all_probe) { + if (tnode.__isset.hash_join_node) { + _output_row_desc.reset( + new RowDescriptor(descs, {tnode.hash_join_node.voutput_tuple_id}, {false})); + _intermediate_row_desc.reset(new RowDescriptor( + descs, tnode.hash_join_node.vintermediate_tuple_id_list, + std::vector(tnode.hash_join_node.vintermediate_tuple_id_list.size()))); + } else if (tnode.__isset.nested_loop_join_node) { + _output_row_desc.reset( + new RowDescriptor(descs, {tnode.nested_loop_join_node.voutput_tuple_id}, {false})); + _intermediate_row_desc.reset(new RowDescriptor( + descs, tnode.nested_loop_join_node.vintermediate_tuple_id_list, + std::vector(tnode.nested_loop_join_node.vintermediate_tuple_id_list.size()))); + } else { + // Iff BE has been upgraded and FE has not yet, we should keep origin logics for CROSS JOIN. + DCHECK_EQ(_join_op, TJoinOp::CROSS_JOIN); + } +} Status VJoinNodeBase::close(RuntimeState* state) { START_AND_SCOPE_SPAN(state->get_tracer(), span, "VJoinNodeBase::close"); @@ -66,7 +70,7 @@ Status VJoinNodeBase::close(RuntimeState* state) { } void VJoinNodeBase::_construct_mutable_join_block() { - const auto& mutable_block_desc = _intermediate_row_desc; + const auto& mutable_block_desc = intermediate_row_desc(); for (const auto tuple_desc : mutable_block_desc.tuple_descriptors()) { for (const auto slot_desc : tuple_desc->slots()) { auto type_ptr = slot_desc->get_data_type_ptr(); @@ -78,9 +82,9 @@ void VJoinNodeBase::_construct_mutable_join_block() { Status VJoinNodeBase::_build_output_block(Block* origin_block, Block* output_block) { auto is_mem_reuse = output_block->mem_reuse(); MutableBlock mutable_block = - is_mem_reuse ? MutableBlock(output_block) - : MutableBlock(VectorizedUtils::create_empty_columnswithtypename( - _output_row_desc)); + is_mem_reuse + ? MutableBlock(output_block) + : MutableBlock(VectorizedUtils::create_empty_columnswithtypename(row_desc())); auto rows = origin_block->rows(); // TODO: After FE plan support same nullable of output expr and origin block and mutable column // we should replace `insert_column_datas` by `insert_range_from` @@ -97,13 +101,13 @@ Status VJoinNodeBase::_build_output_block(Block* origin_block, Block* output_blo if (rows != 0) { auto& mutable_columns = mutable_block.mutable_columns(); if (_output_expr_ctxs.empty()) { - DCHECK(mutable_columns.size() == _output_row_desc.num_materialized_slots()); + DCHECK(mutable_columns.size() == row_desc().num_materialized_slots()); for (int i = 0; i < mutable_columns.size(); ++i) { insert_column_datas(mutable_columns[i], *origin_block->get_by_position(i).column, rows); } } else { - DCHECK(mutable_columns.size() == _output_row_desc.num_materialized_slots()); + DCHECK(mutable_columns.size() == row_desc().num_materialized_slots()); for (int i = 0; i < mutable_columns.size(); ++i) { auto result_column_id = -1; RETURN_IF_ERROR(_output_expr_ctxs[i]->execute(origin_block, &result_column_id)); @@ -123,13 +127,15 @@ Status VJoinNodeBase::_build_output_block(Block* origin_block, Block* output_blo } Status VJoinNodeBase::init(const TPlanNode& tnode, RuntimeState* state) { - const auto& output_exprs = tnode.__isset.hash_join_node - ? tnode.hash_join_node.srcExprList - : tnode.nested_loop_join_node.srcExprList; - for (const auto& expr : output_exprs) { - VExprContext* ctx = nullptr; - RETURN_IF_ERROR(VExpr::create_expr_tree(_pool, expr, &ctx)); - _output_expr_ctxs.push_back(ctx); + if (tnode.__isset.hash_join_node || tnode.__isset.nested_loop_join_node) { + const auto& output_exprs = tnode.__isset.hash_join_node + ? tnode.hash_join_node.srcExprList + : tnode.nested_loop_join_node.srcExprList; + for (const auto& expr : output_exprs) { + VExprContext* ctx = nullptr; + RETURN_IF_ERROR(VExpr::create_expr_tree(_pool, expr, &ctx)); + _output_expr_ctxs.push_back(ctx); + } } return ExecNode::init(tnode, state); } diff --git a/be/src/vec/exec/join/vjoin_node_base.h b/be/src/vec/exec/join/vjoin_node_base.h index dd41f5ca1f..94e4902fac 100644 --- a/be/src/vec/exec/join/vjoin_node_base.h +++ b/be/src/vec/exec/join/vjoin_node_base.h @@ -47,9 +47,11 @@ public: virtual Status open(RuntimeState* state) override; - const RowDescriptor& row_desc() const override { return _output_row_desc; } + virtual const RowDescriptor& row_desc() const override { return *_output_row_desc; } - const RowDescriptor& intermediate_row_desc() const override { return _intermediate_row_desc; } + virtual const RowDescriptor& intermediate_row_desc() const override { + return *_intermediate_row_desc; + } virtual Status init(const TPlanNode& tnode, RuntimeState* state = nullptr) override; @@ -80,8 +82,8 @@ protected: const bool _is_right_semi_anti; const bool _is_outer_join; - RowDescriptor _output_row_desc; - RowDescriptor _intermediate_row_desc; + std::unique_ptr _output_row_desc; + std::unique_ptr _intermediate_row_desc; // output expr std::vector _output_expr_ctxs; diff --git a/be/src/vec/exec/join/vnested_loop_join_node.cpp b/be/src/vec/exec/join/vnested_loop_join_node.cpp index c966b9ab2c..5bea38a51f 100644 --- a/be/src/vec/exec/join/vnested_loop_join_node.cpp +++ b/be/src/vec/exec/join/vnested_loop_join_node.cpp @@ -39,7 +39,8 @@ VNestedLoopJoinNode::VNestedLoopJoinNode(ObjectPool* pool, const TPlanNode& tnod _cur_probe_row_visited_flags(false), _matched_rows_done(false), _left_block_pos(0), - _left_side_eos(false) {} + _left_side_eos(false), + _old_version_flag(!tnode.__isset.nested_loop_join_node) {} Status VNestedLoopJoinNode::prepare(RuntimeState* state) { DCHECK(_join_op == TJoinOp::CROSS_JOIN || _join_op == TJoinOp::INNER_JOIN || @@ -65,7 +66,7 @@ Status VNestedLoopJoinNode::prepare(RuntimeState* state) { _num_probe_side_columns = child(0)->row_desc().num_materialized_slots(); _num_build_side_columns = child(1)->row_desc().num_materialized_slots(); - RETURN_IF_ERROR(VExpr::prepare(_output_expr_ctxs, state, _intermediate_row_desc)); + RETURN_IF_ERROR(VExpr::prepare(_output_expr_ctxs, state, *_intermediate_row_desc)); _construct_mutable_join_block(); return Status::OK(); } diff --git a/be/src/vec/exec/join/vnested_loop_join_node.h b/be/src/vec/exec/join/vnested_loop_join_node.h index 313b33e626..0254c11cf2 100644 --- a/be/src/vec/exec/join/vnested_loop_join_node.h +++ b/be/src/vec/exec/join/vnested_loop_join_node.h @@ -48,6 +48,16 @@ public: void debug_string(int indentation_level, std::stringstream* out) const override; + const RowDescriptor& intermediate_row_desc() const override { + return _old_version_flag ? _row_descriptor : *_intermediate_row_desc; + } + + const RowDescriptor& row_desc() const override { + return _old_version_flag + ? (_output_row_descriptor ? *_output_row_descriptor : _row_descriptor) + : *_output_row_desc; + } + private: Status _materialize_build_side(RuntimeState* state) override; @@ -89,6 +99,8 @@ private: int _left_block_pos; // current scan pos in _left_block bool _left_side_eos; // if true, left child has no more rows to process + + bool _old_version_flag; }; } // namespace doris::vectorized