diff --git a/be/src/exec/exec_node.cpp b/be/src/exec/exec_node.cpp index 368e94562a..ed032d0976 100644 --- a/be/src/exec/exec_node.cpp +++ b/be/src/exec/exec_node.cpp @@ -514,6 +514,24 @@ std::string ExecNode::get_name() { Status ExecNode::do_projections(vectorized::Block* origin_block, vectorized::Block* output_block) { SCOPED_TIMER(_exec_timer); SCOPED_TIMER(_projection_timer); + auto insert_column_datas = [&](auto& to, vectorized::ColumnPtr& from, size_t rows) { + if (to->is_nullable() && !from->is_nullable()) { + if (_keep_origin || !from->is_exclusive()) { + auto& null_column = reinterpret_cast(*to); + null_column.get_nested_column().insert_range_from(*from, 0, rows); + null_column.get_null_map_column().get_data().resize_fill(rows, 0); + } else { + to = make_nullable(from, false)->assume_mutable(); + } + } else { + if (_keep_origin || !from->is_exclusive()) { + to->insert_range_from(*from, 0, rows); + } else { + to = from->assume_mutable(); + } + } + }; + using namespace vectorized; MutableBlock mutable_block = VectorizedUtils::build_mutable_mem_reuse_block(output_block, *_output_row_descriptor); @@ -535,13 +553,7 @@ Status ExecNode::do_projections(vectorized::Block* origin_block, vectorized::Blo auto column_ptr = origin_block->get_by_position(result_column_id) .column->convert_to_full_column_if_const(); //TODO: this is a quick fix, we need a new function like "change_to_nullable" to do it - if (mutable_columns[i]->is_nullable() xor column_ptr->is_nullable()) { - DCHECK(mutable_columns[i]->is_nullable() && !column_ptr->is_nullable()); - reinterpret_cast(mutable_columns[i].get()) - ->insert_range_from_not_nullable(*column_ptr, 0, rows); - } else { - mutable_columns[i]->insert_range_from(*column_ptr, 0, rows); - } + insert_column_datas(mutable_columns[i], column_ptr, rows); } DCHECK(mutable_block.rows() == rows); output_block->set_columns(std::move(mutable_columns)); diff --git a/be/src/exec/exec_node.h b/be/src/exec/exec_node.h index 1dd8979f5b..5d7b3a9165 100644 --- a/be/src/exec/exec_node.h +++ b/be/src/exec/exec_node.h @@ -325,6 +325,10 @@ protected: std::shared_ptr _query_statistics = nullptr; + //_keep_origin is used to avoid copying during projection, + // currently set to true only in the nestloop join. + bool _keep_origin = false; + private: static Status create_tree_helper(RuntimeState* state, ObjectPool* pool, const std::vector& tnodes, diff --git a/be/src/pipeline/exec/hashjoin_probe_operator.cpp b/be/src/pipeline/exec/hashjoin_probe_operator.cpp index e6a605405a..878a813ce0 100644 --- a/be/src/pipeline/exec/hashjoin_probe_operator.cpp +++ b/be/src/pipeline/exec/hashjoin_probe_operator.cpp @@ -173,6 +173,9 @@ void HashJoinProbeLocalState::init_for_probe(RuntimeState* state) { void HashJoinProbeLocalState::add_tuple_is_null_column(vectorized::Block* block) { DCHECK(_parent->cast()._is_outer_join); + if (!_parent->cast()._use_specific_projections) { + return; + } auto p0 = _tuple_is_null_left_flag_column->assume_mutable(); auto p1 = _tuple_is_null_right_flag_column->assume_mutable(); auto& left_null_map = reinterpret_cast(*p0); diff --git a/be/src/pipeline/exec/join_probe_operator.cpp b/be/src/pipeline/exec/join_probe_operator.cpp index 03b20fdb4d..48607309ec 100644 --- a/be/src/pipeline/exec/join_probe_operator.cpp +++ b/be/src/pipeline/exec/join_probe_operator.cpp @@ -82,6 +82,13 @@ template Status JoinProbeLocalState::_build_output_block( vectorized::Block* origin_block, vectorized::Block* output_block, bool keep_origin) { auto& p = Base::_parent->template cast(); + if (!Base::_projections.empty()) { + // In previous versions, the join node had a separate set of project structures, + // and you could see a 'todo' in the Thrift definition. + // Here, we have refactored it, but considering upgrade compatibility, we still need to retain the old code. + *output_block = *origin_block; + return Status::OK(); + } SCOPED_TIMER(_build_output_block_timer); auto is_mem_reuse = output_block->mem_reuse(); vectorized::MutableBlock mutable_block = @@ -192,19 +199,33 @@ JoinProbeOperatorX::JoinProbeOperatorX(ObjectPool* pool, const T : tnode.hash_join_node.__isset.is_mark ? tnode.hash_join_node.is_mark : false), _short_circuit_for_null_in_build_side(_join_op == TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN && - !_is_mark_join) { + !_is_mark_join), + _use_specific_projections( + tnode.__isset.hash_join_node + ? (tnode.hash_join_node.__isset.use_specific_projections + ? tnode.hash_join_node.use_specific_projections + : true) + : (tnode.nested_loop_join_node.__isset.use_specific_projections + ? tnode.nested_loop_join_node.use_specific_projections + : true) + + ) { if (tnode.__isset.hash_join_node) { _intermediate_row_desc.reset(new RowDescriptor( descs, tnode.hash_join_node.vintermediate_tuple_id_list, std::vector(tnode.hash_join_node.vintermediate_tuple_id_list.size()))); - _output_row_desc.reset( - new RowDescriptor(descs, {tnode.hash_join_node.voutput_tuple_id}, {false})); + if (!Base::_output_row_descriptor) { + _output_row_desc.reset( + new RowDescriptor(descs, {tnode.hash_join_node.voutput_tuple_id}, {false})); + } } else if (tnode.__isset.nested_loop_join_node) { _intermediate_row_desc.reset(new RowDescriptor( descs, tnode.nested_loop_join_node.vintermediate_tuple_id_list, std::vector(tnode.nested_loop_join_node.vintermediate_tuple_id_list.size()))); - _output_row_desc.reset( - new RowDescriptor(descs, {tnode.nested_loop_join_node.voutput_tuple_id}, {false})); + if (!Base::_output_row_descriptor) { + _output_row_desc.reset(new RowDescriptor( + descs, {tnode.nested_loop_join_node.voutput_tuple_id}, {false})); + } } else { // Iff BE has been upgraded and FE has not yet, we should keep origin logics for CROSS JOIN. DCHECK_EQ(_join_op, TJoinOp::CROSS_JOIN); diff --git a/be/src/pipeline/exec/join_probe_operator.h b/be/src/pipeline/exec/join_probe_operator.h index 9bb716ff36..4072baa72f 100644 --- a/be/src/pipeline/exec/join_probe_operator.h +++ b/be/src/pipeline/exec/join_probe_operator.h @@ -70,7 +70,12 @@ public: Status init(const TPlanNode& tnode, RuntimeState* state) override; Status open(doris::RuntimeState* state) override; - [[nodiscard]] const RowDescriptor& row_desc() const override { return *_output_row_desc; } + [[nodiscard]] const RowDescriptor& row_desc() const override { + if (Base::_output_row_descriptor) { + return *Base::_output_row_descriptor; + } + return *_output_row_desc; + } [[nodiscard]] const RowDescriptor& intermediate_row_desc() const override { return *_intermediate_row_desc; @@ -114,6 +119,11 @@ protected: vectorized::VExprContextSPtrs _output_expr_ctxs; OperatorXPtr _build_side_child = nullptr; const bool _short_circuit_for_null_in_build_side; + // In the Old planner, there is a plan for two columns of tuple is null, + // but in the Nereids planner, this logic does not exist. + // Therefore, we should not insert these two columns under the Nereids optimizer. + // use_specific_projections true, if output exprssions is denoted by srcExprList represents, o.w. PlanNode.projections + const bool _use_specific_projections; }; } // namespace pipeline diff --git a/be/src/pipeline/exec/nested_loop_join_probe_operator.cpp b/be/src/pipeline/exec/nested_loop_join_probe_operator.cpp index 9272418ca0..271891709b 100644 --- a/be/src/pipeline/exec/nested_loop_join_probe_operator.cpp +++ b/be/src/pipeline/exec/nested_loop_join_probe_operator.cpp @@ -111,6 +111,9 @@ void NestedLoopJoinProbeLocalState::_reset_with_next_probe_row() { void NestedLoopJoinProbeLocalState::add_tuple_is_null_column(vectorized::Block* block) { auto& p = _parent->cast(); + if (!p._use_specific_projections) { + return; + } if (p._is_outer_join) { auto p0 = _tuple_is_null_left_flag_column->assume_mutable(); auto p1 = _tuple_is_null_right_flag_column->assume_mutable(); @@ -436,7 +439,9 @@ NestedLoopJoinProbeOperatorX::NestedLoopJoinProbeOperatorX(ObjectPool* pool, con : JoinProbeOperatorX(pool, tnode, operator_id, descs), _is_output_left_side_only(tnode.nested_loop_join_node.__isset.is_output_left_side_only && tnode.nested_loop_join_node.is_output_left_side_only), - _old_version_flag(!tnode.__isset.nested_loop_join_node) {} + _old_version_flag(!tnode.__isset.nested_loop_join_node) { + _keep_origin = _is_output_left_side_only; +} Status NestedLoopJoinProbeOperatorX::init(const TPlanNode& tnode, RuntimeState* state) { RETURN_IF_ERROR(JoinProbeOperatorX::init(tnode, state)); diff --git a/be/src/pipeline/exec/nested_loop_join_probe_operator.h b/be/src/pipeline/exec/nested_loop_join_probe_operator.h index 770289f397..7a8be87d92 100644 --- a/be/src/pipeline/exec/nested_loop_join_probe_operator.h +++ b/be/src/pipeline/exec/nested_loop_join_probe_operator.h @@ -228,7 +228,7 @@ public: const RowDescriptor& row_desc() const override { return _old_version_flag ? (_output_row_descriptor ? *_output_row_descriptor : _row_descriptor) - : *_output_row_desc; + : (_output_row_descriptor ? *_output_row_descriptor : *_output_row_desc); } bool need_more_input_data(RuntimeState* state) const override; diff --git a/be/src/pipeline/pipeline_x/operator.cpp b/be/src/pipeline/pipeline_x/operator.cpp index 093bf17f46..6ee9ccb13c 100644 --- a/be/src/pipeline/pipeline_x/operator.cpp +++ b/be/src/pipeline/pipeline_x/operator.cpp @@ -172,9 +172,28 @@ void PipelineXLocalStateBase::clear_origin_block() { Status OperatorXBase::do_projections(RuntimeState* state, vectorized::Block* origin_block, vectorized::Block* output_block) const { - auto local_state = state->get_local_state(operator_id()); + auto* local_state = state->get_local_state(operator_id()); SCOPED_TIMER(local_state->exec_time_counter()); SCOPED_TIMER(local_state->_projection_timer); + + auto insert_column_datas = [&](auto& to, vectorized::ColumnPtr& from, size_t rows) { + if (to->is_nullable() && !from->is_nullable()) { + if (_keep_origin || !from->is_exclusive()) { + auto& null_column = reinterpret_cast(*to); + null_column.get_nested_column().insert_range_from(*from, 0, rows); + null_column.get_null_map_column().get_data().resize_fill(rows, 0); + } else { + to = make_nullable(from, false)->assume_mutable(); + } + } else { + if (_keep_origin || !from->is_exclusive()) { + to->insert_range_from(*from, 0, rows); + } else { + to = from->assume_mutable(); + } + } + }; + using namespace vectorized; vectorized::MutableBlock mutable_block = vectorized::VectorizedUtils::build_mutable_mem_reuse_block(output_block, @@ -189,14 +208,7 @@ Status OperatorXBase::do_projections(RuntimeState* state, vectorized::Block* ori RETURN_IF_ERROR(local_state->_projections[i]->execute(origin_block, &result_column_id)); auto column_ptr = origin_block->get_by_position(result_column_id) .column->convert_to_full_column_if_const(); - //TODO: this is a quick fix, we need a new function like "change_to_nullable" to do it - if (mutable_columns[i]->is_nullable() xor column_ptr->is_nullable()) { - DCHECK(mutable_columns[i]->is_nullable() && !column_ptr->is_nullable()); - reinterpret_cast(mutable_columns[i].get()) - ->insert_range_from_not_nullable(*column_ptr, 0, rows); - } else { - mutable_columns[i]->insert_range_from(*column_ptr, 0, rows); - } + insert_column_datas(mutable_columns[i], column_ptr, rows); } DCHECK(mutable_block.rows() == rows); output_block->set_columns(std::move(mutable_columns)); diff --git a/be/src/pipeline/pipeline_x/operator.h b/be/src/pipeline/pipeline_x/operator.h index 06ba93a36f..56991d4310 100644 --- a/be/src/pipeline/pipeline_x/operator.h +++ b/be/src/pipeline/pipeline_x/operator.h @@ -326,6 +326,10 @@ protected: std::string _op_name; bool _ignore_data_distribution = false; int _parallel_tasks = 0; + + //_keep_origin is used to avoid copying during projection, + // currently set to true only in the nestloop join. + bool _keep_origin = false; }; template diff --git a/be/src/vec/exec/join/vhash_join_node.cpp b/be/src/vec/exec/join/vhash_join_node.cpp index ea4fd92418..7f820eafea 100644 --- a/be/src/vec/exec/join/vhash_join_node.cpp +++ b/be/src/vec/exec/join/vhash_join_node.cpp @@ -563,6 +563,9 @@ Status HashJoinNode::get_next(RuntimeState* state, Block* output_block, bool* eo void HashJoinNode::_add_tuple_is_null_column(Block* block) { DCHECK(_is_outer_join); + if (!_use_specific_projections) { + return; + } auto p0 = _tuple_is_null_left_flag_column->assume_mutable(); auto p1 = _tuple_is_null_right_flag_column->assume_mutable(); auto& left_null_map = reinterpret_cast(*p0); diff --git a/be/src/vec/exec/join/vjoin_node_base.cpp b/be/src/vec/exec/join/vjoin_node_base.cpp index 6fab6b8b91..4697be19b8 100644 --- a/be/src/vec/exec/join/vjoin_node_base.cpp +++ b/be/src/vec/exec/join/vjoin_node_base.cpp @@ -80,7 +80,17 @@ VJoinNodeBase::VJoinNodeBase(ObjectPool* pool, const TPlanNode& tnode, const Des tnode.hash_join_node.is_mark), _short_circuit_for_null_in_build_side(_join_op == TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN && !_is_mark_join), - _runtime_filter_descs(tnode.runtime_filters) { + _runtime_filter_descs(tnode.runtime_filters), + _use_specific_projections( + tnode.__isset.hash_join_node + ? (tnode.hash_join_node.__isset.use_specific_projections + ? tnode.hash_join_node.use_specific_projections + : true) + : (tnode.nested_loop_join_node.__isset.use_specific_projections + ? tnode.nested_loop_join_node.use_specific_projections + : true) + + ) { _runtime_filters.resize(_runtime_filter_descs.size()); _init_join_op(); if (_is_mark_join) { @@ -95,14 +105,18 @@ VJoinNodeBase::VJoinNodeBase(ObjectPool* pool, const TPlanNode& tnode, const Des } if (tnode.__isset.hash_join_node) { - _output_row_desc.reset( - new RowDescriptor(descs, {tnode.hash_join_node.voutput_tuple_id}, {false})); + if (!_output_row_descriptor) { + _output_row_desc.reset( + new RowDescriptor(descs, {tnode.hash_join_node.voutput_tuple_id}, {false})); + } _intermediate_row_desc.reset(new RowDescriptor( descs, tnode.hash_join_node.vintermediate_tuple_id_list, std::vector(tnode.hash_join_node.vintermediate_tuple_id_list.size()))); } else if (tnode.__isset.nested_loop_join_node) { - _output_row_desc.reset( - new RowDescriptor(descs, {tnode.nested_loop_join_node.voutput_tuple_id}, {false})); + if (!_output_row_descriptor) { + _output_row_desc.reset(new RowDescriptor( + descs, {tnode.nested_loop_join_node.voutput_tuple_id}, {false})); + } _intermediate_row_desc.reset(new RowDescriptor( descs, tnode.nested_loop_join_node.vintermediate_tuple_id_list, std::vector(tnode.nested_loop_join_node.vintermediate_tuple_id_list.size()))); @@ -166,6 +180,13 @@ void VJoinNodeBase::_construct_mutable_join_block() { Status VJoinNodeBase::_build_output_block(Block* origin_block, Block* output_block, bool keep_origin) { SCOPED_TIMER(_build_output_block_timer); + if (!_projections.empty()) { + // In previous versions, the join node had a separate set of project structures, + // and you could see a 'todo' in the Thrift definition. + // Here, we have refactored it, but considering upgrade compatibility, we still need to retain the old code. + *output_block = *origin_block; + return Status::OK(); + } auto is_mem_reuse = output_block->mem_reuse(); MutableBlock mutable_block = is_mem_reuse diff --git a/be/src/vec/exec/join/vjoin_node_base.h b/be/src/vec/exec/join/vjoin_node_base.h index 0e6ac3c983..63a8bb20ba 100644 --- a/be/src/vec/exec/join/vjoin_node_base.h +++ b/be/src/vec/exec/join/vjoin_node_base.h @@ -63,7 +63,12 @@ public: Status open(RuntimeState* state) override; - const RowDescriptor& row_desc() const override { return *_output_row_desc; } + const RowDescriptor& row_desc() const override { + if (_output_row_descriptor) { + return *_output_row_descriptor; + } + return *_output_row_desc; + } const RowDescriptor& intermediate_row_desc() const override { return *_intermediate_row_desc; } @@ -152,6 +157,11 @@ protected: std::vector _runtime_filter_descs; std::vector _runtime_filters; + // In the Old planner, there is a plan for two columns of tuple is null, + // but in the Nereids planner, this logic does not exist. + // Therefore, we should not insert these two columns under the Nereids optimizer. + // use_specific_projections true, if output exprssions is denoted by srcExprList represents, o.w. PlanNode.projections + const bool _use_specific_projections = false; }; } // namespace doris::vectorized diff --git a/be/src/vec/exec/join/vnested_loop_join_node.cpp b/be/src/vec/exec/join/vnested_loop_join_node.cpp index 3548680bf4..d50171485a 100644 --- a/be/src/vec/exec/join/vnested_loop_join_node.cpp +++ b/be/src/vec/exec/join/vnested_loop_join_node.cpp @@ -102,6 +102,7 @@ Status VNestedLoopJoinNode::init(const TPlanNode& tnode, RuntimeState* state) { if (tnode.nested_loop_join_node.__isset.is_output_left_side_only) { _is_output_left_side_only = tnode.nested_loop_join_node.is_output_left_side_only; + _keep_origin = _is_output_left_side_only; } if (tnode.nested_loop_join_node.__isset.join_conjuncts && @@ -382,6 +383,9 @@ void VNestedLoopJoinNode::_resize_fill_tuple_is_null_column(size_t new_size, int } void VNestedLoopJoinNode::_add_tuple_is_null_column(Block* block) { + if (!_use_specific_projections) { + return; + } if (_is_outer_join) { auto p0 = _tuple_is_null_left_flag_column->assume_mutable(); auto p1 = _tuple_is_null_right_flag_column->assume_mutable(); diff --git a/be/src/vec/exec/join/vnested_loop_join_node.h b/be/src/vec/exec/join/vnested_loop_join_node.h index fd31b651bd..18bc901222 100644 --- a/be/src/vec/exec/join/vnested_loop_join_node.h +++ b/be/src/vec/exec/join/vnested_loop_join_node.h @@ -92,7 +92,7 @@ public: const RowDescriptor& row_desc() const override { return _old_version_flag ? (_output_row_descriptor ? *_output_row_descriptor : _row_descriptor) - : *_output_row_desc; + : (_output_row_descriptor ? *_output_row_descriptor : *_output_row_desc); } std::shared_ptr get_left_block() { return _left_block; } diff --git a/gensrc/thrift/PlanNodes.thrift b/gensrc/thrift/PlanNodes.thrift index da3643747f..7f3aab8e9a 100644 --- a/gensrc/thrift/PlanNodes.thrift +++ b/gensrc/thrift/PlanNodes.thrift @@ -777,6 +777,8 @@ struct THashJoinNode { 11: optional bool is_mark 12: optional TJoinDistributionType dist_type 13: optional list mark_join_conjuncts + // use_specific_projections true, if output exprssions is denoted by srcExprList represents, o.w. PlanNode.projections + 14: optional bool use_specific_projections } struct TNestedLoopJoinNode { @@ -798,6 +800,8 @@ struct TNestedLoopJoinNode { 8: optional list join_conjuncts 9: optional list mark_join_conjuncts + // use_specific_projections true, if output exprssions is denoted by srcExprList represents, o.w. PlanNode.projections + 10: optional bool use_specific_projections } struct TMergeJoinNode {