From 60a2fa7deac959ff34c3aefec1f560b4cfc18775 Mon Sep 17 00:00:00 2001 From: yixiutt <102007456+yixiutt@users.noreply.github.com> Date: Thu, 1 Sep 2022 10:20:17 +0800 Subject: [PATCH] [Improvement](compaction) copy row in batch in VCollectIterator&VGenericIterator (#12214) In VCollectIterator&VGenericIterator, use insert_range_from to copy rows in a block which is continuous to save cpu cost. If rows in rowset and segment are non overlapping, this whill improve 30% throughput of compaction.If rows are completely overlapping such as load two same files, the throughput goes nearly same as before. Co-authored-by: yixiutt --- be/src/olap/compaction.cpp | 2 + be/src/vec/olap/vcollect_iterator.cpp | 48 ++++++++++++++++++---- be/src/vec/olap/vgeneric_iterators.cpp | 55 ++++++++++++++++++++++---- 3 files changed, 90 insertions(+), 15 deletions(-) diff --git a/be/src/olap/compaction.cpp b/be/src/olap/compaction.cpp index a71b441bfd..ec83adbed2 100644 --- a/be/src/olap/compaction.cpp +++ b/be/src/olap/compaction.cpp @@ -231,6 +231,8 @@ Status Compaction::do_compaction_impl(int64_t permits) { << ". tablet=" << _tablet->full_name() << ", output_version=" << _output_version << ", current_max_version=" << current_max_version << ", disk=" << _tablet->data_dir()->path() << ", segments=" << segments_num + << ", input_row_num=" << _input_row_num + << ", output_row_num=" << _output_rowset->num_rows() << ". elapsed time=" << watch.get_elapse_second() << "s. cumulative_compaction_policy=" << _tablet->cumulative_compaction_policy()->name() << "."; diff --git a/be/src/vec/olap/vcollect_iterator.cpp b/be/src/vec/olap/vcollect_iterator.cpp index 27e39ced95..159e59be6b 100644 --- a/be/src/vec/olap/vcollect_iterator.cpp +++ b/be/src/vec/olap/vcollect_iterator.cpp @@ -444,20 +444,29 @@ Status VCollectIterator::Level1Iterator::_merge_next(Block* block) { auto target_columns = block->mutate_columns(); size_t column_count = block->columns(); IteratorRowRef cur_row = _ref; + IteratorRowRef pre_row_ref = _ref; + if (UNLIKELY(_reader->_reader_context.record_rowids)) { _block_row_locations.resize(_batch_size); } + int continuous_row_in_block = 0; do { - const auto& src_block = cur_row.block; - assert(src_block->columns() == column_count); - for (size_t i = 0; i < column_count; ++i) { - target_columns[i]->insert_from(*(src_block->get_by_position(i).column), - cur_row.row_pos); - } if (UNLIKELY(_reader->_reader_context.record_rowids)) { _block_row_locations[target_block_row] = _cur_child->current_row_location(); } ++target_block_row; + ++continuous_row_in_block; + // cur block finished, copy before merge_next cause merge_next will + // clear block column data + if (pre_row_ref.row_pos + continuous_row_in_block == pre_row_ref.block->rows()) { + const auto& src_block = pre_row_ref.block; + for (size_t i = 0; i < column_count; ++i) { + target_columns[i]->insert_range_from(*(src_block->get_by_position(i).column), + pre_row_ref.row_pos, continuous_row_in_block); + } + continuous_row_in_block = 0; + pre_row_ref.block = nullptr; + } auto res = _merge_next(&cur_row); if (UNLIKELY(res.precise_code() == OLAP_ERR_DATA_EOF)) { if (UNLIKELY(_reader->_reader_context.record_rowids)) { @@ -470,7 +479,32 @@ Status VCollectIterator::Level1Iterator::_merge_next(Block* block) { LOG(WARNING) << "next failed: " << res; return res; } - } while (target_block_row < _batch_size); + if (target_block_row >= _batch_size) { + if (continuous_row_in_block > 0) { + const auto& src_block = pre_row_ref.block; + for (size_t i = 0; i < column_count; ++i) { + target_columns[i]->insert_range_from(*(src_block->get_by_position(i).column), + pre_row_ref.row_pos, + continuous_row_in_block); + } + } + return Status::OK(); + } + if (continuous_row_in_block == 0) { + pre_row_ref = _ref; + continue; + } + // copy row if meet a new block + if (cur_row.block != pre_row_ref.block) { + const auto& src_block = pre_row_ref.block; + for (size_t i = 0; i < column_count; ++i) { + target_columns[i]->insert_range_from(*(src_block->get_by_position(i).column), + pre_row_ref.row_pos, continuous_row_in_block); + } + continuous_row_in_block = 0; + pre_row_ref = cur_row; + } + } while (true); return Status::OK(); } diff --git a/be/src/vec/olap/vgeneric_iterators.cpp b/be/src/vec/olap/vgeneric_iterators.cpp index 83fdcbb105..dd9fd28963 100644 --- a/be/src/vec/olap/vgeneric_iterators.cpp +++ b/be/src/vec/olap/vgeneric_iterators.cpp @@ -187,9 +187,13 @@ public: return result; } - void copy_row(vectorized::Block* block) { + // `advanced = false` when current block finished + void copy_rows(vectorized::Block* block, bool advanced = true) { vectorized::Block& src = _block; vectorized::Block& dst = *block; + if (_cur_batch_num == 0 || _index_in_block - _cur_batch_num < 0) { + return; + } for (size_t i = 0; i < _num_columns; ++i) { auto& s_col = src.get_by_position(i); @@ -199,8 +203,14 @@ public: vectorized::ColumnPtr& d_cp = d_col.column; //copy a row to dst block column by column - ((vectorized::IColumn&)(*d_cp)).insert_from(*s_cp, _index_in_block); + size_t start = _index_in_block - _cur_batch_num + 1; + if (advanced) { + start--; + } + DCHECK(start >= 0); + ((vectorized::IColumn&)(*d_cp)).insert_range_from(*s_cp, start, _cur_batch_num); } + _cur_batch_num = 0; } RowLocation current_row_location() { @@ -224,6 +234,17 @@ public: void set_skip(bool skip) const { _skip = skip; } + void add_cur_batch() { _cur_batch_num++; } + + void reset_cur_batch() { _cur_batch_num = 0; } + + bool is_cur_block_finished() { + if (_index_in_block == _block.rows() - 1) { + return true; + } + return false; + } + private: // Load next block into _block Status _load_next_block(); @@ -246,6 +267,7 @@ private: std::vector* _compare_columns; std::vector _block_row_locations; bool _record_rowids = false; + size_t _cur_batch_num = 0; }; Status VMergeIteratorContext::init(const StorageReadOptions& opts) { @@ -344,7 +366,7 @@ private: VMergeHeap _merge_heap; - int block_row_max = 0; + int _block_row_max = 0; int _sequence_id_idx = -1; bool _is_unique = false; bool _is_reverse = false; @@ -372,31 +394,48 @@ Status VMergeIterator::init(const StorageReadOptions& opts) { _origin_iters.clear(); - block_row_max = opts.block_row_max; + _block_row_max = opts.block_row_max; return Status::OK(); } Status VMergeIterator::next_batch(vectorized::Block* block) { if (UNLIKELY(_record_rowids)) { - _block_row_locations.resize(block_row_max); + _block_row_locations.resize(_block_row_max); } size_t row_idx = 0; - while (block->rows() < block_row_max) { + VMergeIteratorContext* pre_ctx = nullptr; + while (block->rows() < _block_row_max) { if (_merge_heap.empty()) break; auto ctx = _merge_heap.top(); _merge_heap.pop(); if (!ctx->need_skip()) { - // copy current row to block - ctx->copy_row(block); + ctx->add_cur_batch(); + if (pre_ctx != ctx) { + if (pre_ctx) { + pre_ctx->copy_rows(block); + } + pre_ctx = ctx; + } if (UNLIKELY(_record_rowids)) { _block_row_locations[row_idx] = ctx->current_row_location(); } row_idx++; + if (ctx->is_cur_block_finished() || row_idx >= _block_row_max) { + // current block finished, ctx not advance + // so copy start_idx = (_index_in_block - _cur_batch_num + 1) + ctx->copy_rows(block, false); + pre_ctx = nullptr; + } } else if (_merged_rows != nullptr) { (*_merged_rows)++; + // need skip cur row, so flush rows in pre_ctx + if (pre_ctx) { + pre_ctx->copy_rows(block); + pre_ctx = nullptr; + } } RETURN_IF_ERROR(ctx->advance());