diff --git a/be/src/exec/exec_node.cpp b/be/src/exec/exec_node.cpp index 4681218dca..7d38ee5e65 100644 --- a/be/src/exec/exec_node.cpp +++ b/be/src/exec/exec_node.cpp @@ -561,6 +561,7 @@ Status ExecNode::do_projections(vectorized::Block* origin_block, vectorized::Blo } } DCHECK(mutable_block.rows() == rows); + output_block->set_columns(std::move(mutable_columns)); } return Status::OK(); diff --git a/be/src/olap/rowset/segment_v2/segment_writer.cpp b/be/src/olap/rowset/segment_v2/segment_writer.cpp index 713448e1aa..a36b83f23b 100644 --- a/be/src/olap/rowset/segment_v2/segment_writer.cpp +++ b/be/src/olap/rowset/segment_v2/segment_writer.cpp @@ -519,6 +519,7 @@ Status SegmentWriter::append_block_with_partial_content(const vectorized::Block* auto mutable_full_columns = full_block.mutate_columns(); RETURN_IF_ERROR(fill_missing_columns(mutable_full_columns, use_default_or_null_flag, has_default_or_nullable, segment_start_pos)); + full_block.set_columns(std::move(mutable_full_columns)); // row column should be filled here if (_tablet_schema->store_row_column()) { // convert block to row store format @@ -586,7 +587,6 @@ Status SegmentWriter::fill_missing_columns(vectorized::MutableColumns& mutable_f const auto& cids_missing = _opts.rowset_ctx->partial_update_info->missing_cids; auto old_value_block = _tablet_schema->create_block_by_cids(cids_missing); CHECK(cids_missing.size() == old_value_block.columns()); - auto mutable_old_columns = old_value_block.mutate_columns(); bool has_row_column = _tablet_schema->store_row_column(); // record real pos, key is input line num, value is old_block line num std::map read_index; @@ -609,6 +609,7 @@ Status SegmentWriter::fill_missing_columns(vectorized::MutableColumns& mutable_f } continue; } + auto mutable_old_columns = old_value_block.mutate_columns(); for (size_t cid = 0; cid < mutable_old_columns.size(); ++cid) { TabletColumn tablet_column = _tablet_schema->column(cids_missing[cid]); auto st = tablet->fetch_value_by_rowids(rowset, seg_it.first, rids, tablet_column, @@ -619,6 +620,7 @@ Status SegmentWriter::fill_missing_columns(vectorized::MutableColumns& mutable_f return st; } } + old_value_block.set_columns(std::move(mutable_old_columns)); } } // build default value columns diff --git a/be/src/olap/tablet.cpp b/be/src/olap/tablet.cpp index a5f34633d7..5f915ad1de 100644 --- a/be/src/olap/tablet.cpp +++ b/be/src/olap/tablet.cpp @@ -3199,6 +3199,7 @@ Status Tablet::generate_new_block_for_partial_update( read_index_update[idx]); } } + output_block->set_columns(std::move(full_mutable_columns)); VLOG_DEBUG << "full block when publish: " << output_block->dump_data(); return Status::OK(); } @@ -3244,6 +3245,7 @@ Status Tablet::read_columns_by_plan(TabletSchemaSPtr tablet_schema, } } } + block.set_columns(std::move(mutable_columns)); return Status::OK(); } diff --git a/be/src/pipeline/exec/join_probe_operator.cpp b/be/src/pipeline/exec/join_probe_operator.cpp index 6b6796767e..4e94b4a206 100644 --- a/be/src/pipeline/exec/join_probe_operator.cpp +++ b/be/src/pipeline/exec/join_probe_operator.cpp @@ -134,9 +134,8 @@ Status JoinProbeLocalState::_build_output_block( } } - if (!is_mem_reuse || !keep_origin) { - output_block->swap(mutable_block.to_block()); - } + output_block->swap(mutable_block.to_block()); + DCHECK(output_block->rows() == rows); } diff --git a/be/src/pipeline/exec/nested_loop_join_probe_operator.cpp b/be/src/pipeline/exec/nested_loop_join_probe_operator.cpp index b7477d0b4f..1e3de36e92 100644 --- a/be/src/pipeline/exec/nested_loop_join_probe_operator.cpp +++ b/be/src/pipeline/exec/nested_loop_join_probe_operator.cpp @@ -130,7 +130,6 @@ Status NestedLoopJoinProbeLocalState::generate_join_block_data(RuntimeState* sta _left_side_process_count = 0; DCHECK(!_need_more_input_data || !_matched_rows_done); - vectorized::MutableBlock mutable_join_block(&_join_block); if (!_matched_rows_done && !_need_more_input_data) { // We should try to join rows if there still are some rows from probe side. while (_join_block.rows() < state->batch_size()) { @@ -144,7 +143,7 @@ Status NestedLoopJoinProbeLocalState::generate_join_block_data(RuntimeState* sta _reset_with_next_probe_row(); if (_left_block_pos < _child_block->rows()) { if constexpr (set_probe_side_flag) { - _probe_offset_stack.push(mutable_join_block.rows()); + _probe_offset_stack.push(_join_block.rows()); } } else { if (_shared_state->left_side_eos) { @@ -163,9 +162,9 @@ Status NestedLoopJoinProbeLocalState::generate_join_block_data(RuntimeState* sta const auto& now_process_build_block = _shared_state->build_blocks[_current_build_pos++]; if constexpr (set_build_side_flag) { - _build_offset_stack.push(mutable_join_block.rows()); + _build_offset_stack.push(_join_block.rows()); } - _process_left_child_block(mutable_join_block, now_process_build_block); + _process_left_child_block(_join_block, now_process_build_block); } if constexpr (set_probe_side_flag) { @@ -178,21 +177,20 @@ Status NestedLoopJoinProbeLocalState::generate_join_block_data(RuntimeState* sta if (!status.ok()) { return status; } - mutable_join_block = vectorized::MutableBlock(&_join_block); // If this join operation is left outer join or full outer join, when // `_left_side_process_count`, means all rows from build // side have been joined with _left_side_process_count, we should output current // probe row with null from build side. if (_left_side_process_count) { _finalize_current_phase( - mutable_join_block, state->batch_size()); + _join_block, state->batch_size()); } } if (_left_side_process_count) { if (p._is_mark_join && _shared_state->build_blocks.empty()) { DCHECK_EQ(JoinOpType::value, TJoinOp::CROSS_JOIN); - _append_left_data_with_null(mutable_join_block); + _append_left_data_with_null(_join_block); } } } @@ -204,7 +202,6 @@ Status NestedLoopJoinProbeLocalState::generate_join_block_data(RuntimeState* sta set_probe_side_flag, ignore_null>( &_join_block, !p._is_right_semi_anti))); _update_additional_flags(&_join_block); - mutable_join_block = vectorized::MutableBlock(&_join_block); if (!status.ok()) { return status; } @@ -214,7 +211,7 @@ Status NestedLoopJoinProbeLocalState::generate_join_block_data(RuntimeState* sta if (_matched_rows_done && _output_null_idx_build_side < _shared_state->build_blocks.size()) { _finalize_current_phase( - mutable_join_block, state->batch_size()); + _join_block, state->batch_size()); } } return Status::OK(); @@ -235,10 +232,10 @@ void NestedLoopJoinProbeLocalState::_resize_fill_tuple_is_null_column(size_t new } template -void NestedLoopJoinProbeLocalState::_finalize_current_phase(vectorized::MutableBlock& mutable_block, +void NestedLoopJoinProbeLocalState::_finalize_current_phase(vectorized::Block& block, size_t batch_size) { auto& p = _parent->cast(); - auto& dst_columns = mutable_block.mutable_columns(); + auto dst_columns = block.mutate_columns(); DCHECK_GT(dst_columns.size(), 0); auto column_size = dst_columns[0]->size(); if constexpr (BuildSide) { @@ -355,12 +352,12 @@ void NestedLoopJoinProbeLocalState::_finalize_current_phase(vectorized::MutableB _resize_fill_tuple_is_null_column(_left_side_process_count, 0, 1); } } + block.set_columns(std::move(dst_columns)); } -void NestedLoopJoinProbeLocalState::_append_left_data_with_null( - vectorized::MutableBlock& mutable_block) const { +void NestedLoopJoinProbeLocalState::_append_left_data_with_null(vectorized::Block& block) const { auto& p = _parent->cast(); - auto& dst_columns = mutable_block.mutable_columns(); + auto dst_columns = block.mutate_columns(); DCHECK(p._is_mark_join); for (size_t i = 0; i < p._num_probe_side_columns; ++i) { const vectorized::ColumnWithTypeAndName& src_column = _child_block->get_by_position(i); @@ -387,13 +384,13 @@ void NestedLoopJoinProbeLocalState::_append_left_data_with_null( auto& mark_column = *dst_columns[dst_columns.size() - 1]; vectorized::ColumnFilterHelper(mark_column) .resize_fill(mark_column.size() + _left_side_process_count, 0); + block.set_columns(std::move(dst_columns)); } void NestedLoopJoinProbeLocalState::_process_left_child_block( - vectorized::MutableBlock& mutable_block, - const vectorized::Block& now_process_build_block) const { + vectorized::Block& block, const vectorized::Block& now_process_build_block) const { auto& p = _parent->cast(); - auto& dst_columns = mutable_block.mutable_columns(); + auto dst_columns = block.mutate_columns(); const int max_added_rows = now_process_build_block.rows(); for (size_t i = 0; i < p._num_probe_side_columns; ++i) { const vectorized::ColumnWithTypeAndName& src_column = _child_block->get_by_position(i); @@ -434,6 +431,7 @@ void NestedLoopJoinProbeLocalState::_process_left_child_block( 0, max_added_rows); } } + block.set_columns(std::move(dst_columns)); } NestedLoopJoinProbeOperatorX::NestedLoopJoinProbeOperatorX(ObjectPool* pool, const TPlanNode& tnode, diff --git a/be/src/pipeline/exec/nested_loop_join_probe_operator.h b/be/src/pipeline/exec/nested_loop_join_probe_operator.h index bc8913f5d0..525640beb4 100644 --- a/be/src/pipeline/exec/nested_loop_join_probe_operator.h +++ b/be/src/pipeline/exec/nested_loop_join_probe_operator.h @@ -82,11 +82,11 @@ private: friend class NestedLoopJoinProbeOperatorX; void _update_additional_flags(vectorized::Block* block); template - void _finalize_current_phase(vectorized::MutableBlock& mutable_block, size_t batch_size); + void _finalize_current_phase(vectorized::Block& block, size_t batch_size); void _resize_fill_tuple_is_null_column(size_t new_size, int left_flag, int right_flag); void _reset_with_next_probe_row(); - void _append_left_data_with_null(vectorized::MutableBlock& mutable_block) const; - void _process_left_child_block(vectorized::MutableBlock& mutable_block, + void _append_left_data_with_null(vectorized::Block& block) const; + void _process_left_child_block(vectorized::Block& block, const vectorized::Block& now_process_build_block) const; template void _do_filtering_and_update_visited_flags_impl(vectorized::Block* block, int column_to_keep, diff --git a/be/src/pipeline/pipeline_x/operator.cpp b/be/src/pipeline/pipeline_x/operator.cpp index 39598dadbd..b90c955142 100644 --- a/be/src/pipeline/pipeline_x/operator.cpp +++ b/be/src/pipeline/pipeline_x/operator.cpp @@ -187,6 +187,7 @@ Status OperatorXBase::do_projections(RuntimeState* state, vectorized::Block* ori } } DCHECK(mutable_block.rows() == rows); + output_block->set_columns(std::move(mutable_columns)); } return Status::OK(); diff --git a/be/src/vec/common/sort/sorter.cpp b/be/src/vec/common/sort/sorter.cpp index 40e51ce20c..bd095e1afc 100644 --- a/be/src/vec/common/sort/sorter.cpp +++ b/be/src/vec/common/sort/sorter.cpp @@ -186,6 +186,7 @@ Status MergeSorterState::_merge_sort_read_not_spilled(int batch_size, if (merged_rows == batch_size) break; } + block->set_columns(std::move(merged_columns)); if (merged_rows == 0) { *eos = true; diff --git a/be/src/vec/core/block.cpp b/be/src/vec/core/block.cpp index 544a558afc..c2594a2e35 100644 --- a/be/src/vec/core/block.cpp +++ b/be/src/vec/core/block.cpp @@ -577,7 +577,7 @@ MutableColumns Block::mutate_columns() { size_t num_columns = data.size(); MutableColumns columns(num_columns); for (size_t i = 0; i < num_columns; ++i) { - columns[i] = data[i].column ? (*std::move(data[i].column)).assume_mutable() + columns[i] = data[i].column ? (*std::move(data[i].column)).mutate() : data[i].type->create_column(); } return columns; @@ -716,7 +716,7 @@ void Block::swap(Block& other) noexcept { void Block::swap(Block&& other) noexcept { clear(); data = std::move(other.data); - initialize_index_by_name(); + index_by_name = std::move(other.index_by_name); row_same_bit = std::move(other.row_same_bit); } @@ -936,7 +936,7 @@ void MutableBlock::swap(MutableBlock& another) noexcept { _columns.swap(another._columns); _data_types.swap(another._data_types); _names.swap(another._names); - initialize_index_by_name(); + index_by_name.swap(another.index_by_name); } void MutableBlock::swap(MutableBlock&& another) noexcept { @@ -944,7 +944,7 @@ void MutableBlock::swap(MutableBlock&& another) noexcept { _columns = std::move(another._columns); _data_types = std::move(another._data_types); _names = std::move(another._names); - initialize_index_by_name(); + index_by_name = std::move(another.index_by_name); } void MutableBlock::add_row(const Block* block, int row) { @@ -1027,6 +1027,7 @@ Block MutableBlock::to_block(int start_column) { Block MutableBlock::to_block(int start_column, int end_column) { ColumnsWithTypeAndName columns_with_schema; + columns_with_schema.reserve(end_column - start_column); for (size_t i = start_column; i < end_column; ++i) { columns_with_schema.emplace_back(std::move(_columns[i]), _data_types[i], _names[i]); } diff --git a/be/src/vec/exec/format/csv/csv_reader.cpp b/be/src/vec/exec/format/csv/csv_reader.cpp index 0d93fcd648..9f8c01b10d 100644 --- a/be/src/vec/exec/format/csv/csv_reader.cpp +++ b/be/src/vec/exec/format/csv/csv_reader.cpp @@ -479,10 +479,11 @@ Status CsvReader::get_next_block(Block* block, size_t* read_rows, bool* eof) { RETURN_IF_ERROR(_validate_line(Slice(ptr, size), &success)); ++rows; } - for (auto& col : block->mutate_columns()) { + auto mutate_columns = block->mutate_columns(); + for (auto& col : mutate_columns) { col->resize(rows); } - + block->set_columns(std::move(mutate_columns)); } else { auto columns = block->mutate_columns(); while (rows < batch_size && !_line_reader_eof) { @@ -504,6 +505,7 @@ Status CsvReader::get_next_block(Block* block, size_t* read_rows, bool* eof) { } RETURN_IF_ERROR(_fill_dest_columns(Slice(ptr, size), block, columns, &rows)); } + block->set_columns(std::move(columns)); } *eof = (rows == 0); diff --git a/be/src/vec/exec/format/orc/vorc_reader.cpp b/be/src/vec/exec/format/orc/vorc_reader.cpp index c57d380762..12a889ec23 100644 --- a/be/src/vec/exec/format/orc/vorc_reader.cpp +++ b/be/src/vec/exec/format/orc/vorc_reader.cpp @@ -1412,11 +1412,11 @@ Status OrcReader::get_next_block(Block* block, size_t* read_rows, bool* eof) { auto rows = std::min(get_remaining_rows(), (int64_t)_batch_size); set_remaining_rows(get_remaining_rows() - rows); - - for (auto& col : block->mutate_columns()) { + auto mutate_columns = block->mutate_columns(); + for (auto& col : mutate_columns) { col->resize(rows); } - + block->set_columns(std::move(mutate_columns)); *read_rows = rows; if (get_remaining_rows() == 0) { *eof = true; diff --git a/be/src/vec/exec/format/parquet/vparquet_reader.cpp b/be/src/vec/exec/format/parquet/vparquet_reader.cpp index 87555f6ee8..6ae4ea2f5b 100644 --- a/be/src/vec/exec/format/parquet/vparquet_reader.cpp +++ b/be/src/vec/exec/format/parquet/vparquet_reader.cpp @@ -528,10 +528,11 @@ Status ParquetReader::get_next_block(Block* block, size_t* read_rows, bool* eof) _current_group_reader->set_remaining_rows(_current_group_reader->get_remaining_rows() - rows); - - for (auto& col : block->mutate_columns()) { + auto mutate_columns = block->mutate_columns(); + for (auto& col : mutate_columns) { col->resize(rows); } + block->set_columns(std::move(mutate_columns)); *read_rows = rows; if (_current_group_reader->get_remaining_rows() == 0) { diff --git a/be/src/vec/exec/format/table/iceberg_reader.cpp b/be/src/vec/exec/format/table/iceberg_reader.cpp index 5ad1bdd315..8c05b8c2a0 100644 --- a/be/src/vec/exec/format/table/iceberg_reader.cpp +++ b/be/src/vec/exec/format/table/iceberg_reader.cpp @@ -143,9 +143,11 @@ Status IcebergTableReader::get_next_block(Block* block, size_t* read_rows, bool* auto rows = std::min(_remaining_push_down_count, (int64_t)_state->query_options().batch_size); _remaining_push_down_count -= rows; - for (auto& col : block->mutate_columns()) { + auto mutate_columns = block->mutate_columns(); + for (auto& col : mutate_columns) { col->resize(rows); } + block->set_columns(std::move(mutate_columns)); *read_rows = rows; if (_remaining_push_down_count == 0) { *eof = true; diff --git a/be/src/vec/exec/join/vjoin_node_base.cpp b/be/src/vec/exec/join/vjoin_node_base.cpp index 0077fe2a7b..656810ba7b 100644 --- a/be/src/vec/exec/join/vjoin_node_base.cpp +++ b/be/src/vec/exec/join/vjoin_node_base.cpp @@ -177,6 +177,7 @@ Status VJoinNodeBase::_build_output_block(Block* origin_block, Block* output_blo } } }; + if (rows != 0) { auto& mutable_columns = mutable_block.mutable_columns(); if (_output_expr_ctxs.empty()) { @@ -207,9 +208,7 @@ Status VJoinNodeBase::_build_output_block(Block* origin_block, Block* output_blo } } - if (!is_mem_reuse || !keep_origin) { - output_block->swap(mutable_block.to_block()); - } + output_block->swap(mutable_block.to_block()); DCHECK(output_block->rows() == rows); } diff --git a/be/src/vec/exec/join/vnested_loop_join_node.cpp b/be/src/vec/exec/join/vnested_loop_join_node.cpp index 7d8100aa6c..08d4c7d448 100644 --- a/be/src/vec/exec/join/vnested_loop_join_node.cpp +++ b/be/src/vec/exec/join/vnested_loop_join_node.cpp @@ -278,8 +278,8 @@ Status VNestedLoopJoinNode::get_next(RuntimeState* state, Block* block, bool* eo return pull(state, block, eos); } -void VNestedLoopJoinNode::_append_left_data_with_null(MutableBlock& mutable_block) const { - auto& dst_columns = mutable_block.mutable_columns(); +void VNestedLoopJoinNode::_append_left_data_with_null(Block& block) const { + auto dst_columns = block.mutate_columns(); DCHECK(_is_mark_join); for (size_t i = 0; i < _num_probe_side_columns; ++i) { const ColumnWithTypeAndName& src_column = _left_block->get_by_position(i); @@ -305,11 +305,12 @@ void VNestedLoopJoinNode::_append_left_data_with_null(MutableBlock& mutable_bloc auto& mark_column = *dst_columns[dst_columns.size() - 1]; ColumnFilterHelper(mark_column).resize_fill(mark_column.size() + _left_side_process_count, 0); + block.set_columns(std::move(dst_columns)); } -void VNestedLoopJoinNode::_process_left_child_block(MutableBlock& mutable_block, +void VNestedLoopJoinNode::_process_left_child_block(Block& block, const Block& now_process_build_block) const { - auto& dst_columns = mutable_block.mutable_columns(); + auto dst_columns = block.mutate_columns(); const int max_added_rows = now_process_build_block.rows(); for (size_t i = 0; i < _num_probe_side_columns; ++i) { const ColumnWithTypeAndName& src_column = _left_block->get_by_position(i); @@ -345,6 +346,7 @@ void VNestedLoopJoinNode::_process_left_child_block(MutableBlock& mutable_block, max_added_rows); } } + block.set_columns(std::move(dst_columns)); } void VNestedLoopJoinNode::_update_additional_flags(Block* block) { @@ -395,8 +397,8 @@ void VNestedLoopJoinNode::_add_tuple_is_null_column(Block* block) { } template -void VNestedLoopJoinNode::_finalize_current_phase(MutableBlock& mutable_block, size_t batch_size) { - auto& dst_columns = mutable_block.mutable_columns(); +void VNestedLoopJoinNode::_finalize_current_phase(Block& block, size_t batch_size) { + auto dst_columns = block.mutate_columns(); DCHECK_GT(dst_columns.size(), 0); auto column_size = dst_columns[0]->size(); if constexpr (BuildSide) { @@ -508,6 +510,7 @@ void VNestedLoopJoinNode::_finalize_current_phase(MutableBlock& mutable_block, s _resize_fill_tuple_is_null_column(_left_side_process_count, 0, 1); } } + block.set_columns(std::move(dst_columns)); } void VNestedLoopJoinNode::_reset_with_next_probe_row() { diff --git a/be/src/vec/exec/join/vnested_loop_join_node.h b/be/src/vec/exec/join/vnested_loop_join_node.h index 810bf57e7f..7326485927 100644 --- a/be/src/vec/exec/join/vnested_loop_join_node.h +++ b/be/src/vec/exec/join/vnested_loop_join_node.h @@ -115,7 +115,6 @@ private: _left_side_process_count = 0; DCHECK(!_need_more_input_data || !_matched_rows_done); - MutableBlock mutable_join_block(&_join_block); if (!_matched_rows_done && !_need_more_input_data) { // We should try to join rows if there still are some rows from probe side. while (_join_block.rows() < state->batch_size()) { @@ -129,7 +128,7 @@ private: _reset_with_next_probe_row(); if (_left_block_pos < _left_block->rows()) { if constexpr (set_probe_side_flag) { - _probe_offset_stack.push(mutable_join_block.rows()); + _probe_offset_stack.push(_join_block.rows()); } } else { if (_left_side_eos) { @@ -148,9 +147,9 @@ private: const auto& now_process_build_block = _build_blocks[_current_build_pos++]; if constexpr (set_build_side_flag) { - _build_offset_stack.push(mutable_join_block.rows()); + _build_offset_stack.push(_join_block.rows()); } - _process_left_child_block(mutable_join_block, now_process_build_block); + _process_left_child_block(_join_block, now_process_build_block); } if constexpr (set_probe_side_flag) { @@ -163,21 +162,20 @@ private: if (!status.ok()) { return status; } - mutable_join_block = MutableBlock(&_join_block); // If this join operation is left outer join or full outer join, when // `_left_side_process_count`, means all rows from build // side have been joined with _left_side_process_count, we should output current // probe row with null from build side. if (_left_side_process_count) { _finalize_current_phase( - mutable_join_block, state->batch_size()); + _join_block, state->batch_size()); } } if (_left_side_process_count) { if (_is_mark_join && _build_blocks.empty()) { DCHECK_EQ(JoinOpType::value, TJoinOp::CROSS_JOIN); - _append_left_data_with_null(mutable_join_block); + _append_left_data_with_null(_join_block); } } } @@ -189,7 +187,6 @@ private: set_build_side_flag, set_probe_side_flag, ignore_null>( &_join_block, !_is_right_semi_anti))); _update_additional_flags(&_join_block); - mutable_join_block = MutableBlock(&_join_block); if (!status.ok()) { return status; } @@ -198,7 +195,7 @@ private: if constexpr (set_build_side_flag) { if (_matched_rows_done && _output_null_idx_build_side < _build_blocks.size()) { _finalize_current_phase( - mutable_join_block, state->batch_size()); + _join_block, state->batch_size()); } } return Status::OK(); @@ -208,8 +205,7 @@ private: // Processes a block from the left child. // dst_columns: left_child_row and now_process_build_block to construct a bundle column of new block // now_process_build_block: right child block now to process - void _process_left_child_block(MutableBlock& mutable_block, - const Block& now_process_build_block) const; + void _process_left_child_block(Block& block, const Block& now_process_build_block) const; template Status _do_filtering_and_update_visited_flags(Block* block, bool materialize); @@ -221,7 +217,7 @@ private: bool materialize, Filter& filter); template - void _finalize_current_phase(MutableBlock& mutable_block, size_t batch_size); + void _finalize_current_phase(Block& block, size_t batch_size); void _reset_with_next_probe_row(); @@ -238,7 +234,7 @@ private: // For mark join, if the relation from right side is empty, we should construct intermediate // block with data from left side and filled with null for right side - void _append_left_data_with_null(MutableBlock& mutable_block) const; + void _append_left_data_with_null(Block& block) const; // List of build blocks, constructed in prepare() Blocks _build_blocks; diff --git a/be/src/vec/exec/scan/pip_scanner_context.h b/be/src/vec/exec/scan/pip_scanner_context.h index 309aed96a8..42726cb17f 100644 --- a/be/src/vec/exec/scan/pip_scanner_context.h +++ b/be/src/vec/exec/scan/pip_scanner_context.h @@ -106,6 +106,7 @@ public: static_cast(m.merge(*merge_block)); return_free_block(std::move(merge_block)); } + (*block)->set_columns(std::move(m.mutable_columns())); } return Status::OK(); diff --git a/be/src/vec/exec/scan/scanner_context.cpp b/be/src/vec/exec/scan/scanner_context.cpp index 5ad2dbec5b..4d3820d763 100644 --- a/be/src/vec/exec/scan/scanner_context.cpp +++ b/be/src/vec/exec/scan/scanner_context.cpp @@ -339,6 +339,7 @@ Status ScannerContext::get_block_from_queue(RuntimeState* state, vectorized::Blo static_cast(m.merge(*merge_block)); return_free_block(std::move(merge_block)); } + (*block)->set_columns(std::move(m.mutable_columns())); } return Status::OK(); diff --git a/be/src/vec/exec/scan/scanner_scheduler.cpp b/be/src/vec/exec/scan/scanner_scheduler.cpp index e8d7f8a713..42285f1d22 100644 --- a/be/src/vec/exec/scan/scanner_scheduler.cpp +++ b/be/src/vec/exec/scan/scanner_scheduler.cpp @@ -374,7 +374,9 @@ void ScannerScheduler::_scanner_scan(ScannerScheduler* scheduler, ctx->return_free_block(std::move(block)); } else { if (!blocks.empty() && blocks.back()->rows() + block->rows() <= state->batch_size()) { - static_cast(vectorized::MutableBlock(blocks.back().get()).merge(*block)); + vectorized::MutableBlock mutable_block(blocks.back().get()); + static_cast(mutable_block.merge(*block)); + blocks.back().get()->set_columns(std::move(mutable_block.mutable_columns())); ctx->return_free_block(std::move(block)); } else { blocks.push_back(std::move(block)); diff --git a/be/src/vec/exec/vpartition_sort_node.h b/be/src/vec/exec/vpartition_sort_node.h index acf18bad7b..d369846df5 100644 --- a/be/src/vec/exec/vpartition_sort_node.h +++ b/be/src/vec/exec/vpartition_sort_node.h @@ -58,6 +58,7 @@ public: for (int i = 0; i < mutable_columns.size(); ++i) { columns[i]->append_data_by_selector(mutable_columns[i], selector); } + blocks.back()->set_columns(std::move(mutable_columns)); init_rows = init_rows - selector.size(); total_rows = total_rows + selector.size(); selector.clear(); diff --git a/be/src/vec/exec/vrepeat_node.cpp b/be/src/vec/exec/vrepeat_node.cpp index 1765c1dcf7..717c0c28d0 100644 --- a/be/src/vec/exec/vrepeat_node.cpp +++ b/be/src/vec/exec/vrepeat_node.cpp @@ -167,7 +167,7 @@ Status VRepeatNode::get_repeated_block(Block* child_block, int repeat_id_idx, Bl } cur_col++; } - + output_block->set_columns(std::move(columns)); DCHECK_EQ(cur_col, column_size); return Status::OK(); diff --git a/be/src/vec/exec/vtable_function_node.cpp b/be/src/vec/exec/vtable_function_node.cpp index b7302aedcd..8affd4cbe7 100644 --- a/be/src/vec/exec/vtable_function_node.cpp +++ b/be/src/vec/exec/vtable_function_node.cpp @@ -218,7 +218,7 @@ Status VTableFunctionNode::_get_expanded_block(RuntimeState* state, Block* outpu for (auto index : _useless_slot_indexs) { columns[index]->insert_many_defaults(row_size - columns[index]->size()); } - + output_block->set_columns(std::move(columns)); // 3. eval conjuncts RETURN_IF_ERROR(VExprContext::filter_block(_conjuncts, output_block, output_block->columns())); diff --git a/be/src/vec/exec/vunion_node.cpp b/be/src/vec/exec/vunion_node.cpp index 884b66347b..8ee258ca05 100644 --- a/be/src/vec/exec/vunion_node.cpp +++ b/be/src/vec/exec/vunion_node.cpp @@ -205,6 +205,7 @@ Status VUnionNode::get_next_materialized(RuntimeState* state, Block* block) { ++_child_idx; } } + block->set_columns(std::move(mblock.mutable_columns())); DCHECK_LE(_child_idx, _children.size()); return Status::OK(); @@ -233,6 +234,7 @@ Status VUnionNode::get_next_const(RuntimeState* state, Block* block) { tmp_block.clear(); } } + block->set_columns(std::move(mblock.mutable_columns())); // some insert query like "insert into string_test select 1, repeat('a', 1024 * 1024);" // the const expr will be in output expr cause the union node return a empty block. so here we @@ -256,6 +258,8 @@ Status VUnionNode::materialize_child_block(RuntimeState* state, int child_id, Block res; RETURN_IF_ERROR(materialize_block(input_block, child_id, &res)); RETURN_IF_ERROR(mblock.merge(res)); + + output_block->set_columns(std::move(mblock.mutable_columns())); } return Status::OK(); } diff --git a/be/src/vec/olap/block_reader.cpp b/be/src/vec/olap/block_reader.cpp index 9c15302af6..82604ac408 100644 --- a/be/src/vec/olap/block_reader.cpp +++ b/be/src/vec/olap/block_reader.cpp @@ -337,6 +337,7 @@ Status BlockReader::_agg_key_next_block(Block* block, bool* eof) { _agg_data_counters.push_back(_last_agg_data_counter); _last_agg_data_counter = 0; _update_agg_data(target_columns); + block->set_columns(std::move(target_columns)); _merged_rows += merged_row; return Status::OK(); @@ -410,17 +411,20 @@ Status BlockReader::_unique_key_next_block(Block* block, bool* eof) { } } } - + auto target_columns_size = target_columns.size(); ColumnWithTypeAndName column_with_type_and_name {_delete_filter_column, std::make_shared(), "__DORIS_COMPACTION_FILTER__"}; + block->set_columns(std::move(target_columns)); block->insert(column_with_type_and_name); - RETURN_IF_ERROR(Block::filter_block(block, target_columns.size(), target_columns.size())); + RETURN_IF_ERROR(Block::filter_block(block, target_columns_size, target_columns_size)); _stats.rows_del_filtered += target_block_row - block->rows(); DCHECK(block->try_get_by_name("__DORIS_COMPACTION_FILTER__") == nullptr); if (UNLIKELY(_reader_context.record_rowids)) { DCHECK_EQ(_block_row_locations.size(), block->rows() + delete_count); } + } else { + block->set_columns(std::move(target_columns)); } return Status::OK(); } diff --git a/be/src/vec/olap/vcollect_iterator.cpp b/be/src/vec/olap/vcollect_iterator.cpp index 921e89c3c8..ca9e2b7296 100644 --- a/be/src/vec/olap/vcollect_iterator.cpp +++ b/be/src/vec/olap/vcollect_iterator.cpp @@ -770,7 +770,7 @@ Status VCollectIterator::Level1Iterator::_normal_next(IteratorRowRef* ref) { Status VCollectIterator::Level1Iterator::_merge_next(Block* block) { int target_block_row = 0; auto target_columns = block->mutate_columns(); - size_t column_count = block->columns(); + size_t column_count = target_columns.size(); IteratorRowRef cur_row = _ref; IteratorRowRef pre_row_ref = _ref; @@ -809,6 +809,7 @@ Status VCollectIterator::Level1Iterator::_merge_next(Block* block) { if (UNLIKELY(_reader->_reader_context.record_rowids)) { _block_row_locations.resize(target_block_row); } + block->set_columns(std::move(target_columns)); return res; } @@ -825,6 +826,7 @@ Status VCollectIterator::Level1Iterator::_merge_next(Block* block) { continuous_row_in_block); } } + block->set_columns(std::move(target_columns)); return Status::OK(); } if (continuous_row_in_block == 0) { diff --git a/be/src/vec/olap/vertical_block_reader.cpp b/be/src/vec/olap/vertical_block_reader.cpp index 3f4a0cd3a1..5f2d856fe7 100644 --- a/be/src/vec/olap/vertical_block_reader.cpp +++ b/be/src/vec/olap/vertical_block_reader.cpp @@ -405,6 +405,7 @@ Status VerticalBlockReader::_agg_key_next_block(Block* block, bool* eof) { _agg_data_counters.push_back(_last_agg_data_counter); _last_agg_data_counter = 0; _update_agg_data(target_columns); + block->set_columns(std::move(target_columns)); return Status::OK(); } @@ -545,6 +546,7 @@ Status VerticalBlockReader::_unique_key_next_block(Block* block, bool* eof) { }); ++target_block_row; } while (target_block_row < _reader_context.batch_size); + block->set_columns(std::move(target_columns)); return Status::OK(); } diff --git a/be/src/vec/olap/vgeneric_iterators.cpp b/be/src/vec/olap/vgeneric_iterators.cpp index 65dc8e3e0d..155bb4ee07 100644 --- a/be/src/vec/olap/vgeneric_iterators.cpp +++ b/be/src/vec/olap/vgeneric_iterators.cpp @@ -75,14 +75,15 @@ Status VStatisticsIterator::next_batch(Block* block) { : std::min(_target_rows - _output_rows, MAX_ROW_SIZE_IN_COUNT); if (_push_down_agg_type_opt == TPushAggOp::COUNT) { size = std::min(_target_rows - _output_rows, MAX_ROW_SIZE_IN_COUNT); - for (int i = 0; i < block->columns(); ++i) { + for (int i = 0; i < columns.size(); ++i) { columns[i]->resize(size); } } else { - for (int i = 0; i < block->columns(); ++i) { + for (int i = 0; i < columns.size(); ++i) { RETURN_IF_ERROR(_column_iterators[i]->next_batch_of_zone_map(&size, columns[i])); } } + block->set_columns(std::move(columns)); _output_rows += size; return Status::OK(); } diff --git a/be/src/vec/runtime/vsorted_run_merger.cpp b/be/src/vec/runtime/vsorted_run_merger.cpp index 1bdd82ba8e..6d2f68db62 100644 --- a/be/src/vec/runtime/vsorted_run_merger.cpp +++ b/be/src/vec/runtime/vsorted_run_merger.cpp @@ -194,6 +194,7 @@ Status VSortedRunMerger::get_next(Block* output_block, bool* eos) { break; } } + output_block->set_columns(std::move(merged_columns)); if (merged_rows == 0) { *eos = true; diff --git a/be/test/vec/core/block_test.cpp b/be/test/vec/core/block_test.cpp index 020c4f2e92..7e8a26ce43 100644 --- a/be/test/vec/core/block_test.cpp +++ b/be/test/vec/core/block_test.cpp @@ -390,4 +390,57 @@ TEST(BlockTest, dump_data) { vectorized::Block::filter_block_internal(&block1, filter, block1.columns()); EXPECT_EQ(size, block1.rows()); } + +TEST(BlockTest, merge_with_shared_columns) { + auto vec = vectorized::ColumnVector::create(); + auto& int32_data = vec->get_data(); + for (int i = 0; i < 1024; ++i) { + int32_data.push_back(i); + } + vectorized::DataTypePtr int32_type(std::make_shared()); + vectorized::ColumnWithTypeAndName test_k1(vec->get_ptr(), int32_type, "k1"); + + auto strcol = vectorized::ColumnString::create(); + for (int i = 0; i < 1024; ++i) { + std::string is = std::to_string(i); + strcol->insert_data(is.c_str(), is.size()); + } + vectorized::DataTypePtr string_type(std::make_shared()); + vectorized::ColumnWithTypeAndName test_v1(strcol->get_ptr(), string_type, "v1"); + + vectorized::ColumnWithTypeAndName test_v2(strcol->get_ptr(), string_type, "v2"); + + vectorized::Block src_block({test_k1, test_v1, test_v2}); + + auto vec_temp = vectorized::ColumnVector::create(); + auto& int32_data_temp = vec_temp->get_data(); + for (int i = 0; i < 10; ++i) { + int32_data_temp.push_back(i); + } + + vectorized::ColumnWithTypeAndName test_k1_temp(vec_temp->get_ptr(), int32_type, "k1"); + + auto strcol_temp = vectorized::ColumnString::create(); + for (int i = 0; i < 10; ++i) { + std::string is = std::to_string(i); + strcol_temp->insert_data(is.c_str(), is.size()); + } + + vectorized::ColumnWithTypeAndName test_v1_temp(strcol_temp->get_ptr(), string_type, "v1"); + vectorized::ColumnWithTypeAndName test_v2_temp(strcol_temp->get_ptr(), string_type, "v2"); + + vectorized::Block temp_block({test_k1_temp, test_v1_temp, test_v2_temp}); + + vectorized::MutableBlock mutable_block(&src_block); + auto status = mutable_block.merge(temp_block); + ASSERT_TRUE(status.ok()); + + src_block.set_columns(std::move(mutable_block.mutable_columns())); + + for (auto& column : src_block.get_columns()) { + EXPECT_EQ(1034, column->size()); + } + EXPECT_EQ(1034, src_block.rows()); +} + } // namespace doris