From 554f566217af439b76d3e79e96a9f1de7dfefe16 Mon Sep 17 00:00:00 2001 From: zhengyu Date: Fri, 4 Nov 2022 14:12:51 +0800 Subject: [PATCH] [enhancement](compaction) introduce segment compaction (#12609) (#12866) ## Design ### Trigger Every time when a rowset writer produces more than N (e.g. 10) segments, we trigger segment compaction. Note that only one segment compaction job for a single rowset at a time to ensure no recursing/queuing nightmare. ### Target Selection We collect segments during every trigger. We skip big segments whose row num > M (e.g. 10000) coz we get little benefits from compacting them comparing our effort. Hence, we only pick the 'Longest Consecutive Small" segment group to do actual compaction. ### Compaction Process A new thread pool is introduced to help do the job. We submit the above-mentioned 'Longest Consecutive Small" segment group to the pool. Then the worker thread does the followings: - build a MergeIterator from the target segments - create a new segment writer - for each block readed from MergeIterator, the Writer append it ### SegID handling SegID must remain consecutive after segment compaction. If a rowset has small segments named seg_0, seg_1, seg_2, seg_3 and a big segment seg_4: - we create a segment named "seg_0-3" to save compacted data for seg_0, seg_1, seg_2 and seg_3 - delete seg_0, seg_1, seg_2 and seg_3 - rename seg_0-3 to seg_0 - rename seg_4 to seg_1 It is worth noticing that we should wait inflight segment compaction tasks to finish before building rowset meta and committing this txn. --- be/src/common/config.h | 11 + be/src/common/status.h | 6 +- be/src/olap/compaction.cpp | 2 +- be/src/olap/delta_writer.cpp | 38 +- be/src/olap/delta_writer.h | 3 - be/src/olap/olap_server.cpp | 21 + be/src/olap/rowset/beta_rowset.cpp | 13 + be/src/olap/rowset/beta_rowset.h | 7 + be/src/olap/rowset/beta_rowset_writer.cpp | 654 ++++++++++++++++-- be/src/olap/rowset/beta_rowset_writer.h | 72 +- .../olap/rowset/segment_v2/segment_writer.h | 7 +- be/src/olap/schema_change.h | 1 + be/src/olap/storage_engine.cpp | 11 + be/src/olap/storage_engine.h | 15 + be/src/util/doris_metrics.h | 1 + be/src/vec/olap/vgeneric_iterators.cpp | 514 ++++---------- be/src/vec/olap/vgeneric_iterators.h | 261 +++++++ be/test/CMakeLists.txt | 1 + be/test/olap/segcompaction_test.cpp | 454 ++++++++++++ docs/en/docs/admin-manual/config/be-config.md | 18 + .../maint-monitor/be-olap-error-code.md | 6 +- .../docs/admin-manual/config/be-config.md | 18 + .../maint-monitor/be-olap-error-code.md | 8 +- 23 files changed, 1696 insertions(+), 446 deletions(-) create mode 100644 be/test/olap/segcompaction_test.cpp diff --git a/be/src/common/config.h b/be/src/common/config.h index 2d627682bb..6843a88514 100644 --- a/be/src/common/config.h +++ b/be/src/common/config.h @@ -285,6 +285,9 @@ CONF_mInt32(quick_compaction_max_threads, "10"); // Thread count to do tablet meta checkpoint, -1 means use the data directories count. CONF_Int32(max_meta_checkpoint_threads, "-1"); +// This config can be set to limit thread number in segcompaction thread pool. +CONF_mInt32(seg_compaction_max_threads, "10"); + // The upper limit of "permits" held by all compaction tasks. This config can be set to limit memory consumption for compaction. CONF_mInt64(total_permits_for_compaction_score, "10000"); @@ -841,6 +844,14 @@ CONF_String(be_node_role, "mix"); // Hide the be config page for webserver. CONF_Bool(hide_webserver_config_page, "false"); +CONF_Bool(enable_segcompaction, "false"); // currently only support vectorized storage + +// Trigger segcompaction if the num of segments in a rowset exceeds this threshold. +CONF_Int32(segcompaction_threshold_segment_num, "10"); + +// The segment whose row number above the threshold will be compacted during segcompaction +CONF_Int32(segcompaction_small_threshold, "1048576"); + CONF_String(jvm_max_heap_size, "1024M"); #ifdef BE_TEST diff --git a/be/src/common/status.h b/be/src/common/status.h index 6dc42e70fa..31344080ba 100644 --- a/be/src/common/status.h +++ b/be/src/common/status.h @@ -231,7 +231,11 @@ class PStatus; M(OLAP_ERR_ROWSET_INVALID_STATE_TRANSITION, -3112, "", true) \ M(OLAP_ERR_STRING_OVERFLOW_IN_VEC_ENGINE, -3113, "", true) \ M(OLAP_ERR_ROWSET_ADD_MIGRATION_V2, -3114, "", true) \ - M(OLAP_ERR_PUBLISH_VERSION_NOT_CONTINUOUS, -3115, "", false) + M(OLAP_ERR_PUBLISH_VERSION_NOT_CONTINUOUS, -3115, "", false) \ + M(OLAP_ERR_ROWSET_RENAME_FILE_FAILED, -3116, "", false) \ + M(OLAP_ERR_SEGCOMPACTION_INIT_READER, -3117, "", false) \ + M(OLAP_ERR_SEGCOMPACTION_INIT_WRITER, -3118, "", false) \ + M(OLAP_ERR_SEGCOMPACTION_FAILED, -3119, "", false) enum ErrorCode { #define M(NAME, ERRORCODE, DESC, STACKTRACEENABLED) NAME = ERRORCODE, diff --git a/be/src/olap/compaction.cpp b/be/src/olap/compaction.cpp index 41b2ebd902..801cbf2990 100644 --- a/be/src/olap/compaction.cpp +++ b/be/src/olap/compaction.cpp @@ -187,7 +187,7 @@ Status Compaction::do_compaction_impl(int64_t permits) { if (_output_rowset == nullptr) { LOG(WARNING) << "rowset writer build failed. writer version:" << ", output_version=" << _output_version; - return Status::OLAPInternalError(OLAP_ERR_MALLOC_ERROR); + return Status::OLAPInternalError(OLAP_ERR_ROWSET_BUILDER_INIT); } TRACE_COUNTER_INCREMENT("output_rowset_data_size", _output_rowset->data_disk_size()); TRACE_COUNTER_INCREMENT("output_row_num", _output_rowset->num_rows()); diff --git a/be/src/olap/delta_writer.cpp b/be/src/olap/delta_writer.cpp index 39185cdbac..a748f566c1 100644 --- a/be/src/olap/delta_writer.cpp +++ b/be/src/olap/delta_writer.cpp @@ -56,8 +56,6 @@ DeltaWriter::~DeltaWriter() { _garbage_collection(); } - _mem_table.reset(); - if (!_is_init) { return; } @@ -77,6 +75,8 @@ DeltaWriter::~DeltaWriter() { _tablet->data_dir()->remove_pending_ids(ROWSET_ID_PREFIX + _rowset_writer->rowset_id().to_string()); } + + _mem_table.reset(); } void DeltaWriter::_garbage_collection() { @@ -167,12 +167,12 @@ Status DeltaWriter::write(Tuple* tuple) { // if memtable is full, push it to the flush executor, // and create a new memtable for incoming data if (_mem_table->memory_usage() >= config::write_buffer_size) { - if (++_segment_counter > config::max_segment_num_per_rowset) { - return Status::OLAPInternalError(OLAP_ERR_TOO_MANY_SEGMENTS); - } - RETURN_NOT_OK(_flush_memtable_async()); + auto s = _flush_memtable_async(); // create a new memtable for new incoming data _reset_mem_table(); + if (OLAP_UNLIKELY(!s.ok())) { + return s; + } } return Status::OK(); } @@ -192,8 +192,11 @@ Status DeltaWriter::write(const RowBatch* row_batch, const std::vector& row } if (_mem_table->memory_usage() >= config::write_buffer_size) { - RETURN_NOT_OK(_flush_memtable_async()); + auto s = _flush_memtable_async(); _reset_mem_table(); + if (OLAP_UNLIKELY(!s.ok())) { + return s; + } } return Status::OK(); @@ -217,8 +220,11 @@ Status DeltaWriter::write(const vectorized::Block* block, const std::vector if (_mem_table->need_to_agg()) { _mem_table->shrink_memtable_by_agg(); if (_mem_table->is_flush()) { - RETURN_NOT_OK(_flush_memtable_async()); + auto s = _flush_memtable_async(); _reset_mem_table(); + if (OLAP_UNLIKELY(!s.ok())) { + return s; + } } } @@ -226,9 +232,6 @@ Status DeltaWriter::write(const vectorized::Block* block, const std::vector } Status DeltaWriter::_flush_memtable_async() { - if (++_segment_counter > config::max_segment_num_per_rowset) { - return Status::OLAPInternalError(OLAP_ERR_TOO_MANY_SEGMENTS); - } return _flush_token->submit(std::move(_mem_table)); } @@ -249,8 +252,11 @@ Status DeltaWriter::flush_memtable_and_wait(bool need_wait) { VLOG_NOTICE << "flush memtable to reduce mem consumption. memtable size: " << _mem_table->memory_usage() << ", tablet: " << _req.tablet_id << ", load id: " << print_id(_req.load_id); - RETURN_NOT_OK(_flush_memtable_async()); + auto s = _flush_memtable_async(); _reset_mem_table(); + if (OLAP_UNLIKELY(!s.ok())) { + return s; + } if (need_wait) { // wait all memtables in flush queue to be flushed. @@ -309,9 +315,13 @@ Status DeltaWriter::close() { return Status::OLAPInternalError(OLAP_ERR_ALREADY_CANCELLED); } - RETURN_NOT_OK(_flush_memtable_async()); + auto s = _flush_memtable_async(); _mem_table.reset(); - return Status::OK(); + if (OLAP_UNLIKELY(!s.ok())) { + return s; + } else { + return Status::OK(); + } } Status DeltaWriter::close_wait(const PSlaveTabletNodes& slave_tablet_nodes, diff --git a/be/src/olap/delta_writer.h b/be/src/olap/delta_writer.h index 85133e456c..444533fecc 100644 --- a/be/src/olap/delta_writer.h +++ b/be/src/olap/delta_writer.h @@ -149,9 +149,6 @@ private: SpinLock _mem_table_tracker_lock; std::atomic _mem_table_num = 1; - // The counter of number of segment flushed already. - int64_t _segment_counter = 0; - std::mutex _lock; // use in vectorized load diff --git a/be/src/olap/olap_server.cpp b/be/src/olap/olap_server.cpp index 845031c229..958e60137e 100644 --- a/be/src/olap/olap_server.cpp +++ b/be/src/olap/olap_server.cpp @@ -26,12 +26,14 @@ #include #include "agent/cgroups_mgr.h" +#include "common/config.h" #include "common/status.h" #include "gutil/strings/substitute.h" #include "io/cache/file_cache_manager.h" #include "olap/cumulative_compaction.h" #include "olap/olap_common.h" #include "olap/olap_define.h" +#include "olap/rowset/beta_rowset_writer.h" #include "olap/storage_engine.h" #include "util/file_utils.h" #include "util/time.h" @@ -83,6 +85,12 @@ Status StorageEngine::start_bg_threads() { .set_min_threads(config::quick_compaction_max_threads) .set_max_threads(config::quick_compaction_max_threads) .build(&_quick_compaction_thread_pool); + if (config::enable_segcompaction && config::enable_storage_vectorization) { + ThreadPoolBuilder("SegCompactionTaskThreadPool") + .set_min_threads(config::seg_compaction_max_threads) + .set_max_threads(config::seg_compaction_max_threads) + .build(&_seg_compaction_thread_pool); + } // compaction tasks producer thread RETURN_IF_ERROR(Thread::create( @@ -686,6 +694,19 @@ Status StorageEngine::submit_quick_compaction_task(TabletSharedPtr tablet) { return Status::OK(); } +Status StorageEngine::_handle_seg_compaction(BetaRowsetWriter* writer, + SegCompactionCandidatesSharedPtr segments) { + writer->compact_segments(segments); + // return OK here. error will be reported via BetaRowsetWriter::_segcompaction_status + return Status::OK(); +} + +Status StorageEngine::submit_seg_compaction_task(BetaRowsetWriter* writer, + SegCompactionCandidatesSharedPtr segments) { + return _seg_compaction_thread_pool->submit_func( + std::bind(&StorageEngine::_handle_seg_compaction, this, writer, segments)); +} + void StorageEngine::_cooldown_tasks_producer_callback() { int64_t interval = config::generate_cooldown_task_interval_sec; do { diff --git a/be/src/olap/rowset/beta_rowset.cpp b/be/src/olap/rowset/beta_rowset.cpp index 0d99676ecc..fdb7b90496 100644 --- a/be/src/olap/rowset/beta_rowset.cpp +++ b/be/src/olap/rowset/beta_rowset.cpp @@ -46,6 +46,12 @@ std::string BetaRowset::segment_file_path(int segment_id) { return segment_file_path(_rowset_dir, rowset_id(), segment_id); } +std::string BetaRowset::segment_cache_path(const std::string& rowset_dir, const RowsetId& rowset_id, + int segment_id) { + // {root_path}/data/{shard_id}/{tablet_id}/{schema_hash}/{rowset_id}_{seg_num} + return fmt::format("{}/{}_{}", rowset_dir, rowset_id.to_string(), segment_id); +} + std::string BetaRowset::segment_cache_path(int segment_id) { // {root_path}/data/{shard_id}/{tablet_id}/{schema_hash}/{rowset_id}_{seg_num} return fmt::format("{}/{}_{}", _tablet_path, rowset_id().to_string(), segment_id); @@ -74,6 +80,13 @@ std::string BetaRowset::remote_segment_path(int64_t tablet_id, const std::string return fmt::format("{}/{}_{}.dat", remote_tablet_path(tablet_id), rowset_id, segment_id); } +std::string BetaRowset::local_segment_path_segcompacted(const std::string& tablet_path, + const RowsetId& rowset_id, int64_t begin, + int64_t end) { + // {root_path}/data/{shard_id}/{tablet_id}/{schema_hash}/{rowset_id}_{begin_seg}-{end_seg}.dat + return fmt::format("{}/{}_{}-{}.dat", tablet_path, rowset_id.to_string(), begin, end); +} + BetaRowset::BetaRowset(TabletSchemaSPtr schema, const std::string& tablet_path, RowsetMetaSharedPtr rowset_meta) : Rowset(schema, tablet_path, std::move(rowset_meta)) { diff --git a/be/src/olap/rowset/beta_rowset.h b/be/src/olap/rowset/beta_rowset.h index 1d4153f701..7cd792bf69 100644 --- a/be/src/olap/rowset/beta_rowset.h +++ b/be/src/olap/rowset/beta_rowset.h @@ -46,9 +46,16 @@ public: std::string segment_cache_path(int segment_id); + static std::string segment_cache_path(const std::string& rowset_dir, const RowsetId& rowset_id, + int segment_id); + static std::string segment_file_path(const std::string& rowset_dir, const RowsetId& rowset_id, int segment_id); + static std::string local_segment_path_segcompacted(const std::string& tablet_path, + const RowsetId& rowset_id, int64_t begin, + int64_t end); + static std::string remote_segment_path(int64_t tablet_id, const RowsetId& rowset_id, int segment_id); diff --git a/be/src/olap/rowset/beta_rowset_writer.cpp b/be/src/olap/rowset/beta_rowset_writer.cpp index 288d8b91f7..a44e72cf1d 100644 --- a/be/src/olap/rowset/beta_rowset_writer.cpp +++ b/be/src/olap/rowset/beta_rowset_writer.cpp @@ -18,6 +18,8 @@ #include "olap/rowset/beta_rowset_writer.h" #include // time +#include +#include #include "common/config.h" #include "common/logging.h" @@ -31,20 +33,31 @@ #include "olap/rowset/beta_rowset.h" #include "olap/rowset/rowset_factory.h" #include "olap/rowset/segment_v2/segment_writer.h" -#include "olap/storage_engine.h" +//#include "olap/storage_engine.h" #include "runtime/exec_env.h" +#include "runtime/memory/mem_tracker_limiter.h" namespace doris { +class StorageEngine; + BetaRowsetWriter::BetaRowsetWriter() : _rowset_meta(nullptr), _num_segment(0), + _segcompacted_point(0), + _num_segcompacted(0), _segment_writer(nullptr), _num_rows_written(0), _total_data_size(0), - _total_index_size(0) {} + _total_index_size(0), + _raw_num_rows_written(0), + _is_doing_segcompaction(false) { + _segcompaction_status.store(OLAP_SUCCESS); +} BetaRowsetWriter::~BetaRowsetWriter() { + OLAP_UNUSED_ARG(_wait_flying_segcompaction()); + // TODO(lingbin): Should wrapper exception logic, no need to know file ops directly. if (!_already_built) { // abnormal exit, remove all files generated _segment_writer.reset(); // ensure all files are closed @@ -107,6 +120,433 @@ Status BetaRowsetWriter::add_block(const vectorized::Block* block) { return _add_block(block, &_segment_writer); } +vectorized::VMergeIterator* BetaRowsetWriter::_get_segcompaction_reader( + SegCompactionCandidatesSharedPtr segments, std::shared_ptr schema, + OlapReaderStatistics* stat, uint64_t* merged_row_stat) { + StorageReadOptions read_options; + read_options.stats = stat; + read_options.use_page_cache = false; + read_options.tablet_schema = _context.tablet_schema; + std::vector> seg_iterators; + for (auto& seg_ptr : *segments) { + std::unique_ptr iter; + auto s = seg_ptr->new_iterator(*schema, read_options, &iter); + if (!s.ok()) { + LOG(WARNING) << "failed to create iterator[" << seg_ptr->id() << "]: " << s.to_string(); + return nullptr; + } + seg_iterators.push_back(std::move(iter)); + } + std::vector iterators; + for (auto& owned_it : seg_iterators) { + // transfer ownership + iterators.push_back(owned_it.release()); + } + bool is_unique = (_context.tablet_schema->keys_type() == UNIQUE_KEYS); + bool is_reverse = false; + auto merge_itr = + vectorized::new_merge_iterator(iterators, -1, is_unique, is_reverse, merged_row_stat); + DCHECK(merge_itr); + auto s = merge_itr->init(read_options); + if (!s.ok()) { + LOG(WARNING) << "failed to init iterator: " << s.to_string(); + for (auto& itr : iterators) { + delete itr; + } + return nullptr; + } + + return (vectorized::VMergeIterator*)merge_itr; +} + +std::unique_ptr BetaRowsetWriter::_create_segcompaction_writer( + uint64_t begin, uint64_t end) { + Status status; + std::unique_ptr writer = nullptr; + status = _create_segment_writer_for_segcompaction(&writer, begin, end); + if (status != Status::OK() || writer == nullptr) { + LOG(ERROR) << "failed to create segment writer for begin:" << begin << " end:" << end + << " path:" << writer->get_data_dir()->path() << " status:" << status; + return nullptr; + } else { + return writer; + } +} + +Status BetaRowsetWriter::_delete_original_segments(uint32_t begin, uint32_t end) { + auto fs = _rowset_meta->fs(); + if (!fs) { + return Status::OLAPInternalError(OLAP_ERR_INIT_FAILED); + } + for (uint32_t i = begin; i <= end; ++i) { + auto seg_path = BetaRowset::segment_file_path(_context.rowset_dir, _context.rowset_id, i); + // Even if an error is encountered, these files that have not been cleaned up + // will be cleaned up by the GC background. So here we only print the error + // message when we encounter an error. + RETURN_NOT_OK_LOG(fs->delete_file(seg_path), + strings::Substitute("Failed to delete file=$0", seg_path)); + } + return Status::OK(); +} + +Status BetaRowsetWriter::_rename_compacted_segments(int64_t begin, int64_t end) { + int ret; + auto src_seg_path = BetaRowset::local_segment_path_segcompacted(_context.rowset_dir, + _context.rowset_id, begin, end); + auto dst_seg_path = BetaRowset::segment_file_path(_context.rowset_dir, _context.rowset_id, + _num_segcompacted++); + ret = rename(src_seg_path.c_str(), dst_seg_path.c_str()); + if (ret) { + LOG(WARNING) << "failed to rename " << src_seg_path << " to " << dst_seg_path + << ". ret:" << ret << " errno:" << errno; + return Status::OLAPInternalError(OLAP_ERR_ROWSET_RENAME_FILE_FAILED); + } + return Status::OK(); +} + +Status BetaRowsetWriter::_rename_compacted_segment_plain(uint64_t seg_id) { + if (seg_id == _num_segcompacted) { + return Status::OK(); + } + + int ret; + auto src_seg_path = + BetaRowset::segment_file_path(_context.rowset_dir, _context.rowset_id, seg_id); + auto dst_seg_path = BetaRowset::segment_file_path(_context.rowset_dir, _context.rowset_id, + _num_segcompacted); + VLOG_DEBUG << "segcompaction skip this segment. rename " << src_seg_path << " to " + << dst_seg_path; + { + std::lock_guard lock(_segid_statistics_map_mutex); + DCHECK_EQ(_segid_statistics_map.find(seg_id) == _segid_statistics_map.end(), false); + DCHECK_EQ(_segid_statistics_map.find(_num_segcompacted) == _segid_statistics_map.end(), + true); + Statistics org = _segid_statistics_map[seg_id]; + _segid_statistics_map.emplace(_num_segcompacted, org); + _clear_statistics_for_deleting_segments_unsafe(seg_id, seg_id); + } + ++_num_segcompacted; + ret = rename(src_seg_path.c_str(), dst_seg_path.c_str()); + if (ret) { + LOG(WARNING) << "failed to rename " << src_seg_path << " to " << dst_seg_path + << ". ret:" << ret << " errno:" << errno; + return Status::OLAPInternalError(OLAP_ERR_ROWSET_RENAME_FILE_FAILED); + } + return Status::OK(); +} + +void BetaRowsetWriter::_clear_statistics_for_deleting_segments_unsafe(uint64_t begin, + uint64_t end) { + VLOG_DEBUG << "_segid_statistics_map clear record segid range from:" << begin << " to:" << end; + for (int i = begin; i <= end; ++i) { + _segid_statistics_map.erase(i); + } +} + +Status BetaRowsetWriter::_check_correctness(std::unique_ptr stat, + uint64_t merged_row_stat, uint64_t row_count, + uint64_t begin, uint64_t end) { + uint64_t stat_read_row = stat->raw_rows_read; + uint64_t sum_target_row = 0; + + { + std::lock_guard lock(_segid_statistics_map_mutex); + for (int i = begin; i <= end; ++i) { + sum_target_row += _segid_statistics_map[i].row_num; + } + } + + if (sum_target_row != stat_read_row) { + LOG(WARNING) << "read row_num does not match. expect read row:" << sum_target_row + << " actual read row:" << stat_read_row; + return Status::OLAPInternalError(OLAP_ERR_CHECK_LINES_ERROR); + } + + uint64_t total_row = row_count + merged_row_stat; + if (stat_read_row != total_row) { + LOG(WARNING) << "total row_num does not match. expect total row:" << total_row + << " actual total row:" << stat_read_row; + return Status::OLAPInternalError(OLAP_ERR_CHECK_LINES_ERROR); + } + return Status::OK(); +} + +Status BetaRowsetWriter::_do_compact_segments(SegCompactionCandidatesSharedPtr segments) { + SCOPED_ATTACH_TASK(StorageEngine::instance()->segcompaction_mem_tracker(), + ThreadContext::TaskType::COMPACTION); + // throttle segcompaction task if memory depleted. + if (MemTrackerLimiter::sys_mem_exceed_limit_check(GB_EXCHANGE_BYTE)) { + LOG(WARNING) << "skip segcompaction due to memory shortage"; + return Status::OLAPInternalError(OLAP_ERR_FETCH_MEMORY_EXCEEDED); + } + uint64_t begin = (*(segments->begin()))->id(); + uint64_t end = (*(segments->end() - 1))->id(); + LOG(INFO) << "BetaRowsetWriter:" << this << " do segcompaction at " << segments->size() + << " segments. Begin:" << begin << " End:" << end; + uint64_t begin_time = GetCurrentTimeMicros(); + + auto schema = std::make_shared(_context.tablet_schema->columns(), + _context.tablet_schema->columns().size()); + std::unique_ptr stat(new OlapReaderStatistics()); + uint64_t merged_row_stat = 0; + vectorized::VMergeIterator* reader = + _get_segcompaction_reader(segments, schema, stat.get(), &merged_row_stat); + if (UNLIKELY(reader == nullptr)) { + LOG(WARNING) << "failed to get segcompaction reader"; + return Status::OLAPInternalError(OLAP_ERR_SEGCOMPACTION_INIT_READER); + } + std::unique_ptr reader_ptr; + reader_ptr.reset(reader); + auto writer = _create_segcompaction_writer(begin, end); + if (UNLIKELY(writer == nullptr)) { + LOG(WARNING) << "failed to get segcompaction writer"; + return Status::OLAPInternalError(OLAP_ERR_SEGCOMPACTION_INIT_WRITER); + } + uint64_t row_count = 0; + vectorized::Block block = _context.tablet_schema->create_block(); + while (true) { + auto status = reader_ptr->next_batch(&block); + row_count += block.rows(); + if (status != Status::OK()) { + if (LIKELY(status.is_end_of_file())) { + RETURN_NOT_OK_LOG(_add_block_for_segcompaction(&block, &writer), + "write block failed"); + break; + } else { + LOG(WARNING) << "read block failed: " << status.to_string(); + return status; + } + } + RETURN_NOT_OK_LOG(_add_block_for_segcompaction(&block, &writer), "write block failed"); + block.clear_column_data(); + } + RETURN_NOT_OK_LOG(_check_correctness(std::move(stat), merged_row_stat, row_count, begin, end), + "check correctness failed"); + { + std::lock_guard lock(_segid_statistics_map_mutex); + _clear_statistics_for_deleting_segments_unsafe(begin, end); + } + RETURN_NOT_OK(_flush_segment_writer(&writer)); + + if (_segcompaction_file_writer != nullptr) { + _segcompaction_file_writer->close(); + } + + RETURN_NOT_OK(_delete_original_segments(begin, end)); + RETURN_NOT_OK(_rename_compacted_segments(begin, end)); + + if (VLOG_DEBUG_IS_ON) { + vlog_buffer.clear(); + for (const auto& entry : std::filesystem::directory_iterator(_context.rowset_dir)) { + fmt::format_to(vlog_buffer, "[{}]", string(entry.path())); + } + VLOG_DEBUG << "_segcompacted_point:" << _segcompacted_point + << " _num_segment:" << _num_segment << " _num_segcompacted:" << _num_segcompacted + << " list directory:" << fmt::to_string(vlog_buffer); + } + + _segcompacted_point += (end - begin + 1); + + uint64_t elapsed = GetCurrentTimeMicros() - begin_time; + LOG(INFO) << "BetaRowsetWriter:" << this << " segcompaction completed. elapsed time:" << elapsed + << "us. _segcompacted_point update:" << _segcompacted_point; + + return Status::OK(); +} + +void BetaRowsetWriter::compact_segments(SegCompactionCandidatesSharedPtr segments) { + Status status = _do_compact_segments(segments); + if (!status.ok()) { + int16_t errcode = status.precise_code(); + switch (errcode) { + case OLAP_ERR_FETCH_MEMORY_EXCEEDED: + case OLAP_ERR_SEGCOMPACTION_INIT_READER: + case OLAP_ERR_SEGCOMPACTION_INIT_WRITER: + LOG(WARNING) << "segcompaction failed, try next time:" << status; + return; + default: + LOG(WARNING) << "segcompaction fatal, terminating the write job:" << status; + // status will be checked by the next trigger of segcompaction or the final wait + _segcompaction_status.store(OLAP_ERR_OTHER_ERROR); + } + } + DCHECK_EQ(_is_doing_segcompaction, true); + { + std::lock_guard lk(_is_doing_segcompaction_lock); + _is_doing_segcompaction = false; + _segcompacting_cond.notify_all(); + } +} + +Status BetaRowsetWriter::_load_noncompacted_segments( + std::vector* segments) { + auto fs = _rowset_meta->fs(); + if (!fs) { + return Status::OLAPInternalError(OLAP_ERR_INIT_FAILED); + } + for (int seg_id = _segcompacted_point; seg_id < _num_segment; ++seg_id) { + auto seg_path = + BetaRowset::segment_file_path(_context.rowset_dir, _context.rowset_id, seg_id); + auto cache_path = + BetaRowset::segment_cache_path(_context.rowset_dir, _context.rowset_id, seg_id); + std::shared_ptr segment; + auto s = segment_v2::Segment::open(fs, seg_path, cache_path, seg_id, _context.tablet_schema, + &segment); + if (!s.ok()) { + LOG(WARNING) << "failed to open segment. " << seg_path << ":" << s.to_string(); + return Status::OLAPInternalError(OLAP_ERR_ROWSET_LOAD_FAILED); + } + segments->push_back(std::move(segment)); + } + return Status::OK(); +} + +/* policy of segcompaction target selection: + * 1. skip big segments + * 2. if the consecutive smalls end up with a big, compact the smalls, except + * single small + * 3. if the consecutive smalls end up with small, compact the smalls if the + * length is beyond (config::segcompaction_threshold_segment_num / 2) + */ +Status BetaRowsetWriter::_find_longest_consecutive_small_segment( + SegCompactionCandidatesSharedPtr segments) { + std::vector all_segments; + RETURN_NOT_OK(_load_noncompacted_segments(&all_segments)); + + if (VLOG_DEBUG_IS_ON) { + vlog_buffer.clear(); + for (auto& segment : all_segments) { + fmt::format_to(vlog_buffer, "[id:{} num_rows:{}]", segment->id(), segment->num_rows()); + } + VLOG_DEBUG << "all noncompacted segments num:" << all_segments.size() + << " list of segments:" << fmt::to_string(vlog_buffer); + } + + bool is_terminated_by_big = false; + bool let_big_terminate = false; + size_t small_threshold = config::segcompaction_small_threshold; + for (int64_t i = 0; i < all_segments.size(); ++i) { + segment_v2::SegmentSharedPtr seg = all_segments[i]; + if (seg->num_rows() > small_threshold) { + if (let_big_terminate) { + is_terminated_by_big = true; + break; + } else { + RETURN_NOT_OK(_rename_compacted_segment_plain(_segcompacted_point++)); + } + } else { + let_big_terminate = true; // break if find a big after small + segments->push_back(seg); + } + } + size_t s = segments->size(); + if (!is_terminated_by_big && s <= (config::segcompaction_threshold_segment_num / 2)) { + // start with big segments and end with small, better to do it in next + // round to compact more at once + segments->clear(); + return Status::OK(); + } + if (s == 1) { // poor bachelor, let it go + VLOG_DEBUG << "only one candidate segment"; + RETURN_NOT_OK(_rename_compacted_segment_plain(_segcompacted_point++)); + segments->clear(); + return Status::OK(); + } + if (VLOG_DEBUG_IS_ON) { + vlog_buffer.clear(); + for (auto& segment : (*segments.get())) { + fmt::format_to(vlog_buffer, "[id:{} num_rows:{}]", segment->id(), segment->num_rows()); + } + VLOG_DEBUG << "candidate segments num:" << s + << " list of candidates:" << fmt::to_string(vlog_buffer); + } + return Status::OK(); +} + +Status BetaRowsetWriter::_get_segcompaction_candidates(SegCompactionCandidatesSharedPtr& segments, + bool is_last) { + if (is_last) { + VLOG_DEBUG << "segcompaction last few segments"; + // currently we only rename remaining segments to reduce wait time + // so that transaction can be committed ASAP + RETURN_NOT_OK(_load_noncompacted_segments(segments.get())); + for (int i = 0; i < segments->size(); ++i) { + RETURN_NOT_OK(_rename_compacted_segment_plain(_segcompacted_point++)); + } + segments->clear(); + } else { + RETURN_NOT_OK(_find_longest_consecutive_small_segment(segments)); + } + return Status::OK(); +} + +bool BetaRowsetWriter::_check_and_set_is_doing_segcompaction() { + std::lock_guard l(_is_doing_segcompaction_lock); + if (!_is_doing_segcompaction) { + _is_doing_segcompaction = true; + return true; + } else { + return false; + } +} + +Status BetaRowsetWriter::_segcompaction_if_necessary() { + Status status = Status::OK(); + if (!config::enable_segcompaction || !config::enable_storage_vectorization || + !_check_and_set_is_doing_segcompaction()) { + return status; + } + if (_segcompaction_status.load() != OLAP_SUCCESS) { + status = Status::OLAPInternalError(OLAP_ERR_SEGCOMPACTION_FAILED); + } else if ((_num_segment - _segcompacted_point) >= + config::segcompaction_threshold_segment_num) { + SegCompactionCandidatesSharedPtr segments = std::make_shared(); + status = _get_segcompaction_candidates(segments, false); + if (LIKELY(status.ok()) && (segments->size() > 0)) { + LOG(INFO) << "submit segcompaction task, segment num:" << _num_segment + << ", segcompacted_point:" << _segcompacted_point; + status = StorageEngine::instance()->submit_seg_compaction_task(this, segments); + if (status.ok()) { + return status; + } + } + } + { + std::lock_guard lk(_is_doing_segcompaction_lock); + _is_doing_segcompaction = false; + _segcompacting_cond.notify_all(); + } + return status; +} + +Status BetaRowsetWriter::_segcompaction_ramaining_if_necessary() { + Status status = Status::OK(); + DCHECK_EQ(_is_doing_segcompaction, false); + if (!config::enable_segcompaction || !config::enable_storage_vectorization) { + return Status::OK(); + } + if (_segcompaction_status.load() != OLAP_SUCCESS) { + return Status::OLAPInternalError(OLAP_ERR_SEGCOMPACTION_FAILED); + } + if (!_is_segcompacted() || _segcompacted_point == _num_segment) { + // no need if never segcompact before or all segcompacted + return Status::OK(); + } + _is_doing_segcompaction = true; + SegCompactionCandidatesSharedPtr segments = std::make_shared(); + status = _get_segcompaction_candidates(segments, true); + if (LIKELY(status.ok()) && (segments->size() > 0)) { + LOG(INFO) << "submit segcompaction remaining task, segment num:" << _num_segment + << ", segcompacted_point:" << _segcompacted_point; + status = StorageEngine::instance()->submit_seg_compaction_task(this, segments); + if (status.ok()) { + return status; + } + } + _is_doing_segcompaction = false; + return status; +} + Status BetaRowsetWriter::_add_block(const vectorized::Block* block, std::unique_ptr* segment_writer) { size_t block_size_in_bytes = block->bytes(); @@ -133,7 +573,18 @@ Status BetaRowsetWriter::_add_block(const vectorized::Block* block, row_offset += input_row_num; } while (row_offset < block_row_num); - _num_rows_written += block_row_num; + _raw_num_rows_written += block_row_num; + return Status::OK(); +} + +Status BetaRowsetWriter::_add_block_for_segcompaction( + const vectorized::Block* block, + std::unique_ptr* segment_writer) { + auto s = (*segment_writer)->append_block(block, 0, block->rows()); + if (UNLIKELY(!s.ok())) { + LOG(WARNING) << "failed to append block: " << s.to_string(); + return Status::OLAPInternalError(OLAP_ERR_WRITER_DATA_WRITE_ERROR); + } return Status::OK(); } @@ -152,7 +603,7 @@ Status BetaRowsetWriter::_add_row(const RowType& row) { _segment_writer->num_rows_written() >= _context.max_rows_per_segment)) { RETURN_NOT_OK(_flush_segment_writer(&_segment_writer)); } - ++_num_rows_written; + ++_raw_num_rows_written; return Status::OK(); } @@ -183,12 +634,14 @@ Status BetaRowsetWriter::add_rowset_for_linked_schema_change(RowsetSharedPtr row Status BetaRowsetWriter::flush() { if (_segment_writer != nullptr) { RETURN_NOT_OK(_flush_segment_writer(&_segment_writer)); + RETURN_NOT_OK(_segcompaction_if_necessary()); } return Status::OK(); } Status BetaRowsetWriter::flush_single_memtable(MemTable* memtable, int64_t* flush_size) { - int64_t current_flush_size = _total_data_size + _total_index_size; + int64_t size = 0; + int64_t sum_size = 0; // Create segment writer for each memtable, so that // all memtables can be flushed in parallel. std::unique_ptr writer; @@ -196,6 +649,7 @@ Status BetaRowsetWriter::flush_single_memtable(MemTable* memtable, int64_t* flus MemTable::Iterator it(memtable); for (it.seek_to_first(); it.valid(); it.next()) { if (PREDICT_FALSE(writer == nullptr)) { + RETURN_NOT_OK(_segcompaction_if_necessary()); RETURN_NOT_OK(_create_segment_writer(&writer)); } ContiguousRow dst_row = it.get_current_row(); @@ -207,16 +661,24 @@ Status BetaRowsetWriter::flush_single_memtable(MemTable* memtable, int64_t* flus if (PREDICT_FALSE(writer->estimate_segment_size() >= MAX_SEGMENT_SIZE || writer->num_rows_written() >= _context.max_rows_per_segment)) { - RETURN_NOT_OK(_flush_segment_writer(&writer)); + auto s = _flush_segment_writer(&writer, &size); + sum_size += size; + if (OLAP_UNLIKELY(!s.ok())) { + *flush_size = sum_size; + return s; + } } - ++_num_rows_written; } if (writer != nullptr) { - RETURN_NOT_OK(_flush_segment_writer(&writer)); + auto s = _flush_segment_writer(&writer, &size); + sum_size += size; + *flush_size = sum_size; + if (OLAP_UNLIKELY(!s.ok())) { + return s; + } } - *flush_size = (_total_data_size + _total_index_size) - current_flush_size; return Status::OK(); } @@ -224,6 +686,7 @@ Status BetaRowsetWriter::flush_single_memtable(const vectorized::Block* block) { if (block->rows() == 0) { return Status::OK(); } + RETURN_NOT_OK(_segcompaction_if_necessary()); std::unique_ptr writer; RETURN_NOT_OK(_create_segment_writer(&writer)); RETURN_NOT_OK(_add_block(block, &writer)); @@ -231,6 +694,23 @@ Status BetaRowsetWriter::flush_single_memtable(const vectorized::Block* block) { return Status::OK(); } +Status BetaRowsetWriter::_wait_flying_segcompaction() { + std::unique_lock l(_is_doing_segcompaction_lock); + uint64_t begin_wait = GetCurrentTimeMicros(); + while (_is_doing_segcompaction) { + // change sync wait to async? + _segcompacting_cond.wait(l); + } + uint64_t elapsed = GetCurrentTimeMicros() - begin_wait; + if (elapsed >= MICROS_PER_SEC) { + LOG(INFO) << "wait flying segcompaction finish time:" << elapsed << "us"; + } + if (_segcompaction_status.load() != OLAP_SUCCESS) { + return Status::OLAPInternalError(OLAP_ERR_SEGCOMPACTION_FAILED); + } + return Status::OK(); +} + RowsetSharedPtr BetaRowsetWriter::build() { // TODO(lingbin): move to more better place, or in a CreateBlockBatch? for (auto& file_writer : _file_writers) { @@ -241,6 +721,26 @@ RowsetSharedPtr BetaRowsetWriter::build() { return nullptr; } } + Status status; + status = _wait_flying_segcompaction(); + if (!status.ok()) { + LOG(WARNING) << "segcompaction failed when build new rowset 1st wait, res=" << status; + return nullptr; + } + status = _segcompaction_ramaining_if_necessary(); + if (!status.ok()) { + LOG(WARNING) << "segcompaction failed when build new rowset, res=" << status; + return nullptr; + } + status = _wait_flying_segcompaction(); + if (!status.ok()) { + LOG(WARNING) << "segcompaction failed when build new rowset 2nd wait, res=" << status; + return nullptr; + } + + if (_segcompaction_file_writer) { + _segcompaction_file_writer->close(); + } // When building a rowset, we must ensure that the current _segment_writer has been // flushed, that is, the current _segment_writer is nullptr DCHECK(_segment_writer == nullptr) << "segment must be null when build rowset"; @@ -255,8 +755,8 @@ RowsetSharedPtr BetaRowsetWriter::build() { } RowsetSharedPtr rowset; - auto status = RowsetFactory::create_rowset(_context.tablet_schema, _context.rowset_dir, - _rowset_meta, &rowset); + status = RowsetFactory::create_rowset(_context.tablet_schema, _context.rowset_dir, _rowset_meta, + &rowset); if (!status.ok()) { LOG(WARNING) << "rowset init failed when build new rowset, res=" << status; return nullptr; @@ -266,23 +766,47 @@ RowsetSharedPtr BetaRowsetWriter::build() { } void BetaRowsetWriter::_build_rowset_meta(std::shared_ptr rowset_meta) { - rowset_meta->set_num_rows(_num_rows_written); - rowset_meta->set_total_disk_size(_total_data_size); - rowset_meta->set_data_disk_size(_total_data_size); - rowset_meta->set_index_disk_size(_total_index_size); - // TODO write zonemap to meta - rowset_meta->set_empty(_num_rows_written == 0); - rowset_meta->set_creation_time(time(nullptr)); - rowset_meta->set_num_segments(_num_segment); - if (_num_segment <= 1) { + int64_t num_seg = _is_segcompacted() ? _num_segcompacted : _num_segment; + int64_t num_rows_written = 0; + int64_t total_data_size = 0; + int64_t total_index_size = 0; + std::vector segment_num_rows; + std::vector segments_encoded_key_bounds; + { + std::lock_guard lock(_segid_statistics_map_mutex); + for (const auto& itr : _segid_statistics_map) { + num_rows_written += itr.second.row_num; + segment_num_rows.push_back(itr.second.row_num); + total_data_size += itr.second.data_size; + total_index_size += itr.second.index_size; + segments_encoded_key_bounds.push_back(itr.second.key_bounds); + } + } + rowset_meta->set_num_segments(num_seg); + if (num_seg <= 1) { rowset_meta->set_segments_overlap(NONOVERLAPPING); } + _segment_num_rows = segment_num_rows; + for (auto itr = _segments_encoded_key_bounds.begin(); itr != _segments_encoded_key_bounds.end(); + ++itr) { + segments_encoded_key_bounds.push_back(*itr); + } + + CHECK_EQ(segments_encoded_key_bounds.size(), num_seg); + rowset_meta->set_num_rows(num_rows_written + _num_rows_written); + rowset_meta->set_total_disk_size(total_data_size + _total_data_size); + rowset_meta->set_data_disk_size(total_data_size + _total_data_size); + rowset_meta->set_index_disk_size(total_index_size + _total_index_size); + rowset_meta->set_segments_key_bounds(segments_encoded_key_bounds); + // TODO write zonemap to meta + rowset_meta->set_empty((num_rows_written + _num_rows_written) == 0); + rowset_meta->set_creation_time(time(nullptr)); + if (_is_pending) { rowset_meta->set_rowset_state(COMMITTED); } else { rowset_meta->set_rowset_state(VISIBLE); } - rowset_meta->set_segments_key_bounds(_segments_encoded_key_bounds); } RowsetSharedPtr BetaRowsetWriter::build_tmp() { @@ -300,10 +824,19 @@ RowsetSharedPtr BetaRowsetWriter::build_tmp() { return rowset; } -Status BetaRowsetWriter::_create_segment_writer( - std::unique_ptr* writer) { - int32_t segment_id = _num_segment.fetch_add(1); - auto path = BetaRowset::segment_file_path(_context.rowset_dir, _context.rowset_id, segment_id); +Status BetaRowsetWriter::_do_create_segment_writer( + std::unique_ptr* writer, bool is_segcompaction, int64_t begin, + int64_t end) { + std::string path; + int32_t segment_id = 0; + if (is_segcompaction) { + DCHECK(begin >= 0 && end >= 0); + path = BetaRowset::local_segment_path_segcompacted(_context.rowset_dir, _context.rowset_id, + begin, end); + } else { + segment_id = _num_segment.fetch_add(1); + path = BetaRowset::segment_file_path(_context.rowset_dir, _context.rowset_id, segment_id); + } auto fs = _rowset_meta->fs(); if (!fs) { return Status::OLAPInternalError(OLAP_ERR_INIT_FAILED); @@ -319,12 +852,23 @@ Status BetaRowsetWriter::_create_segment_writer( DCHECK(file_writer != nullptr); segment_v2::SegmentWriterOptions writer_options; writer_options.enable_unique_key_merge_on_write = _context.enable_unique_key_merge_on_write; - writer->reset(new segment_v2::SegmentWriter(file_writer.get(), segment_id, - _context.tablet_schema, _context.data_dir, - _context.max_rows_per_segment, writer_options)); - { - std::lock_guard l(_lock); - _file_writers.push_back(std::move(file_writer)); + + if (is_segcompaction) { + writer->reset(new segment_v2::SegmentWriter(file_writer.get(), _num_segcompacted, + _context.tablet_schema, _context.data_dir, + _context.max_rows_per_segment, writer_options)); + if (_segcompaction_file_writer != nullptr) { + _segcompaction_file_writer->close(); + } + _segcompaction_file_writer.reset(file_writer.release()); + } else { + writer->reset(new segment_v2::SegmentWriter(file_writer.get(), segment_id, + _context.tablet_schema, _context.data_dir, + _context.max_rows_per_segment, writer_options)); + { + std::lock_guard l(_lock); + _file_writers.push_back(std::move(file_writer)); + } } auto s = (*writer)->init(); @@ -336,7 +880,31 @@ Status BetaRowsetWriter::_create_segment_writer( return Status::OK(); } -Status BetaRowsetWriter::_flush_segment_writer(std::unique_ptr* writer) { +Status BetaRowsetWriter::_create_segment_writer( + std::unique_ptr* writer) { + size_t total_segment_num = _num_segment - _segcompacted_point + 1 + _num_segcompacted; + if (UNLIKELY(total_segment_num > config::max_segment_num_per_rowset)) { + LOG(ERROR) << "too many segments in rowset." + << " max:" << config::max_segment_num_per_rowset + << " _num_segment:" << _num_segment + << " _segcompacted_point:" << _segcompacted_point + << " _num_segcompacted:" << _num_segcompacted; + return Status::OLAPInternalError(OLAP_ERR_TOO_MANY_SEGMENTS); + } else { + return _do_create_segment_writer(writer, false, -1, -1); + } +} + +Status BetaRowsetWriter::_create_segment_writer_for_segcompaction( + std::unique_ptr* writer, uint64_t begin, uint64_t end) { + return _do_create_segment_writer(writer, true, begin, end); +} + +Status BetaRowsetWriter::_flush_segment_writer(std::unique_ptr* writer, + int64_t* flush_size) { + uint32_t segid = (*writer)->get_segment_id(); + uint32_t row_num = (*writer)->num_rows_written(); + if ((*writer)->num_rows_written() == 0) { return Status::OK(); } @@ -347,22 +915,30 @@ Status BetaRowsetWriter::_flush_segment_writer(std::unique_ptrmin_encoded_key(); Slice max_key = (*writer)->max_encoded_key(); DCHECK_LE(min_key.compare(max_key), 0); key_bounds.set_min_key(min_key.to_string()); key_bounds.set_max_key(max_key.to_string()); + + Statistics segstat; + segstat.row_num = row_num; + segstat.data_size = segment_size; + segstat.index_size = index_size; + segstat.key_bounds = key_bounds; { - std::lock_guard l(_lock); - _segment_num_rows.resize(_num_segment); - _segments_encoded_key_bounds.resize(_num_segment); - _segment_num_rows[(*writer)->get_segment_id()] = (*writer)->num_rows_written(); - _segments_encoded_key_bounds[(*writer)->get_segment_id()] = key_bounds; + std::lock_guard lock(_segid_statistics_map_mutex); + CHECK_EQ(_segid_statistics_map.find(segid) == _segid_statistics_map.end(), true); + _segid_statistics_map.emplace(segid, segstat); } + VLOG_DEBUG << "_segid_statistics_map add new record. segid:" << segid << " row_num:" << row_num + << " data_size:" << segment_size << " index_size:" << index_size; + writer->reset(); + if (flush_size) { + *flush_size = segment_size + index_size; + } return Status::OK(); } diff --git a/be/src/olap/rowset/beta_rowset_writer.h b/be/src/olap/rowset/beta_rowset_writer.h index 5980fae81e..65662ecf1a 100644 --- a/be/src/olap/rowset/beta_rowset_writer.h +++ b/be/src/olap/rowset/beta_rowset_writer.h @@ -18,6 +18,7 @@ #pragma once #include "olap/rowset/rowset_writer.h" +#include "vec/olap/vgeneric_iterators.h" namespace doris { namespace segment_v2 { @@ -28,6 +29,9 @@ namespace io { class FileWriter; } // namespace io +using SegCompactionCandidates = std::vector; +using SegCompactionCandidatesSharedPtr = std::shared_ptr; + class BetaRowsetWriter : public RowsetWriter { public: BetaRowsetWriter(); @@ -62,7 +66,7 @@ public: Version version() override { return _context.version; } - int64_t num_rows() const override { return _num_rows_written; } + int64_t num_rows() const override { return _raw_num_rows_written; } RowsetId rowset_id() override { return _context.rowset_id; } @@ -74,42 +78,100 @@ public: return Status::OK(); } + void compact_segments(SegCompactionCandidatesSharedPtr segments); + private: template Status _add_row(const RowType& row); Status _add_block(const vectorized::Block* block, std::unique_ptr* writer); + Status _add_block_for_segcompaction(const vectorized::Block* block, + std::unique_ptr* writer); + Status _do_create_segment_writer(std::unique_ptr* writer, + bool is_segcompaction, int64_t begin, int64_t end); Status _create_segment_writer(std::unique_ptr* writer); + Status _create_segment_writer_for_segcompaction( + std::unique_ptr* writer, uint64_t begin, uint64_t end); - Status _flush_segment_writer(std::unique_ptr* writer); + Status _flush_segment_writer(std::unique_ptr* writer, + int64_t* flush_size = nullptr); void _build_rowset_meta(std::shared_ptr rowset_meta); + Status _segcompaction_if_necessary(); + Status _segcompaction_ramaining_if_necessary(); + vectorized::VMergeIterator* _get_segcompaction_reader(SegCompactionCandidatesSharedPtr segments, + std::shared_ptr schema, + OlapReaderStatistics* stat, + uint64_t* merged_row_stat); + std::unique_ptr _create_segcompaction_writer(uint64_t begin, + uint64_t end); + Status _delete_original_segments(uint32_t begin, uint32_t end); + Status _rename_compacted_segments(int64_t begin, int64_t end); + Status _rename_compacted_segment_plain(uint64_t seg_id); + Status _load_noncompacted_segments(std::vector* segments); + Status _find_longest_consecutive_small_segment(SegCompactionCandidatesSharedPtr segments); + Status _get_segcompaction_candidates(SegCompactionCandidatesSharedPtr& segments, bool is_last); + Status _wait_flying_segcompaction(); + bool _is_segcompacted() { return (_num_segcompacted > 0) ? true : false; } + void _clear_statistics_for_deleting_segments_unsafe(uint64_t begin, uint64_t end); + Status _check_correctness(std::unique_ptr stat, uint64_t merged_row_stat, + uint64_t row_count, uint64_t begin, uint64_t end); + bool _check_and_set_is_doing_segcompaction(); + + Status _do_compact_segments(SegCompactionCandidatesSharedPtr segments); private: RowsetWriterContext _context; std::shared_ptr _rowset_meta; std::atomic _num_segment; + std::atomic _segcompacted_point; // segemnts before this point have + // already been segment compacted + std::atomic _num_segcompacted; // index for segment compaction /// When flushing the memtable in the load process, we do not use this writer but an independent writer. /// Because we want to flush memtables in parallel. /// In other processes, such as merger or schema change, we will use this unified writer for data writing. std::unique_ptr _segment_writer; mutable SpinLock _lock; // protect following vectors. + // record rows number of every segment + std::vector _segment_num_rows; std::vector _file_writers; // for unique key table with merge-on-write std::vector _segments_encoded_key_bounds; - // record rows number of every segment - std::vector _segment_num_rows; - // counters and statistics maintained during data write + io::FileWriterPtr _segcompaction_file_writer; + + // counters and statistics maintained during add_rowset std::atomic _num_rows_written; std::atomic _total_data_size; std::atomic _total_index_size; // TODO rowset Zonemap + // written rows by add_block/add_row (not effected by segcompaction) + std::atomic _raw_num_rows_written; + + struct Statistics { + int64_t row_num; + int64_t data_size; + int64_t index_size; + KeyBoundsPB key_bounds; + }; + std::map _segid_statistics_map; + std::mutex _segid_statistics_map_mutex; + bool _is_pending = false; bool _already_built = false; + + // ensure only one inflight segcompaction task for each rowset + std::atomic _is_doing_segcompaction; + // enforce compare-and-swap on _is_doing_segcompaction + std::mutex _is_doing_segcompaction_lock; + std::condition_variable _segcompacting_cond; + + std::atomic _segcompaction_status; + + fmt::memory_buffer vlog_buffer; }; } // namespace doris diff --git a/be/src/olap/rowset/segment_v2/segment_writer.h b/be/src/olap/rowset/segment_v2/segment_writer.h index fd54dec72d..4d81dbb2a9 100644 --- a/be/src/olap/rowset/segment_v2/segment_writer.h +++ b/be/src/olap/rowset/segment_v2/segment_writer.h @@ -84,10 +84,15 @@ public: static void init_column_meta(ColumnMetaPB* meta, uint32_t* column_id, const TabletColumn& column, TabletSchemaSPtr tablet_schema); - uint32_t get_segment_id() { return _segment_id; } + + uint32_t get_segment_id() const { return _segment_id; } + Slice min_encoded_key(); Slice max_encoded_key(); + DataDir* get_data_dir() { return _data_dir; } + bool is_unique_key() { return _tablet_schema->keys_type() == UNIQUE_KEYS; } + private: DISALLOW_COPY_AND_ASSIGN(SegmentWriter); Status _write_data(); diff --git a/be/src/olap/schema_change.h b/be/src/olap/schema_change.h index 769eee02a7..00e2f79bad 100644 --- a/be/src/olap/schema_change.h +++ b/be/src/olap/schema_change.h @@ -135,6 +135,7 @@ protected: if (reader->rowset()->num_rows() != writer.num_rows() + _merged_rows + _filtered_rows) { LOG(WARNING) << "fail to check row num! " << "source_rows=" << reader->rowset()->num_rows() + << ", writer rows=" << writer.num_rows() << ", merged_rows=" << merged_rows() << ", filtered_rows=" << filtered_rows() << ", new_index_rows=" << writer.num_rows(); diff --git a/be/src/olap/storage_engine.cpp b/be/src/olap/storage_engine.cpp index 183e4fa5ba..9da8c8e147 100644 --- a/be/src/olap/storage_engine.cpp +++ b/be/src/olap/storage_engine.cpp @@ -81,6 +81,8 @@ using strings::Substitute; namespace doris { DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(unused_rowsets_count, MetricUnit::ROWSETS); +DEFINE_GAUGE_METRIC_PROTOTYPE_5ARG(segcompaction_mem_consumption, MetricUnit::BYTES, "", + mem_consumption, Labels({{"type", "segcompaction"}})); DEFINE_GAUGE_METRIC_PROTOTYPE_5ARG(compaction_mem_consumption, MetricUnit::BYTES, "", mem_consumption, Labels({{"type", "compaction"}})); DEFINE_GAUGE_METRIC_PROTOTYPE_5ARG(schema_change_mem_consumption, MetricUnit::BYTES, "", @@ -110,6 +112,8 @@ StorageEngine::StorageEngine(const EngineOptions& options) _available_storage_medium_type_count(0), _effective_cluster_id(-1), _is_all_cluster_id_exist(true), + _segcompaction_mem_tracker( + std::make_shared(-1, "StorageEngine::SegCompaction")), _compaction_mem_tracker( std::make_shared(-1, "StorageEngine::AutoCompaction")), _segment_meta_mem_tracker(std::make_unique("StorageEngine::SegmentMeta")), @@ -134,6 +138,8 @@ StorageEngine::StorageEngine(const EngineOptions& options) // std::lock_guard lock(_gc_mutex); return _unused_rowsets.size(); }); + REGISTER_HOOK_METRIC(segcompaction_mem_consumption, + [this]() { return _segcompaction_mem_tracker->consumption(); }); REGISTER_HOOK_METRIC(compaction_mem_consumption, [this]() { return _compaction_mem_tracker->consumption(); }); REGISTER_HOOK_METRIC(schema_change_mem_consumption, @@ -142,6 +148,7 @@ StorageEngine::StorageEngine(const EngineOptions& options) StorageEngine::~StorageEngine() { DEREGISTER_HOOK_METRIC(unused_rowsets_count); + DEREGISTER_HOOK_METRIC(segcompaction_mem_consumption); DEREGISTER_HOOK_METRIC(compaction_mem_consumption); DEREGISTER_HOOK_METRIC(schema_change_mem_consumption); _clear(); @@ -157,6 +164,10 @@ StorageEngine::~StorageEngine() { _quick_compaction_thread_pool->shutdown(); } + if (_seg_compaction_thread_pool) { + _seg_compaction_thread_pool->shutdown(); + } + if (_tablet_meta_checkpoint_thread_pool) { _tablet_meta_checkpoint_thread_pool->shutdown(); } diff --git a/be/src/olap/storage_engine.h b/be/src/olap/storage_engine.h index 7d0dad4f6e..93f1d0c438 100644 --- a/be/src/olap/storage_engine.h +++ b/be/src/olap/storage_engine.h @@ -58,6 +58,10 @@ class BlockManager; class MemTableFlushExecutor; class Tablet; class TaskWorkerPool; +class BetaRowsetWriter; + +using SegCompactionCandidates = std::vector; +using SegCompactionCandidatesSharedPtr = std::shared_ptr; // StorageEngine singleton to manage all Table pointers. // Providing add/drop/get operations. @@ -175,6 +179,9 @@ public: Status get_compaction_status_json(std::string* result); + std::shared_ptr segcompaction_mem_tracker() { + return _segcompaction_mem_tracker; + } std::shared_ptr compaction_mem_tracker() { return _compaction_mem_tracker; } MemTracker* segment_meta_mem_tracker() { return _segment_meta_mem_tracker.get(); } std::shared_ptr schema_change_mem_tracker() { @@ -191,6 +198,8 @@ public: Status submit_compaction_task(TabletSharedPtr tablet, CompactionType compaction_type); Status submit_quick_compaction_task(TabletSharedPtr tablet); + Status submit_seg_compaction_task(BetaRowsetWriter* writer, + SegCompactionCandidatesSharedPtr segments); std::unique_ptr& tablet_publish_txn_thread_pool() { return _tablet_publish_txn_thread_pool; @@ -277,6 +286,9 @@ private: void _cache_file_cleaner_tasks_producer_callback(); + Status _handle_seg_compaction(BetaRowsetWriter* writer, + SegCompactionCandidatesSharedPtr segments); + private: struct CompactionCandidate { CompactionCandidate(uint32_t nicumulative_compaction_, int64_t tablet_id_, uint32_t index_) @@ -322,6 +334,8 @@ private: // map, if we use RowsetId as the key, we need custom hash func std::unordered_map _unused_rowsets; + // Count the memory consumption of segment compaction tasks. + std::shared_ptr _segcompaction_mem_tracker; // Count the memory consumption of all Base and Cumulative tasks. std::shared_ptr _compaction_mem_tracker; // This mem tracker is only for tracking memory use by segment meta data such as footer or index page. @@ -377,6 +391,7 @@ private: std::unique_ptr _quick_compaction_thread_pool; std::unique_ptr _base_compaction_thread_pool; std::unique_ptr _cumu_compaction_thread_pool; + std::unique_ptr _seg_compaction_thread_pool; std::unique_ptr _tablet_publish_txn_thread_pool; diff --git a/be/src/util/doris_metrics.h b/be/src/util/doris_metrics.h index fe0689f23c..2b5ca205bf 100644 --- a/be/src/util/doris_metrics.h +++ b/be/src/util/doris_metrics.h @@ -185,6 +185,7 @@ public: UIntGauge* brpc_function_endpoint_stub_count; UIntGauge* tablet_writer_count; + UIntGauge* segcompaction_mem_consumption; UIntGauge* compaction_mem_consumption; UIntGauge* load_mem_consumption; UIntGauge* load_channel_mem_consumption; diff --git a/be/src/vec/olap/vgeneric_iterators.cpp b/be/src/vec/olap/vgeneric_iterators.cpp index bc74ab0862..20b4479e2e 100644 --- a/be/src/vec/olap/vgeneric_iterators.cpp +++ b/be/src/vec/olap/vgeneric_iterators.cpp @@ -15,13 +15,14 @@ // specific language governing permissions and limitations // under the License. +#include + #include #include #include #include "common/status.h" #include "olap/iterators.h" -#include "olap/rowset/segment_v2/column_reader.h" #include "olap/rowset/segment_v2/segment.h" #include "olap/schema.h" #include "vec/core/block.h" @@ -29,6 +30,139 @@ namespace doris { namespace vectorized { +VStatisticsIterator::~VStatisticsIterator() { + for (auto& pair : _column_iterators_map) { + delete pair.second; + } +} + +Status VStatisticsIterator::init(const StorageReadOptions& opts) { + if (!_init) { + _push_down_agg_type_opt = opts.push_down_agg_type_opt; + + for (size_t i = 0; i < _schema.num_column_ids(); i++) { + auto cid = _schema.column_id(i); + auto unique_id = _schema.column(cid)->unique_id(); + if (_column_iterators_map.count(unique_id) < 1) { + RETURN_IF_ERROR(_segment->new_column_iterator(opts.tablet_schema->column(cid), + &_column_iterators_map[unique_id])); + } + _column_iterators.push_back(_column_iterators_map[unique_id]); + } + + _target_rows = _push_down_agg_type_opt == TPushAggOp::MINMAX ? 2 : _segment->num_rows(); + _init = true; + } + + return Status::OK(); +} + +Status VStatisticsIterator::next_batch(Block* block) { + DCHECK(block->columns() == _column_iterators.size()); + if (_output_rows < _target_rows) { + block->clear_column_data(); + auto columns = block->mutate_columns(); + + size_t size = _push_down_agg_type_opt == TPushAggOp::MINMAX + ? 2 + : std::min(_target_rows - _output_rows, MAX_ROW_SIZE_IN_COUNT); + if (_push_down_agg_type_opt == TPushAggOp::COUNT) { + size = std::min(_target_rows - _output_rows, MAX_ROW_SIZE_IN_COUNT); + for (int i = 0; i < block->columns(); ++i) { + columns[i]->resize(size); + } + } else { + for (int i = 0; i < block->columns(); ++i) { + _column_iterators[i]->next_batch_of_zone_map(&size, columns[i]); + } + } + _output_rows += size; + return Status::OK(); + } + return Status::EndOfFile("End of VStatisticsIterator"); +} + +Status VMergeIteratorContext::block_reset(const std::shared_ptr& block) { + if (!*block) { + const Schema& schema = _iter->schema(); + const auto& column_ids = schema.column_ids(); + for (size_t i = 0; i < schema.num_column_ids(); ++i) { + auto column_desc = schema.column(column_ids[i]); + auto data_type = Schema::get_data_type_ptr(*column_desc); + if (data_type == nullptr) { + return Status::RuntimeError("invalid data type"); + } + auto column = data_type->create_column(); + column->reserve(_block_row_max); + block->insert(ColumnWithTypeAndName(std::move(column), data_type, column_desc->name())); + } + } else { + block->clear_column_data(); + } + return Status::OK(); +} + +bool VMergeIteratorContext::compare(const VMergeIteratorContext& rhs) const { + int cmp_res = UNLIKELY(_compare_columns) + ? _block->compare_at(_index_in_block, rhs._index_in_block, + _compare_columns, *rhs._block, -1) + : _block->compare_at(_index_in_block, rhs._index_in_block, + _num_key_columns, *rhs._block, -1); + + if (cmp_res != 0) { + return UNLIKELY(_is_reverse) ? cmp_res < 0 : cmp_res > 0; + } + + auto col_cmp_res = 0; + if (_sequence_id_idx != -1) { + col_cmp_res = _block->compare_column_at(_index_in_block, rhs._index_in_block, + _sequence_id_idx, *rhs._block, -1); + } + auto result = col_cmp_res == 0 ? data_id() < rhs.data_id() : col_cmp_res < 0; + + if (_is_unique) { + result ? set_skip(true) : rhs.set_skip(true); + } + return result; +} + +// `advanced = false` when current block finished +void VMergeIteratorContext::copy_rows(Block* block, bool advanced) { + Block& src = *_block; + Block& dst = *block; + if (_cur_batch_num == 0) { + return; + } + + // copy a row to dst block column by column + size_t start = _index_in_block - _cur_batch_num + 1 - advanced; + DCHECK(start >= 0); + + for (size_t i = 0; i < _num_columns; ++i) { + auto& s_col = src.get_by_position(i); + auto& d_col = dst.get_by_position(i); + + ColumnPtr& s_cp = s_col.column; + ColumnPtr& d_cp = d_col.column; + + d_cp->assume_mutable()->insert_range_from(*s_cp, start, _cur_batch_num); + } + _cur_batch_num = 0; +} + +void VMergeIteratorContext::copy_rows(BlockView* view, bool advanced) { + if (_cur_batch_num == 0) { + return; + } + size_t start = _index_in_block - _cur_batch_num + 1 - advanced; + DCHECK(start >= 0); + + for (size_t i = 0; i < _cur_batch_num; ++i) { + view->push_back({_block, static_cast(start + i), false}); + } + + _cur_batch_num = 0; +} // This iterator will generate ordered data. For example for schema // (int, int) this iterator will generator data like @@ -113,251 +247,6 @@ Status VAutoIncrementIterator::init(const StorageReadOptions& opts) { return Status::OK(); } -class VStatisticsIterator : public RowwiseIterator { -public: - // Will generate num_rows rows in total - VStatisticsIterator(std::shared_ptr segment, const Schema& schema) - : _segment(std::move(segment)), _schema(schema) {} - - ~VStatisticsIterator() override { - for (auto& pair : _column_iterators_map) { - delete pair.second; - } - } - - Status init(const StorageReadOptions& opts) override { - if (!_init) { - _push_down_agg_type_opt = opts.push_down_agg_type_opt; - - for (size_t i = 0; i < _schema.num_column_ids(); i++) { - auto cid = _schema.column_id(i); - auto unique_id = _schema.column(cid)->unique_id(); - if (_column_iterators_map.count(unique_id) < 1) { - RETURN_IF_ERROR(_segment->new_column_iterator( - opts.tablet_schema->column(cid), &_column_iterators_map[unique_id])); - } - _column_iterators.push_back(_column_iterators_map[unique_id]); - } - - _target_rows = _push_down_agg_type_opt == TPushAggOp::MINMAX ? 2 : _segment->num_rows(); - _init = true; - } - - return Status::OK(); - } - - Status next_batch(Block* block) override { - DCHECK(block->columns() == _column_iterators.size()); - if (_output_rows < _target_rows) { - block->clear_column_data(); - auto columns = block->mutate_columns(); - - size_t size = _push_down_agg_type_opt == TPushAggOp::MINMAX - ? 2 - : std::min(_target_rows - _output_rows, MAX_ROW_SIZE_IN_COUNT); - if (_push_down_agg_type_opt == TPushAggOp::COUNT) { - size = std::min(_target_rows - _output_rows, MAX_ROW_SIZE_IN_COUNT); - for (int i = 0; i < block->columns(); ++i) { - columns[i]->resize(size); - } - } else { - for (int i = 0; i < block->columns(); ++i) { - _column_iterators[i]->next_batch_of_zone_map(&size, columns[i]); - } - } - _output_rows += size; - return Status::OK(); - } - return Status::EndOfFile("End of VStatisticsIterator"); - } - - const Schema& schema() const override { return _schema; } - -private: - std::shared_ptr _segment; - const Schema& _schema; - size_t _target_rows = 0; - size_t _output_rows = 0; - bool _init = false; - TPushAggOp::type _push_down_agg_type_opt; - std::map _column_iterators_map; - std::vector _column_iterators; - - static constexpr size_t MAX_ROW_SIZE_IN_COUNT = 65535; -}; - -// Used to store merge state for a VMergeIterator input. -// This class will iterate all data from internal iterator -// through client call advance(). -// Usage: -// VMergeIteratorContext ctx(iter); -// RETURN_IF_ERROR(ctx.init()); -// while (ctx.valid()) { -// visit(ctx.current_row()); -// RETURN_IF_ERROR(ctx.advance()); -// } -class VMergeIteratorContext { -public: - VMergeIteratorContext(RowwiseIterator* iter, int sequence_id_idx, bool is_unique, - bool is_reverse, std::vector* read_orderby_key_columns) - : _iter(iter), - _sequence_id_idx(sequence_id_idx), - _is_unique(is_unique), - _is_reverse(is_reverse), - _num_columns(iter->schema().num_column_ids()), - _num_key_columns(iter->schema().num_key_columns()), - _compare_columns(read_orderby_key_columns) {} - - VMergeIteratorContext(const VMergeIteratorContext&) = delete; - VMergeIteratorContext(VMergeIteratorContext&&) = delete; - VMergeIteratorContext& operator=(const VMergeIteratorContext&) = delete; - VMergeIteratorContext& operator=(VMergeIteratorContext&&) = delete; - - ~VMergeIteratorContext() { - delete _iter; - _iter = nullptr; - } - - Status block_reset(const std::shared_ptr& block) { - if (!*block) { - const Schema& schema = _iter->schema(); - const auto& column_ids = schema.column_ids(); - for (size_t i = 0; i < schema.num_column_ids(); ++i) { - auto column_desc = schema.column(column_ids[i]); - auto data_type = Schema::get_data_type_ptr(*column_desc); - if (data_type == nullptr) { - return Status::RuntimeError("invalid data type"); - } - auto column = data_type->create_column(); - column->reserve(_block_row_max); - block->insert( - ColumnWithTypeAndName(std::move(column), data_type, column_desc->name())); - } - } else { - block->clear_column_data(); - } - return Status::OK(); - } - - // Initialize this context and will prepare data for current_row() - Status init(const StorageReadOptions& opts); - - bool compare(const VMergeIteratorContext& rhs) const { - int cmp_res = UNLIKELY(_compare_columns) - ? _block->compare_at(_index_in_block, rhs._index_in_block, - _compare_columns, *rhs._block, -1) - : _block->compare_at(_index_in_block, rhs._index_in_block, - _num_key_columns, *rhs._block, -1); - - if (cmp_res != 0) { - return UNLIKELY(_is_reverse) ? cmp_res < 0 : cmp_res > 0; - } - - auto col_cmp_res = 0; - if (_sequence_id_idx != -1) { - col_cmp_res = _block->compare_column_at(_index_in_block, rhs._index_in_block, - _sequence_id_idx, *rhs._block, -1); - } - auto result = col_cmp_res == 0 ? data_id() < rhs.data_id() : col_cmp_res < 0; - - if (_is_unique) { - result ? set_skip(true) : rhs.set_skip(true); - } - return result; - } - - // `advanced = false` when current block finished - void copy_rows(Block* block, bool advanced = true) { - Block& src = *_block; - Block& dst = *block; - if (_cur_batch_num == 0) { - return; - } - - // copy a row to dst block column by column - size_t start = _index_in_block - _cur_batch_num + 1 - advanced; - DCHECK(start >= 0); - - for (size_t i = 0; i < _num_columns; ++i) { - auto& s_col = src.get_by_position(i); - auto& d_col = dst.get_by_position(i); - - ColumnPtr& s_cp = s_col.column; - ColumnPtr& d_cp = d_col.column; - - d_cp->assume_mutable()->insert_range_from(*s_cp, start, _cur_batch_num); - } - _cur_batch_num = 0; - } - - void copy_rows(BlockView* view, bool advanced = true) { - if (_cur_batch_num == 0) { - return; - } - size_t start = _index_in_block - _cur_batch_num + 1 - advanced; - DCHECK(start >= 0); - - for (size_t i = 0; i < _cur_batch_num; ++i) { - view->push_back({_block, static_cast(start + i), false}); - } - - _cur_batch_num = 0; - } - - RowLocation current_row_location() { - DCHECK(_record_rowids); - return _block_row_locations[_index_in_block]; - } - - // Advance internal row index to next valid row - // Return error if error happens - // Don't call this when valid() is false, action is undefined - Status advance(); - - // Return if it has remaining data in this context. - // Only when this function return true, current_row() - // will return a valid row - bool valid() const { return _valid; } - - uint64_t data_id() const { return _iter->data_id(); } - - bool need_skip() const { return _skip; } - - void set_skip(bool skip) const { _skip = skip; } - - void add_cur_batch() { _cur_batch_num++; } - - void reset_cur_batch() { _cur_batch_num = 0; } - - bool is_cur_block_finished() { return _index_in_block == _block->rows() - 1; } - -private: - // Load next block into _block - Status _load_next_block(); - - RowwiseIterator* _iter; - - int _sequence_id_idx = -1; - bool _is_unique = false; - bool _is_reverse = false; - bool _valid = false; - mutable bool _skip = false; - size_t _index_in_block = -1; - // 4096 minus 16 + 16 bytes padding that in padding pod array - int _block_row_max = 4064; - int _num_columns; - int _num_key_columns; - std::vector* _compare_columns; - std::vector _block_row_locations; - bool _record_rowids = false; - size_t _cur_batch_num = 0; - - // used to store data load from iterator->next_batch(Block*) - std::shared_ptr _block; - // used to store data still on block view - std::list> _block_list; -}; - Status VMergeIteratorContext::init(const StorageReadOptions& opts) { _block_row_max = opts.block_row_max; _record_rowids = opts.record_rowids; @@ -419,139 +308,6 @@ Status VMergeIteratorContext::_load_next_block() { return Status::OK(); } -class VMergeIterator : public RowwiseIterator { -public: - // VMergeIterator takes the ownership of input iterators - VMergeIterator(std::vector& iters, int sequence_id_idx, bool is_unique, - bool is_reverse, uint64_t* merged_rows) - : _origin_iters(iters), - _sequence_id_idx(sequence_id_idx), - _is_unique(is_unique), - _is_reverse(is_reverse), - _merged_rows(merged_rows) {} - - ~VMergeIterator() override { - while (!_merge_heap.empty()) { - auto ctx = _merge_heap.top(); - _merge_heap.pop(); - delete ctx; - } - } - - Status init(const StorageReadOptions& opts) override; - - Status next_batch(Block* block) override { return _next_batch(block); } - Status next_block_view(BlockView* block_view) override { return _next_batch(block_view); } - - bool support_return_data_by_ref() override { return true; } - - const Schema& schema() const override { return *_schema; } - - Status current_block_row_locations(std::vector* block_row_locations) override { - DCHECK(_record_rowids); - *block_row_locations = _block_row_locations; - return Status::OK(); - } - - bool update_profile(RuntimeProfile* profile) override { - if (!_origin_iters.empty()) { - return (*_origin_iters.begin())->update_profile(profile); - } - return false; - } - -private: - int _get_size(Block* block) { return block->rows(); } - int _get_size(BlockView* block_view) { return block_view->size(); } - - template - Status _next_batch(T* block) { - if (UNLIKELY(_record_rowids)) { - _block_row_locations.resize(_block_row_max); - } - size_t row_idx = 0; - VMergeIteratorContext* pre_ctx = nullptr; - while (_get_size(block) < _block_row_max) { - if (_merge_heap.empty()) { - break; - } - - auto ctx = _merge_heap.top(); - _merge_heap.pop(); - - if (!ctx->need_skip()) { - ctx->add_cur_batch(); - if (pre_ctx != ctx) { - if (pre_ctx) { - pre_ctx->copy_rows(block); - } - pre_ctx = ctx; - } - if (UNLIKELY(_record_rowids)) { - _block_row_locations[row_idx] = ctx->current_row_location(); - } - row_idx++; - if (ctx->is_cur_block_finished() || row_idx >= _block_row_max) { - // current block finished, ctx not advance - // so copy start_idx = (_index_in_block - _cur_batch_num + 1) - ctx->copy_rows(block, false); - pre_ctx = nullptr; - } - } else if (_merged_rows != nullptr) { - (*_merged_rows)++; - // need skip cur row, so flush rows in pre_ctx - if (pre_ctx) { - pre_ctx->copy_rows(block); - pre_ctx = nullptr; - } - } - - RETURN_IF_ERROR(ctx->advance()); - if (ctx->valid()) { - _merge_heap.push(ctx); - } else { - // Release ctx earlier to reduce resource consumed - delete ctx; - } - } - if (!_merge_heap.empty()) { - return Status::OK(); - } - // Still last batch needs to be processed - - if (UNLIKELY(_record_rowids)) { - _block_row_locations.resize(row_idx); - } - - return Status::EndOfFile("no more data in segment"); - } - - // It will be released after '_merge_heap' has been built. - std::vector _origin_iters; - - const Schema* _schema = nullptr; - - struct VMergeContextComparator { - bool operator()(const VMergeIteratorContext* lhs, const VMergeIteratorContext* rhs) const { - return lhs->compare(*rhs); - } - }; - - using VMergeHeap = - std::priority_queue, - VMergeContextComparator>; - - VMergeHeap _merge_heap; - - int _block_row_max = 0; - int _sequence_id_idx = -1; - bool _is_unique = false; - bool _is_reverse = false; - uint64_t* _merged_rows = nullptr; - bool _record_rowids = false; - std::vector _block_row_locations; -}; - Status VMergeIterator::init(const StorageReadOptions& opts) { if (_origin_iters.empty()) { return Status::OK(); diff --git a/be/src/vec/olap/vgeneric_iterators.h b/be/src/vec/olap/vgeneric_iterators.h index 1342589355..210f47022a 100644 --- a/be/src/vec/olap/vgeneric_iterators.h +++ b/be/src/vec/olap/vgeneric_iterators.h @@ -16,6 +16,9 @@ // under the License. #include "olap/iterators.h" +#include "olap/row.h" +#include "olap/row_block2.h" +#include "olap/rowset/segment_v2/column_reader.h" namespace doris { @@ -25,6 +28,264 @@ class Segment; namespace vectorized { +class VStatisticsIterator : public RowwiseIterator { +public: + // Will generate num_rows rows in total + VStatisticsIterator(std::shared_ptr segment, const Schema& schema) + : _segment(std::move(segment)), _schema(schema) {} + + ~VStatisticsIterator() override; + + Status init(const StorageReadOptions& opts) override; + + Status next_batch(Block* block) override; + + const Schema& schema() const override { return _schema; } + +private: + std::shared_ptr _segment; + const Schema& _schema; + size_t _target_rows = 0; + size_t _output_rows = 0; + bool _init = false; + TPushAggOp::type _push_down_agg_type_opt; + std::map _column_iterators_map; + std::vector _column_iterators; + + static constexpr size_t MAX_ROW_SIZE_IN_COUNT = 65535; +}; + +// Used to store merge state for a VMergeIterator input. +// This class will iterate all data from internal iterator +// through client call advance(). +// Usage: +// VMergeIteratorContext ctx(iter); +// RETURN_IF_ERROR(ctx.init()); +// while (ctx.valid()) { +// visit(ctx.current_row()); +// RETURN_IF_ERROR(ctx.advance()); +// } +class VMergeIteratorContext { +public: + VMergeIteratorContext(RowwiseIterator* iter, int sequence_id_idx, bool is_unique, + bool is_reverse, std::vector* read_orderby_key_columns) + : _iter(iter), + _sequence_id_idx(sequence_id_idx), + _is_unique(is_unique), + _is_reverse(is_reverse), + _num_columns(iter->schema().num_column_ids()), + _num_key_columns(iter->schema().num_key_columns()), + _compare_columns(read_orderby_key_columns) {} + + VMergeIteratorContext(const VMergeIteratorContext&) = delete; + VMergeIteratorContext(VMergeIteratorContext&&) = delete; + VMergeIteratorContext& operator=(const VMergeIteratorContext&) = delete; + VMergeIteratorContext& operator=(VMergeIteratorContext&&) = delete; + + ~VMergeIteratorContext() { + delete _iter; + _iter = nullptr; + } + + Status block_reset(const std::shared_ptr& block); + + // Initialize this context and will prepare data for current_row() + Status init(const StorageReadOptions& opts); + + bool compare(const VMergeIteratorContext& rhs) const; + + // `advanced = false` when current block finished + void copy_rows(Block* block, bool advanced = true); + + void copy_rows(BlockView* view, bool advanced = true); + + RowLocation current_row_location() { + DCHECK(_record_rowids); + return _block_row_locations[_index_in_block]; + } + + // Advance internal row index to next valid row + // Return error if error happens + // Don't call this when valid() is false, action is undefined + Status advance(); + + // Return if it has remaining data in this context. + // Only when this function return true, current_row() + // will return a valid row + bool valid() const { return _valid; } + + uint64_t data_id() const { return _iter->data_id(); } + + bool need_skip() const { return _skip; } + + void set_skip(bool skip) const { _skip = skip; } + + void add_cur_batch() { _cur_batch_num++; } + + void reset_cur_batch() { _cur_batch_num = 0; } + + bool is_cur_block_finished() { return _index_in_block == _block->rows() - 1; } + +private: + // Load next block into _block + Status _load_next_block(); + + RowwiseIterator* _iter; + + int _sequence_id_idx = -1; + bool _is_unique = false; + bool _is_reverse = false; + bool _valid = false; + mutable bool _skip = false; + size_t _index_in_block = -1; + // 4096 minus 16 + 16 bytes padding that in padding pod array + int _block_row_max = 4064; + int _num_columns; + int _num_key_columns; + std::vector* _compare_columns; + std::vector _block_row_locations; + bool _record_rowids = false; + size_t _cur_batch_num = 0; + + // used to store data load from iterator->next_batch(Block*) + std::shared_ptr _block; + // used to store data still on block view + std::list> _block_list; +}; + +class VMergeIterator : public RowwiseIterator { +public: + // VMergeIterator takes the ownership of input iterators + VMergeIterator(std::vector& iters, int sequence_id_idx, bool is_unique, + bool is_reverse, uint64_t* merged_rows) + : _origin_iters(iters), + _sequence_id_idx(sequence_id_idx), + _is_unique(is_unique), + _is_reverse(is_reverse), + _merged_rows(merged_rows) {} + + ~VMergeIterator() override { + while (!_merge_heap.empty()) { + auto ctx = _merge_heap.top(); + _merge_heap.pop(); + delete ctx; + } + } + + Status init(const StorageReadOptions& opts) override; + + Status next_batch(Block* block) override { return _next_batch(block); } + Status next_block_view(BlockView* block_view) override { return _next_batch(block_view); } + + bool support_return_data_by_ref() override { return true; } + + const Schema& schema() const override { return *_schema; } + + Status current_block_row_locations(std::vector* block_row_locations) override { + DCHECK(_record_rowids); + *block_row_locations = _block_row_locations; + return Status::OK(); + } + + bool update_profile(RuntimeProfile* profile) override { + if (!_origin_iters.empty()) { + return (*_origin_iters.begin())->update_profile(profile); + } + return false; + } + +private: + int _get_size(Block* block) { return block->rows(); } + int _get_size(BlockView* block_view) { return block_view->size(); } + + template + Status _next_batch(T* block) { + if (UNLIKELY(_record_rowids)) { + _block_row_locations.resize(_block_row_max); + } + size_t row_idx = 0; + VMergeIteratorContext* pre_ctx = nullptr; + while (_get_size(block) < _block_row_max) { + if (_merge_heap.empty()) { + break; + } + + auto ctx = _merge_heap.top(); + _merge_heap.pop(); + + if (!ctx->need_skip()) { + ctx->add_cur_batch(); + if (pre_ctx != ctx) { + if (pre_ctx) { + pre_ctx->copy_rows(block); + } + pre_ctx = ctx; + } + if (UNLIKELY(_record_rowids)) { + _block_row_locations[row_idx] = ctx->current_row_location(); + } + row_idx++; + if (ctx->is_cur_block_finished() || row_idx >= _block_row_max) { + // current block finished, ctx not advance + // so copy start_idx = (_index_in_block - _cur_batch_num + 1) + ctx->copy_rows(block, false); + pre_ctx = nullptr; + } + } else if (_merged_rows != nullptr) { + (*_merged_rows)++; + // need skip cur row, so flush rows in pre_ctx + if (pre_ctx) { + pre_ctx->copy_rows(block); + pre_ctx = nullptr; + } + } + + RETURN_IF_ERROR(ctx->advance()); + if (ctx->valid()) { + _merge_heap.push(ctx); + } else { + // Release ctx earlier to reduce resource consumed + delete ctx; + } + } + if (!_merge_heap.empty()) { + return Status::OK(); + } + // Still last batch needs to be processed + + if (UNLIKELY(_record_rowids)) { + _block_row_locations.resize(row_idx); + } + + return Status::EndOfFile("no more data in segment"); + } + + // It will be released after '_merge_heap' has been built. + std::vector _origin_iters; + + const Schema* _schema = nullptr; + + struct VMergeContextComparator { + bool operator()(const VMergeIteratorContext* lhs, const VMergeIteratorContext* rhs) const { + return lhs->compare(*rhs); + } + }; + + using VMergeHeap = + std::priority_queue, + VMergeContextComparator>; + + VMergeHeap _merge_heap; + + int _block_row_max = 0; + int _sequence_id_idx = -1; + bool _is_unique = false; + bool _is_reverse = false; + uint64_t* _merged_rows = nullptr; + bool _record_rowids = false; + std::vector _block_row_locations; +}; + // Create a merge iterator for input iterators. Merge iterator will merge // ordered input iterator to one ordered iterator. So client should ensure // that every input iterator is ordered, otherwise result is undefined. diff --git a/be/test/CMakeLists.txt b/be/test/CMakeLists.txt index 4c6c951b88..de353b0c6d 100644 --- a/be/test/CMakeLists.txt +++ b/be/test/CMakeLists.txt @@ -203,6 +203,7 @@ set(OLAP_TEST_FILES olap/tablet_cooldown_test.cpp olap/rowid_conversion_test.cpp olap/remote_rowset_gc_test.cpp + olap/segcompaction_test.cpp ) set(RUNTIME_TEST_FILES diff --git a/be/test/olap/segcompaction_test.cpp b/be/test/olap/segcompaction_test.cpp new file mode 100644 index 0000000000..ca2dc28f04 --- /dev/null +++ b/be/test/olap/segcompaction_test.cpp @@ -0,0 +1,454 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include + +#include +#include +#include +#include + +#include "common/config.h" +#include "env/env_posix.h" +#include "gen_cpp/olap_file.pb.h" +#include "olap/data_dir.h" +#include "olap/row_block.h" +#include "olap/row_cursor.h" +#include "olap/rowset/beta_rowset_reader.h" +#include "olap/rowset/beta_rowset_writer.h" +#include "olap/rowset/rowset_factory.h" +#include "olap/rowset/rowset_reader_context.h" +#include "olap/rowset/rowset_writer.h" +#include "olap/rowset/rowset_writer_context.h" +#include "olap/storage_engine.h" +#include "olap/tablet_schema.h" +#include "olap/utils.h" +#include "runtime/exec_env.h" +#include "runtime/mem_pool.h" +#include "runtime/memory/mem_tracker.h" +#include "util/file_utils.h" +#include "util/slice.h" + +namespace doris { + +static const uint32_t MAX_PATH_LEN = 1024; +StorageEngine* l_engine = nullptr; +static const std::string lTestDir = "./data_test/data/segcompaction_test"; + +class SegCompactionTest : public testing::Test { +public: + SegCompactionTest() : _data_dir(std::make_unique(lTestDir)) { + _data_dir->update_capacity(); + } + + void SetUp() { + config::enable_segcompaction = true; + config::enable_storage_vectorization = true; + config::tablet_map_shard_size = 1; + config::txn_map_shard_size = 1; + config::txn_shard_size = 1; + + char buffer[MAX_PATH_LEN]; + EXPECT_NE(getcwd(buffer, MAX_PATH_LEN), nullptr); + config::storage_root_path = std::string(buffer) + "/data_test"; + + EXPECT_TRUE(FileUtils::remove_all(config::storage_root_path).ok()); + EXPECT_TRUE(FileUtils::create_dir(config::storage_root_path).ok()); + + std::vector paths; + paths.emplace_back(config::storage_root_path, -1); + + doris::EngineOptions options; + options.store_paths = paths; + Status s = doris::StorageEngine::open(options, &l_engine); + EXPECT_TRUE(s.ok()) << s.to_string(); + + ExecEnv* exec_env = doris::ExecEnv::GetInstance(); + exec_env->set_storage_engine(l_engine); + + EXPECT_TRUE(FileUtils::create_dir(lTestDir).ok()); + + l_engine->start_bg_threads(); + } + + void TearDown() { + if (l_engine != nullptr) { + l_engine->stop(); + delete l_engine; + l_engine = nullptr; + } + } + +protected: + OlapReaderStatistics _stats; + + bool check_dir(std::vector& vec) { + std::vector result; + for (const auto& entry : std::filesystem::directory_iterator(lTestDir)) { + result.push_back(std::filesystem::path(entry.path()).filename()); + } + + LOG(INFO) << "expected ls:" << std::endl; + for (auto& i : vec) { + LOG(INFO) << i; + } + LOG(INFO) << "acutal ls:" << std::endl; + for (auto& i : result) { + LOG(INFO) << i; + } + + if (result.size() != vec.size()) { + return false; + } else { + for (auto& i : vec) { + if (std::find(result.begin(), result.end(), i) == result.end()) { + return false; + } + } + } + return true; + } + + // (k1 int, k2 varchar(20), k3 int) duplicated key (k1, k2) + void create_tablet_schema(TabletSchemaSPtr tablet_schema) { + TabletSchemaPB tablet_schema_pb; + tablet_schema_pb.set_keys_type(DUP_KEYS); + tablet_schema_pb.set_num_short_key_columns(2); + tablet_schema_pb.set_num_rows_per_row_block(1024); + tablet_schema_pb.set_compress_kind(COMPRESS_NONE); + tablet_schema_pb.set_next_column_unique_id(4); + + ColumnPB* column_1 = tablet_schema_pb.add_column(); + column_1->set_unique_id(1); + column_1->set_name("k1"); + column_1->set_type("INT"); + column_1->set_is_key(true); + column_1->set_length(4); + column_1->set_index_length(4); + column_1->set_is_nullable(true); + column_1->set_is_bf_column(false); + + ColumnPB* column_2 = tablet_schema_pb.add_column(); + column_2->set_unique_id(2); + column_2->set_name("k2"); + column_2->set_type( + "INT"); // TODO change to varchar(20) when dict encoding for string is supported + column_2->set_length(4); + column_2->set_index_length(4); + column_2->set_is_nullable(true); + column_2->set_is_key(true); + column_2->set_is_nullable(true); + column_2->set_is_bf_column(false); + + ColumnPB* column_3 = tablet_schema_pb.add_column(); + column_3->set_unique_id(3); + column_3->set_name("v1"); + column_3->set_type("INT"); + column_3->set_length(4); + column_3->set_is_key(false); + column_3->set_is_nullable(false); + column_3->set_is_bf_column(false); + column_3->set_aggregation("SUM"); + + tablet_schema->init_from_pb(tablet_schema_pb); + } + + void create_rowset_writer_context(int64_t id, TabletSchemaSPtr tablet_schema, + RowsetWriterContext* rowset_writer_context) { + RowsetId rowset_id; + rowset_id.init(id); + // rowset_writer_context->data_dir = _data_dir.get(); + rowset_writer_context->rowset_id = rowset_id; + rowset_writer_context->tablet_id = 12345; + rowset_writer_context->tablet_schema_hash = 1111; + rowset_writer_context->partition_id = 10; + rowset_writer_context->rowset_type = BETA_ROWSET; + rowset_writer_context->rowset_dir = lTestDir; + rowset_writer_context->rowset_state = VISIBLE; + rowset_writer_context->tablet_schema = tablet_schema; + rowset_writer_context->version.first = 10; + rowset_writer_context->version.second = 10; + } + + void create_and_init_rowset_reader(Rowset* rowset, RowsetReaderContext& context, + RowsetReaderSharedPtr* result) { + auto s = rowset->create_reader(result); + EXPECT_EQ(Status::OK(), s); + EXPECT_TRUE(*result != nullptr); + + s = (*result)->init(&context); + EXPECT_EQ(Status::OK(), s); + } + +private: + std::unique_ptr _data_dir; +}; + +TEST_F(SegCompactionTest, SegCompactionThenRead) { + config::enable_segcompaction = true; + config::enable_storage_vectorization = true; + Status s; + TabletSchemaSPtr tablet_schema = std::make_shared(); + create_tablet_schema(tablet_schema); + + RowsetSharedPtr rowset; + const int num_segments = 15; + const uint32_t rows_per_segment = 4096; + config::segcompaction_small_threshold = 6000; // set threshold above + // rows_per_segment + std::vector segment_num_rows; + { // write `num_segments * rows_per_segment` rows to rowset + RowsetWriterContext writer_context; + create_rowset_writer_context(10047, tablet_schema, &writer_context); + + std::unique_ptr rowset_writer; + s = RowsetFactory::create_rowset_writer(writer_context, &rowset_writer); + EXPECT_EQ(Status::OK(), s); + + RowCursor input_row; + input_row.init(tablet_schema); + + // for segment "i", row "rid" + // k1 := rid*10 + i + // k2 := k1 * 10 + // k3 := rid + for (int i = 0; i < num_segments; ++i) { + MemPool mem_pool; + for (int rid = 0; rid < rows_per_segment; ++rid) { + uint32_t k1 = rid * 100 + i; + uint32_t k2 = i; + uint32_t k3 = rid; + input_row.set_field_content(0, reinterpret_cast(&k1), &mem_pool); + input_row.set_field_content(1, reinterpret_cast(&k2), &mem_pool); + input_row.set_field_content(2, reinterpret_cast(&k3), &mem_pool); + s = rowset_writer->add_row(input_row); + EXPECT_EQ(Status::OK(), s); + } + s = rowset_writer->flush(); + EXPECT_EQ(Status::OK(), s); + } + + rowset = rowset_writer->build(); + std::vector ls; + ls.push_back("10047_0.dat"); + ls.push_back("10047_1.dat"); + ls.push_back("10047_2.dat"); + ls.push_back("10047_3.dat"); + ls.push_back("10047_4.dat"); + ls.push_back("10047_5.dat"); + EXPECT_TRUE(check_dir(ls)); + } + + { // read + RowsetReaderContext reader_context; + reader_context.tablet_schema = tablet_schema; + // use this type to avoid cache from other ut + reader_context.reader_type = READER_CUMULATIVE_COMPACTION; + reader_context.need_ordered_result = true; + std::vector return_columns = {0, 1, 2}; + reader_context.return_columns = &return_columns; + reader_context.stats = &_stats; + + // without predicates + { + RowsetReaderSharedPtr rowset_reader; + create_and_init_rowset_reader(rowset.get(), reader_context, &rowset_reader); + RowBlock* output_block; + uint32_t num_rows_read = 0; + while ((s = rowset_reader->next_block(&output_block)) == Status::OK()) { + EXPECT_TRUE(output_block != nullptr); + EXPECT_GT(output_block->row_num(), 0); + EXPECT_EQ(0, output_block->pos()); + EXPECT_EQ(output_block->row_num(), output_block->limit()); + EXPECT_EQ(return_columns, output_block->row_block_info().column_ids); + // after sort merge segments, k1 will be 0, 1, 2, 10, 11, 12, 20, 21, 22, ..., 40950, 40951, 40952 + for (int i = 0; i < output_block->row_num(); ++i) { + char* field1 = output_block->field_ptr(i, 0); + char* field2 = output_block->field_ptr(i, 1); + char* field3 = output_block->field_ptr(i, 2); + // test null bit + EXPECT_FALSE(*reinterpret_cast(field1)); + EXPECT_FALSE(*reinterpret_cast(field2)); + EXPECT_FALSE(*reinterpret_cast(field3)); + uint32_t k1 = *reinterpret_cast(field1 + 1); + uint32_t k2 = *reinterpret_cast(field2 + 1); + uint32_t k3 = *reinterpret_cast(field3 + 1); + EXPECT_EQ(100 * k3 + k2, k1); + + num_rows_read++; + } + } + EXPECT_EQ(Status::OLAPInternalError(OLAP_ERR_DATA_EOF), s); + EXPECT_TRUE(output_block == nullptr); + EXPECT_EQ(rowset->rowset_meta()->num_rows(), num_rows_read); + EXPECT_TRUE(rowset_reader->get_segment_num_rows(&segment_num_rows).ok()); + size_t total_num_rows = 0; + //EXPECT_EQ(segment_num_rows.size(), num_segments); + for (const auto& i : segment_num_rows) { + total_num_rows += i; + } + EXPECT_EQ(total_num_rows, num_rows_read); + } + } +} + +TEST_F(SegCompactionTest, SegCompactionInterleaveWithBig_ooooOOoOooooooooO) { + config::enable_segcompaction = true; + config::enable_storage_vectorization = true; + Status s; + TabletSchemaSPtr tablet_schema = std::make_shared(); + create_tablet_schema(tablet_schema); + + RowsetSharedPtr rowset; + config::segcompaction_small_threshold = 6000; // set threshold above + // rows_per_segment + std::vector segment_num_rows; + { // write `num_segments * rows_per_segment` rows to rowset + RowsetWriterContext writer_context; + create_rowset_writer_context(10048, tablet_schema, &writer_context); + + std::unique_ptr rowset_writer; + s = RowsetFactory::create_rowset_writer(writer_context, &rowset_writer); + EXPECT_EQ(Status::OK(), s); + + RowCursor input_row; + input_row.init(tablet_schema); + + // for segment "i", row "rid" + // k1 := rid*10 + i + // k2 := k1 * 10 + // k3 := 4096 * i + rid + int num_segments = 4; + uint32_t rows_per_segment = 4096; + for (int i = 0; i < num_segments; ++i) { + MemPool mem_pool; + for (int rid = 0; rid < rows_per_segment; ++rid) { + uint32_t k1 = rid * 100 + i; + uint32_t k2 = i; + uint32_t k3 = rid; + input_row.set_field_content(0, reinterpret_cast(&k1), &mem_pool); + input_row.set_field_content(1, reinterpret_cast(&k2), &mem_pool); + input_row.set_field_content(2, reinterpret_cast(&k3), &mem_pool); + s = rowset_writer->add_row(input_row); + EXPECT_EQ(Status::OK(), s); + } + s = rowset_writer->flush(); + EXPECT_EQ(Status::OK(), s); + } + num_segments = 2; + rows_per_segment = 6400; + for (int i = 0; i < num_segments; ++i) { + MemPool mem_pool; + for (int rid = 0; rid < rows_per_segment; ++rid) { + uint32_t k1 = rid * 100 + i; + uint32_t k2 = i; + uint32_t k3 = rid; + input_row.set_field_content(0, reinterpret_cast(&k1), &mem_pool); + input_row.set_field_content(1, reinterpret_cast(&k2), &mem_pool); + input_row.set_field_content(2, reinterpret_cast(&k3), &mem_pool); + s = rowset_writer->add_row(input_row); + EXPECT_EQ(Status::OK(), s); + } + s = rowset_writer->flush(); + EXPECT_EQ(Status::OK(), s); + } + num_segments = 1; + rows_per_segment = 4096; + for (int i = 0; i < num_segments; ++i) { + MemPool mem_pool; + for (int rid = 0; rid < rows_per_segment; ++rid) { + uint32_t k1 = rid * 100 + i; + uint32_t k2 = i; + uint32_t k3 = rid; + input_row.set_field_content(0, reinterpret_cast(&k1), &mem_pool); + input_row.set_field_content(1, reinterpret_cast(&k2), &mem_pool); + input_row.set_field_content(2, reinterpret_cast(&k3), &mem_pool); + s = rowset_writer->add_row(input_row); + EXPECT_EQ(Status::OK(), s); + } + s = rowset_writer->flush(); + EXPECT_EQ(Status::OK(), s); + } + num_segments = 1; + rows_per_segment = 6400; + for (int i = 0; i < num_segments; ++i) { + MemPool mem_pool; + for (int rid = 0; rid < rows_per_segment; ++rid) { + uint32_t k1 = rid * 100 + i; + uint32_t k2 = i; + uint32_t k3 = rid; + input_row.set_field_content(0, reinterpret_cast(&k1), &mem_pool); + input_row.set_field_content(1, reinterpret_cast(&k2), &mem_pool); + input_row.set_field_content(2, reinterpret_cast(&k3), &mem_pool); + s = rowset_writer->add_row(input_row); + EXPECT_EQ(Status::OK(), s); + } + s = rowset_writer->flush(); + EXPECT_EQ(Status::OK(), s); + } + num_segments = 8; + rows_per_segment = 4096; + for (int i = 0; i < num_segments; ++i) { + MemPool mem_pool; + for (int rid = 0; rid < rows_per_segment; ++rid) { + uint32_t k1 = rid * 100 + i; + uint32_t k2 = i; + uint32_t k3 = rid; + input_row.set_field_content(0, reinterpret_cast(&k1), &mem_pool); + input_row.set_field_content(1, reinterpret_cast(&k2), &mem_pool); + input_row.set_field_content(2, reinterpret_cast(&k3), &mem_pool); + s = rowset_writer->add_row(input_row); + EXPECT_EQ(Status::OK(), s); + } + s = rowset_writer->flush(); + EXPECT_EQ(Status::OK(), s); + } + num_segments = 1; + rows_per_segment = 6400; + for (int i = 0; i < num_segments; ++i) { + MemPool mem_pool; + for (int rid = 0; rid < rows_per_segment; ++rid) { + uint32_t k1 = rid * 100 + i; + uint32_t k2 = i; + uint32_t k3 = rid; + input_row.set_field_content(0, reinterpret_cast(&k1), &mem_pool); + input_row.set_field_content(1, reinterpret_cast(&k2), &mem_pool); + input_row.set_field_content(2, reinterpret_cast(&k3), &mem_pool); + s = rowset_writer->add_row(input_row); + EXPECT_EQ(Status::OK(), s); + } + s = rowset_writer->flush(); + EXPECT_EQ(Status::OK(), s); + } + + rowset = rowset_writer->build(); + std::vector ls; + // ooooOOoOooooooooO + ls.push_back("10048_0.dat"); // oooo + ls.push_back("10048_1.dat"); // O + ls.push_back("10048_2.dat"); // O + ls.push_back("10048_3.dat"); // o + ls.push_back("10048_4.dat"); // O + ls.push_back("10048_5.dat"); // oooooooo + ls.push_back("10048_6.dat"); // O + EXPECT_TRUE(check_dir(ls)); + } +} + +} // namespace doris + +// @brief Test Stub diff --git a/docs/en/docs/admin-manual/config/be-config.md b/docs/en/docs/admin-manual/config/be-config.md index aa11cf1057..d4236e6870 100644 --- a/docs/en/docs/admin-manual/config/be-config.md +++ b/docs/en/docs/admin-manual/config/be-config.md @@ -1590,3 +1590,21 @@ Translated with www.DeepL.com/Translator (free version) * Type:int64 * Description:Save time of cache file, in seconds * Default:604800(1 week) + +### `enable_segcompaction` + +* Type: bool +* Description: Enable to use segment compaction during loading +* Default value: false + +### `segcompaction_threshold_segment_num` + +* Type: int32 +* Description: Trigger segcompaction if the num of segments in a rowset exceeds this threshold +* Default value: 10 + +### `segcompaction_small_threshold` + +* Type: int32 +* Description: The segment whose row number above the threshold will be compacted during segcompaction +* Default value: 1048576 diff --git a/docs/en/docs/admin-manual/maint-monitor/be-olap-error-code.md b/docs/en/docs/admin-manual/maint-monitor/be-olap-error-code.md index dd4bae44fe..f71d7ed65f 100644 --- a/docs/en/docs/admin-manual/maint-monitor/be-olap-error-code.md +++ b/docs/en/docs/admin-manual/maint-monitor/be-olap-error-code.md @@ -6,7 +6,7 @@ --- -