[fix](compaction) fix bug for vectorized compaction (#9344)
1. add a BE config to switch vectorized compaction 2. Fix vectorized compaction bug that row statistic is not right.
This commit is contained in:
@ -240,6 +240,8 @@ CONF_Bool(enable_low_cardinality_optimize, "false");
|
||||
// be policy
|
||||
// whether disable automatic compaction task
|
||||
CONF_mBool(disable_auto_compaction, "false");
|
||||
// whether enable vectorized compaction
|
||||
CONF_Bool(enable_vectorized_compaction, "false");
|
||||
// check the configuration of auto compaction in seconds when auto compaction disabled
|
||||
CONF_mInt32(check_auto_compaction_interval_seconds, "5");
|
||||
|
||||
|
||||
@ -86,10 +86,17 @@ Status Compaction::do_compaction_impl(int64_t permits) {
|
||||
// 2. write merged rows to output rowset
|
||||
// The test results show that merger is low-memory-footprint, there is no need to tracker its mem pool
|
||||
Merger::Statistics stats;
|
||||
auto res = Merger::vmerge_rowsets(_tablet, compaction_type(), _input_rs_readers,
|
||||
_output_rs_writer.get(), &stats);
|
||||
Status res;
|
||||
if (config::enable_vectorized_compaction) {
|
||||
res = Merger::vmerge_rowsets(_tablet, compaction_type(), _input_rs_readers,
|
||||
_output_rs_writer.get(), &stats);
|
||||
} else {
|
||||
res = Merger::merge_rowsets(_tablet, compaction_type(), _input_rs_readers,
|
||||
_output_rs_writer.get(), &stats);
|
||||
}
|
||||
string merge_type = config::enable_vectorized_compaction ? "v" : "";
|
||||
if (!res.ok()) {
|
||||
LOG(WARNING) << "fail to do " << compaction_name() << ". res=" << res
|
||||
LOG(WARNING) << "fail to do " << merge_type << compaction_name() << ". res=" << res
|
||||
<< ", tablet=" << _tablet->full_name()
|
||||
<< ", output_version=" << _output_version;
|
||||
return res;
|
||||
@ -132,8 +139,8 @@ Status Compaction::do_compaction_impl(int64_t permits) {
|
||||
current_max_version = _tablet->rowset_with_max_version()->end_version();
|
||||
}
|
||||
|
||||
LOG(INFO) << "succeed to do " << compaction_name() << ". tablet=" << _tablet->full_name()
|
||||
<< ", output_version=" << _output_version
|
||||
LOG(INFO) << "succeed to do " << merge_type << compaction_name()
|
||||
<< ". tablet=" << _tablet->full_name() << ", output_version=" << _output_version
|
||||
<< ", current_max_version=" << current_max_version
|
||||
<< ", disk=" << _tablet->data_dir()->path() << ", segments=" << segments_num
|
||||
<< ". elapsed time=" << watch.get_elapse_second()
|
||||
|
||||
@ -107,15 +107,12 @@ Status Merger::vmerge_rowsets(TabletSharedPtr tablet, ReaderType reader_type,
|
||||
|
||||
vectorized::Block block = schema.create_block(reader_params.return_columns);
|
||||
size_t output_rows = 0;
|
||||
while (true) {
|
||||
bool eof = false;
|
||||
bool eof = false;
|
||||
while (!eof) {
|
||||
// Read one block from block reader
|
||||
RETURN_NOT_OK_LOG(
|
||||
reader.next_block_with_aggregation(&block, nullptr, nullptr, &eof),
|
||||
"failed to read next block when merging rowsets of tablet " + tablet->full_name());
|
||||
if (eof) {
|
||||
break;
|
||||
}
|
||||
RETURN_NOT_OK_LOG(
|
||||
dst_rowset_writer->add_block(&block),
|
||||
"failed to write block when merging rowsets of tablet " + tablet->full_name());
|
||||
|
||||
@ -323,6 +323,9 @@ Status BetaRowsetWriter::_create_segment_writer(
|
||||
}
|
||||
|
||||
Status BetaRowsetWriter::_flush_segment_writer(std::unique_ptr<segment_v2::SegmentWriter>* writer) {
|
||||
if ((*writer)->num_rows_written() == 0) {
|
||||
return Status::OK();
|
||||
}
|
||||
uint64_t segment_size;
|
||||
uint64_t index_size;
|
||||
Status s = (*writer)->finalize(&segment_size, &index_size);
|
||||
|
||||
@ -192,6 +192,7 @@ Status BlockReader::_agg_key_next_block(Block* block, MemPool* mem_pool, ObjectP
|
||||
}
|
||||
|
||||
auto target_block_row = 0;
|
||||
auto merged_row = 0;
|
||||
auto target_columns = block->mutate_columns();
|
||||
|
||||
_insert_data_normal(target_columns);
|
||||
@ -218,6 +219,8 @@ Status BlockReader::_agg_key_next_block(Block* block, MemPool* mem_pool, ObjectP
|
||||
|
||||
_insert_data_normal(target_columns);
|
||||
target_block_row++;
|
||||
} else {
|
||||
merged_row++;
|
||||
}
|
||||
|
||||
_append_agg_data(target_columns);
|
||||
@ -227,7 +230,7 @@ Status BlockReader::_agg_key_next_block(Block* block, MemPool* mem_pool, ObjectP
|
||||
_last_agg_data_counter = 0;
|
||||
_update_agg_data(target_columns);
|
||||
|
||||
_merged_rows += target_block_row;
|
||||
_merged_rows += merged_row;
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
@ -260,7 +263,6 @@ Status BlockReader::_unique_key_next_block(Block* block, MemPool* mem_pool, Obje
|
||||
}
|
||||
} while (target_block_row < _batch_size);
|
||||
|
||||
_merged_rows += target_block_row;
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user