diff --git a/be/src/pipeline/exec/hashjoin_probe_operator.cpp b/be/src/pipeline/exec/hashjoin_probe_operator.cpp index cfe9feadeb..dd9d2451de 100644 --- a/be/src/pipeline/exec/hashjoin_probe_operator.cpp +++ b/be/src/pipeline/exec/hashjoin_probe_operator.cpp @@ -186,8 +186,8 @@ Status HashJoinProbeOperatorX::get_block(RuntimeState* state, vectorized::Block* local_state.init_for_probe(state); if (need_more_input_data(state)) { local_state._child_block->clear_column_data(); - RETURN_IF_ERROR(_child_x->get_block(state, local_state._child_block.get(), - local_state._child_source_state)); + RETURN_IF_ERROR(_child_x->get_next_after_projects(state, local_state._child_block.get(), + local_state._child_source_state)); source_state = local_state._child_source_state; if (local_state._child_block->rows() == 0 && local_state._child_source_state != SourceState::FINISHED) { diff --git a/be/src/pipeline/exec/hashjoin_probe_operator.h b/be/src/pipeline/exec/hashjoin_probe_operator.h index ce60dd1545..dc1402dce6 100644 --- a/be/src/pipeline/exec/hashjoin_probe_operator.h +++ b/be/src/pipeline/exec/hashjoin_probe_operator.h @@ -90,7 +90,8 @@ private: vectorized::ColumnUInt8::MutablePtr _null_map_column; // for cases when a probe row matches more than batch size build rows. bool _is_any_probe_match_row_output = false; - std::unique_ptr _process_hashtable_ctx_variants = nullptr; + std::unique_ptr _process_hashtable_ctx_variants = + std::make_unique(); RuntimeProfile::Counter* _probe_expr_call_timer; RuntimeProfile::Counter* _probe_next_timer; diff --git a/be/src/pipeline/exec/nested_loop_join_probe_operator.cpp b/be/src/pipeline/exec/nested_loop_join_probe_operator.cpp index 888d39dde3..b590bc3eb3 100644 --- a/be/src/pipeline/exec/nested_loop_join_probe_operator.cpp +++ b/be/src/pipeline/exec/nested_loop_join_probe_operator.cpp @@ -61,7 +61,7 @@ Status NestedLoopJoinProbeLocalState::init(RuntimeState* state, LocalStateInfo& } Status NestedLoopJoinProbeLocalState::close(RuntimeState* state) { - _left_block.clear(); + _child_block->clear(); vectorized::Blocks tmp_build_blocks; _shared_state->build_blocks.swap(tmp_build_blocks); @@ -137,14 +137,14 @@ Status NestedLoopJoinProbeLocalState::generate_join_block_data(RuntimeState* sta // We should try to join rows if there still are some rows from probe side. while (_join_block.rows() < state->batch_size()) { while (_current_build_pos == _shared_state->build_blocks.size() || - _left_block_pos == _left_block.rows()) { + _left_block_pos == _child_block->rows()) { // if left block is empty(), do not need disprocess the left block rows - if (_left_block.rows() > _left_block_pos) { + if (_child_block->rows() > _left_block_pos) { _left_side_process_count++; } _reset_with_next_probe_row(); - if (_left_block_pos < _left_block.rows()) { + if (_left_block_pos < _child_block->rows()) { if constexpr (set_probe_side_flag) { _probe_offset_stack.push(mutable_join_block.rows()); } @@ -305,14 +305,14 @@ void NestedLoopJoinProbeLocalState::_finalize_current_phase(vectorized::MutableB } else { if (!p._is_mark_join) { auto new_size = column_size; - DCHECK_LE(_left_block_start_pos + _left_side_process_count, _left_block.rows()); + DCHECK_LE(_left_block_start_pos + _left_side_process_count, _child_block->rows()); for (int j = _left_block_start_pos; j < _left_block_start_pos + _left_side_process_count; ++j) { if (_cur_probe_row_visited_flags[j] == IsSemi) { new_size++; for (size_t i = 0; i < p._num_probe_side_columns; ++i) { const vectorized::ColumnWithTypeAndName src_column = - _left_block.get_by_position(i); + _child_block->get_by_position(i); if (!src_column.column->is_nullable() && dst_columns[i]->is_nullable()) { DCHECK(p._join_op == TJoinOp::FULL_OUTER_JOIN); assert_cast(dst_columns[i].get()) @@ -341,13 +341,13 @@ void NestedLoopJoinProbeLocalState::_finalize_current_phase(vectorized::MutableB *dst_columns[dst_columns.size() - 1]) .get_data(); mark_data.reserve(mark_data.size() + _left_side_process_count); - DCHECK_LT(_left_block_pos, _left_block.rows()); + DCHECK_LT(_left_block_pos, _child_block->rows()); for (int j = _left_block_start_pos; j < _left_block_start_pos + _left_side_process_count; ++j) { mark_data.emplace_back(IsSemi != _cur_probe_row_visited_flags[j]); for (size_t i = 0; i < p._num_probe_side_columns; ++i) { const vectorized::ColumnWithTypeAndName src_column = - _left_block.get_by_position(i); + _child_block->get_by_position(i); DCHECK(p._join_op != TJoinOp::FULL_OUTER_JOIN); dst_columns[i]->insert_from(*src_column.column, j); } @@ -362,7 +362,7 @@ void NestedLoopJoinProbeLocalState::_append_left_data_with_null( auto& dst_columns = mutable_block.mutable_columns(); DCHECK(p._is_mark_join); for (size_t i = 0; i < p._num_probe_side_columns; ++i) { - const vectorized::ColumnWithTypeAndName& src_column = _left_block.get_by_position(i); + const vectorized::ColumnWithTypeAndName& src_column = _child_block->get_by_position(i); if (!src_column.column->is_nullable() && dst_columns[i]->is_nullable()) { auto origin_sz = dst_columns[i]->size(); DCHECK(p._join_op == TJoinOp::RIGHT_OUTER_JOIN || @@ -397,7 +397,7 @@ void NestedLoopJoinProbeLocalState::_process_left_child_block( auto& dst_columns = mutable_block.mutable_columns(); const int max_added_rows = now_process_build_block.rows(); for (size_t i = 0; i < p._num_probe_side_columns; ++i) { - const vectorized::ColumnWithTypeAndName& src_column = _left_block.get_by_position(i); + const vectorized::ColumnWithTypeAndName& src_column = _child_block->get_by_position(i); if (!src_column.column->is_nullable() && dst_columns[i]->is_nullable()) { auto origin_sz = dst_columns[i]->size(); DCHECK(p._join_op == TJoinOp::RIGHT_OUTER_JOIN || @@ -484,7 +484,7 @@ bool NestedLoopJoinProbeOperatorX::need_more_input_data(RuntimeState* state) con } Status NestedLoopJoinProbeOperatorX::push(doris::RuntimeState* state, vectorized::Block* block, - SourceState source_state) { + SourceState source_state) const { auto& local_state = state->get_local_state(id())->cast(); COUNTER_UPDATE(local_state._probe_rows_counter, block->rows()); local_state._cur_probe_row_visited_flags.resize(block->rows()); @@ -514,8 +514,8 @@ Status NestedLoopJoinProbeOperatorX::get_block(RuntimeState* state, vectorized:: auto& local_state = state->get_local_state(id())->cast(); if (need_more_input_data(state)) { local_state._child_block->clear_column_data(); - RETURN_IF_ERROR(_child_x->get_block(state, local_state._child_block.get(), - local_state._child_source_state)); + RETURN_IF_ERROR(_child_x->get_next_after_projects(state, local_state._child_block.get(), + local_state._child_source_state)); source_state = local_state._child_source_state; if (local_state._child_block->rows() == 0 && local_state._child_source_state != SourceState::FINISHED) { @@ -538,10 +538,10 @@ Status NestedLoopJoinProbeOperatorX::get_block(RuntimeState* state, vectorized:: } Status NestedLoopJoinProbeOperatorX::pull(RuntimeState* state, vectorized::Block* block, - SourceState& source_state) { + SourceState& source_state) const { auto& local_state = state->get_local_state(id())->cast(); if (_is_output_left_side_only) { - RETURN_IF_ERROR(local_state._build_output_block(&local_state._left_block, block)); + RETURN_IF_ERROR(local_state._build_output_block(local_state._child_block.get(), block)); source_state = local_state._shared_state->left_side_eos ? SourceState::FINISHED : source_state; local_state._need_more_input_data = !local_state._shared_state->left_side_eos; diff --git a/be/src/pipeline/exec/nested_loop_join_probe_operator.h b/be/src/pipeline/exec/nested_loop_join_probe_operator.h index 65f8f73593..c43e30e726 100644 --- a/be/src/pipeline/exec/nested_loop_join_probe_operator.h +++ b/be/src/pipeline/exec/nested_loop_join_probe_operator.h @@ -103,8 +103,8 @@ private: } if constexpr (SetProbeSideFlag) { int end = filter.size(); - for (int i = _left_block_pos == _left_block.rows() ? _left_block_pos - 1 - : _left_block_pos; + for (int i = _left_block_pos == _child_block->rows() ? _left_block_pos - 1 + : _left_block_pos; i >= _left_block_start_pos; i--) { int offset = 0; if (!_probe_offset_stack.empty()) { @@ -193,10 +193,6 @@ private: // Visited flags for current row in probe side. std::vector _cur_probe_row_visited_flags; size_t _current_build_pos = 0; - // _left_block must be cleared before calling get_next(). The child node - // does not initialize all tuple ptrs in the row, only the ones that it - // is responsible for. - vectorized::Block _left_block; vectorized::MutableColumns _dst_columns; std::stack _build_offset_stack; std::stack _probe_offset_stack; @@ -217,9 +213,10 @@ public: Status get_block(RuntimeState* state, vectorized::Block* block, SourceState& source_state) override; - Status push(RuntimeState* state, vectorized::Block* input_block, SourceState source_state); + Status push(RuntimeState* state, vectorized::Block* input_block, + SourceState source_state) const; Status pull(doris::RuntimeState* state, vectorized::Block* output_block, - SourceState& source_state); + SourceState& source_state) const; const RowDescriptor& intermediate_row_desc() const override { return _old_version_flag ? _row_descriptor : *_intermediate_row_desc; } diff --git a/be/src/pipeline/pipeline_x/dependency.h b/be/src/pipeline/pipeline_x/dependency.h index 6fc8544899..d3350d7814 100644 --- a/be/src/pipeline/pipeline_x/dependency.h +++ b/be/src/pipeline/pipeline_x/dependency.h @@ -220,7 +220,8 @@ struct HashJoinSharedState : public JoinSharedState { vectorized::Sizes probe_key_sz; const std::vector build_side_child_desc; size_t build_exprs_size = 0; - std::shared_ptr> build_blocks; + std::shared_ptr> build_blocks = + std::make_shared>(); }; class HashJoinDependency final : public Dependency { diff --git a/be/src/pipeline/pipeline_x/operator.cpp b/be/src/pipeline/pipeline_x/operator.cpp index eb333df533..a8d786496f 100644 --- a/be/src/pipeline/pipeline_x/operator.cpp +++ b/be/src/pipeline/pipeline_x/operator.cpp @@ -112,7 +112,7 @@ void PipelineXLocalStateBase::clear_origin_block() { } Status OperatorXBase::do_projections(RuntimeState* state, vectorized::Block* origin_block, - vectorized::Block* output_block) { + vectorized::Block* output_block) const { auto local_state = state->get_local_state(id()); SCOPED_TIMER(local_state->_projection_timer); using namespace vectorized; diff --git a/be/src/pipeline/pipeline_x/operator.h b/be/src/pipeline/pipeline_x/operator.h index e855ad2ec0..3a66042813 100644 --- a/be/src/pipeline/pipeline_x/operator.h +++ b/be/src/pipeline/pipeline_x/operator.h @@ -225,7 +225,7 @@ public: /// Only use in vectorized exec engine try to do projections to trans _row_desc -> _output_row_desc Status do_projections(RuntimeState* state, vectorized::Block* origin_block, - vectorized::Block* output_block); + vectorized::Block* output_block) const; protected: template