From 8bbccc59ef21d4483534da40b78fbfad78aa47a7 Mon Sep 17 00:00:00 2001 From: Kaijie Chen Date: Fri, 4 Aug 2023 19:48:56 +0800 Subject: [PATCH] [refactor](load) split segment flush out of beta rowset writer (#21725) --- be/src/olap/rowset/beta_rowset_writer.cpp | 180 ++----------- be/src/olap/rowset/beta_rowset_writer.h | 45 +--- be/src/olap/rowset/rowset_writer_context.h | 4 + be/src/olap/rowset/segment_creator.cpp | 242 ++++++++++++++++++ be/src/olap/rowset/segment_creator.h | 190 ++++++++++++++ .../rowset/vertical_beta_rowset_writer.cpp | 5 - 6 files changed, 474 insertions(+), 192 deletions(-) create mode 100644 be/src/olap/rowset/segment_creator.cpp create mode 100644 be/src/olap/rowset/segment_creator.h diff --git a/be/src/olap/rowset/beta_rowset_writer.cpp b/be/src/olap/rowset/beta_rowset_writer.cpp index 35fb2c8677..870fc55241 100644 --- a/be/src/olap/rowset/beta_rowset_writer.cpp +++ b/be/src/olap/rowset/beta_rowset_writer.cpp @@ -61,17 +61,13 @@ using namespace ErrorCode; BetaRowsetWriter::BetaRowsetWriter() : _rowset_meta(nullptr), - _next_segment_id(0), _num_segment(0), _segment_start_id(0), _segcompacted_point(0), _num_segcompacted(0), - _segment_writer(nullptr), _num_rows_written(0), _total_data_size(0), _total_index_size(0), - _raw_num_rows_written(0), - _num_rows_filtered(0), _segcompaction_worker(this), _is_doing_segcompaction(false) { _segcompaction_status.store(OK); @@ -85,14 +81,14 @@ BetaRowsetWriter::~BetaRowsetWriter() { wait_flying_segcompaction(); // TODO(lingbin): Should wrapper exception logic, no need to know file ops directly. - if (!_already_built) { // abnormal exit, remove all files generated - _segment_writer.reset(); // ensure all files are closed + if (!_already_built) { // abnormal exit, remove all files generated + _segment_creator.close(); // ensure all files are closed auto fs = _rowset_meta->fs(); if (!fs) { return; } - DCHECK_LE(_segment_start_id + _num_segment, _next_segment_id); - for (int i = _segment_start_id; i < _next_segment_id; ++i) { + DCHECK_LE(_segment_start_id + _num_segment, _segment_creator.next_segment_id()); + for (int i = _segment_start_id; i < _segment_creator.next_segment_id(); ++i) { std::string seg_path = BetaRowset::segment_file_path(_context.rowset_dir, _context.rowset_id, i); // Even if an error is encountered, these files that have not been cleaned up @@ -127,18 +123,14 @@ Status BetaRowsetWriter::init(const RowsetWriterContext& rowset_writer_context) _rowset_meta->set_tablet_schema(_context.tablet_schema); _context.schema_change_recorder = std::make_shared(); - + _context.segment_collector = std::make_shared>(this); + _context.file_writer_creator = std::make_shared>(this); + _segment_creator.init(_context); return Status::OK(); } Status BetaRowsetWriter::add_block(const vectorized::Block* block) { - if (block->rows() == 0) { - return Status::OK(); - } - if (UNLIKELY(_segment_writer == nullptr)) { - RETURN_IF_ERROR(_create_segment_writer(_segment_writer, allocate_segment_id())); - } - return _add_block(block, _segment_writer); + return _segment_creator.add_block(block); } Status BetaRowsetWriter::_generate_delete_bitmap(int32_t segment_id) { @@ -414,41 +406,6 @@ Status BetaRowsetWriter::_segcompaction_rename_last_segments() { return Status::OK(); } -Status BetaRowsetWriter::_add_rows(const vectorized::Block* block, - std::unique_ptr& segment_writer, - size_t row_offset, size_t input_row_num) { - auto s = segment_writer->append_block(block, row_offset, input_row_num); - if (UNLIKELY(!s.ok())) { - return Status::Error("failed to append block: {}", s.to_string()); - } - _raw_num_rows_written += input_row_num; - return Status::OK(); -} - -Status BetaRowsetWriter::_add_block(const vectorized::Block* block, - std::unique_ptr& segment_writer) { - size_t block_size_in_bytes = block->bytes(); - size_t block_row_num = block->rows(); - size_t row_avg_size_in_bytes = std::max((size_t)1, block_size_in_bytes / block_row_num); - size_t row_offset = 0; - - do { - auto max_row_add = segment_writer->max_row_to_add(row_avg_size_in_bytes); - if (UNLIKELY(max_row_add < 1)) { - // no space for another single row, need flush now - RETURN_IF_ERROR(_flush_segment_writer(segment_writer)); - RETURN_IF_ERROR(_create_segment_writer(segment_writer, allocate_segment_id())); - max_row_add = segment_writer->max_row_to_add(row_avg_size_in_bytes); - DCHECK(max_row_add > 0); - } - size_t input_row_num = std::min(block_row_num - row_offset, size_t(max_row_add)); - RETURN_IF_ERROR(_add_rows(block, segment_writer, row_offset, input_row_num)); - row_offset += input_row_num; - } while (row_offset < block_row_num); - - return Status::OK(); -} - Status BetaRowsetWriter::add_rowset(RowsetSharedPtr rowset) { assert(rowset->rowset_meta()->rowset_type() == BETA_ROWSET); RETURN_IF_ERROR(rowset->link_files_to(_context.rowset_dir, _context.rowset_id)); @@ -456,9 +413,6 @@ Status BetaRowsetWriter::add_rowset(RowsetSharedPtr rowset) { _total_data_size += rowset->rowset_meta()->data_disk_size(); _total_index_size += rowset->rowset_meta()->index_disk_size(); _num_segment += rowset->num_segments(); - // _next_segment_id is not used in this code path, - // just to make sure it matches with _num_segment - _next_segment_id = _num_segment.load(); // append key_bounds to current rowset rowset->get_segments_key_bounds(&_segments_encoded_key_bounds); // TODO update zonemap @@ -474,10 +428,7 @@ Status BetaRowsetWriter::add_rowset_for_linked_schema_change(RowsetSharedPtr row } Status BetaRowsetWriter::flush() { - if (_segment_writer != nullptr) { - RETURN_IF_ERROR(_flush_segment_writer(_segment_writer)); - } - return Status::OK(); + return _segment_creator.flush(); } Status BetaRowsetWriter::flush_memtable(vectorized::Block* block, int32_t segment_id, @@ -493,7 +444,8 @@ Status BetaRowsetWriter::flush_memtable(vectorized::Block* block, int32_t segmen } { SCOPED_RAW_TIMER(&_segment_writer_ns); - RETURN_IF_ERROR(_flush_single_block(block, segment_id, flush_size, flush_schema)); + RETURN_IF_ERROR( + _segment_creator.flush_single_block(block, segment_id, flush_size, flush_schema)); } RETURN_IF_ERROR(_generate_delete_bitmap(segment_id)); RETURN_IF_ERROR(_segcompaction_if_necessary()); @@ -501,20 +453,7 @@ Status BetaRowsetWriter::flush_memtable(vectorized::Block* block, int32_t segmen } Status BetaRowsetWriter::flush_single_block(const vectorized::Block* block) { - if (block->rows() == 0) { - return Status::OK(); - } - return _flush_single_block(block, allocate_segment_id()); -} - -Status BetaRowsetWriter::_flush_single_block(const vectorized::Block* block, int32_t segment_id, - int64_t* flush_size, TabletSchemaSPtr flush_schema) { - std::unique_ptr writer; - bool no_compression = block->bytes() <= config::segment_compression_threshold_kb * 1024; - RETURN_IF_ERROR(_create_segment_writer(writer, segment_id, no_compression, flush_schema)); - RETURN_IF_ERROR(_add_rows(block, writer, 0, block->rows())); - RETURN_IF_ERROR(_flush_segment_writer(writer, flush_size)); - return Status::OK(); + return _segment_creator.flush_single_block(block); } Status BetaRowsetWriter::wait_flying_segcompaction() { @@ -552,8 +491,6 @@ RowsetSharedPtr BetaRowsetWriter::manual_build(const RowsetMetaSharedPtr& spec_r } RowsetSharedPtr BetaRowsetWriter::build() { - // make sure all segments are flushed - DCHECK_EQ(_segment_start_id + _num_segment, _next_segment_id); // TODO(lingbin): move to more better place, or in a CreateBlockBatch? for (auto& file_writer : _file_writers) { Status status = file_writer->close(); @@ -564,6 +501,11 @@ RowsetSharedPtr BetaRowsetWriter::build() { } } Status status; + status = _segment_creator.close(); + if (!status.ok()) { + LOG(WARNING) << "failed to close segment creator when build new rowset, res=" << status; + return nullptr; + } // if _segment_start_id is not zero, that means it's a transient rowset writer for // MoW partial update, don't need to do segment compaction. if (_segment_start_id == 0) { @@ -583,9 +525,11 @@ RowsetSharedPtr BetaRowsetWriter::build() { _segcompaction_worker.get_file_writer()->close(); } } - // When building a rowset, we must ensure that the current _segment_writer has been - // flushed, that is, the current _segment_writer is nullptr - DCHECK(_segment_writer == nullptr) << "segment must be null when build rowset"; + status = _check_segment_number_limit(); + if (!status.ok()) { + LOG(WARNING) << "build rowset failed, res=" << status; + return nullptr; + } _build_rowset_meta(_rowset_meta); if (_rowset_meta->newest_write_timestamp() == -1) { @@ -758,38 +702,6 @@ Status BetaRowsetWriter::_create_segment_writer_for_segcompaction( return Status::OK(); } -Status BetaRowsetWriter::_create_segment_writer(std::unique_ptr& writer, - int32_t segment_id, bool no_compression, - TabletSchemaSPtr flush_schema) { - RETURN_IF_ERROR(_check_segment_number_limit()); - io::FileWriterPtr file_writer; - RETURN_IF_ERROR(create_file_writer(segment_id, file_writer)); - - segment_v2::SegmentWriterOptions writer_options; - writer_options.enable_unique_key_merge_on_write = _context.enable_unique_key_merge_on_write; - writer_options.rowset_ctx = &_context; - writer_options.write_type = _context.write_type; - if (no_compression) { - writer_options.compression_type = NO_COMPRESSION; - } - - const auto& tablet_schema = flush_schema ? flush_schema : _context.tablet_schema; - writer.reset(new segment_v2::SegmentWriter( - file_writer.get(), segment_id, tablet_schema, _context.tablet, _context.data_dir, - _context.max_rows_per_segment, writer_options, _context.mow_context)); - { - std::lock_guard l(_lock); - _file_writers.push_back(std::move(file_writer)); - } - auto s = writer->init(); - if (!s.ok()) { - LOG(WARNING) << "failed to init segment writer: " << s.to_string(); - writer.reset(); - return s; - } - return Status::OK(); -} - Status BetaRowsetWriter::_check_segment_number_limit() { size_t total_segment_num = _num_segment - _segcompacted_point + 1 + _num_segcompacted; if (UNLIKELY(total_segment_num > config::max_segment_num_per_rowset)) { @@ -803,54 +715,15 @@ Status BetaRowsetWriter::_check_segment_number_limit() { return Status::OK(); } -Status BetaRowsetWriter::_flush_segment_writer(std::unique_ptr& writer, - int64_t* flush_size) { - uint32_t segid = writer->get_segment_id(); - uint32_t row_num = writer->num_rows_written(); - - if (writer->num_rows_written() == 0) { - return Status::OK(); - } - uint64_t segment_size; - uint64_t index_size; - Status s = writer->finalize(&segment_size, &index_size); - if (!s.ok()) { - return Status::Error(s.code(), "failed to finalize segment: {}", s.to_string()); - } - VLOG_DEBUG << "tablet_id:" << _context.tablet_id - << " flushing filename: " << writer->get_data_dir()->path() - << " rowset_id:" << _context.rowset_id << " segment num:" << _num_segment; - - KeyBoundsPB key_bounds; - Slice min_key = writer->min_encoded_key(); - Slice max_key = writer->max_encoded_key(); - DCHECK_LE(min_key.compare(max_key), 0); - key_bounds.set_min_key(min_key.to_string()); - key_bounds.set_max_key(max_key.to_string()); - - SegmentStatistics segstat; - segstat.row_num = row_num; - segstat.data_size = segment_size + writer->get_inverted_index_file_size(); - segstat.index_size = index_size + writer->get_inverted_index_file_size(); - segstat.key_bounds = key_bounds; - - _num_rows_filtered += writer->num_rows_filtered(); - writer.reset(); - if (flush_size) { - *flush_size = segment_size + index_size; - } - - add_segment(segid, segstat); - return Status::OK(); -} - -void BetaRowsetWriter::add_segment(uint32_t segid, SegmentStatistics& segstat) { +Status BetaRowsetWriter::add_segment(uint32_t segid, SegmentStatistics& segstat) { uint32_t segid_offset = segid - _segment_start_id; { std::lock_guard lock(_segid_statistics_map_mutex); CHECK_EQ(_segid_statistics_map.find(segid) == _segid_statistics_map.end(), true); _segid_statistics_map.emplace(segid, segstat); - _segment_num_rows.resize(_next_segment_id); + if (segid >= _segment_num_rows.size()) { + _segment_num_rows.resize(segid + 1); + } _segment_num_rows[segid_offset] = segstat.row_num; } VLOG_DEBUG << "_segid_statistics_map add new record. segid:" << segid @@ -864,6 +737,7 @@ void BetaRowsetWriter::add_segment(uint32_t segid, SegmentStatistics& segstat) { _num_segment++; } } + return Status::OK(); } Status BetaRowsetWriter::flush_segment_writer_for_segcompaction( diff --git a/be/src/olap/rowset/beta_rowset_writer.h b/be/src/olap/rowset/beta_rowset_writer.h index 22bed4feff..3ab1e176e4 100644 --- a/be/src/olap/rowset/beta_rowset_writer.h +++ b/be/src/olap/rowset/beta_rowset_writer.h @@ -42,6 +42,7 @@ #include "olap/rowset/rowset_meta.h" #include "olap/rowset/rowset_writer.h" #include "olap/rowset/rowset_writer_context.h" +#include "olap/rowset/segment_creator.h" #include "segcompaction.h" #include "segment_v2/segment.h" #include "util/spinlock.h" @@ -87,7 +88,7 @@ public: Status create_file_writer(uint32_t segment_id, io::FileWriterPtr& writer); - void add_segment(uint32_t segid, SegmentStatistics& segstat); + Status add_segment(uint32_t segid, SegmentStatistics& segstat); Status flush() override; @@ -104,9 +105,9 @@ public: Version version() override { return _context.version; } - int64_t num_rows() const override { return _raw_num_rows_written; } + int64_t num_rows() const override { return _segment_creator.num_rows_written(); } - int64_t num_rows_filtered() const override { return _num_rows_filtered; } + int64_t num_rows_filtered() const override { return _segment_creator.num_rows_filtered(); } RowsetId rowset_id() override { return _context.rowset_id; } @@ -118,7 +119,7 @@ public: return Status::OK(); } - int32_t allocate_segment_id() override { return _next_segment_id.fetch_add(1); }; + int32_t allocate_segment_id() override { return _segment_creator.allocate_segment_id(); }; Status flush_segment_writer_for_segcompaction( std::unique_ptr* writer, uint64_t index_size, @@ -129,8 +130,8 @@ public: Status wait_flying_segcompaction() override; void set_segment_start_id(int32_t start_id) override { + _segment_creator.set_segment_start_id(start_id); _segment_start_id = start_id; - _next_segment_id = start_id; } int64_t delete_bitmap_ns() override { return _delete_bitmap_ns; } @@ -138,22 +139,8 @@ public: int64_t segment_writer_ns() override { return _segment_writer_ns; } private: - Status _add_rows(const vectorized::Block* block, - std::unique_ptr& segment_writer, size_t row_offset, - size_t input_row_num); - Status _add_block(const vectorized::Block* block, - std::unique_ptr& writer); - Status _create_file_writer(std::string path, io::FileWriterPtr& file_writer); Status _check_segment_number_limit(); - Status _create_segment_writer(std::unique_ptr& writer, - int32_t segment_id, bool no_compression = false, - TabletSchemaSPtr flush_schema = nullptr); - Status _flush_segment_writer(std::unique_ptr& writer, - int64_t* flush_size = nullptr); - Status _flush_single_block(const vectorized::Block* block, int32_t segment_id, - int64_t* flush_size = nullptr, - TabletSchemaSPtr flush_schema = nullptr); Status _generate_delete_bitmap(int32_t segment_id); void _build_rowset_meta(std::shared_ptr rowset_meta); @@ -191,19 +178,13 @@ protected: RowsetWriterContext _context; std::shared_ptr _rowset_meta; - std::atomic _next_segment_id; // the next available segment_id (offset), - // also the numer of allocated segments - std::atomic _num_segment; // number of consecutive flushed segments - roaring::Roaring _segment_set; // bitmap set to record flushed segment id - std::mutex _segment_set_mutex; // mutex for _segment_set - int32_t _segment_start_id; //basic write start from 0, partial update may be different + std::atomic _num_segment; // number of consecutive flushed segments + roaring::Roaring _segment_set; // bitmap set to record flushed segment id + std::mutex _segment_set_mutex; // mutex for _segment_set + int32_t _segment_start_id; // basic write start from 0, partial update may be different std::atomic _segcompacted_point; // segemnts before this point have // already been segment compacted std::atomic _num_segcompacted; // index for segment compaction - /// When flushing the memtable in the load process, we do not use this writer but an independent writer. - /// Because we want to flush memtables in parallel. - /// In other processes, such as merger or schema change, we will use this unified writer for data writing. - std::unique_ptr _segment_writer; mutable SpinLock _lock; // protect following vectors. // record rows number of every segment already written, using for rowid @@ -219,17 +200,13 @@ protected: std::atomic _total_index_size; // TODO rowset Zonemap - // written rows by add_block/add_row (not effected by segcompaction) - std::atomic _raw_num_rows_written; - - std::atomic _num_rows_filtered; - std::map _segid_statistics_map; std::mutex _segid_statistics_map_mutex; bool _is_pending = false; bool _already_built = false; + SegmentCreator _segment_creator; SegcompactionWorker _segcompaction_worker; // ensure only one inflight segcompaction task for each rowset diff --git a/be/src/olap/rowset/rowset_writer_context.h b/be/src/olap/rowset/rowset_writer_context.h index 904966d8e3..e26ab15dfa 100644 --- a/be/src/olap/rowset/rowset_writer_context.h +++ b/be/src/olap/rowset/rowset_writer_context.h @@ -30,6 +30,8 @@ class RowsetWriterContextBuilder; using RowsetWriterContextBuilderSharedPtr = std::shared_ptr; class DataDir; class Tablet; +class FileWriterCreator; +class SegmentCollector; namespace vectorized::schema_util { class LocalSchemaChangeRecorder; } @@ -90,6 +92,8 @@ struct RowsetWriterContext { nullptr; std::shared_ptr mow_context; + std::shared_ptr file_writer_creator; + std::shared_ptr segment_collector; }; } // namespace doris diff --git a/be/src/olap/rowset/segment_creator.cpp b/be/src/olap/rowset/segment_creator.cpp new file mode 100644 index 0000000000..458e31992e --- /dev/null +++ b/be/src/olap/rowset/segment_creator.cpp @@ -0,0 +1,242 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "olap/rowset/segment_creator.h" + +// IWYU pragma: no_include +#include // IWYU pragma: keep + +#include +#include +#include + +// IWYU pragma: no_include +#include "common/compiler_util.h" // IWYU pragma: keep +#include "common/config.h" +#include "common/logging.h" +#include "io/fs/file_writer.h" +#include "olap/rowset/beta_rowset_writer.h" // SegmentStatistics +#include "olap/rowset/segment_v2/segment_writer.h" +#include "vec/core/block.h" + +namespace doris { +using namespace ErrorCode; + +SegmentFlusher::SegmentFlusher() = default; + +SegmentFlusher::~SegmentFlusher() = default; + +Status SegmentFlusher::init(const RowsetWriterContext& rowset_writer_context) { + _context = rowset_writer_context; + return Status::OK(); +} + +Status SegmentFlusher::flush_single_block(const vectorized::Block* block, int32_t segment_id, + int64_t* flush_size, TabletSchemaSPtr flush_schema) { + if (block->rows() == 0) { + return Status::OK(); + } + std::unique_ptr writer; + bool no_compression = block->bytes() <= config::segment_compression_threshold_kb * 1024; + RETURN_IF_ERROR(_create_segment_writer(writer, segment_id, no_compression, flush_schema)); + RETURN_IF_ERROR(_add_rows(writer, block, 0, block->rows())); + RETURN_IF_ERROR(_flush_segment_writer(writer, flush_size)); + return Status::OK(); +} + +Status SegmentFlusher::close() { + std::lock_guard l(_lock); + for (auto& file_writer : _file_writers) { + Status status = file_writer->close(); + if (!status.ok()) { + LOG(WARNING) << "failed to close file writer, path=" << file_writer->path() + << " res=" << status; + return status; + } + } + return Status::OK(); +} + +Status SegmentFlusher::_add_rows(std::unique_ptr& segment_writer, + const vectorized::Block* block, size_t row_offset, + size_t row_num) { + auto s = segment_writer->append_block(block, row_offset, row_num); + if (UNLIKELY(!s.ok())) { + return Status::Error("failed to append block: {}", s.to_string()); + } + _num_rows_written += row_num; + return Status::OK(); +} + +Status SegmentFlusher::_create_segment_writer(std::unique_ptr& writer, + int32_t segment_id, bool no_compression, + TabletSchemaSPtr flush_schema) { + io::FileWriterPtr file_writer; + RETURN_IF_ERROR(_context.file_writer_creator->create(segment_id, file_writer)); + + segment_v2::SegmentWriterOptions writer_options; + writer_options.enable_unique_key_merge_on_write = _context.enable_unique_key_merge_on_write; + writer_options.rowset_ctx = &_context; + writer_options.write_type = _context.write_type; + if (no_compression) { + writer_options.compression_type = NO_COMPRESSION; + } + + const auto& tablet_schema = flush_schema ? flush_schema : _context.tablet_schema; + writer.reset(new segment_v2::SegmentWriter( + file_writer.get(), segment_id, tablet_schema, _context.tablet, _context.data_dir, + _context.max_rows_per_segment, writer_options, _context.mow_context)); + { + std::lock_guard l(_lock); + _file_writers.push_back(std::move(file_writer)); + } + auto s = writer->init(); + if (!s.ok()) { + LOG(WARNING) << "failed to init segment writer: " << s.to_string(); + writer.reset(); + return s; + } + return Status::OK(); +} + +Status SegmentFlusher::_flush_segment_writer(std::unique_ptr& writer, + int64_t* flush_size) { + uint32_t row_num = writer->num_rows_written(); + _num_rows_filtered += writer->num_rows_filtered(); + + if (row_num == 0) { + return Status::OK(); + } + uint64_t segment_size; + uint64_t index_size; + Status s = writer->finalize(&segment_size, &index_size); + if (!s.ok()) { + return Status::Error(s.code(), "failed to finalize segment: {}", s.to_string()); + } + VLOG_DEBUG << "tablet_id:" << _context.tablet_id + << " flushing filename: " << writer->get_data_dir()->path() + << " rowset_id:" << _context.rowset_id; + + KeyBoundsPB key_bounds; + Slice min_key = writer->min_encoded_key(); + Slice max_key = writer->max_encoded_key(); + DCHECK_LE(min_key.compare(max_key), 0); + key_bounds.set_min_key(min_key.to_string()); + key_bounds.set_max_key(max_key.to_string()); + + uint32_t segment_id = writer->get_segment_id(); + SegmentStatistics segstat; + segstat.row_num = row_num; + segstat.data_size = segment_size + writer->get_inverted_index_file_size(); + segstat.index_size = index_size + writer->get_inverted_index_file_size(); + segstat.key_bounds = key_bounds; + + writer.reset(); + + RETURN_IF_ERROR(_context.segment_collector->add(segment_id, segstat)); + + if (flush_size) { + *flush_size = segment_size + index_size; + } + return Status::OK(); +} + +Status SegmentFlusher::create_writer(std::unique_ptr& writer, + uint32_t segment_id) { + std::unique_ptr segment_writer; + RETURN_IF_ERROR(_create_segment_writer(segment_writer, segment_id)); + DCHECK(segment_writer != nullptr); + writer.reset(new SegmentFlusher::Writer(this, segment_writer)); + return Status::OK(); +} + +SegmentFlusher::Writer::Writer(SegmentFlusher* flusher, + std::unique_ptr& segment_writer) + : _flusher(flusher), _writer(std::move(segment_writer)) {}; + +SegmentFlusher::Writer::~Writer() = default; + +Status SegmentFlusher::Writer::flush() { + return _flusher->_flush_segment_writer(_writer); +} + +int64_t SegmentFlusher::Writer::max_row_to_add(size_t row_avg_size_in_bytes) { + return _writer->max_row_to_add(row_avg_size_in_bytes); +} + +Status SegmentCreator::init(const RowsetWriterContext& rowset_writer_context) { + _segment_flusher.init(rowset_writer_context); + return Status::OK(); +} + +Status SegmentCreator::add_block(const vectorized::Block* block) { + if (block->rows() == 0) { + return Status::OK(); + } + + size_t block_size_in_bytes = block->bytes(); + size_t block_row_num = block->rows(); + size_t row_avg_size_in_bytes = std::max((size_t)1, block_size_in_bytes / block_row_num); + size_t row_offset = 0; + + if (_flush_writer == nullptr) { + RETURN_IF_ERROR(_segment_flusher.create_writer(_flush_writer, allocate_segment_id())); + } + + do { + auto max_row_add = _flush_writer->max_row_to_add(row_avg_size_in_bytes); + if (UNLIKELY(max_row_add < 1)) { + // no space for another single row, need flush now + RETURN_IF_ERROR(flush()); + RETURN_IF_ERROR(_segment_flusher.create_writer(_flush_writer, allocate_segment_id())); + max_row_add = _flush_writer->max_row_to_add(row_avg_size_in_bytes); + DCHECK(max_row_add > 0); + } + size_t input_row_num = std::min(block_row_num - row_offset, size_t(max_row_add)); + RETURN_IF_ERROR(_flush_writer->add_rows(block, row_offset, input_row_num)); + row_offset += input_row_num; + } while (row_offset < block_row_num); + + return Status::OK(); +} + +Status SegmentCreator::flush() { + if (_flush_writer == nullptr) { + return Status::OK(); + } + RETURN_IF_ERROR(_flush_writer->flush()); + _flush_writer.reset(); + return Status::OK(); +} + +Status SegmentCreator::flush_single_block(const vectorized::Block* block, int32_t segment_id, + int64_t* flush_size, TabletSchemaSPtr flush_schema) { + if (block->rows() == 0) { + return Status::OK(); + } + RETURN_IF_ERROR( + _segment_flusher.flush_single_block(block, segment_id, flush_size, flush_schema)); + return Status::OK(); +} + +Status SegmentCreator::close() { + RETURN_IF_ERROR(flush()); + RETURN_IF_ERROR(_segment_flusher.close()); + return Status::OK(); +} + +} // namespace doris diff --git a/be/src/olap/rowset/segment_creator.h b/be/src/olap/rowset/segment_creator.h new file mode 100644 index 0000000000..750aa14872 --- /dev/null +++ b/be/src/olap/rowset/segment_creator.h @@ -0,0 +1,190 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include + +#include +#include + +#include "common/status.h" +#include "io/fs/file_reader_writer_fwd.h" +#include "olap/olap_common.h" +#include "olap/rowset/rowset_writer_context.h" +#include "util/spinlock.h" + +namespace doris { +namespace vectorized { +class Block; +} // namespace vectorized + +namespace segment_v2 { +class SegmentWriter; +} // namespace segment_v2 + +struct SegmentStatistics; +class BetaRowsetWriter; + +class FileWriterCreator { +public: + virtual ~FileWriterCreator() = default; + + virtual Status create(uint32_t segment_id, io::FileWriterPtr& file_writer) = 0; +}; + +template +class FileWriterCreatorT : public FileWriterCreator { +public: + explicit FileWriterCreatorT(T* t) : _t(t) {} + + Status create(uint32_t segment_id, io::FileWriterPtr& file_writer) override { + return _t->create_file_writer(segment_id, file_writer); + } + +private: + T* _t; +}; + +class SegmentCollector { +public: + virtual ~SegmentCollector() = default; + + virtual Status add(uint32_t segment_id, SegmentStatistics& segstat) = 0; +}; + +template +class SegmentCollectorT : public SegmentCollector { +public: + explicit SegmentCollectorT(T* t) : _t(t) {} + + Status add(uint32_t segment_id, SegmentStatistics& segstat) override { + return _t->add_segment(segment_id, segstat); + } + +private: + T* _t; +}; + +class SegmentFlusher { +public: + SegmentFlusher(); + + ~SegmentFlusher(); + + Status init(const RowsetWriterContext& rowset_writer_context); + + // Return the file size flushed to disk in "flush_size" + // This method is thread-safe. + Status flush_single_block(const vectorized::Block* block, int32_t segment_id, + int64_t* flush_size = nullptr, + TabletSchemaSPtr flush_schema = nullptr); + + int64_t num_rows_written() const { return _num_rows_written; } + + int64_t num_rows_filtered() const { return _num_rows_filtered; } + + Status close(); + +public: + class Writer { + friend class SegmentFlusher; + + public: + ~Writer(); + + Status add_rows(const vectorized::Block* block, size_t row_offset, size_t input_row_num) { + return _flusher->_add_rows(_writer, block, row_offset, input_row_num); + } + + Status flush(); + + int64_t max_row_to_add(size_t row_avg_size_in_bytes); + + private: + Writer(SegmentFlusher* flusher, std::unique_ptr& segment_writer); + + SegmentFlusher* _flusher; + std::unique_ptr _writer; + }; + + Status create_writer(std::unique_ptr& writer, uint32_t segment_id); + +private: + Status _add_rows(std::unique_ptr& segment_writer, + const vectorized::Block* block, size_t row_offset, size_t row_num); + Status _create_segment_writer(std::unique_ptr& writer, + int32_t segment_id, bool no_compression = false, + TabletSchemaSPtr flush_schema = nullptr); + Status _flush_segment_writer(std::unique_ptr& writer, + int64_t* flush_size = nullptr); + +private: + RowsetWriterContext _context; + + mutable SpinLock _lock; // protect following vectors. + std::vector _file_writers; + + // written rows by add_block/add_row + std::atomic _num_rows_written = 0; + std::atomic _num_rows_filtered = 0; +}; + +class SegmentCreator { +public: + SegmentCreator() = default; + + ~SegmentCreator() = default; + + Status init(const RowsetWriterContext& rowset_writer_context); + + void set_segment_start_id(uint32_t start_id) { _next_segment_id = start_id; } + + Status add_block(const vectorized::Block* block); + + Status flush(); + + int32_t allocate_segment_id() { return _next_segment_id.fetch_add(1); } + + int32_t next_segment_id() const { return _next_segment_id.load(); } + + int64_t num_rows_written() const { return _segment_flusher.num_rows_written(); } + + int64_t num_rows_filtered() const { return _segment_flusher.num_rows_filtered(); } + + // Flush a block into a single segment, with pre-allocated segment_id. + // Return the file size flushed to disk in "flush_size" + // This method is thread-safe. + Status flush_single_block(const vectorized::Block* block, int32_t segment_id, + int64_t* flush_size = nullptr, + TabletSchemaSPtr flush_schema = nullptr); + + // Flush a block into a single segment, without pre-allocated segment_id. + // This method is thread-safe. + Status flush_single_block(const vectorized::Block* block) { + return flush_single_block(block, allocate_segment_id()); + } + + Status close(); + +private: + std::atomic _next_segment_id = 0; + SegmentFlusher _segment_flusher; + std::unique_ptr _flush_writer; +}; + +} // namespace doris diff --git a/be/src/olap/rowset/vertical_beta_rowset_writer.cpp b/be/src/olap/rowset/vertical_beta_rowset_writer.cpp index 4ae2ee017f..b527cab78b 100644 --- a/be/src/olap/rowset/vertical_beta_rowset_writer.cpp +++ b/be/src/olap/rowset/vertical_beta_rowset_writer.cpp @@ -171,11 +171,6 @@ Status VerticalBetaRowsetWriter::flush_columns(bool is_key) { Status VerticalBetaRowsetWriter::_create_segment_writer( const std::vector& column_ids, bool is_key, std::unique_ptr* writer) { - // TODO: just for pass DCHECK now, we should align the meaning - // of _num_segment and _next_segment_id with BetaRowsetWriter. - // i.e. _next_segment_id means next available segment id, - // and _num_segment means num of flushed segments. - allocate_segment_id(); auto path = BetaRowset::segment_file_path(_context.rowset_dir, _context.rowset_id, _num_segment++); auto fs = _rowset_meta->fs();