[enhancement](compaction) introduce segment compaction (#12609) (#12866)

## Design

### Trigger

Every time when a rowset writer produces more than N (e.g. 10) segments, we trigger segment compaction. Note that only one segment compaction job for a single rowset at a time to ensure no recursing/queuing nightmare.

### Target Selection

We collect segments during every trigger. We skip big segments whose row num > M (e.g. 10000) coz we get little benefits from compacting them comparing our effort. Hence, we only pick the 'Longest Consecutive Small" segment group to do actual compaction.

### Compaction Process

A new thread pool is introduced to help do the job. We submit the above-mentioned 'Longest Consecutive Small" segment group to the pool. Then the worker thread does the followings:

- build a MergeIterator from the target segments
- create a new segment writer
- for each block readed from MergeIterator, the Writer append it

### SegID handling

SegID must remain consecutive after segment compaction. 

If a rowset has small segments named seg_0, seg_1, seg_2, seg_3 and a big segment seg_4:

- we create a segment named "seg_0-3" to save compacted data for seg_0, seg_1, seg_2 and seg_3
- delete seg_0, seg_1, seg_2 and seg_3
- rename seg_0-3 to seg_0
- rename seg_4 to seg_1

It is worth noticing that we should wait inflight segment compaction tasks to finish before building rowset meta and committing this txn.
This commit is contained in:
zhengyu
2022-11-04 14:12:51 +08:00
committed by GitHub
parent 948e080b31
commit 554f566217
23 changed files with 1696 additions and 446 deletions

View File

@ -285,6 +285,9 @@ CONF_mInt32(quick_compaction_max_threads, "10");
// Thread count to do tablet meta checkpoint, -1 means use the data directories count.
CONF_Int32(max_meta_checkpoint_threads, "-1");
// This config can be set to limit thread number in segcompaction thread pool.
CONF_mInt32(seg_compaction_max_threads, "10");
// The upper limit of "permits" held by all compaction tasks. This config can be set to limit memory consumption for compaction.
CONF_mInt64(total_permits_for_compaction_score, "10000");
@ -841,6 +844,14 @@ CONF_String(be_node_role, "mix");
// Hide the be config page for webserver.
CONF_Bool(hide_webserver_config_page, "false");
CONF_Bool(enable_segcompaction, "false"); // currently only support vectorized storage
// Trigger segcompaction if the num of segments in a rowset exceeds this threshold.
CONF_Int32(segcompaction_threshold_segment_num, "10");
// The segment whose row number above the threshold will be compacted during segcompaction
CONF_Int32(segcompaction_small_threshold, "1048576");
CONF_String(jvm_max_heap_size, "1024M");
#ifdef BE_TEST

View File

@ -231,7 +231,11 @@ class PStatus;
M(OLAP_ERR_ROWSET_INVALID_STATE_TRANSITION, -3112, "", true) \
M(OLAP_ERR_STRING_OVERFLOW_IN_VEC_ENGINE, -3113, "", true) \
M(OLAP_ERR_ROWSET_ADD_MIGRATION_V2, -3114, "", true) \
M(OLAP_ERR_PUBLISH_VERSION_NOT_CONTINUOUS, -3115, "", false)
M(OLAP_ERR_PUBLISH_VERSION_NOT_CONTINUOUS, -3115, "", false) \
M(OLAP_ERR_ROWSET_RENAME_FILE_FAILED, -3116, "", false) \
M(OLAP_ERR_SEGCOMPACTION_INIT_READER, -3117, "", false) \
M(OLAP_ERR_SEGCOMPACTION_INIT_WRITER, -3118, "", false) \
M(OLAP_ERR_SEGCOMPACTION_FAILED, -3119, "", false)
enum ErrorCode {
#define M(NAME, ERRORCODE, DESC, STACKTRACEENABLED) NAME = ERRORCODE,

View File

@ -187,7 +187,7 @@ Status Compaction::do_compaction_impl(int64_t permits) {
if (_output_rowset == nullptr) {
LOG(WARNING) << "rowset writer build failed. writer version:"
<< ", output_version=" << _output_version;
return Status::OLAPInternalError(OLAP_ERR_MALLOC_ERROR);
return Status::OLAPInternalError(OLAP_ERR_ROWSET_BUILDER_INIT);
}
TRACE_COUNTER_INCREMENT("output_rowset_data_size", _output_rowset->data_disk_size());
TRACE_COUNTER_INCREMENT("output_row_num", _output_rowset->num_rows());

View File

@ -56,8 +56,6 @@ DeltaWriter::~DeltaWriter() {
_garbage_collection();
}
_mem_table.reset();
if (!_is_init) {
return;
}
@ -77,6 +75,8 @@ DeltaWriter::~DeltaWriter() {
_tablet->data_dir()->remove_pending_ids(ROWSET_ID_PREFIX +
_rowset_writer->rowset_id().to_string());
}
_mem_table.reset();
}
void DeltaWriter::_garbage_collection() {
@ -167,12 +167,12 @@ Status DeltaWriter::write(Tuple* tuple) {
// if memtable is full, push it to the flush executor,
// and create a new memtable for incoming data
if (_mem_table->memory_usage() >= config::write_buffer_size) {
if (++_segment_counter > config::max_segment_num_per_rowset) {
return Status::OLAPInternalError(OLAP_ERR_TOO_MANY_SEGMENTS);
}
RETURN_NOT_OK(_flush_memtable_async());
auto s = _flush_memtable_async();
// create a new memtable for new incoming data
_reset_mem_table();
if (OLAP_UNLIKELY(!s.ok())) {
return s;
}
}
return Status::OK();
}
@ -192,8 +192,11 @@ Status DeltaWriter::write(const RowBatch* row_batch, const std::vector<int>& row
}
if (_mem_table->memory_usage() >= config::write_buffer_size) {
RETURN_NOT_OK(_flush_memtable_async());
auto s = _flush_memtable_async();
_reset_mem_table();
if (OLAP_UNLIKELY(!s.ok())) {
return s;
}
}
return Status::OK();
@ -217,8 +220,11 @@ Status DeltaWriter::write(const vectorized::Block* block, const std::vector<int>
if (_mem_table->need_to_agg()) {
_mem_table->shrink_memtable_by_agg();
if (_mem_table->is_flush()) {
RETURN_NOT_OK(_flush_memtable_async());
auto s = _flush_memtable_async();
_reset_mem_table();
if (OLAP_UNLIKELY(!s.ok())) {
return s;
}
}
}
@ -226,9 +232,6 @@ Status DeltaWriter::write(const vectorized::Block* block, const std::vector<int>
}
Status DeltaWriter::_flush_memtable_async() {
if (++_segment_counter > config::max_segment_num_per_rowset) {
return Status::OLAPInternalError(OLAP_ERR_TOO_MANY_SEGMENTS);
}
return _flush_token->submit(std::move(_mem_table));
}
@ -249,8 +252,11 @@ Status DeltaWriter::flush_memtable_and_wait(bool need_wait) {
VLOG_NOTICE << "flush memtable to reduce mem consumption. memtable size: "
<< _mem_table->memory_usage() << ", tablet: " << _req.tablet_id
<< ", load id: " << print_id(_req.load_id);
RETURN_NOT_OK(_flush_memtable_async());
auto s = _flush_memtable_async();
_reset_mem_table();
if (OLAP_UNLIKELY(!s.ok())) {
return s;
}
if (need_wait) {
// wait all memtables in flush queue to be flushed.
@ -309,9 +315,13 @@ Status DeltaWriter::close() {
return Status::OLAPInternalError(OLAP_ERR_ALREADY_CANCELLED);
}
RETURN_NOT_OK(_flush_memtable_async());
auto s = _flush_memtable_async();
_mem_table.reset();
return Status::OK();
if (OLAP_UNLIKELY(!s.ok())) {
return s;
} else {
return Status::OK();
}
}
Status DeltaWriter::close_wait(const PSlaveTabletNodes& slave_tablet_nodes,

View File

@ -149,9 +149,6 @@ private:
SpinLock _mem_table_tracker_lock;
std::atomic<uint32_t> _mem_table_num = 1;
// The counter of number of segment flushed already.
int64_t _segment_counter = 0;
std::mutex _lock;
// use in vectorized load

View File

@ -26,12 +26,14 @@
#include <string>
#include "agent/cgroups_mgr.h"
#include "common/config.h"
#include "common/status.h"
#include "gutil/strings/substitute.h"
#include "io/cache/file_cache_manager.h"
#include "olap/cumulative_compaction.h"
#include "olap/olap_common.h"
#include "olap/olap_define.h"
#include "olap/rowset/beta_rowset_writer.h"
#include "olap/storage_engine.h"
#include "util/file_utils.h"
#include "util/time.h"
@ -83,6 +85,12 @@ Status StorageEngine::start_bg_threads() {
.set_min_threads(config::quick_compaction_max_threads)
.set_max_threads(config::quick_compaction_max_threads)
.build(&_quick_compaction_thread_pool);
if (config::enable_segcompaction && config::enable_storage_vectorization) {
ThreadPoolBuilder("SegCompactionTaskThreadPool")
.set_min_threads(config::seg_compaction_max_threads)
.set_max_threads(config::seg_compaction_max_threads)
.build(&_seg_compaction_thread_pool);
}
// compaction tasks producer thread
RETURN_IF_ERROR(Thread::create(
@ -686,6 +694,19 @@ Status StorageEngine::submit_quick_compaction_task(TabletSharedPtr tablet) {
return Status::OK();
}
Status StorageEngine::_handle_seg_compaction(BetaRowsetWriter* writer,
SegCompactionCandidatesSharedPtr segments) {
writer->compact_segments(segments);
// return OK here. error will be reported via BetaRowsetWriter::_segcompaction_status
return Status::OK();
}
Status StorageEngine::submit_seg_compaction_task(BetaRowsetWriter* writer,
SegCompactionCandidatesSharedPtr segments) {
return _seg_compaction_thread_pool->submit_func(
std::bind<void>(&StorageEngine::_handle_seg_compaction, this, writer, segments));
}
void StorageEngine::_cooldown_tasks_producer_callback() {
int64_t interval = config::generate_cooldown_task_interval_sec;
do {

View File

@ -46,6 +46,12 @@ std::string BetaRowset::segment_file_path(int segment_id) {
return segment_file_path(_rowset_dir, rowset_id(), segment_id);
}
std::string BetaRowset::segment_cache_path(const std::string& rowset_dir, const RowsetId& rowset_id,
int segment_id) {
// {root_path}/data/{shard_id}/{tablet_id}/{schema_hash}/{rowset_id}_{seg_num}
return fmt::format("{}/{}_{}", rowset_dir, rowset_id.to_string(), segment_id);
}
std::string BetaRowset::segment_cache_path(int segment_id) {
// {root_path}/data/{shard_id}/{tablet_id}/{schema_hash}/{rowset_id}_{seg_num}
return fmt::format("{}/{}_{}", _tablet_path, rowset_id().to_string(), segment_id);
@ -74,6 +80,13 @@ std::string BetaRowset::remote_segment_path(int64_t tablet_id, const std::string
return fmt::format("{}/{}_{}.dat", remote_tablet_path(tablet_id), rowset_id, segment_id);
}
std::string BetaRowset::local_segment_path_segcompacted(const std::string& tablet_path,
const RowsetId& rowset_id, int64_t begin,
int64_t end) {
// {root_path}/data/{shard_id}/{tablet_id}/{schema_hash}/{rowset_id}_{begin_seg}-{end_seg}.dat
return fmt::format("{}/{}_{}-{}.dat", tablet_path, rowset_id.to_string(), begin, end);
}
BetaRowset::BetaRowset(TabletSchemaSPtr schema, const std::string& tablet_path,
RowsetMetaSharedPtr rowset_meta)
: Rowset(schema, tablet_path, std::move(rowset_meta)) {

View File

@ -46,9 +46,16 @@ public:
std::string segment_cache_path(int segment_id);
static std::string segment_cache_path(const std::string& rowset_dir, const RowsetId& rowset_id,
int segment_id);
static std::string segment_file_path(const std::string& rowset_dir, const RowsetId& rowset_id,
int segment_id);
static std::string local_segment_path_segcompacted(const std::string& tablet_path,
const RowsetId& rowset_id, int64_t begin,
int64_t end);
static std::string remote_segment_path(int64_t tablet_id, const RowsetId& rowset_id,
int segment_id);

View File

@ -18,6 +18,8 @@
#include "olap/rowset/beta_rowset_writer.h"
#include <ctime> // time
#include <memory>
#include <sstream>
#include "common/config.h"
#include "common/logging.h"
@ -31,20 +33,31 @@
#include "olap/rowset/beta_rowset.h"
#include "olap/rowset/rowset_factory.h"
#include "olap/rowset/segment_v2/segment_writer.h"
#include "olap/storage_engine.h"
//#include "olap/storage_engine.h"
#include "runtime/exec_env.h"
#include "runtime/memory/mem_tracker_limiter.h"
namespace doris {
class StorageEngine;
BetaRowsetWriter::BetaRowsetWriter()
: _rowset_meta(nullptr),
_num_segment(0),
_segcompacted_point(0),
_num_segcompacted(0),
_segment_writer(nullptr),
_num_rows_written(0),
_total_data_size(0),
_total_index_size(0) {}
_total_index_size(0),
_raw_num_rows_written(0),
_is_doing_segcompaction(false) {
_segcompaction_status.store(OLAP_SUCCESS);
}
BetaRowsetWriter::~BetaRowsetWriter() {
OLAP_UNUSED_ARG(_wait_flying_segcompaction());
// TODO(lingbin): Should wrapper exception logic, no need to know file ops directly.
if (!_already_built) { // abnormal exit, remove all files generated
_segment_writer.reset(); // ensure all files are closed
@ -107,6 +120,433 @@ Status BetaRowsetWriter::add_block(const vectorized::Block* block) {
return _add_block(block, &_segment_writer);
}
vectorized::VMergeIterator* BetaRowsetWriter::_get_segcompaction_reader(
SegCompactionCandidatesSharedPtr segments, std::shared_ptr<Schema> schema,
OlapReaderStatistics* stat, uint64_t* merged_row_stat) {
StorageReadOptions read_options;
read_options.stats = stat;
read_options.use_page_cache = false;
read_options.tablet_schema = _context.tablet_schema;
std::vector<std::unique_ptr<RowwiseIterator>> seg_iterators;
for (auto& seg_ptr : *segments) {
std::unique_ptr<RowwiseIterator> iter;
auto s = seg_ptr->new_iterator(*schema, read_options, &iter);
if (!s.ok()) {
LOG(WARNING) << "failed to create iterator[" << seg_ptr->id() << "]: " << s.to_string();
return nullptr;
}
seg_iterators.push_back(std::move(iter));
}
std::vector<RowwiseIterator*> iterators;
for (auto& owned_it : seg_iterators) {
// transfer ownership
iterators.push_back(owned_it.release());
}
bool is_unique = (_context.tablet_schema->keys_type() == UNIQUE_KEYS);
bool is_reverse = false;
auto merge_itr =
vectorized::new_merge_iterator(iterators, -1, is_unique, is_reverse, merged_row_stat);
DCHECK(merge_itr);
auto s = merge_itr->init(read_options);
if (!s.ok()) {
LOG(WARNING) << "failed to init iterator: " << s.to_string();
for (auto& itr : iterators) {
delete itr;
}
return nullptr;
}
return (vectorized::VMergeIterator*)merge_itr;
}
std::unique_ptr<segment_v2::SegmentWriter> BetaRowsetWriter::_create_segcompaction_writer(
uint64_t begin, uint64_t end) {
Status status;
std::unique_ptr<segment_v2::SegmentWriter> writer = nullptr;
status = _create_segment_writer_for_segcompaction(&writer, begin, end);
if (status != Status::OK() || writer == nullptr) {
LOG(ERROR) << "failed to create segment writer for begin:" << begin << " end:" << end
<< " path:" << writer->get_data_dir()->path() << " status:" << status;
return nullptr;
} else {
return writer;
}
}
Status BetaRowsetWriter::_delete_original_segments(uint32_t begin, uint32_t end) {
auto fs = _rowset_meta->fs();
if (!fs) {
return Status::OLAPInternalError(OLAP_ERR_INIT_FAILED);
}
for (uint32_t i = begin; i <= end; ++i) {
auto seg_path = BetaRowset::segment_file_path(_context.rowset_dir, _context.rowset_id, i);
// Even if an error is encountered, these files that have not been cleaned up
// will be cleaned up by the GC background. So here we only print the error
// message when we encounter an error.
RETURN_NOT_OK_LOG(fs->delete_file(seg_path),
strings::Substitute("Failed to delete file=$0", seg_path));
}
return Status::OK();
}
Status BetaRowsetWriter::_rename_compacted_segments(int64_t begin, int64_t end) {
int ret;
auto src_seg_path = BetaRowset::local_segment_path_segcompacted(_context.rowset_dir,
_context.rowset_id, begin, end);
auto dst_seg_path = BetaRowset::segment_file_path(_context.rowset_dir, _context.rowset_id,
_num_segcompacted++);
ret = rename(src_seg_path.c_str(), dst_seg_path.c_str());
if (ret) {
LOG(WARNING) << "failed to rename " << src_seg_path << " to " << dst_seg_path
<< ". ret:" << ret << " errno:" << errno;
return Status::OLAPInternalError(OLAP_ERR_ROWSET_RENAME_FILE_FAILED);
}
return Status::OK();
}
Status BetaRowsetWriter::_rename_compacted_segment_plain(uint64_t seg_id) {
if (seg_id == _num_segcompacted) {
return Status::OK();
}
int ret;
auto src_seg_path =
BetaRowset::segment_file_path(_context.rowset_dir, _context.rowset_id, seg_id);
auto dst_seg_path = BetaRowset::segment_file_path(_context.rowset_dir, _context.rowset_id,
_num_segcompacted);
VLOG_DEBUG << "segcompaction skip this segment. rename " << src_seg_path << " to "
<< dst_seg_path;
{
std::lock_guard<std::mutex> lock(_segid_statistics_map_mutex);
DCHECK_EQ(_segid_statistics_map.find(seg_id) == _segid_statistics_map.end(), false);
DCHECK_EQ(_segid_statistics_map.find(_num_segcompacted) == _segid_statistics_map.end(),
true);
Statistics org = _segid_statistics_map[seg_id];
_segid_statistics_map.emplace(_num_segcompacted, org);
_clear_statistics_for_deleting_segments_unsafe(seg_id, seg_id);
}
++_num_segcompacted;
ret = rename(src_seg_path.c_str(), dst_seg_path.c_str());
if (ret) {
LOG(WARNING) << "failed to rename " << src_seg_path << " to " << dst_seg_path
<< ". ret:" << ret << " errno:" << errno;
return Status::OLAPInternalError(OLAP_ERR_ROWSET_RENAME_FILE_FAILED);
}
return Status::OK();
}
void BetaRowsetWriter::_clear_statistics_for_deleting_segments_unsafe(uint64_t begin,
uint64_t end) {
VLOG_DEBUG << "_segid_statistics_map clear record segid range from:" << begin << " to:" << end;
for (int i = begin; i <= end; ++i) {
_segid_statistics_map.erase(i);
}
}
Status BetaRowsetWriter::_check_correctness(std::unique_ptr<OlapReaderStatistics> stat,
uint64_t merged_row_stat, uint64_t row_count,
uint64_t begin, uint64_t end) {
uint64_t stat_read_row = stat->raw_rows_read;
uint64_t sum_target_row = 0;
{
std::lock_guard<std::mutex> lock(_segid_statistics_map_mutex);
for (int i = begin; i <= end; ++i) {
sum_target_row += _segid_statistics_map[i].row_num;
}
}
if (sum_target_row != stat_read_row) {
LOG(WARNING) << "read row_num does not match. expect read row:" << sum_target_row
<< " actual read row:" << stat_read_row;
return Status::OLAPInternalError(OLAP_ERR_CHECK_LINES_ERROR);
}
uint64_t total_row = row_count + merged_row_stat;
if (stat_read_row != total_row) {
LOG(WARNING) << "total row_num does not match. expect total row:" << total_row
<< " actual total row:" << stat_read_row;
return Status::OLAPInternalError(OLAP_ERR_CHECK_LINES_ERROR);
}
return Status::OK();
}
Status BetaRowsetWriter::_do_compact_segments(SegCompactionCandidatesSharedPtr segments) {
SCOPED_ATTACH_TASK(StorageEngine::instance()->segcompaction_mem_tracker(),
ThreadContext::TaskType::COMPACTION);
// throttle segcompaction task if memory depleted.
if (MemTrackerLimiter::sys_mem_exceed_limit_check(GB_EXCHANGE_BYTE)) {
LOG(WARNING) << "skip segcompaction due to memory shortage";
return Status::OLAPInternalError(OLAP_ERR_FETCH_MEMORY_EXCEEDED);
}
uint64_t begin = (*(segments->begin()))->id();
uint64_t end = (*(segments->end() - 1))->id();
LOG(INFO) << "BetaRowsetWriter:" << this << " do segcompaction at " << segments->size()
<< " segments. Begin:" << begin << " End:" << end;
uint64_t begin_time = GetCurrentTimeMicros();
auto schema = std::make_shared<Schema>(_context.tablet_schema->columns(),
_context.tablet_schema->columns().size());
std::unique_ptr<OlapReaderStatistics> stat(new OlapReaderStatistics());
uint64_t merged_row_stat = 0;
vectorized::VMergeIterator* reader =
_get_segcompaction_reader(segments, schema, stat.get(), &merged_row_stat);
if (UNLIKELY(reader == nullptr)) {
LOG(WARNING) << "failed to get segcompaction reader";
return Status::OLAPInternalError(OLAP_ERR_SEGCOMPACTION_INIT_READER);
}
std::unique_ptr<vectorized::VMergeIterator> reader_ptr;
reader_ptr.reset(reader);
auto writer = _create_segcompaction_writer(begin, end);
if (UNLIKELY(writer == nullptr)) {
LOG(WARNING) << "failed to get segcompaction writer";
return Status::OLAPInternalError(OLAP_ERR_SEGCOMPACTION_INIT_WRITER);
}
uint64_t row_count = 0;
vectorized::Block block = _context.tablet_schema->create_block();
while (true) {
auto status = reader_ptr->next_batch(&block);
row_count += block.rows();
if (status != Status::OK()) {
if (LIKELY(status.is_end_of_file())) {
RETURN_NOT_OK_LOG(_add_block_for_segcompaction(&block, &writer),
"write block failed");
break;
} else {
LOG(WARNING) << "read block failed: " << status.to_string();
return status;
}
}
RETURN_NOT_OK_LOG(_add_block_for_segcompaction(&block, &writer), "write block failed");
block.clear_column_data();
}
RETURN_NOT_OK_LOG(_check_correctness(std::move(stat), merged_row_stat, row_count, begin, end),
"check correctness failed");
{
std::lock_guard<std::mutex> lock(_segid_statistics_map_mutex);
_clear_statistics_for_deleting_segments_unsafe(begin, end);
}
RETURN_NOT_OK(_flush_segment_writer(&writer));
if (_segcompaction_file_writer != nullptr) {
_segcompaction_file_writer->close();
}
RETURN_NOT_OK(_delete_original_segments(begin, end));
RETURN_NOT_OK(_rename_compacted_segments(begin, end));
if (VLOG_DEBUG_IS_ON) {
vlog_buffer.clear();
for (const auto& entry : std::filesystem::directory_iterator(_context.rowset_dir)) {
fmt::format_to(vlog_buffer, "[{}]", string(entry.path()));
}
VLOG_DEBUG << "_segcompacted_point:" << _segcompacted_point
<< " _num_segment:" << _num_segment << " _num_segcompacted:" << _num_segcompacted
<< " list directory:" << fmt::to_string(vlog_buffer);
}
_segcompacted_point += (end - begin + 1);
uint64_t elapsed = GetCurrentTimeMicros() - begin_time;
LOG(INFO) << "BetaRowsetWriter:" << this << " segcompaction completed. elapsed time:" << elapsed
<< "us. _segcompacted_point update:" << _segcompacted_point;
return Status::OK();
}
void BetaRowsetWriter::compact_segments(SegCompactionCandidatesSharedPtr segments) {
Status status = _do_compact_segments(segments);
if (!status.ok()) {
int16_t errcode = status.precise_code();
switch (errcode) {
case OLAP_ERR_FETCH_MEMORY_EXCEEDED:
case OLAP_ERR_SEGCOMPACTION_INIT_READER:
case OLAP_ERR_SEGCOMPACTION_INIT_WRITER:
LOG(WARNING) << "segcompaction failed, try next time:" << status;
return;
default:
LOG(WARNING) << "segcompaction fatal, terminating the write job:" << status;
// status will be checked by the next trigger of segcompaction or the final wait
_segcompaction_status.store(OLAP_ERR_OTHER_ERROR);
}
}
DCHECK_EQ(_is_doing_segcompaction, true);
{
std::lock_guard lk(_is_doing_segcompaction_lock);
_is_doing_segcompaction = false;
_segcompacting_cond.notify_all();
}
}
Status BetaRowsetWriter::_load_noncompacted_segments(
std::vector<segment_v2::SegmentSharedPtr>* segments) {
auto fs = _rowset_meta->fs();
if (!fs) {
return Status::OLAPInternalError(OLAP_ERR_INIT_FAILED);
}
for (int seg_id = _segcompacted_point; seg_id < _num_segment; ++seg_id) {
auto seg_path =
BetaRowset::segment_file_path(_context.rowset_dir, _context.rowset_id, seg_id);
auto cache_path =
BetaRowset::segment_cache_path(_context.rowset_dir, _context.rowset_id, seg_id);
std::shared_ptr<segment_v2::Segment> segment;
auto s = segment_v2::Segment::open(fs, seg_path, cache_path, seg_id, _context.tablet_schema,
&segment);
if (!s.ok()) {
LOG(WARNING) << "failed to open segment. " << seg_path << ":" << s.to_string();
return Status::OLAPInternalError(OLAP_ERR_ROWSET_LOAD_FAILED);
}
segments->push_back(std::move(segment));
}
return Status::OK();
}
/* policy of segcompaction target selection:
* 1. skip big segments
* 2. if the consecutive smalls end up with a big, compact the smalls, except
* single small
* 3. if the consecutive smalls end up with small, compact the smalls if the
* length is beyond (config::segcompaction_threshold_segment_num / 2)
*/
Status BetaRowsetWriter::_find_longest_consecutive_small_segment(
SegCompactionCandidatesSharedPtr segments) {
std::vector<segment_v2::SegmentSharedPtr> all_segments;
RETURN_NOT_OK(_load_noncompacted_segments(&all_segments));
if (VLOG_DEBUG_IS_ON) {
vlog_buffer.clear();
for (auto& segment : all_segments) {
fmt::format_to(vlog_buffer, "[id:{} num_rows:{}]", segment->id(), segment->num_rows());
}
VLOG_DEBUG << "all noncompacted segments num:" << all_segments.size()
<< " list of segments:" << fmt::to_string(vlog_buffer);
}
bool is_terminated_by_big = false;
bool let_big_terminate = false;
size_t small_threshold = config::segcompaction_small_threshold;
for (int64_t i = 0; i < all_segments.size(); ++i) {
segment_v2::SegmentSharedPtr seg = all_segments[i];
if (seg->num_rows() > small_threshold) {
if (let_big_terminate) {
is_terminated_by_big = true;
break;
} else {
RETURN_NOT_OK(_rename_compacted_segment_plain(_segcompacted_point++));
}
} else {
let_big_terminate = true; // break if find a big after small
segments->push_back(seg);
}
}
size_t s = segments->size();
if (!is_terminated_by_big && s <= (config::segcompaction_threshold_segment_num / 2)) {
// start with big segments and end with small, better to do it in next
// round to compact more at once
segments->clear();
return Status::OK();
}
if (s == 1) { // poor bachelor, let it go
VLOG_DEBUG << "only one candidate segment";
RETURN_NOT_OK(_rename_compacted_segment_plain(_segcompacted_point++));
segments->clear();
return Status::OK();
}
if (VLOG_DEBUG_IS_ON) {
vlog_buffer.clear();
for (auto& segment : (*segments.get())) {
fmt::format_to(vlog_buffer, "[id:{} num_rows:{}]", segment->id(), segment->num_rows());
}
VLOG_DEBUG << "candidate segments num:" << s
<< " list of candidates:" << fmt::to_string(vlog_buffer);
}
return Status::OK();
}
Status BetaRowsetWriter::_get_segcompaction_candidates(SegCompactionCandidatesSharedPtr& segments,
bool is_last) {
if (is_last) {
VLOG_DEBUG << "segcompaction last few segments";
// currently we only rename remaining segments to reduce wait time
// so that transaction can be committed ASAP
RETURN_NOT_OK(_load_noncompacted_segments(segments.get()));
for (int i = 0; i < segments->size(); ++i) {
RETURN_NOT_OK(_rename_compacted_segment_plain(_segcompacted_point++));
}
segments->clear();
} else {
RETURN_NOT_OK(_find_longest_consecutive_small_segment(segments));
}
return Status::OK();
}
bool BetaRowsetWriter::_check_and_set_is_doing_segcompaction() {
std::lock_guard<std::mutex> l(_is_doing_segcompaction_lock);
if (!_is_doing_segcompaction) {
_is_doing_segcompaction = true;
return true;
} else {
return false;
}
}
Status BetaRowsetWriter::_segcompaction_if_necessary() {
Status status = Status::OK();
if (!config::enable_segcompaction || !config::enable_storage_vectorization ||
!_check_and_set_is_doing_segcompaction()) {
return status;
}
if (_segcompaction_status.load() != OLAP_SUCCESS) {
status = Status::OLAPInternalError(OLAP_ERR_SEGCOMPACTION_FAILED);
} else if ((_num_segment - _segcompacted_point) >=
config::segcompaction_threshold_segment_num) {
SegCompactionCandidatesSharedPtr segments = std::make_shared<SegCompactionCandidates>();
status = _get_segcompaction_candidates(segments, false);
if (LIKELY(status.ok()) && (segments->size() > 0)) {
LOG(INFO) << "submit segcompaction task, segment num:" << _num_segment
<< ", segcompacted_point:" << _segcompacted_point;
status = StorageEngine::instance()->submit_seg_compaction_task(this, segments);
if (status.ok()) {
return status;
}
}
}
{
std::lock_guard lk(_is_doing_segcompaction_lock);
_is_doing_segcompaction = false;
_segcompacting_cond.notify_all();
}
return status;
}
Status BetaRowsetWriter::_segcompaction_ramaining_if_necessary() {
Status status = Status::OK();
DCHECK_EQ(_is_doing_segcompaction, false);
if (!config::enable_segcompaction || !config::enable_storage_vectorization) {
return Status::OK();
}
if (_segcompaction_status.load() != OLAP_SUCCESS) {
return Status::OLAPInternalError(OLAP_ERR_SEGCOMPACTION_FAILED);
}
if (!_is_segcompacted() || _segcompacted_point == _num_segment) {
// no need if never segcompact before or all segcompacted
return Status::OK();
}
_is_doing_segcompaction = true;
SegCompactionCandidatesSharedPtr segments = std::make_shared<SegCompactionCandidates>();
status = _get_segcompaction_candidates(segments, true);
if (LIKELY(status.ok()) && (segments->size() > 0)) {
LOG(INFO) << "submit segcompaction remaining task, segment num:" << _num_segment
<< ", segcompacted_point:" << _segcompacted_point;
status = StorageEngine::instance()->submit_seg_compaction_task(this, segments);
if (status.ok()) {
return status;
}
}
_is_doing_segcompaction = false;
return status;
}
Status BetaRowsetWriter::_add_block(const vectorized::Block* block,
std::unique_ptr<segment_v2::SegmentWriter>* segment_writer) {
size_t block_size_in_bytes = block->bytes();
@ -133,7 +573,18 @@ Status BetaRowsetWriter::_add_block(const vectorized::Block* block,
row_offset += input_row_num;
} while (row_offset < block_row_num);
_num_rows_written += block_row_num;
_raw_num_rows_written += block_row_num;
return Status::OK();
}
Status BetaRowsetWriter::_add_block_for_segcompaction(
const vectorized::Block* block,
std::unique_ptr<segment_v2::SegmentWriter>* segment_writer) {
auto s = (*segment_writer)->append_block(block, 0, block->rows());
if (UNLIKELY(!s.ok())) {
LOG(WARNING) << "failed to append block: " << s.to_string();
return Status::OLAPInternalError(OLAP_ERR_WRITER_DATA_WRITE_ERROR);
}
return Status::OK();
}
@ -152,7 +603,7 @@ Status BetaRowsetWriter::_add_row(const RowType& row) {
_segment_writer->num_rows_written() >= _context.max_rows_per_segment)) {
RETURN_NOT_OK(_flush_segment_writer(&_segment_writer));
}
++_num_rows_written;
++_raw_num_rows_written;
return Status::OK();
}
@ -183,12 +634,14 @@ Status BetaRowsetWriter::add_rowset_for_linked_schema_change(RowsetSharedPtr row
Status BetaRowsetWriter::flush() {
if (_segment_writer != nullptr) {
RETURN_NOT_OK(_flush_segment_writer(&_segment_writer));
RETURN_NOT_OK(_segcompaction_if_necessary());
}
return Status::OK();
}
Status BetaRowsetWriter::flush_single_memtable(MemTable* memtable, int64_t* flush_size) {
int64_t current_flush_size = _total_data_size + _total_index_size;
int64_t size = 0;
int64_t sum_size = 0;
// Create segment writer for each memtable, so that
// all memtables can be flushed in parallel.
std::unique_ptr<segment_v2::SegmentWriter> writer;
@ -196,6 +649,7 @@ Status BetaRowsetWriter::flush_single_memtable(MemTable* memtable, int64_t* flus
MemTable::Iterator it(memtable);
for (it.seek_to_first(); it.valid(); it.next()) {
if (PREDICT_FALSE(writer == nullptr)) {
RETURN_NOT_OK(_segcompaction_if_necessary());
RETURN_NOT_OK(_create_segment_writer(&writer));
}
ContiguousRow dst_row = it.get_current_row();
@ -207,16 +661,24 @@ Status BetaRowsetWriter::flush_single_memtable(MemTable* memtable, int64_t* flus
if (PREDICT_FALSE(writer->estimate_segment_size() >= MAX_SEGMENT_SIZE ||
writer->num_rows_written() >= _context.max_rows_per_segment)) {
RETURN_NOT_OK(_flush_segment_writer(&writer));
auto s = _flush_segment_writer(&writer, &size);
sum_size += size;
if (OLAP_UNLIKELY(!s.ok())) {
*flush_size = sum_size;
return s;
}
}
++_num_rows_written;
}
if (writer != nullptr) {
RETURN_NOT_OK(_flush_segment_writer(&writer));
auto s = _flush_segment_writer(&writer, &size);
sum_size += size;
*flush_size = sum_size;
if (OLAP_UNLIKELY(!s.ok())) {
return s;
}
}
*flush_size = (_total_data_size + _total_index_size) - current_flush_size;
return Status::OK();
}
@ -224,6 +686,7 @@ Status BetaRowsetWriter::flush_single_memtable(const vectorized::Block* block) {
if (block->rows() == 0) {
return Status::OK();
}
RETURN_NOT_OK(_segcompaction_if_necessary());
std::unique_ptr<segment_v2::SegmentWriter> writer;
RETURN_NOT_OK(_create_segment_writer(&writer));
RETURN_NOT_OK(_add_block(block, &writer));
@ -231,6 +694,23 @@ Status BetaRowsetWriter::flush_single_memtable(const vectorized::Block* block) {
return Status::OK();
}
Status BetaRowsetWriter::_wait_flying_segcompaction() {
std::unique_lock<std::mutex> l(_is_doing_segcompaction_lock);
uint64_t begin_wait = GetCurrentTimeMicros();
while (_is_doing_segcompaction) {
// change sync wait to async?
_segcompacting_cond.wait(l);
}
uint64_t elapsed = GetCurrentTimeMicros() - begin_wait;
if (elapsed >= MICROS_PER_SEC) {
LOG(INFO) << "wait flying segcompaction finish time:" << elapsed << "us";
}
if (_segcompaction_status.load() != OLAP_SUCCESS) {
return Status::OLAPInternalError(OLAP_ERR_SEGCOMPACTION_FAILED);
}
return Status::OK();
}
RowsetSharedPtr BetaRowsetWriter::build() {
// TODO(lingbin): move to more better place, or in a CreateBlockBatch?
for (auto& file_writer : _file_writers) {
@ -241,6 +721,26 @@ RowsetSharedPtr BetaRowsetWriter::build() {
return nullptr;
}
}
Status status;
status = _wait_flying_segcompaction();
if (!status.ok()) {
LOG(WARNING) << "segcompaction failed when build new rowset 1st wait, res=" << status;
return nullptr;
}
status = _segcompaction_ramaining_if_necessary();
if (!status.ok()) {
LOG(WARNING) << "segcompaction failed when build new rowset, res=" << status;
return nullptr;
}
status = _wait_flying_segcompaction();
if (!status.ok()) {
LOG(WARNING) << "segcompaction failed when build new rowset 2nd wait, res=" << status;
return nullptr;
}
if (_segcompaction_file_writer) {
_segcompaction_file_writer->close();
}
// When building a rowset, we must ensure that the current _segment_writer has been
// flushed, that is, the current _segment_writer is nullptr
DCHECK(_segment_writer == nullptr) << "segment must be null when build rowset";
@ -255,8 +755,8 @@ RowsetSharedPtr BetaRowsetWriter::build() {
}
RowsetSharedPtr rowset;
auto status = RowsetFactory::create_rowset(_context.tablet_schema, _context.rowset_dir,
_rowset_meta, &rowset);
status = RowsetFactory::create_rowset(_context.tablet_schema, _context.rowset_dir, _rowset_meta,
&rowset);
if (!status.ok()) {
LOG(WARNING) << "rowset init failed when build new rowset, res=" << status;
return nullptr;
@ -266,23 +766,47 @@ RowsetSharedPtr BetaRowsetWriter::build() {
}
void BetaRowsetWriter::_build_rowset_meta(std::shared_ptr<RowsetMeta> rowset_meta) {
rowset_meta->set_num_rows(_num_rows_written);
rowset_meta->set_total_disk_size(_total_data_size);
rowset_meta->set_data_disk_size(_total_data_size);
rowset_meta->set_index_disk_size(_total_index_size);
// TODO write zonemap to meta
rowset_meta->set_empty(_num_rows_written == 0);
rowset_meta->set_creation_time(time(nullptr));
rowset_meta->set_num_segments(_num_segment);
if (_num_segment <= 1) {
int64_t num_seg = _is_segcompacted() ? _num_segcompacted : _num_segment;
int64_t num_rows_written = 0;
int64_t total_data_size = 0;
int64_t total_index_size = 0;
std::vector<uint32_t> segment_num_rows;
std::vector<KeyBoundsPB> segments_encoded_key_bounds;
{
std::lock_guard<std::mutex> lock(_segid_statistics_map_mutex);
for (const auto& itr : _segid_statistics_map) {
num_rows_written += itr.second.row_num;
segment_num_rows.push_back(itr.second.row_num);
total_data_size += itr.second.data_size;
total_index_size += itr.second.index_size;
segments_encoded_key_bounds.push_back(itr.second.key_bounds);
}
}
rowset_meta->set_num_segments(num_seg);
if (num_seg <= 1) {
rowset_meta->set_segments_overlap(NONOVERLAPPING);
}
_segment_num_rows = segment_num_rows;
for (auto itr = _segments_encoded_key_bounds.begin(); itr != _segments_encoded_key_bounds.end();
++itr) {
segments_encoded_key_bounds.push_back(*itr);
}
CHECK_EQ(segments_encoded_key_bounds.size(), num_seg);
rowset_meta->set_num_rows(num_rows_written + _num_rows_written);
rowset_meta->set_total_disk_size(total_data_size + _total_data_size);
rowset_meta->set_data_disk_size(total_data_size + _total_data_size);
rowset_meta->set_index_disk_size(total_index_size + _total_index_size);
rowset_meta->set_segments_key_bounds(segments_encoded_key_bounds);
// TODO write zonemap to meta
rowset_meta->set_empty((num_rows_written + _num_rows_written) == 0);
rowset_meta->set_creation_time(time(nullptr));
if (_is_pending) {
rowset_meta->set_rowset_state(COMMITTED);
} else {
rowset_meta->set_rowset_state(VISIBLE);
}
rowset_meta->set_segments_key_bounds(_segments_encoded_key_bounds);
}
RowsetSharedPtr BetaRowsetWriter::build_tmp() {
@ -300,10 +824,19 @@ RowsetSharedPtr BetaRowsetWriter::build_tmp() {
return rowset;
}
Status BetaRowsetWriter::_create_segment_writer(
std::unique_ptr<segment_v2::SegmentWriter>* writer) {
int32_t segment_id = _num_segment.fetch_add(1);
auto path = BetaRowset::segment_file_path(_context.rowset_dir, _context.rowset_id, segment_id);
Status BetaRowsetWriter::_do_create_segment_writer(
std::unique_ptr<segment_v2::SegmentWriter>* writer, bool is_segcompaction, int64_t begin,
int64_t end) {
std::string path;
int32_t segment_id = 0;
if (is_segcompaction) {
DCHECK(begin >= 0 && end >= 0);
path = BetaRowset::local_segment_path_segcompacted(_context.rowset_dir, _context.rowset_id,
begin, end);
} else {
segment_id = _num_segment.fetch_add(1);
path = BetaRowset::segment_file_path(_context.rowset_dir, _context.rowset_id, segment_id);
}
auto fs = _rowset_meta->fs();
if (!fs) {
return Status::OLAPInternalError(OLAP_ERR_INIT_FAILED);
@ -319,12 +852,23 @@ Status BetaRowsetWriter::_create_segment_writer(
DCHECK(file_writer != nullptr);
segment_v2::SegmentWriterOptions writer_options;
writer_options.enable_unique_key_merge_on_write = _context.enable_unique_key_merge_on_write;
writer->reset(new segment_v2::SegmentWriter(file_writer.get(), segment_id,
_context.tablet_schema, _context.data_dir,
_context.max_rows_per_segment, writer_options));
{
std::lock_guard<SpinLock> l(_lock);
_file_writers.push_back(std::move(file_writer));
if (is_segcompaction) {
writer->reset(new segment_v2::SegmentWriter(file_writer.get(), _num_segcompacted,
_context.tablet_schema, _context.data_dir,
_context.max_rows_per_segment, writer_options));
if (_segcompaction_file_writer != nullptr) {
_segcompaction_file_writer->close();
}
_segcompaction_file_writer.reset(file_writer.release());
} else {
writer->reset(new segment_v2::SegmentWriter(file_writer.get(), segment_id,
_context.tablet_schema, _context.data_dir,
_context.max_rows_per_segment, writer_options));
{
std::lock_guard<SpinLock> l(_lock);
_file_writers.push_back(std::move(file_writer));
}
}
auto s = (*writer)->init();
@ -336,7 +880,31 @@ Status BetaRowsetWriter::_create_segment_writer(
return Status::OK();
}
Status BetaRowsetWriter::_flush_segment_writer(std::unique_ptr<segment_v2::SegmentWriter>* writer) {
Status BetaRowsetWriter::_create_segment_writer(
std::unique_ptr<segment_v2::SegmentWriter>* writer) {
size_t total_segment_num = _num_segment - _segcompacted_point + 1 + _num_segcompacted;
if (UNLIKELY(total_segment_num > config::max_segment_num_per_rowset)) {
LOG(ERROR) << "too many segments in rowset."
<< " max:" << config::max_segment_num_per_rowset
<< " _num_segment:" << _num_segment
<< " _segcompacted_point:" << _segcompacted_point
<< " _num_segcompacted:" << _num_segcompacted;
return Status::OLAPInternalError(OLAP_ERR_TOO_MANY_SEGMENTS);
} else {
return _do_create_segment_writer(writer, false, -1, -1);
}
}
Status BetaRowsetWriter::_create_segment_writer_for_segcompaction(
std::unique_ptr<segment_v2::SegmentWriter>* writer, uint64_t begin, uint64_t end) {
return _do_create_segment_writer(writer, true, begin, end);
}
Status BetaRowsetWriter::_flush_segment_writer(std::unique_ptr<segment_v2::SegmentWriter>* writer,
int64_t* flush_size) {
uint32_t segid = (*writer)->get_segment_id();
uint32_t row_num = (*writer)->num_rows_written();
if ((*writer)->num_rows_written() == 0) {
return Status::OK();
}
@ -347,22 +915,30 @@ Status BetaRowsetWriter::_flush_segment_writer(std::unique_ptr<segment_v2::Segme
LOG(WARNING) << "failed to finalize segment: " << s.to_string();
return Status::OLAPInternalError(OLAP_ERR_WRITER_DATA_WRITE_ERROR);
}
_total_data_size += segment_size;
_total_index_size += index_size;
KeyBoundsPB key_bounds;
Slice min_key = (*writer)->min_encoded_key();
Slice max_key = (*writer)->max_encoded_key();
DCHECK_LE(min_key.compare(max_key), 0);
key_bounds.set_min_key(min_key.to_string());
key_bounds.set_max_key(max_key.to_string());
Statistics segstat;
segstat.row_num = row_num;
segstat.data_size = segment_size;
segstat.index_size = index_size;
segstat.key_bounds = key_bounds;
{
std::lock_guard<SpinLock> l(_lock);
_segment_num_rows.resize(_num_segment);
_segments_encoded_key_bounds.resize(_num_segment);
_segment_num_rows[(*writer)->get_segment_id()] = (*writer)->num_rows_written();
_segments_encoded_key_bounds[(*writer)->get_segment_id()] = key_bounds;
std::lock_guard<std::mutex> lock(_segid_statistics_map_mutex);
CHECK_EQ(_segid_statistics_map.find(segid) == _segid_statistics_map.end(), true);
_segid_statistics_map.emplace(segid, segstat);
}
VLOG_DEBUG << "_segid_statistics_map add new record. segid:" << segid << " row_num:" << row_num
<< " data_size:" << segment_size << " index_size:" << index_size;
writer->reset();
if (flush_size) {
*flush_size = segment_size + index_size;
}
return Status::OK();
}

View File

@ -18,6 +18,7 @@
#pragma once
#include "olap/rowset/rowset_writer.h"
#include "vec/olap/vgeneric_iterators.h"
namespace doris {
namespace segment_v2 {
@ -28,6 +29,9 @@ namespace io {
class FileWriter;
} // namespace io
using SegCompactionCandidates = std::vector<segment_v2::SegmentSharedPtr>;
using SegCompactionCandidatesSharedPtr = std::shared_ptr<SegCompactionCandidates>;
class BetaRowsetWriter : public RowsetWriter {
public:
BetaRowsetWriter();
@ -62,7 +66,7 @@ public:
Version version() override { return _context.version; }
int64_t num_rows() const override { return _num_rows_written; }
int64_t num_rows() const override { return _raw_num_rows_written; }
RowsetId rowset_id() override { return _context.rowset_id; }
@ -74,42 +78,100 @@ public:
return Status::OK();
}
void compact_segments(SegCompactionCandidatesSharedPtr segments);
private:
template <typename RowType>
Status _add_row(const RowType& row);
Status _add_block(const vectorized::Block* block,
std::unique_ptr<segment_v2::SegmentWriter>* writer);
Status _add_block_for_segcompaction(const vectorized::Block* block,
std::unique_ptr<segment_v2::SegmentWriter>* writer);
Status _do_create_segment_writer(std::unique_ptr<segment_v2::SegmentWriter>* writer,
bool is_segcompaction, int64_t begin, int64_t end);
Status _create_segment_writer(std::unique_ptr<segment_v2::SegmentWriter>* writer);
Status _create_segment_writer_for_segcompaction(
std::unique_ptr<segment_v2::SegmentWriter>* writer, uint64_t begin, uint64_t end);
Status _flush_segment_writer(std::unique_ptr<segment_v2::SegmentWriter>* writer);
Status _flush_segment_writer(std::unique_ptr<segment_v2::SegmentWriter>* writer,
int64_t* flush_size = nullptr);
void _build_rowset_meta(std::shared_ptr<RowsetMeta> rowset_meta);
Status _segcompaction_if_necessary();
Status _segcompaction_ramaining_if_necessary();
vectorized::VMergeIterator* _get_segcompaction_reader(SegCompactionCandidatesSharedPtr segments,
std::shared_ptr<Schema> schema,
OlapReaderStatistics* stat,
uint64_t* merged_row_stat);
std::unique_ptr<segment_v2::SegmentWriter> _create_segcompaction_writer(uint64_t begin,
uint64_t end);
Status _delete_original_segments(uint32_t begin, uint32_t end);
Status _rename_compacted_segments(int64_t begin, int64_t end);
Status _rename_compacted_segment_plain(uint64_t seg_id);
Status _load_noncompacted_segments(std::vector<segment_v2::SegmentSharedPtr>* segments);
Status _find_longest_consecutive_small_segment(SegCompactionCandidatesSharedPtr segments);
Status _get_segcompaction_candidates(SegCompactionCandidatesSharedPtr& segments, bool is_last);
Status _wait_flying_segcompaction();
bool _is_segcompacted() { return (_num_segcompacted > 0) ? true : false; }
void _clear_statistics_for_deleting_segments_unsafe(uint64_t begin, uint64_t end);
Status _check_correctness(std::unique_ptr<OlapReaderStatistics> stat, uint64_t merged_row_stat,
uint64_t row_count, uint64_t begin, uint64_t end);
bool _check_and_set_is_doing_segcompaction();
Status _do_compact_segments(SegCompactionCandidatesSharedPtr segments);
private:
RowsetWriterContext _context;
std::shared_ptr<RowsetMeta> _rowset_meta;
std::atomic<int32_t> _num_segment;
std::atomic<int32_t> _segcompacted_point; // segemnts before this point have
// already been segment compacted
std::atomic<int32_t> _num_segcompacted; // index for segment compaction
/// When flushing the memtable in the load process, we do not use this writer but an independent writer.
/// Because we want to flush memtables in parallel.
/// In other processes, such as merger or schema change, we will use this unified writer for data writing.
std::unique_ptr<segment_v2::SegmentWriter> _segment_writer;
mutable SpinLock _lock; // protect following vectors.
// record rows number of every segment
std::vector<uint32_t> _segment_num_rows;
std::vector<io::FileWriterPtr> _file_writers;
// for unique key table with merge-on-write
std::vector<KeyBoundsPB> _segments_encoded_key_bounds;
// record rows number of every segment
std::vector<uint32_t> _segment_num_rows;
// counters and statistics maintained during data write
io::FileWriterPtr _segcompaction_file_writer;
// counters and statistics maintained during add_rowset
std::atomic<int64_t> _num_rows_written;
std::atomic<int64_t> _total_data_size;
std::atomic<int64_t> _total_index_size;
// TODO rowset Zonemap
// written rows by add_block/add_row (not effected by segcompaction)
std::atomic<int64_t> _raw_num_rows_written;
struct Statistics {
int64_t row_num;
int64_t data_size;
int64_t index_size;
KeyBoundsPB key_bounds;
};
std::map<uint32_t, Statistics> _segid_statistics_map;
std::mutex _segid_statistics_map_mutex;
bool _is_pending = false;
bool _already_built = false;
// ensure only one inflight segcompaction task for each rowset
std::atomic<bool> _is_doing_segcompaction;
// enforce compare-and-swap on _is_doing_segcompaction
std::mutex _is_doing_segcompaction_lock;
std::condition_variable _segcompacting_cond;
std::atomic<ErrorCode> _segcompaction_status;
fmt::memory_buffer vlog_buffer;
};
} // namespace doris

View File

@ -84,10 +84,15 @@ public:
static void init_column_meta(ColumnMetaPB* meta, uint32_t* column_id,
const TabletColumn& column, TabletSchemaSPtr tablet_schema);
uint32_t get_segment_id() { return _segment_id; }
uint32_t get_segment_id() const { return _segment_id; }
Slice min_encoded_key();
Slice max_encoded_key();
DataDir* get_data_dir() { return _data_dir; }
bool is_unique_key() { return _tablet_schema->keys_type() == UNIQUE_KEYS; }
private:
DISALLOW_COPY_AND_ASSIGN(SegmentWriter);
Status _write_data();

View File

@ -135,6 +135,7 @@ protected:
if (reader->rowset()->num_rows() != writer.num_rows() + _merged_rows + _filtered_rows) {
LOG(WARNING) << "fail to check row num! "
<< "source_rows=" << reader->rowset()->num_rows()
<< ", writer rows=" << writer.num_rows()
<< ", merged_rows=" << merged_rows()
<< ", filtered_rows=" << filtered_rows()
<< ", new_index_rows=" << writer.num_rows();

View File

@ -81,6 +81,8 @@ using strings::Substitute;
namespace doris {
DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(unused_rowsets_count, MetricUnit::ROWSETS);
DEFINE_GAUGE_METRIC_PROTOTYPE_5ARG(segcompaction_mem_consumption, MetricUnit::BYTES, "",
mem_consumption, Labels({{"type", "segcompaction"}}));
DEFINE_GAUGE_METRIC_PROTOTYPE_5ARG(compaction_mem_consumption, MetricUnit::BYTES, "",
mem_consumption, Labels({{"type", "compaction"}}));
DEFINE_GAUGE_METRIC_PROTOTYPE_5ARG(schema_change_mem_consumption, MetricUnit::BYTES, "",
@ -110,6 +112,8 @@ StorageEngine::StorageEngine(const EngineOptions& options)
_available_storage_medium_type_count(0),
_effective_cluster_id(-1),
_is_all_cluster_id_exist(true),
_segcompaction_mem_tracker(
std::make_shared<MemTrackerLimiter>(-1, "StorageEngine::SegCompaction")),
_compaction_mem_tracker(
std::make_shared<MemTrackerLimiter>(-1, "StorageEngine::AutoCompaction")),
_segment_meta_mem_tracker(std::make_unique<MemTracker>("StorageEngine::SegmentMeta")),
@ -134,6 +138,8 @@ StorageEngine::StorageEngine(const EngineOptions& options)
// std::lock_guard<std::mutex> lock(_gc_mutex);
return _unused_rowsets.size();
});
REGISTER_HOOK_METRIC(segcompaction_mem_consumption,
[this]() { return _segcompaction_mem_tracker->consumption(); });
REGISTER_HOOK_METRIC(compaction_mem_consumption,
[this]() { return _compaction_mem_tracker->consumption(); });
REGISTER_HOOK_METRIC(schema_change_mem_consumption,
@ -142,6 +148,7 @@ StorageEngine::StorageEngine(const EngineOptions& options)
StorageEngine::~StorageEngine() {
DEREGISTER_HOOK_METRIC(unused_rowsets_count);
DEREGISTER_HOOK_METRIC(segcompaction_mem_consumption);
DEREGISTER_HOOK_METRIC(compaction_mem_consumption);
DEREGISTER_HOOK_METRIC(schema_change_mem_consumption);
_clear();
@ -157,6 +164,10 @@ StorageEngine::~StorageEngine() {
_quick_compaction_thread_pool->shutdown();
}
if (_seg_compaction_thread_pool) {
_seg_compaction_thread_pool->shutdown();
}
if (_tablet_meta_checkpoint_thread_pool) {
_tablet_meta_checkpoint_thread_pool->shutdown();
}

View File

@ -58,6 +58,10 @@ class BlockManager;
class MemTableFlushExecutor;
class Tablet;
class TaskWorkerPool;
class BetaRowsetWriter;
using SegCompactionCandidates = std::vector<segment_v2::SegmentSharedPtr>;
using SegCompactionCandidatesSharedPtr = std::shared_ptr<SegCompactionCandidates>;
// StorageEngine singleton to manage all Table pointers.
// Providing add/drop/get operations.
@ -175,6 +179,9 @@ public:
Status get_compaction_status_json(std::string* result);
std::shared_ptr<MemTrackerLimiter> segcompaction_mem_tracker() {
return _segcompaction_mem_tracker;
}
std::shared_ptr<MemTrackerLimiter> compaction_mem_tracker() { return _compaction_mem_tracker; }
MemTracker* segment_meta_mem_tracker() { return _segment_meta_mem_tracker.get(); }
std::shared_ptr<MemTrackerLimiter> schema_change_mem_tracker() {
@ -191,6 +198,8 @@ public:
Status submit_compaction_task(TabletSharedPtr tablet, CompactionType compaction_type);
Status submit_quick_compaction_task(TabletSharedPtr tablet);
Status submit_seg_compaction_task(BetaRowsetWriter* writer,
SegCompactionCandidatesSharedPtr segments);
std::unique_ptr<ThreadPool>& tablet_publish_txn_thread_pool() {
return _tablet_publish_txn_thread_pool;
@ -277,6 +286,9 @@ private:
void _cache_file_cleaner_tasks_producer_callback();
Status _handle_seg_compaction(BetaRowsetWriter* writer,
SegCompactionCandidatesSharedPtr segments);
private:
struct CompactionCandidate {
CompactionCandidate(uint32_t nicumulative_compaction_, int64_t tablet_id_, uint32_t index_)
@ -322,6 +334,8 @@ private:
// map<rowset_id(str), RowsetSharedPtr>, if we use RowsetId as the key, we need custom hash func
std::unordered_map<std::string, RowsetSharedPtr> _unused_rowsets;
// Count the memory consumption of segment compaction tasks.
std::shared_ptr<MemTrackerLimiter> _segcompaction_mem_tracker;
// Count the memory consumption of all Base and Cumulative tasks.
std::shared_ptr<MemTrackerLimiter> _compaction_mem_tracker;
// This mem tracker is only for tracking memory use by segment meta data such as footer or index page.
@ -377,6 +391,7 @@ private:
std::unique_ptr<ThreadPool> _quick_compaction_thread_pool;
std::unique_ptr<ThreadPool> _base_compaction_thread_pool;
std::unique_ptr<ThreadPool> _cumu_compaction_thread_pool;
std::unique_ptr<ThreadPool> _seg_compaction_thread_pool;
std::unique_ptr<ThreadPool> _tablet_publish_txn_thread_pool;

View File

@ -185,6 +185,7 @@ public:
UIntGauge* brpc_function_endpoint_stub_count;
UIntGauge* tablet_writer_count;
UIntGauge* segcompaction_mem_consumption;
UIntGauge* compaction_mem_consumption;
UIntGauge* load_mem_consumption;
UIntGauge* load_channel_mem_consumption;

View File

@ -15,13 +15,14 @@
// specific language governing permissions and limitations
// under the License.
#include <vec/olap/vgeneric_iterators.h>
#include <memory>
#include <queue>
#include <utility>
#include "common/status.h"
#include "olap/iterators.h"
#include "olap/rowset/segment_v2/column_reader.h"
#include "olap/rowset/segment_v2/segment.h"
#include "olap/schema.h"
#include "vec/core/block.h"
@ -29,6 +30,139 @@
namespace doris {
namespace vectorized {
VStatisticsIterator::~VStatisticsIterator() {
for (auto& pair : _column_iterators_map) {
delete pair.second;
}
}
Status VStatisticsIterator::init(const StorageReadOptions& opts) {
if (!_init) {
_push_down_agg_type_opt = opts.push_down_agg_type_opt;
for (size_t i = 0; i < _schema.num_column_ids(); i++) {
auto cid = _schema.column_id(i);
auto unique_id = _schema.column(cid)->unique_id();
if (_column_iterators_map.count(unique_id) < 1) {
RETURN_IF_ERROR(_segment->new_column_iterator(opts.tablet_schema->column(cid),
&_column_iterators_map[unique_id]));
}
_column_iterators.push_back(_column_iterators_map[unique_id]);
}
_target_rows = _push_down_agg_type_opt == TPushAggOp::MINMAX ? 2 : _segment->num_rows();
_init = true;
}
return Status::OK();
}
Status VStatisticsIterator::next_batch(Block* block) {
DCHECK(block->columns() == _column_iterators.size());
if (_output_rows < _target_rows) {
block->clear_column_data();
auto columns = block->mutate_columns();
size_t size = _push_down_agg_type_opt == TPushAggOp::MINMAX
? 2
: std::min(_target_rows - _output_rows, MAX_ROW_SIZE_IN_COUNT);
if (_push_down_agg_type_opt == TPushAggOp::COUNT) {
size = std::min(_target_rows - _output_rows, MAX_ROW_SIZE_IN_COUNT);
for (int i = 0; i < block->columns(); ++i) {
columns[i]->resize(size);
}
} else {
for (int i = 0; i < block->columns(); ++i) {
_column_iterators[i]->next_batch_of_zone_map(&size, columns[i]);
}
}
_output_rows += size;
return Status::OK();
}
return Status::EndOfFile("End of VStatisticsIterator");
}
Status VMergeIteratorContext::block_reset(const std::shared_ptr<Block>& block) {
if (!*block) {
const Schema& schema = _iter->schema();
const auto& column_ids = schema.column_ids();
for (size_t i = 0; i < schema.num_column_ids(); ++i) {
auto column_desc = schema.column(column_ids[i]);
auto data_type = Schema::get_data_type_ptr(*column_desc);
if (data_type == nullptr) {
return Status::RuntimeError("invalid data type");
}
auto column = data_type->create_column();
column->reserve(_block_row_max);
block->insert(ColumnWithTypeAndName(std::move(column), data_type, column_desc->name()));
}
} else {
block->clear_column_data();
}
return Status::OK();
}
bool VMergeIteratorContext::compare(const VMergeIteratorContext& rhs) const {
int cmp_res = UNLIKELY(_compare_columns)
? _block->compare_at(_index_in_block, rhs._index_in_block,
_compare_columns, *rhs._block, -1)
: _block->compare_at(_index_in_block, rhs._index_in_block,
_num_key_columns, *rhs._block, -1);
if (cmp_res != 0) {
return UNLIKELY(_is_reverse) ? cmp_res < 0 : cmp_res > 0;
}
auto col_cmp_res = 0;
if (_sequence_id_idx != -1) {
col_cmp_res = _block->compare_column_at(_index_in_block, rhs._index_in_block,
_sequence_id_idx, *rhs._block, -1);
}
auto result = col_cmp_res == 0 ? data_id() < rhs.data_id() : col_cmp_res < 0;
if (_is_unique) {
result ? set_skip(true) : rhs.set_skip(true);
}
return result;
}
// `advanced = false` when current block finished
void VMergeIteratorContext::copy_rows(Block* block, bool advanced) {
Block& src = *_block;
Block& dst = *block;
if (_cur_batch_num == 0) {
return;
}
// copy a row to dst block column by column
size_t start = _index_in_block - _cur_batch_num + 1 - advanced;
DCHECK(start >= 0);
for (size_t i = 0; i < _num_columns; ++i) {
auto& s_col = src.get_by_position(i);
auto& d_col = dst.get_by_position(i);
ColumnPtr& s_cp = s_col.column;
ColumnPtr& d_cp = d_col.column;
d_cp->assume_mutable()->insert_range_from(*s_cp, start, _cur_batch_num);
}
_cur_batch_num = 0;
}
void VMergeIteratorContext::copy_rows(BlockView* view, bool advanced) {
if (_cur_batch_num == 0) {
return;
}
size_t start = _index_in_block - _cur_batch_num + 1 - advanced;
DCHECK(start >= 0);
for (size_t i = 0; i < _cur_batch_num; ++i) {
view->push_back({_block, static_cast<int>(start + i), false});
}
_cur_batch_num = 0;
}
// This iterator will generate ordered data. For example for schema
// (int, int) this iterator will generator data like
@ -113,251 +247,6 @@ Status VAutoIncrementIterator::init(const StorageReadOptions& opts) {
return Status::OK();
}
class VStatisticsIterator : public RowwiseIterator {
public:
// Will generate num_rows rows in total
VStatisticsIterator(std::shared_ptr<Segment> segment, const Schema& schema)
: _segment(std::move(segment)), _schema(schema) {}
~VStatisticsIterator() override {
for (auto& pair : _column_iterators_map) {
delete pair.second;
}
}
Status init(const StorageReadOptions& opts) override {
if (!_init) {
_push_down_agg_type_opt = opts.push_down_agg_type_opt;
for (size_t i = 0; i < _schema.num_column_ids(); i++) {
auto cid = _schema.column_id(i);
auto unique_id = _schema.column(cid)->unique_id();
if (_column_iterators_map.count(unique_id) < 1) {
RETURN_IF_ERROR(_segment->new_column_iterator(
opts.tablet_schema->column(cid), &_column_iterators_map[unique_id]));
}
_column_iterators.push_back(_column_iterators_map[unique_id]);
}
_target_rows = _push_down_agg_type_opt == TPushAggOp::MINMAX ? 2 : _segment->num_rows();
_init = true;
}
return Status::OK();
}
Status next_batch(Block* block) override {
DCHECK(block->columns() == _column_iterators.size());
if (_output_rows < _target_rows) {
block->clear_column_data();
auto columns = block->mutate_columns();
size_t size = _push_down_agg_type_opt == TPushAggOp::MINMAX
? 2
: std::min(_target_rows - _output_rows, MAX_ROW_SIZE_IN_COUNT);
if (_push_down_agg_type_opt == TPushAggOp::COUNT) {
size = std::min(_target_rows - _output_rows, MAX_ROW_SIZE_IN_COUNT);
for (int i = 0; i < block->columns(); ++i) {
columns[i]->resize(size);
}
} else {
for (int i = 0; i < block->columns(); ++i) {
_column_iterators[i]->next_batch_of_zone_map(&size, columns[i]);
}
}
_output_rows += size;
return Status::OK();
}
return Status::EndOfFile("End of VStatisticsIterator");
}
const Schema& schema() const override { return _schema; }
private:
std::shared_ptr<Segment> _segment;
const Schema& _schema;
size_t _target_rows = 0;
size_t _output_rows = 0;
bool _init = false;
TPushAggOp::type _push_down_agg_type_opt;
std::map<int32_t, ColumnIterator*> _column_iterators_map;
std::vector<ColumnIterator*> _column_iterators;
static constexpr size_t MAX_ROW_SIZE_IN_COUNT = 65535;
};
// Used to store merge state for a VMergeIterator input.
// This class will iterate all data from internal iterator
// through client call advance().
// Usage:
// VMergeIteratorContext ctx(iter);
// RETURN_IF_ERROR(ctx.init());
// while (ctx.valid()) {
// visit(ctx.current_row());
// RETURN_IF_ERROR(ctx.advance());
// }
class VMergeIteratorContext {
public:
VMergeIteratorContext(RowwiseIterator* iter, int sequence_id_idx, bool is_unique,
bool is_reverse, std::vector<uint32_t>* read_orderby_key_columns)
: _iter(iter),
_sequence_id_idx(sequence_id_idx),
_is_unique(is_unique),
_is_reverse(is_reverse),
_num_columns(iter->schema().num_column_ids()),
_num_key_columns(iter->schema().num_key_columns()),
_compare_columns(read_orderby_key_columns) {}
VMergeIteratorContext(const VMergeIteratorContext&) = delete;
VMergeIteratorContext(VMergeIteratorContext&&) = delete;
VMergeIteratorContext& operator=(const VMergeIteratorContext&) = delete;
VMergeIteratorContext& operator=(VMergeIteratorContext&&) = delete;
~VMergeIteratorContext() {
delete _iter;
_iter = nullptr;
}
Status block_reset(const std::shared_ptr<Block>& block) {
if (!*block) {
const Schema& schema = _iter->schema();
const auto& column_ids = schema.column_ids();
for (size_t i = 0; i < schema.num_column_ids(); ++i) {
auto column_desc = schema.column(column_ids[i]);
auto data_type = Schema::get_data_type_ptr(*column_desc);
if (data_type == nullptr) {
return Status::RuntimeError("invalid data type");
}
auto column = data_type->create_column();
column->reserve(_block_row_max);
block->insert(
ColumnWithTypeAndName(std::move(column), data_type, column_desc->name()));
}
} else {
block->clear_column_data();
}
return Status::OK();
}
// Initialize this context and will prepare data for current_row()
Status init(const StorageReadOptions& opts);
bool compare(const VMergeIteratorContext& rhs) const {
int cmp_res = UNLIKELY(_compare_columns)
? _block->compare_at(_index_in_block, rhs._index_in_block,
_compare_columns, *rhs._block, -1)
: _block->compare_at(_index_in_block, rhs._index_in_block,
_num_key_columns, *rhs._block, -1);
if (cmp_res != 0) {
return UNLIKELY(_is_reverse) ? cmp_res < 0 : cmp_res > 0;
}
auto col_cmp_res = 0;
if (_sequence_id_idx != -1) {
col_cmp_res = _block->compare_column_at(_index_in_block, rhs._index_in_block,
_sequence_id_idx, *rhs._block, -1);
}
auto result = col_cmp_res == 0 ? data_id() < rhs.data_id() : col_cmp_res < 0;
if (_is_unique) {
result ? set_skip(true) : rhs.set_skip(true);
}
return result;
}
// `advanced = false` when current block finished
void copy_rows(Block* block, bool advanced = true) {
Block& src = *_block;
Block& dst = *block;
if (_cur_batch_num == 0) {
return;
}
// copy a row to dst block column by column
size_t start = _index_in_block - _cur_batch_num + 1 - advanced;
DCHECK(start >= 0);
for (size_t i = 0; i < _num_columns; ++i) {
auto& s_col = src.get_by_position(i);
auto& d_col = dst.get_by_position(i);
ColumnPtr& s_cp = s_col.column;
ColumnPtr& d_cp = d_col.column;
d_cp->assume_mutable()->insert_range_from(*s_cp, start, _cur_batch_num);
}
_cur_batch_num = 0;
}
void copy_rows(BlockView* view, bool advanced = true) {
if (_cur_batch_num == 0) {
return;
}
size_t start = _index_in_block - _cur_batch_num + 1 - advanced;
DCHECK(start >= 0);
for (size_t i = 0; i < _cur_batch_num; ++i) {
view->push_back({_block, static_cast<int>(start + i), false});
}
_cur_batch_num = 0;
}
RowLocation current_row_location() {
DCHECK(_record_rowids);
return _block_row_locations[_index_in_block];
}
// Advance internal row index to next valid row
// Return error if error happens
// Don't call this when valid() is false, action is undefined
Status advance();
// Return if it has remaining data in this context.
// Only when this function return true, current_row()
// will return a valid row
bool valid() const { return _valid; }
uint64_t data_id() const { return _iter->data_id(); }
bool need_skip() const { return _skip; }
void set_skip(bool skip) const { _skip = skip; }
void add_cur_batch() { _cur_batch_num++; }
void reset_cur_batch() { _cur_batch_num = 0; }
bool is_cur_block_finished() { return _index_in_block == _block->rows() - 1; }
private:
// Load next block into _block
Status _load_next_block();
RowwiseIterator* _iter;
int _sequence_id_idx = -1;
bool _is_unique = false;
bool _is_reverse = false;
bool _valid = false;
mutable bool _skip = false;
size_t _index_in_block = -1;
// 4096 minus 16 + 16 bytes padding that in padding pod array
int _block_row_max = 4064;
int _num_columns;
int _num_key_columns;
std::vector<uint32_t>* _compare_columns;
std::vector<RowLocation> _block_row_locations;
bool _record_rowids = false;
size_t _cur_batch_num = 0;
// used to store data load from iterator->next_batch(Block*)
std::shared_ptr<Block> _block;
// used to store data still on block view
std::list<std::shared_ptr<Block>> _block_list;
};
Status VMergeIteratorContext::init(const StorageReadOptions& opts) {
_block_row_max = opts.block_row_max;
_record_rowids = opts.record_rowids;
@ -419,139 +308,6 @@ Status VMergeIteratorContext::_load_next_block() {
return Status::OK();
}
class VMergeIterator : public RowwiseIterator {
public:
// VMergeIterator takes the ownership of input iterators
VMergeIterator(std::vector<RowwiseIterator*>& iters, int sequence_id_idx, bool is_unique,
bool is_reverse, uint64_t* merged_rows)
: _origin_iters(iters),
_sequence_id_idx(sequence_id_idx),
_is_unique(is_unique),
_is_reverse(is_reverse),
_merged_rows(merged_rows) {}
~VMergeIterator() override {
while (!_merge_heap.empty()) {
auto ctx = _merge_heap.top();
_merge_heap.pop();
delete ctx;
}
}
Status init(const StorageReadOptions& opts) override;
Status next_batch(Block* block) override { return _next_batch(block); }
Status next_block_view(BlockView* block_view) override { return _next_batch(block_view); }
bool support_return_data_by_ref() override { return true; }
const Schema& schema() const override { return *_schema; }
Status current_block_row_locations(std::vector<RowLocation>* block_row_locations) override {
DCHECK(_record_rowids);
*block_row_locations = _block_row_locations;
return Status::OK();
}
bool update_profile(RuntimeProfile* profile) override {
if (!_origin_iters.empty()) {
return (*_origin_iters.begin())->update_profile(profile);
}
return false;
}
private:
int _get_size(Block* block) { return block->rows(); }
int _get_size(BlockView* block_view) { return block_view->size(); }
template <typename T>
Status _next_batch(T* block) {
if (UNLIKELY(_record_rowids)) {
_block_row_locations.resize(_block_row_max);
}
size_t row_idx = 0;
VMergeIteratorContext* pre_ctx = nullptr;
while (_get_size(block) < _block_row_max) {
if (_merge_heap.empty()) {
break;
}
auto ctx = _merge_heap.top();
_merge_heap.pop();
if (!ctx->need_skip()) {
ctx->add_cur_batch();
if (pre_ctx != ctx) {
if (pre_ctx) {
pre_ctx->copy_rows(block);
}
pre_ctx = ctx;
}
if (UNLIKELY(_record_rowids)) {
_block_row_locations[row_idx] = ctx->current_row_location();
}
row_idx++;
if (ctx->is_cur_block_finished() || row_idx >= _block_row_max) {
// current block finished, ctx not advance
// so copy start_idx = (_index_in_block - _cur_batch_num + 1)
ctx->copy_rows(block, false);
pre_ctx = nullptr;
}
} else if (_merged_rows != nullptr) {
(*_merged_rows)++;
// need skip cur row, so flush rows in pre_ctx
if (pre_ctx) {
pre_ctx->copy_rows(block);
pre_ctx = nullptr;
}
}
RETURN_IF_ERROR(ctx->advance());
if (ctx->valid()) {
_merge_heap.push(ctx);
} else {
// Release ctx earlier to reduce resource consumed
delete ctx;
}
}
if (!_merge_heap.empty()) {
return Status::OK();
}
// Still last batch needs to be processed
if (UNLIKELY(_record_rowids)) {
_block_row_locations.resize(row_idx);
}
return Status::EndOfFile("no more data in segment");
}
// It will be released after '_merge_heap' has been built.
std::vector<RowwiseIterator*> _origin_iters;
const Schema* _schema = nullptr;
struct VMergeContextComparator {
bool operator()(const VMergeIteratorContext* lhs, const VMergeIteratorContext* rhs) const {
return lhs->compare(*rhs);
}
};
using VMergeHeap =
std::priority_queue<VMergeIteratorContext*, std::vector<VMergeIteratorContext*>,
VMergeContextComparator>;
VMergeHeap _merge_heap;
int _block_row_max = 0;
int _sequence_id_idx = -1;
bool _is_unique = false;
bool _is_reverse = false;
uint64_t* _merged_rows = nullptr;
bool _record_rowids = false;
std::vector<RowLocation> _block_row_locations;
};
Status VMergeIterator::init(const StorageReadOptions& opts) {
if (_origin_iters.empty()) {
return Status::OK();

View File

@ -16,6 +16,9 @@
// under the License.
#include "olap/iterators.h"
#include "olap/row.h"
#include "olap/row_block2.h"
#include "olap/rowset/segment_v2/column_reader.h"
namespace doris {
@ -25,6 +28,264 @@ class Segment;
namespace vectorized {
class VStatisticsIterator : public RowwiseIterator {
public:
// Will generate num_rows rows in total
VStatisticsIterator(std::shared_ptr<Segment> segment, const Schema& schema)
: _segment(std::move(segment)), _schema(schema) {}
~VStatisticsIterator() override;
Status init(const StorageReadOptions& opts) override;
Status next_batch(Block* block) override;
const Schema& schema() const override { return _schema; }
private:
std::shared_ptr<Segment> _segment;
const Schema& _schema;
size_t _target_rows = 0;
size_t _output_rows = 0;
bool _init = false;
TPushAggOp::type _push_down_agg_type_opt;
std::map<int32_t, ColumnIterator*> _column_iterators_map;
std::vector<ColumnIterator*> _column_iterators;
static constexpr size_t MAX_ROW_SIZE_IN_COUNT = 65535;
};
// Used to store merge state for a VMergeIterator input.
// This class will iterate all data from internal iterator
// through client call advance().
// Usage:
// VMergeIteratorContext ctx(iter);
// RETURN_IF_ERROR(ctx.init());
// while (ctx.valid()) {
// visit(ctx.current_row());
// RETURN_IF_ERROR(ctx.advance());
// }
class VMergeIteratorContext {
public:
VMergeIteratorContext(RowwiseIterator* iter, int sequence_id_idx, bool is_unique,
bool is_reverse, std::vector<uint32_t>* read_orderby_key_columns)
: _iter(iter),
_sequence_id_idx(sequence_id_idx),
_is_unique(is_unique),
_is_reverse(is_reverse),
_num_columns(iter->schema().num_column_ids()),
_num_key_columns(iter->schema().num_key_columns()),
_compare_columns(read_orderby_key_columns) {}
VMergeIteratorContext(const VMergeIteratorContext&) = delete;
VMergeIteratorContext(VMergeIteratorContext&&) = delete;
VMergeIteratorContext& operator=(const VMergeIteratorContext&) = delete;
VMergeIteratorContext& operator=(VMergeIteratorContext&&) = delete;
~VMergeIteratorContext() {
delete _iter;
_iter = nullptr;
}
Status block_reset(const std::shared_ptr<Block>& block);
// Initialize this context and will prepare data for current_row()
Status init(const StorageReadOptions& opts);
bool compare(const VMergeIteratorContext& rhs) const;
// `advanced = false` when current block finished
void copy_rows(Block* block, bool advanced = true);
void copy_rows(BlockView* view, bool advanced = true);
RowLocation current_row_location() {
DCHECK(_record_rowids);
return _block_row_locations[_index_in_block];
}
// Advance internal row index to next valid row
// Return error if error happens
// Don't call this when valid() is false, action is undefined
Status advance();
// Return if it has remaining data in this context.
// Only when this function return true, current_row()
// will return a valid row
bool valid() const { return _valid; }
uint64_t data_id() const { return _iter->data_id(); }
bool need_skip() const { return _skip; }
void set_skip(bool skip) const { _skip = skip; }
void add_cur_batch() { _cur_batch_num++; }
void reset_cur_batch() { _cur_batch_num = 0; }
bool is_cur_block_finished() { return _index_in_block == _block->rows() - 1; }
private:
// Load next block into _block
Status _load_next_block();
RowwiseIterator* _iter;
int _sequence_id_idx = -1;
bool _is_unique = false;
bool _is_reverse = false;
bool _valid = false;
mutable bool _skip = false;
size_t _index_in_block = -1;
// 4096 minus 16 + 16 bytes padding that in padding pod array
int _block_row_max = 4064;
int _num_columns;
int _num_key_columns;
std::vector<uint32_t>* _compare_columns;
std::vector<RowLocation> _block_row_locations;
bool _record_rowids = false;
size_t _cur_batch_num = 0;
// used to store data load from iterator->next_batch(Block*)
std::shared_ptr<Block> _block;
// used to store data still on block view
std::list<std::shared_ptr<Block>> _block_list;
};
class VMergeIterator : public RowwiseIterator {
public:
// VMergeIterator takes the ownership of input iterators
VMergeIterator(std::vector<RowwiseIterator*>& iters, int sequence_id_idx, bool is_unique,
bool is_reverse, uint64_t* merged_rows)
: _origin_iters(iters),
_sequence_id_idx(sequence_id_idx),
_is_unique(is_unique),
_is_reverse(is_reverse),
_merged_rows(merged_rows) {}
~VMergeIterator() override {
while (!_merge_heap.empty()) {
auto ctx = _merge_heap.top();
_merge_heap.pop();
delete ctx;
}
}
Status init(const StorageReadOptions& opts) override;
Status next_batch(Block* block) override { return _next_batch(block); }
Status next_block_view(BlockView* block_view) override { return _next_batch(block_view); }
bool support_return_data_by_ref() override { return true; }
const Schema& schema() const override { return *_schema; }
Status current_block_row_locations(std::vector<RowLocation>* block_row_locations) override {
DCHECK(_record_rowids);
*block_row_locations = _block_row_locations;
return Status::OK();
}
bool update_profile(RuntimeProfile* profile) override {
if (!_origin_iters.empty()) {
return (*_origin_iters.begin())->update_profile(profile);
}
return false;
}
private:
int _get_size(Block* block) { return block->rows(); }
int _get_size(BlockView* block_view) { return block_view->size(); }
template <typename T>
Status _next_batch(T* block) {
if (UNLIKELY(_record_rowids)) {
_block_row_locations.resize(_block_row_max);
}
size_t row_idx = 0;
VMergeIteratorContext* pre_ctx = nullptr;
while (_get_size(block) < _block_row_max) {
if (_merge_heap.empty()) {
break;
}
auto ctx = _merge_heap.top();
_merge_heap.pop();
if (!ctx->need_skip()) {
ctx->add_cur_batch();
if (pre_ctx != ctx) {
if (pre_ctx) {
pre_ctx->copy_rows(block);
}
pre_ctx = ctx;
}
if (UNLIKELY(_record_rowids)) {
_block_row_locations[row_idx] = ctx->current_row_location();
}
row_idx++;
if (ctx->is_cur_block_finished() || row_idx >= _block_row_max) {
// current block finished, ctx not advance
// so copy start_idx = (_index_in_block - _cur_batch_num + 1)
ctx->copy_rows(block, false);
pre_ctx = nullptr;
}
} else if (_merged_rows != nullptr) {
(*_merged_rows)++;
// need skip cur row, so flush rows in pre_ctx
if (pre_ctx) {
pre_ctx->copy_rows(block);
pre_ctx = nullptr;
}
}
RETURN_IF_ERROR(ctx->advance());
if (ctx->valid()) {
_merge_heap.push(ctx);
} else {
// Release ctx earlier to reduce resource consumed
delete ctx;
}
}
if (!_merge_heap.empty()) {
return Status::OK();
}
// Still last batch needs to be processed
if (UNLIKELY(_record_rowids)) {
_block_row_locations.resize(row_idx);
}
return Status::EndOfFile("no more data in segment");
}
// It will be released after '_merge_heap' has been built.
std::vector<RowwiseIterator*> _origin_iters;
const Schema* _schema = nullptr;
struct VMergeContextComparator {
bool operator()(const VMergeIteratorContext* lhs, const VMergeIteratorContext* rhs) const {
return lhs->compare(*rhs);
}
};
using VMergeHeap =
std::priority_queue<VMergeIteratorContext*, std::vector<VMergeIteratorContext*>,
VMergeContextComparator>;
VMergeHeap _merge_heap;
int _block_row_max = 0;
int _sequence_id_idx = -1;
bool _is_unique = false;
bool _is_reverse = false;
uint64_t* _merged_rows = nullptr;
bool _record_rowids = false;
std::vector<RowLocation> _block_row_locations;
};
// Create a merge iterator for input iterators. Merge iterator will merge
// ordered input iterator to one ordered iterator. So client should ensure
// that every input iterator is ordered, otherwise result is undefined.

View File

@ -203,6 +203,7 @@ set(OLAP_TEST_FILES
olap/tablet_cooldown_test.cpp
olap/rowid_conversion_test.cpp
olap/remote_rowset_gc_test.cpp
olap/segcompaction_test.cpp
)
set(RUNTIME_TEST_FILES

View File

@ -0,0 +1,454 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include <gtest/gtest.h>
#include <memory>
#include <sstream>
#include <string>
#include <vector>
#include "common/config.h"
#include "env/env_posix.h"
#include "gen_cpp/olap_file.pb.h"
#include "olap/data_dir.h"
#include "olap/row_block.h"
#include "olap/row_cursor.h"
#include "olap/rowset/beta_rowset_reader.h"
#include "olap/rowset/beta_rowset_writer.h"
#include "olap/rowset/rowset_factory.h"
#include "olap/rowset/rowset_reader_context.h"
#include "olap/rowset/rowset_writer.h"
#include "olap/rowset/rowset_writer_context.h"
#include "olap/storage_engine.h"
#include "olap/tablet_schema.h"
#include "olap/utils.h"
#include "runtime/exec_env.h"
#include "runtime/mem_pool.h"
#include "runtime/memory/mem_tracker.h"
#include "util/file_utils.h"
#include "util/slice.h"
namespace doris {
static const uint32_t MAX_PATH_LEN = 1024;
StorageEngine* l_engine = nullptr;
static const std::string lTestDir = "./data_test/data/segcompaction_test";
class SegCompactionTest : public testing::Test {
public:
SegCompactionTest() : _data_dir(std::make_unique<DataDir>(lTestDir)) {
_data_dir->update_capacity();
}
void SetUp() {
config::enable_segcompaction = true;
config::enable_storage_vectorization = true;
config::tablet_map_shard_size = 1;
config::txn_map_shard_size = 1;
config::txn_shard_size = 1;
char buffer[MAX_PATH_LEN];
EXPECT_NE(getcwd(buffer, MAX_PATH_LEN), nullptr);
config::storage_root_path = std::string(buffer) + "/data_test";
EXPECT_TRUE(FileUtils::remove_all(config::storage_root_path).ok());
EXPECT_TRUE(FileUtils::create_dir(config::storage_root_path).ok());
std::vector<StorePath> paths;
paths.emplace_back(config::storage_root_path, -1);
doris::EngineOptions options;
options.store_paths = paths;
Status s = doris::StorageEngine::open(options, &l_engine);
EXPECT_TRUE(s.ok()) << s.to_string();
ExecEnv* exec_env = doris::ExecEnv::GetInstance();
exec_env->set_storage_engine(l_engine);
EXPECT_TRUE(FileUtils::create_dir(lTestDir).ok());
l_engine->start_bg_threads();
}
void TearDown() {
if (l_engine != nullptr) {
l_engine->stop();
delete l_engine;
l_engine = nullptr;
}
}
protected:
OlapReaderStatistics _stats;
bool check_dir(std::vector<std::string>& vec) {
std::vector<std::string> result;
for (const auto& entry : std::filesystem::directory_iterator(lTestDir)) {
result.push_back(std::filesystem::path(entry.path()).filename());
}
LOG(INFO) << "expected ls:" << std::endl;
for (auto& i : vec) {
LOG(INFO) << i;
}
LOG(INFO) << "acutal ls:" << std::endl;
for (auto& i : result) {
LOG(INFO) << i;
}
if (result.size() != vec.size()) {
return false;
} else {
for (auto& i : vec) {
if (std::find(result.begin(), result.end(), i) == result.end()) {
return false;
}
}
}
return true;
}
// (k1 int, k2 varchar(20), k3 int) duplicated key (k1, k2)
void create_tablet_schema(TabletSchemaSPtr tablet_schema) {
TabletSchemaPB tablet_schema_pb;
tablet_schema_pb.set_keys_type(DUP_KEYS);
tablet_schema_pb.set_num_short_key_columns(2);
tablet_schema_pb.set_num_rows_per_row_block(1024);
tablet_schema_pb.set_compress_kind(COMPRESS_NONE);
tablet_schema_pb.set_next_column_unique_id(4);
ColumnPB* column_1 = tablet_schema_pb.add_column();
column_1->set_unique_id(1);
column_1->set_name("k1");
column_1->set_type("INT");
column_1->set_is_key(true);
column_1->set_length(4);
column_1->set_index_length(4);
column_1->set_is_nullable(true);
column_1->set_is_bf_column(false);
ColumnPB* column_2 = tablet_schema_pb.add_column();
column_2->set_unique_id(2);
column_2->set_name("k2");
column_2->set_type(
"INT"); // TODO change to varchar(20) when dict encoding for string is supported
column_2->set_length(4);
column_2->set_index_length(4);
column_2->set_is_nullable(true);
column_2->set_is_key(true);
column_2->set_is_nullable(true);
column_2->set_is_bf_column(false);
ColumnPB* column_3 = tablet_schema_pb.add_column();
column_3->set_unique_id(3);
column_3->set_name("v1");
column_3->set_type("INT");
column_3->set_length(4);
column_3->set_is_key(false);
column_3->set_is_nullable(false);
column_3->set_is_bf_column(false);
column_3->set_aggregation("SUM");
tablet_schema->init_from_pb(tablet_schema_pb);
}
void create_rowset_writer_context(int64_t id, TabletSchemaSPtr tablet_schema,
RowsetWriterContext* rowset_writer_context) {
RowsetId rowset_id;
rowset_id.init(id);
// rowset_writer_context->data_dir = _data_dir.get();
rowset_writer_context->rowset_id = rowset_id;
rowset_writer_context->tablet_id = 12345;
rowset_writer_context->tablet_schema_hash = 1111;
rowset_writer_context->partition_id = 10;
rowset_writer_context->rowset_type = BETA_ROWSET;
rowset_writer_context->rowset_dir = lTestDir;
rowset_writer_context->rowset_state = VISIBLE;
rowset_writer_context->tablet_schema = tablet_schema;
rowset_writer_context->version.first = 10;
rowset_writer_context->version.second = 10;
}
void create_and_init_rowset_reader(Rowset* rowset, RowsetReaderContext& context,
RowsetReaderSharedPtr* result) {
auto s = rowset->create_reader(result);
EXPECT_EQ(Status::OK(), s);
EXPECT_TRUE(*result != nullptr);
s = (*result)->init(&context);
EXPECT_EQ(Status::OK(), s);
}
private:
std::unique_ptr<DataDir> _data_dir;
};
TEST_F(SegCompactionTest, SegCompactionThenRead) {
config::enable_segcompaction = true;
config::enable_storage_vectorization = true;
Status s;
TabletSchemaSPtr tablet_schema = std::make_shared<TabletSchema>();
create_tablet_schema(tablet_schema);
RowsetSharedPtr rowset;
const int num_segments = 15;
const uint32_t rows_per_segment = 4096;
config::segcompaction_small_threshold = 6000; // set threshold above
// rows_per_segment
std::vector<uint32_t> segment_num_rows;
{ // write `num_segments * rows_per_segment` rows to rowset
RowsetWriterContext writer_context;
create_rowset_writer_context(10047, tablet_schema, &writer_context);
std::unique_ptr<RowsetWriter> rowset_writer;
s = RowsetFactory::create_rowset_writer(writer_context, &rowset_writer);
EXPECT_EQ(Status::OK(), s);
RowCursor input_row;
input_row.init(tablet_schema);
// for segment "i", row "rid"
// k1 := rid*10 + i
// k2 := k1 * 10
// k3 := rid
for (int i = 0; i < num_segments; ++i) {
MemPool mem_pool;
for (int rid = 0; rid < rows_per_segment; ++rid) {
uint32_t k1 = rid * 100 + i;
uint32_t k2 = i;
uint32_t k3 = rid;
input_row.set_field_content(0, reinterpret_cast<char*>(&k1), &mem_pool);
input_row.set_field_content(1, reinterpret_cast<char*>(&k2), &mem_pool);
input_row.set_field_content(2, reinterpret_cast<char*>(&k3), &mem_pool);
s = rowset_writer->add_row(input_row);
EXPECT_EQ(Status::OK(), s);
}
s = rowset_writer->flush();
EXPECT_EQ(Status::OK(), s);
}
rowset = rowset_writer->build();
std::vector<std::string> ls;
ls.push_back("10047_0.dat");
ls.push_back("10047_1.dat");
ls.push_back("10047_2.dat");
ls.push_back("10047_3.dat");
ls.push_back("10047_4.dat");
ls.push_back("10047_5.dat");
EXPECT_TRUE(check_dir(ls));
}
{ // read
RowsetReaderContext reader_context;
reader_context.tablet_schema = tablet_schema;
// use this type to avoid cache from other ut
reader_context.reader_type = READER_CUMULATIVE_COMPACTION;
reader_context.need_ordered_result = true;
std::vector<uint32_t> return_columns = {0, 1, 2};
reader_context.return_columns = &return_columns;
reader_context.stats = &_stats;
// without predicates
{
RowsetReaderSharedPtr rowset_reader;
create_and_init_rowset_reader(rowset.get(), reader_context, &rowset_reader);
RowBlock* output_block;
uint32_t num_rows_read = 0;
while ((s = rowset_reader->next_block(&output_block)) == Status::OK()) {
EXPECT_TRUE(output_block != nullptr);
EXPECT_GT(output_block->row_num(), 0);
EXPECT_EQ(0, output_block->pos());
EXPECT_EQ(output_block->row_num(), output_block->limit());
EXPECT_EQ(return_columns, output_block->row_block_info().column_ids);
// after sort merge segments, k1 will be 0, 1, 2, 10, 11, 12, 20, 21, 22, ..., 40950, 40951, 40952
for (int i = 0; i < output_block->row_num(); ++i) {
char* field1 = output_block->field_ptr(i, 0);
char* field2 = output_block->field_ptr(i, 1);
char* field3 = output_block->field_ptr(i, 2);
// test null bit
EXPECT_FALSE(*reinterpret_cast<bool*>(field1));
EXPECT_FALSE(*reinterpret_cast<bool*>(field2));
EXPECT_FALSE(*reinterpret_cast<bool*>(field3));
uint32_t k1 = *reinterpret_cast<uint32_t*>(field1 + 1);
uint32_t k2 = *reinterpret_cast<uint32_t*>(field2 + 1);
uint32_t k3 = *reinterpret_cast<uint32_t*>(field3 + 1);
EXPECT_EQ(100 * k3 + k2, k1);
num_rows_read++;
}
}
EXPECT_EQ(Status::OLAPInternalError(OLAP_ERR_DATA_EOF), s);
EXPECT_TRUE(output_block == nullptr);
EXPECT_EQ(rowset->rowset_meta()->num_rows(), num_rows_read);
EXPECT_TRUE(rowset_reader->get_segment_num_rows(&segment_num_rows).ok());
size_t total_num_rows = 0;
//EXPECT_EQ(segment_num_rows.size(), num_segments);
for (const auto& i : segment_num_rows) {
total_num_rows += i;
}
EXPECT_EQ(total_num_rows, num_rows_read);
}
}
}
TEST_F(SegCompactionTest, SegCompactionInterleaveWithBig_ooooOOoOooooooooO) {
config::enable_segcompaction = true;
config::enable_storage_vectorization = true;
Status s;
TabletSchemaSPtr tablet_schema = std::make_shared<TabletSchema>();
create_tablet_schema(tablet_schema);
RowsetSharedPtr rowset;
config::segcompaction_small_threshold = 6000; // set threshold above
// rows_per_segment
std::vector<uint32_t> segment_num_rows;
{ // write `num_segments * rows_per_segment` rows to rowset
RowsetWriterContext writer_context;
create_rowset_writer_context(10048, tablet_schema, &writer_context);
std::unique_ptr<RowsetWriter> rowset_writer;
s = RowsetFactory::create_rowset_writer(writer_context, &rowset_writer);
EXPECT_EQ(Status::OK(), s);
RowCursor input_row;
input_row.init(tablet_schema);
// for segment "i", row "rid"
// k1 := rid*10 + i
// k2 := k1 * 10
// k3 := 4096 * i + rid
int num_segments = 4;
uint32_t rows_per_segment = 4096;
for (int i = 0; i < num_segments; ++i) {
MemPool mem_pool;
for (int rid = 0; rid < rows_per_segment; ++rid) {
uint32_t k1 = rid * 100 + i;
uint32_t k2 = i;
uint32_t k3 = rid;
input_row.set_field_content(0, reinterpret_cast<char*>(&k1), &mem_pool);
input_row.set_field_content(1, reinterpret_cast<char*>(&k2), &mem_pool);
input_row.set_field_content(2, reinterpret_cast<char*>(&k3), &mem_pool);
s = rowset_writer->add_row(input_row);
EXPECT_EQ(Status::OK(), s);
}
s = rowset_writer->flush();
EXPECT_EQ(Status::OK(), s);
}
num_segments = 2;
rows_per_segment = 6400;
for (int i = 0; i < num_segments; ++i) {
MemPool mem_pool;
for (int rid = 0; rid < rows_per_segment; ++rid) {
uint32_t k1 = rid * 100 + i;
uint32_t k2 = i;
uint32_t k3 = rid;
input_row.set_field_content(0, reinterpret_cast<char*>(&k1), &mem_pool);
input_row.set_field_content(1, reinterpret_cast<char*>(&k2), &mem_pool);
input_row.set_field_content(2, reinterpret_cast<char*>(&k3), &mem_pool);
s = rowset_writer->add_row(input_row);
EXPECT_EQ(Status::OK(), s);
}
s = rowset_writer->flush();
EXPECT_EQ(Status::OK(), s);
}
num_segments = 1;
rows_per_segment = 4096;
for (int i = 0; i < num_segments; ++i) {
MemPool mem_pool;
for (int rid = 0; rid < rows_per_segment; ++rid) {
uint32_t k1 = rid * 100 + i;
uint32_t k2 = i;
uint32_t k3 = rid;
input_row.set_field_content(0, reinterpret_cast<char*>(&k1), &mem_pool);
input_row.set_field_content(1, reinterpret_cast<char*>(&k2), &mem_pool);
input_row.set_field_content(2, reinterpret_cast<char*>(&k3), &mem_pool);
s = rowset_writer->add_row(input_row);
EXPECT_EQ(Status::OK(), s);
}
s = rowset_writer->flush();
EXPECT_EQ(Status::OK(), s);
}
num_segments = 1;
rows_per_segment = 6400;
for (int i = 0; i < num_segments; ++i) {
MemPool mem_pool;
for (int rid = 0; rid < rows_per_segment; ++rid) {
uint32_t k1 = rid * 100 + i;
uint32_t k2 = i;
uint32_t k3 = rid;
input_row.set_field_content(0, reinterpret_cast<char*>(&k1), &mem_pool);
input_row.set_field_content(1, reinterpret_cast<char*>(&k2), &mem_pool);
input_row.set_field_content(2, reinterpret_cast<char*>(&k3), &mem_pool);
s = rowset_writer->add_row(input_row);
EXPECT_EQ(Status::OK(), s);
}
s = rowset_writer->flush();
EXPECT_EQ(Status::OK(), s);
}
num_segments = 8;
rows_per_segment = 4096;
for (int i = 0; i < num_segments; ++i) {
MemPool mem_pool;
for (int rid = 0; rid < rows_per_segment; ++rid) {
uint32_t k1 = rid * 100 + i;
uint32_t k2 = i;
uint32_t k3 = rid;
input_row.set_field_content(0, reinterpret_cast<char*>(&k1), &mem_pool);
input_row.set_field_content(1, reinterpret_cast<char*>(&k2), &mem_pool);
input_row.set_field_content(2, reinterpret_cast<char*>(&k3), &mem_pool);
s = rowset_writer->add_row(input_row);
EXPECT_EQ(Status::OK(), s);
}
s = rowset_writer->flush();
EXPECT_EQ(Status::OK(), s);
}
num_segments = 1;
rows_per_segment = 6400;
for (int i = 0; i < num_segments; ++i) {
MemPool mem_pool;
for (int rid = 0; rid < rows_per_segment; ++rid) {
uint32_t k1 = rid * 100 + i;
uint32_t k2 = i;
uint32_t k3 = rid;
input_row.set_field_content(0, reinterpret_cast<char*>(&k1), &mem_pool);
input_row.set_field_content(1, reinterpret_cast<char*>(&k2), &mem_pool);
input_row.set_field_content(2, reinterpret_cast<char*>(&k3), &mem_pool);
s = rowset_writer->add_row(input_row);
EXPECT_EQ(Status::OK(), s);
}
s = rowset_writer->flush();
EXPECT_EQ(Status::OK(), s);
}
rowset = rowset_writer->build();
std::vector<std::string> ls;
// ooooOOoOooooooooO
ls.push_back("10048_0.dat"); // oooo
ls.push_back("10048_1.dat"); // O
ls.push_back("10048_2.dat"); // O
ls.push_back("10048_3.dat"); // o
ls.push_back("10048_4.dat"); // O
ls.push_back("10048_5.dat"); // oooooooo
ls.push_back("10048_6.dat"); // O
EXPECT_TRUE(check_dir(ls));
}
}
} // namespace doris
// @brief Test Stub

View File

@ -1590,3 +1590,21 @@ Translated with www.DeepL.com/Translator (free version)
* Type:int64
* Description:Save time of cache file, in seconds
* Default:604800(1 week)
### `enable_segcompaction`
* Type: bool
* Description: Enable to use segment compaction during loading
* Default value: false
### `segcompaction_threshold_segment_num`
* Type: int32
* Description: Trigger segcompaction if the num of segments in a rowset exceeds this threshold
* Default value: 10
### `segcompaction_small_threshold`
* Type: int32
* Description: The segment whose row number above the threshold will be compacted during segcompaction
* Default value: 1048576

View File

@ -6,7 +6,7 @@
---
<!--
<!--
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
@ -249,6 +249,10 @@ under the License.
| OLAP_ERR_ROWSET_READER_INIT | -3110 | Rowset read object initialization failed |
| OLAP_ERR_ROWSET_READ_FAILED | -3111 | Rowset read failure |
| OLAP_ERR_ROWSET_INVALID_STATE_TRANSITION | -3112 | Rowset invalid transaction state |
| OLAP_ERR_ROWSET_RENAME_FILE_FAILED | -3116 | Rowset failed to rename file |
| OLAP_ERR_SEGCOMPACTION_INIT_READER | -3117 | Segment Compaction failed to init reader |
| OLAP_ERR_SEGCOMPACTION_INIT_WRITER | -3118 | Segment Compaction failed to init writer |
| OLAP_ERR_SEGCOMPACTION_FAILED | -3119 | Segment Compaction failed |

View File

@ -1613,3 +1613,21 @@ webserver默认工作线程数
* 类型:int64
* 描述:缓存文件的保存时间,单位:秒
* 默认值:604800(1个星期)
### `enable_segcompaction`
* 类型:bool
* 描述:在导入时进行 segment compaction 来减少 segment 数量
* 默认值:false
### `segcompaction_threshold_segment_num`
* 类型:int32
* 描述:当 segment 数量超过此阈值时触发 segment compaction
* 默认值:10
### `segcompaction_small_threshold`
* 类型:int32
* 描述:当 segment 文件超过此大小时则会在 segment compaction 时被 compact,否则跳过
* 默认值:1048576

View File

@ -6,7 +6,7 @@
---
<!--
<!--
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
@ -257,4 +257,8 @@ under the License.
| OLAP_ERR_ROWSET_LOAD_FAILED | -3109 | Rowset加载失败 |
| OLAP_ERR_ROWSET_READER_INIT | -3110 | Rowset读对象初始化失败 |
| OLAP_ERR_ROWSET_READ_FAILED | -3111 | Rowset读失败 |
| OLAP_ERR_ROWSET_INVALID_STATE_TRANSITION | -3112 | Rowset无效的事务状态 |
| OLAP_ERR_ROWSET_INVALID_STATE_TRANSITION | -3112 | Rowset无效的事务状态 |
| OLAP_ERR_ROWSET_RENAME_FILE_FAILED | -3116 | Rowset重命名文件失败 |
| OLAP_ERR_SEGCOMPACTION_INIT_READER | -3117 | SegmentCompaction初始化Reader失败 |
| OLAP_ERR_SEGCOMPACTION_INIT_WRITER | -3118 | SegmentCompaction初始化Writer失败 |
| OLAP_ERR_SEGCOMPACTION_FAILED | -3119 | SegmentCompaction失败 |