[Improvement](compaction) copy row in batch in VCollectIterator&VGenericIterator (#12214)

In VCollectIterator&VGenericIterator, use insert_range_from to copy rows
in a block which is continuous to save cpu cost.

If rows in rowset and segment are non overlapping, this whill improve 30%
throughput of compaction.If rows are completely overlapping such as load two
same files, the throughput goes nearly same as before.

Co-authored-by: yixiutt <yixiu@selectdb.com>
This commit is contained in:
yixiutt
2022-09-01 10:20:17 +08:00
committed by GitHub
parent 90fb3b7783
commit 60a2fa7dea
3 changed files with 90 additions and 15 deletions

View File

@ -231,6 +231,8 @@ Status Compaction::do_compaction_impl(int64_t permits) {
<< ". tablet=" << _tablet->full_name() << ", output_version=" << _output_version
<< ", current_max_version=" << current_max_version
<< ", disk=" << _tablet->data_dir()->path() << ", segments=" << segments_num
<< ", input_row_num=" << _input_row_num
<< ", output_row_num=" << _output_rowset->num_rows()
<< ". elapsed time=" << watch.get_elapse_second()
<< "s. cumulative_compaction_policy="
<< _tablet->cumulative_compaction_policy()->name() << ".";

View File

@ -444,20 +444,29 @@ Status VCollectIterator::Level1Iterator::_merge_next(Block* block) {
auto target_columns = block->mutate_columns();
size_t column_count = block->columns();
IteratorRowRef cur_row = _ref;
IteratorRowRef pre_row_ref = _ref;
if (UNLIKELY(_reader->_reader_context.record_rowids)) {
_block_row_locations.resize(_batch_size);
}
int continuous_row_in_block = 0;
do {
const auto& src_block = cur_row.block;
assert(src_block->columns() == column_count);
for (size_t i = 0; i < column_count; ++i) {
target_columns[i]->insert_from(*(src_block->get_by_position(i).column),
cur_row.row_pos);
}
if (UNLIKELY(_reader->_reader_context.record_rowids)) {
_block_row_locations[target_block_row] = _cur_child->current_row_location();
}
++target_block_row;
++continuous_row_in_block;
// cur block finished, copy before merge_next cause merge_next will
// clear block column data
if (pre_row_ref.row_pos + continuous_row_in_block == pre_row_ref.block->rows()) {
const auto& src_block = pre_row_ref.block;
for (size_t i = 0; i < column_count; ++i) {
target_columns[i]->insert_range_from(*(src_block->get_by_position(i).column),
pre_row_ref.row_pos, continuous_row_in_block);
}
continuous_row_in_block = 0;
pre_row_ref.block = nullptr;
}
auto res = _merge_next(&cur_row);
if (UNLIKELY(res.precise_code() == OLAP_ERR_DATA_EOF)) {
if (UNLIKELY(_reader->_reader_context.record_rowids)) {
@ -470,7 +479,32 @@ Status VCollectIterator::Level1Iterator::_merge_next(Block* block) {
LOG(WARNING) << "next failed: " << res;
return res;
}
} while (target_block_row < _batch_size);
if (target_block_row >= _batch_size) {
if (continuous_row_in_block > 0) {
const auto& src_block = pre_row_ref.block;
for (size_t i = 0; i < column_count; ++i) {
target_columns[i]->insert_range_from(*(src_block->get_by_position(i).column),
pre_row_ref.row_pos,
continuous_row_in_block);
}
}
return Status::OK();
}
if (continuous_row_in_block == 0) {
pre_row_ref = _ref;
continue;
}
// copy row if meet a new block
if (cur_row.block != pre_row_ref.block) {
const auto& src_block = pre_row_ref.block;
for (size_t i = 0; i < column_count; ++i) {
target_columns[i]->insert_range_from(*(src_block->get_by_position(i).column),
pre_row_ref.row_pos, continuous_row_in_block);
}
continuous_row_in_block = 0;
pre_row_ref = cur_row;
}
} while (true);
return Status::OK();
}

View File

@ -187,9 +187,13 @@ public:
return result;
}
void copy_row(vectorized::Block* block) {
// `advanced = false` when current block finished
void copy_rows(vectorized::Block* block, bool advanced = true) {
vectorized::Block& src = _block;
vectorized::Block& dst = *block;
if (_cur_batch_num == 0 || _index_in_block - _cur_batch_num < 0) {
return;
}
for (size_t i = 0; i < _num_columns; ++i) {
auto& s_col = src.get_by_position(i);
@ -199,8 +203,14 @@ public:
vectorized::ColumnPtr& d_cp = d_col.column;
//copy a row to dst block column by column
((vectorized::IColumn&)(*d_cp)).insert_from(*s_cp, _index_in_block);
size_t start = _index_in_block - _cur_batch_num + 1;
if (advanced) {
start--;
}
DCHECK(start >= 0);
((vectorized::IColumn&)(*d_cp)).insert_range_from(*s_cp, start, _cur_batch_num);
}
_cur_batch_num = 0;
}
RowLocation current_row_location() {
@ -224,6 +234,17 @@ public:
void set_skip(bool skip) const { _skip = skip; }
void add_cur_batch() { _cur_batch_num++; }
void reset_cur_batch() { _cur_batch_num = 0; }
bool is_cur_block_finished() {
if (_index_in_block == _block.rows() - 1) {
return true;
}
return false;
}
private:
// Load next block into _block
Status _load_next_block();
@ -246,6 +267,7 @@ private:
std::vector<uint32_t>* _compare_columns;
std::vector<RowLocation> _block_row_locations;
bool _record_rowids = false;
size_t _cur_batch_num = 0;
};
Status VMergeIteratorContext::init(const StorageReadOptions& opts) {
@ -344,7 +366,7 @@ private:
VMergeHeap _merge_heap;
int block_row_max = 0;
int _block_row_max = 0;
int _sequence_id_idx = -1;
bool _is_unique = false;
bool _is_reverse = false;
@ -372,31 +394,48 @@ Status VMergeIterator::init(const StorageReadOptions& opts) {
_origin_iters.clear();
block_row_max = opts.block_row_max;
_block_row_max = opts.block_row_max;
return Status::OK();
}
Status VMergeIterator::next_batch(vectorized::Block* block) {
if (UNLIKELY(_record_rowids)) {
_block_row_locations.resize(block_row_max);
_block_row_locations.resize(_block_row_max);
}
size_t row_idx = 0;
while (block->rows() < block_row_max) {
VMergeIteratorContext* pre_ctx = nullptr;
while (block->rows() < _block_row_max) {
if (_merge_heap.empty()) break;
auto ctx = _merge_heap.top();
_merge_heap.pop();
if (!ctx->need_skip()) {
// copy current row to block
ctx->copy_row(block);
ctx->add_cur_batch();
if (pre_ctx != ctx) {
if (pre_ctx) {
pre_ctx->copy_rows(block);
}
pre_ctx = ctx;
}
if (UNLIKELY(_record_rowids)) {
_block_row_locations[row_idx] = ctx->current_row_location();
}
row_idx++;
if (ctx->is_cur_block_finished() || row_idx >= _block_row_max) {
// current block finished, ctx not advance
// so copy start_idx = (_index_in_block - _cur_batch_num + 1)
ctx->copy_rows(block, false);
pre_ctx = nullptr;
}
} else if (_merged_rows != nullptr) {
(*_merged_rows)++;
// need skip cur row, so flush rows in pre_ctx
if (pre_ctx) {
pre_ctx->copy_rows(block);
pre_ctx = nullptr;
}
}
RETURN_IF_ERROR(ctx->advance());