[fix](compaction) fix mismatch between segment key and value column rows during compaction (#37960)(#38251)(#38356) (#38835)
pick master #37960 #38251 #38356
This commit is contained in:
@ -96,36 +96,32 @@ Status VerticalBetaRowsetWriter::add_columns(const vectorized::Block* block,
|
||||
RETURN_IF_ERROR(_segment_writers[_cur_writer_idx]->append_block(block, 0, num_rows));
|
||||
} else {
|
||||
// value columns
|
||||
uint32_t num_rows_written = _segment_writers[_cur_writer_idx]->num_rows_written();
|
||||
VLOG_NOTICE << "num_rows_written: " << num_rows_written
|
||||
<< ", _cur_writer_idx: " << _cur_writer_idx;
|
||||
uint32_t num_rows_key_group = _segment_writers[_cur_writer_idx]->row_count();
|
||||
// init if it's first value column write in current segment
|
||||
if (_cur_writer_idx == 0 && num_rows_written == 0) {
|
||||
VLOG_NOTICE << "init first value column segment writer";
|
||||
RETURN_IF_ERROR(_segment_writers[_cur_writer_idx]->init(col_ids, is_key));
|
||||
}
|
||||
// when splitting segment, need to make rows align between key columns and value columns
|
||||
size_t start_offset = 0;
|
||||
size_t limit = num_rows;
|
||||
if (num_rows_written + num_rows >= num_rows_key_group &&
|
||||
_cur_writer_idx < _segment_writers.size() - 1) {
|
||||
RETURN_IF_ERROR(_segment_writers[_cur_writer_idx]->append_block(
|
||||
block, 0, num_rows_key_group - num_rows_written));
|
||||
RETURN_IF_ERROR(_flush_columns(&_segment_writers[_cur_writer_idx]));
|
||||
start_offset = num_rows_key_group - num_rows_written;
|
||||
limit = num_rows - start_offset;
|
||||
++_cur_writer_idx;
|
||||
// switch to next writer
|
||||
RETURN_IF_ERROR(_segment_writers[_cur_writer_idx]->init(col_ids, is_key));
|
||||
num_rows_written = 0;
|
||||
num_rows_key_group = _segment_writers[_cur_writer_idx]->row_count();
|
||||
}
|
||||
if (limit > 0) {
|
||||
RETURN_IF_ERROR(
|
||||
_segment_writers[_cur_writer_idx]->append_block(block, start_offset, limit));
|
||||
DCHECK(_segment_writers[_cur_writer_idx]->num_rows_written() <=
|
||||
_segment_writers[_cur_writer_idx]->row_count());
|
||||
int64_t left = num_rows;
|
||||
while (left > 0) {
|
||||
uint32_t num_rows_written = _segment_writers[_cur_writer_idx]->num_rows_written();
|
||||
VLOG_NOTICE << "num_rows_written: " << num_rows_written
|
||||
<< ", _cur_writer_idx: " << _cur_writer_idx;
|
||||
uint32_t num_rows_key_group = _segment_writers[_cur_writer_idx]->row_count();
|
||||
CHECK_LT(num_rows_written, num_rows_key_group);
|
||||
// init if it's first value column write in current segment
|
||||
if (num_rows_written == 0) {
|
||||
VLOG_NOTICE << "init first value column segment writer";
|
||||
RETURN_IF_ERROR(_segment_writers[_cur_writer_idx]->init(col_ids, is_key));
|
||||
}
|
||||
|
||||
int64_t to_write = num_rows_written + left >= num_rows_key_group
|
||||
? num_rows_key_group - num_rows_written
|
||||
: left;
|
||||
RETURN_IF_ERROR(_segment_writers[_cur_writer_idx]->append_block(block, num_rows - left,
|
||||
to_write));
|
||||
left -= to_write;
|
||||
CHECK_GE(left, 0);
|
||||
|
||||
if (num_rows_key_group == num_rows_written + to_write &&
|
||||
_cur_writer_idx < _segment_writers.size() - 1) {
|
||||
RETURN_IF_ERROR(_flush_columns(&_segment_writers[_cur_writer_idx]));
|
||||
++_cur_writer_idx;
|
||||
}
|
||||
}
|
||||
}
|
||||
if (is_key) {
|
||||
|
||||
Reference in New Issue
Block a user