[refactor](load) split segment flush out of beta rowset writer (#21725)

This commit is contained in:
Kaijie Chen
2023-08-04 19:48:56 +08:00
committed by GitHub
parent ea674aa540
commit 8bbccc59ef
6 changed files with 474 additions and 192 deletions

View File

@ -61,17 +61,13 @@ using namespace ErrorCode;
BetaRowsetWriter::BetaRowsetWriter()
: _rowset_meta(nullptr),
_next_segment_id(0),
_num_segment(0),
_segment_start_id(0),
_segcompacted_point(0),
_num_segcompacted(0),
_segment_writer(nullptr),
_num_rows_written(0),
_total_data_size(0),
_total_index_size(0),
_raw_num_rows_written(0),
_num_rows_filtered(0),
_segcompaction_worker(this),
_is_doing_segcompaction(false) {
_segcompaction_status.store(OK);
@ -85,14 +81,14 @@ BetaRowsetWriter::~BetaRowsetWriter() {
wait_flying_segcompaction();
// TODO(lingbin): Should wrapper exception logic, no need to know file ops directly.
if (!_already_built) { // abnormal exit, remove all files generated
_segment_writer.reset(); // ensure all files are closed
if (!_already_built) { // abnormal exit, remove all files generated
_segment_creator.close(); // ensure all files are closed
auto fs = _rowset_meta->fs();
if (!fs) {
return;
}
DCHECK_LE(_segment_start_id + _num_segment, _next_segment_id);
for (int i = _segment_start_id; i < _next_segment_id; ++i) {
DCHECK_LE(_segment_start_id + _num_segment, _segment_creator.next_segment_id());
for (int i = _segment_start_id; i < _segment_creator.next_segment_id(); ++i) {
std::string seg_path =
BetaRowset::segment_file_path(_context.rowset_dir, _context.rowset_id, i);
// Even if an error is encountered, these files that have not been cleaned up
@ -127,18 +123,14 @@ Status BetaRowsetWriter::init(const RowsetWriterContext& rowset_writer_context)
_rowset_meta->set_tablet_schema(_context.tablet_schema);
_context.schema_change_recorder =
std::make_shared<vectorized::schema_util::LocalSchemaChangeRecorder>();
_context.segment_collector = std::make_shared<SegmentCollectorT<BetaRowsetWriter>>(this);
_context.file_writer_creator = std::make_shared<FileWriterCreatorT<BetaRowsetWriter>>(this);
_segment_creator.init(_context);
return Status::OK();
}
Status BetaRowsetWriter::add_block(const vectorized::Block* block) {
if (block->rows() == 0) {
return Status::OK();
}
if (UNLIKELY(_segment_writer == nullptr)) {
RETURN_IF_ERROR(_create_segment_writer(_segment_writer, allocate_segment_id()));
}
return _add_block(block, _segment_writer);
return _segment_creator.add_block(block);
}
Status BetaRowsetWriter::_generate_delete_bitmap(int32_t segment_id) {
@ -414,41 +406,6 @@ Status BetaRowsetWriter::_segcompaction_rename_last_segments() {
return Status::OK();
}
Status BetaRowsetWriter::_add_rows(const vectorized::Block* block,
std::unique_ptr<segment_v2::SegmentWriter>& segment_writer,
size_t row_offset, size_t input_row_num) {
auto s = segment_writer->append_block(block, row_offset, input_row_num);
if (UNLIKELY(!s.ok())) {
return Status::Error<WRITER_DATA_WRITE_ERROR>("failed to append block: {}", s.to_string());
}
_raw_num_rows_written += input_row_num;
return Status::OK();
}
Status BetaRowsetWriter::_add_block(const vectorized::Block* block,
std::unique_ptr<segment_v2::SegmentWriter>& segment_writer) {
size_t block_size_in_bytes = block->bytes();
size_t block_row_num = block->rows();
size_t row_avg_size_in_bytes = std::max((size_t)1, block_size_in_bytes / block_row_num);
size_t row_offset = 0;
do {
auto max_row_add = segment_writer->max_row_to_add(row_avg_size_in_bytes);
if (UNLIKELY(max_row_add < 1)) {
// no space for another single row, need flush now
RETURN_IF_ERROR(_flush_segment_writer(segment_writer));
RETURN_IF_ERROR(_create_segment_writer(segment_writer, allocate_segment_id()));
max_row_add = segment_writer->max_row_to_add(row_avg_size_in_bytes);
DCHECK(max_row_add > 0);
}
size_t input_row_num = std::min(block_row_num - row_offset, size_t(max_row_add));
RETURN_IF_ERROR(_add_rows(block, segment_writer, row_offset, input_row_num));
row_offset += input_row_num;
} while (row_offset < block_row_num);
return Status::OK();
}
Status BetaRowsetWriter::add_rowset(RowsetSharedPtr rowset) {
assert(rowset->rowset_meta()->rowset_type() == BETA_ROWSET);
RETURN_IF_ERROR(rowset->link_files_to(_context.rowset_dir, _context.rowset_id));
@ -456,9 +413,6 @@ Status BetaRowsetWriter::add_rowset(RowsetSharedPtr rowset) {
_total_data_size += rowset->rowset_meta()->data_disk_size();
_total_index_size += rowset->rowset_meta()->index_disk_size();
_num_segment += rowset->num_segments();
// _next_segment_id is not used in this code path,
// just to make sure it matches with _num_segment
_next_segment_id = _num_segment.load();
// append key_bounds to current rowset
rowset->get_segments_key_bounds(&_segments_encoded_key_bounds);
// TODO update zonemap
@ -474,10 +428,7 @@ Status BetaRowsetWriter::add_rowset_for_linked_schema_change(RowsetSharedPtr row
}
Status BetaRowsetWriter::flush() {
if (_segment_writer != nullptr) {
RETURN_IF_ERROR(_flush_segment_writer(_segment_writer));
}
return Status::OK();
return _segment_creator.flush();
}
Status BetaRowsetWriter::flush_memtable(vectorized::Block* block, int32_t segment_id,
@ -493,7 +444,8 @@ Status BetaRowsetWriter::flush_memtable(vectorized::Block* block, int32_t segmen
}
{
SCOPED_RAW_TIMER(&_segment_writer_ns);
RETURN_IF_ERROR(_flush_single_block(block, segment_id, flush_size, flush_schema));
RETURN_IF_ERROR(
_segment_creator.flush_single_block(block, segment_id, flush_size, flush_schema));
}
RETURN_IF_ERROR(_generate_delete_bitmap(segment_id));
RETURN_IF_ERROR(_segcompaction_if_necessary());
@ -501,20 +453,7 @@ Status BetaRowsetWriter::flush_memtable(vectorized::Block* block, int32_t segmen
}
Status BetaRowsetWriter::flush_single_block(const vectorized::Block* block) {
if (block->rows() == 0) {
return Status::OK();
}
return _flush_single_block(block, allocate_segment_id());
}
Status BetaRowsetWriter::_flush_single_block(const vectorized::Block* block, int32_t segment_id,
int64_t* flush_size, TabletSchemaSPtr flush_schema) {
std::unique_ptr<segment_v2::SegmentWriter> writer;
bool no_compression = block->bytes() <= config::segment_compression_threshold_kb * 1024;
RETURN_IF_ERROR(_create_segment_writer(writer, segment_id, no_compression, flush_schema));
RETURN_IF_ERROR(_add_rows(block, writer, 0, block->rows()));
RETURN_IF_ERROR(_flush_segment_writer(writer, flush_size));
return Status::OK();
return _segment_creator.flush_single_block(block);
}
Status BetaRowsetWriter::wait_flying_segcompaction() {
@ -552,8 +491,6 @@ RowsetSharedPtr BetaRowsetWriter::manual_build(const RowsetMetaSharedPtr& spec_r
}
RowsetSharedPtr BetaRowsetWriter::build() {
// make sure all segments are flushed
DCHECK_EQ(_segment_start_id + _num_segment, _next_segment_id);
// TODO(lingbin): move to more better place, or in a CreateBlockBatch?
for (auto& file_writer : _file_writers) {
Status status = file_writer->close();
@ -564,6 +501,11 @@ RowsetSharedPtr BetaRowsetWriter::build() {
}
}
Status status;
status = _segment_creator.close();
if (!status.ok()) {
LOG(WARNING) << "failed to close segment creator when build new rowset, res=" << status;
return nullptr;
}
// if _segment_start_id is not zero, that means it's a transient rowset writer for
// MoW partial update, don't need to do segment compaction.
if (_segment_start_id == 0) {
@ -583,9 +525,11 @@ RowsetSharedPtr BetaRowsetWriter::build() {
_segcompaction_worker.get_file_writer()->close();
}
}
// When building a rowset, we must ensure that the current _segment_writer has been
// flushed, that is, the current _segment_writer is nullptr
DCHECK(_segment_writer == nullptr) << "segment must be null when build rowset";
status = _check_segment_number_limit();
if (!status.ok()) {
LOG(WARNING) << "build rowset failed, res=" << status;
return nullptr;
}
_build_rowset_meta(_rowset_meta);
if (_rowset_meta->newest_write_timestamp() == -1) {
@ -758,38 +702,6 @@ Status BetaRowsetWriter::_create_segment_writer_for_segcompaction(
return Status::OK();
}
Status BetaRowsetWriter::_create_segment_writer(std::unique_ptr<segment_v2::SegmentWriter>& writer,
int32_t segment_id, bool no_compression,
TabletSchemaSPtr flush_schema) {
RETURN_IF_ERROR(_check_segment_number_limit());
io::FileWriterPtr file_writer;
RETURN_IF_ERROR(create_file_writer(segment_id, file_writer));
segment_v2::SegmentWriterOptions writer_options;
writer_options.enable_unique_key_merge_on_write = _context.enable_unique_key_merge_on_write;
writer_options.rowset_ctx = &_context;
writer_options.write_type = _context.write_type;
if (no_compression) {
writer_options.compression_type = NO_COMPRESSION;
}
const auto& tablet_schema = flush_schema ? flush_schema : _context.tablet_schema;
writer.reset(new segment_v2::SegmentWriter(
file_writer.get(), segment_id, tablet_schema, _context.tablet, _context.data_dir,
_context.max_rows_per_segment, writer_options, _context.mow_context));
{
std::lock_guard<SpinLock> l(_lock);
_file_writers.push_back(std::move(file_writer));
}
auto s = writer->init();
if (!s.ok()) {
LOG(WARNING) << "failed to init segment writer: " << s.to_string();
writer.reset();
return s;
}
return Status::OK();
}
Status BetaRowsetWriter::_check_segment_number_limit() {
size_t total_segment_num = _num_segment - _segcompacted_point + 1 + _num_segcompacted;
if (UNLIKELY(total_segment_num > config::max_segment_num_per_rowset)) {
@ -803,54 +715,15 @@ Status BetaRowsetWriter::_check_segment_number_limit() {
return Status::OK();
}
Status BetaRowsetWriter::_flush_segment_writer(std::unique_ptr<segment_v2::SegmentWriter>& writer,
int64_t* flush_size) {
uint32_t segid = writer->get_segment_id();
uint32_t row_num = writer->num_rows_written();
if (writer->num_rows_written() == 0) {
return Status::OK();
}
uint64_t segment_size;
uint64_t index_size;
Status s = writer->finalize(&segment_size, &index_size);
if (!s.ok()) {
return Status::Error(s.code(), "failed to finalize segment: {}", s.to_string());
}
VLOG_DEBUG << "tablet_id:" << _context.tablet_id
<< " flushing filename: " << writer->get_data_dir()->path()
<< " rowset_id:" << _context.rowset_id << " segment num:" << _num_segment;
KeyBoundsPB key_bounds;
Slice min_key = writer->min_encoded_key();
Slice max_key = writer->max_encoded_key();
DCHECK_LE(min_key.compare(max_key), 0);
key_bounds.set_min_key(min_key.to_string());
key_bounds.set_max_key(max_key.to_string());
SegmentStatistics segstat;
segstat.row_num = row_num;
segstat.data_size = segment_size + writer->get_inverted_index_file_size();
segstat.index_size = index_size + writer->get_inverted_index_file_size();
segstat.key_bounds = key_bounds;
_num_rows_filtered += writer->num_rows_filtered();
writer.reset();
if (flush_size) {
*flush_size = segment_size + index_size;
}
add_segment(segid, segstat);
return Status::OK();
}
void BetaRowsetWriter::add_segment(uint32_t segid, SegmentStatistics& segstat) {
Status BetaRowsetWriter::add_segment(uint32_t segid, SegmentStatistics& segstat) {
uint32_t segid_offset = segid - _segment_start_id;
{
std::lock_guard<std::mutex> lock(_segid_statistics_map_mutex);
CHECK_EQ(_segid_statistics_map.find(segid) == _segid_statistics_map.end(), true);
_segid_statistics_map.emplace(segid, segstat);
_segment_num_rows.resize(_next_segment_id);
if (segid >= _segment_num_rows.size()) {
_segment_num_rows.resize(segid + 1);
}
_segment_num_rows[segid_offset] = segstat.row_num;
}
VLOG_DEBUG << "_segid_statistics_map add new record. segid:" << segid
@ -864,6 +737,7 @@ void BetaRowsetWriter::add_segment(uint32_t segid, SegmentStatistics& segstat) {
_num_segment++;
}
}
return Status::OK();
}
Status BetaRowsetWriter::flush_segment_writer_for_segcompaction(

View File

@ -42,6 +42,7 @@
#include "olap/rowset/rowset_meta.h"
#include "olap/rowset/rowset_writer.h"
#include "olap/rowset/rowset_writer_context.h"
#include "olap/rowset/segment_creator.h"
#include "segcompaction.h"
#include "segment_v2/segment.h"
#include "util/spinlock.h"
@ -87,7 +88,7 @@ public:
Status create_file_writer(uint32_t segment_id, io::FileWriterPtr& writer);
void add_segment(uint32_t segid, SegmentStatistics& segstat);
Status add_segment(uint32_t segid, SegmentStatistics& segstat);
Status flush() override;
@ -104,9 +105,9 @@ public:
Version version() override { return _context.version; }
int64_t num_rows() const override { return _raw_num_rows_written; }
int64_t num_rows() const override { return _segment_creator.num_rows_written(); }
int64_t num_rows_filtered() const override { return _num_rows_filtered; }
int64_t num_rows_filtered() const override { return _segment_creator.num_rows_filtered(); }
RowsetId rowset_id() override { return _context.rowset_id; }
@ -118,7 +119,7 @@ public:
return Status::OK();
}
int32_t allocate_segment_id() override { return _next_segment_id.fetch_add(1); };
int32_t allocate_segment_id() override { return _segment_creator.allocate_segment_id(); };
Status flush_segment_writer_for_segcompaction(
std::unique_ptr<segment_v2::SegmentWriter>* writer, uint64_t index_size,
@ -129,8 +130,8 @@ public:
Status wait_flying_segcompaction() override;
void set_segment_start_id(int32_t start_id) override {
_segment_creator.set_segment_start_id(start_id);
_segment_start_id = start_id;
_next_segment_id = start_id;
}
int64_t delete_bitmap_ns() override { return _delete_bitmap_ns; }
@ -138,22 +139,8 @@ public:
int64_t segment_writer_ns() override { return _segment_writer_ns; }
private:
Status _add_rows(const vectorized::Block* block,
std::unique_ptr<segment_v2::SegmentWriter>& segment_writer, size_t row_offset,
size_t input_row_num);
Status _add_block(const vectorized::Block* block,
std::unique_ptr<segment_v2::SegmentWriter>& writer);
Status _create_file_writer(std::string path, io::FileWriterPtr& file_writer);
Status _check_segment_number_limit();
Status _create_segment_writer(std::unique_ptr<segment_v2::SegmentWriter>& writer,
int32_t segment_id, bool no_compression = false,
TabletSchemaSPtr flush_schema = nullptr);
Status _flush_segment_writer(std::unique_ptr<segment_v2::SegmentWriter>& writer,
int64_t* flush_size = nullptr);
Status _flush_single_block(const vectorized::Block* block, int32_t segment_id,
int64_t* flush_size = nullptr,
TabletSchemaSPtr flush_schema = nullptr);
Status _generate_delete_bitmap(int32_t segment_id);
void _build_rowset_meta(std::shared_ptr<RowsetMeta> rowset_meta);
@ -191,19 +178,13 @@ protected:
RowsetWriterContext _context;
std::shared_ptr<RowsetMeta> _rowset_meta;
std::atomic<int32_t> _next_segment_id; // the next available segment_id (offset),
// also the numer of allocated segments
std::atomic<int32_t> _num_segment; // number of consecutive flushed segments
roaring::Roaring _segment_set; // bitmap set to record flushed segment id
std::mutex _segment_set_mutex; // mutex for _segment_set
int32_t _segment_start_id; //basic write start from 0, partial update may be different
std::atomic<int32_t> _num_segment; // number of consecutive flushed segments
roaring::Roaring _segment_set; // bitmap set to record flushed segment id
std::mutex _segment_set_mutex; // mutex for _segment_set
int32_t _segment_start_id; // basic write start from 0, partial update may be different
std::atomic<int32_t> _segcompacted_point; // segemnts before this point have
// already been segment compacted
std::atomic<int32_t> _num_segcompacted; // index for segment compaction
/// When flushing the memtable in the load process, we do not use this writer but an independent writer.
/// Because we want to flush memtables in parallel.
/// In other processes, such as merger or schema change, we will use this unified writer for data writing.
std::unique_ptr<segment_v2::SegmentWriter> _segment_writer;
mutable SpinLock _lock; // protect following vectors.
// record rows number of every segment already written, using for rowid
@ -219,17 +200,13 @@ protected:
std::atomic<int64_t> _total_index_size;
// TODO rowset Zonemap
// written rows by add_block/add_row (not effected by segcompaction)
std::atomic<int64_t> _raw_num_rows_written;
std::atomic<int64_t> _num_rows_filtered;
std::map<uint32_t, SegmentStatistics> _segid_statistics_map;
std::mutex _segid_statistics_map_mutex;
bool _is_pending = false;
bool _already_built = false;
SegmentCreator _segment_creator;
SegcompactionWorker _segcompaction_worker;
// ensure only one inflight segcompaction task for each rowset

View File

@ -30,6 +30,8 @@ class RowsetWriterContextBuilder;
using RowsetWriterContextBuilderSharedPtr = std::shared_ptr<RowsetWriterContextBuilder>;
class DataDir;
class Tablet;
class FileWriterCreator;
class SegmentCollector;
namespace vectorized::schema_util {
class LocalSchemaChangeRecorder;
}
@ -90,6 +92,8 @@ struct RowsetWriterContext {
nullptr;
std::shared_ptr<MowContext> mow_context;
std::shared_ptr<FileWriterCreator> file_writer_creator;
std::shared_ptr<SegmentCollector> segment_collector;
};
} // namespace doris

View File

@ -0,0 +1,242 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "olap/rowset/segment_creator.h"
// IWYU pragma: no_include <bthread/errno.h>
#include <errno.h> // IWYU pragma: keep
#include <filesystem>
#include <sstream>
#include <utility>
// IWYU pragma: no_include <opentelemetry/common/threadlocal.h>
#include "common/compiler_util.h" // IWYU pragma: keep
#include "common/config.h"
#include "common/logging.h"
#include "io/fs/file_writer.h"
#include "olap/rowset/beta_rowset_writer.h" // SegmentStatistics
#include "olap/rowset/segment_v2/segment_writer.h"
#include "vec/core/block.h"
namespace doris {
using namespace ErrorCode;
SegmentFlusher::SegmentFlusher() = default;
SegmentFlusher::~SegmentFlusher() = default;
Status SegmentFlusher::init(const RowsetWriterContext& rowset_writer_context) {
_context = rowset_writer_context;
return Status::OK();
}
Status SegmentFlusher::flush_single_block(const vectorized::Block* block, int32_t segment_id,
int64_t* flush_size, TabletSchemaSPtr flush_schema) {
if (block->rows() == 0) {
return Status::OK();
}
std::unique_ptr<segment_v2::SegmentWriter> writer;
bool no_compression = block->bytes() <= config::segment_compression_threshold_kb * 1024;
RETURN_IF_ERROR(_create_segment_writer(writer, segment_id, no_compression, flush_schema));
RETURN_IF_ERROR(_add_rows(writer, block, 0, block->rows()));
RETURN_IF_ERROR(_flush_segment_writer(writer, flush_size));
return Status::OK();
}
Status SegmentFlusher::close() {
std::lock_guard<SpinLock> l(_lock);
for (auto& file_writer : _file_writers) {
Status status = file_writer->close();
if (!status.ok()) {
LOG(WARNING) << "failed to close file writer, path=" << file_writer->path()
<< " res=" << status;
return status;
}
}
return Status::OK();
}
Status SegmentFlusher::_add_rows(std::unique_ptr<segment_v2::SegmentWriter>& segment_writer,
const vectorized::Block* block, size_t row_offset,
size_t row_num) {
auto s = segment_writer->append_block(block, row_offset, row_num);
if (UNLIKELY(!s.ok())) {
return Status::Error<WRITER_DATA_WRITE_ERROR>("failed to append block: {}", s.to_string());
}
_num_rows_written += row_num;
return Status::OK();
}
Status SegmentFlusher::_create_segment_writer(std::unique_ptr<segment_v2::SegmentWriter>& writer,
int32_t segment_id, bool no_compression,
TabletSchemaSPtr flush_schema) {
io::FileWriterPtr file_writer;
RETURN_IF_ERROR(_context.file_writer_creator->create(segment_id, file_writer));
segment_v2::SegmentWriterOptions writer_options;
writer_options.enable_unique_key_merge_on_write = _context.enable_unique_key_merge_on_write;
writer_options.rowset_ctx = &_context;
writer_options.write_type = _context.write_type;
if (no_compression) {
writer_options.compression_type = NO_COMPRESSION;
}
const auto& tablet_schema = flush_schema ? flush_schema : _context.tablet_schema;
writer.reset(new segment_v2::SegmentWriter(
file_writer.get(), segment_id, tablet_schema, _context.tablet, _context.data_dir,
_context.max_rows_per_segment, writer_options, _context.mow_context));
{
std::lock_guard<SpinLock> l(_lock);
_file_writers.push_back(std::move(file_writer));
}
auto s = writer->init();
if (!s.ok()) {
LOG(WARNING) << "failed to init segment writer: " << s.to_string();
writer.reset();
return s;
}
return Status::OK();
}
Status SegmentFlusher::_flush_segment_writer(std::unique_ptr<segment_v2::SegmentWriter>& writer,
int64_t* flush_size) {
uint32_t row_num = writer->num_rows_written();
_num_rows_filtered += writer->num_rows_filtered();
if (row_num == 0) {
return Status::OK();
}
uint64_t segment_size;
uint64_t index_size;
Status s = writer->finalize(&segment_size, &index_size);
if (!s.ok()) {
return Status::Error(s.code(), "failed to finalize segment: {}", s.to_string());
}
VLOG_DEBUG << "tablet_id:" << _context.tablet_id
<< " flushing filename: " << writer->get_data_dir()->path()
<< " rowset_id:" << _context.rowset_id;
KeyBoundsPB key_bounds;
Slice min_key = writer->min_encoded_key();
Slice max_key = writer->max_encoded_key();
DCHECK_LE(min_key.compare(max_key), 0);
key_bounds.set_min_key(min_key.to_string());
key_bounds.set_max_key(max_key.to_string());
uint32_t segment_id = writer->get_segment_id();
SegmentStatistics segstat;
segstat.row_num = row_num;
segstat.data_size = segment_size + writer->get_inverted_index_file_size();
segstat.index_size = index_size + writer->get_inverted_index_file_size();
segstat.key_bounds = key_bounds;
writer.reset();
RETURN_IF_ERROR(_context.segment_collector->add(segment_id, segstat));
if (flush_size) {
*flush_size = segment_size + index_size;
}
return Status::OK();
}
Status SegmentFlusher::create_writer(std::unique_ptr<SegmentFlusher::Writer>& writer,
uint32_t segment_id) {
std::unique_ptr<segment_v2::SegmentWriter> segment_writer;
RETURN_IF_ERROR(_create_segment_writer(segment_writer, segment_id));
DCHECK(segment_writer != nullptr);
writer.reset(new SegmentFlusher::Writer(this, segment_writer));
return Status::OK();
}
SegmentFlusher::Writer::Writer(SegmentFlusher* flusher,
std::unique_ptr<segment_v2::SegmentWriter>& segment_writer)
: _flusher(flusher), _writer(std::move(segment_writer)) {};
SegmentFlusher::Writer::~Writer() = default;
Status SegmentFlusher::Writer::flush() {
return _flusher->_flush_segment_writer(_writer);
}
int64_t SegmentFlusher::Writer::max_row_to_add(size_t row_avg_size_in_bytes) {
return _writer->max_row_to_add(row_avg_size_in_bytes);
}
Status SegmentCreator::init(const RowsetWriterContext& rowset_writer_context) {
_segment_flusher.init(rowset_writer_context);
return Status::OK();
}
Status SegmentCreator::add_block(const vectorized::Block* block) {
if (block->rows() == 0) {
return Status::OK();
}
size_t block_size_in_bytes = block->bytes();
size_t block_row_num = block->rows();
size_t row_avg_size_in_bytes = std::max((size_t)1, block_size_in_bytes / block_row_num);
size_t row_offset = 0;
if (_flush_writer == nullptr) {
RETURN_IF_ERROR(_segment_flusher.create_writer(_flush_writer, allocate_segment_id()));
}
do {
auto max_row_add = _flush_writer->max_row_to_add(row_avg_size_in_bytes);
if (UNLIKELY(max_row_add < 1)) {
// no space for another single row, need flush now
RETURN_IF_ERROR(flush());
RETURN_IF_ERROR(_segment_flusher.create_writer(_flush_writer, allocate_segment_id()));
max_row_add = _flush_writer->max_row_to_add(row_avg_size_in_bytes);
DCHECK(max_row_add > 0);
}
size_t input_row_num = std::min(block_row_num - row_offset, size_t(max_row_add));
RETURN_IF_ERROR(_flush_writer->add_rows(block, row_offset, input_row_num));
row_offset += input_row_num;
} while (row_offset < block_row_num);
return Status::OK();
}
Status SegmentCreator::flush() {
if (_flush_writer == nullptr) {
return Status::OK();
}
RETURN_IF_ERROR(_flush_writer->flush());
_flush_writer.reset();
return Status::OK();
}
Status SegmentCreator::flush_single_block(const vectorized::Block* block, int32_t segment_id,
int64_t* flush_size, TabletSchemaSPtr flush_schema) {
if (block->rows() == 0) {
return Status::OK();
}
RETURN_IF_ERROR(
_segment_flusher.flush_single_block(block, segment_id, flush_size, flush_schema));
return Status::OK();
}
Status SegmentCreator::close() {
RETURN_IF_ERROR(flush());
RETURN_IF_ERROR(_segment_flusher.close());
return Status::OK();
}
} // namespace doris

View File

@ -0,0 +1,190 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#pragma once
#include <gen_cpp/olap_file.pb.h>
#include <string>
#include <vector>
#include "common/status.h"
#include "io/fs/file_reader_writer_fwd.h"
#include "olap/olap_common.h"
#include "olap/rowset/rowset_writer_context.h"
#include "util/spinlock.h"
namespace doris {
namespace vectorized {
class Block;
} // namespace vectorized
namespace segment_v2 {
class SegmentWriter;
} // namespace segment_v2
struct SegmentStatistics;
class BetaRowsetWriter;
class FileWriterCreator {
public:
virtual ~FileWriterCreator() = default;
virtual Status create(uint32_t segment_id, io::FileWriterPtr& file_writer) = 0;
};
template <class T>
class FileWriterCreatorT : public FileWriterCreator {
public:
explicit FileWriterCreatorT(T* t) : _t(t) {}
Status create(uint32_t segment_id, io::FileWriterPtr& file_writer) override {
return _t->create_file_writer(segment_id, file_writer);
}
private:
T* _t;
};
class SegmentCollector {
public:
virtual ~SegmentCollector() = default;
virtual Status add(uint32_t segment_id, SegmentStatistics& segstat) = 0;
};
template <class T>
class SegmentCollectorT : public SegmentCollector {
public:
explicit SegmentCollectorT(T* t) : _t(t) {}
Status add(uint32_t segment_id, SegmentStatistics& segstat) override {
return _t->add_segment(segment_id, segstat);
}
private:
T* _t;
};
class SegmentFlusher {
public:
SegmentFlusher();
~SegmentFlusher();
Status init(const RowsetWriterContext& rowset_writer_context);
// Return the file size flushed to disk in "flush_size"
// This method is thread-safe.
Status flush_single_block(const vectorized::Block* block, int32_t segment_id,
int64_t* flush_size = nullptr,
TabletSchemaSPtr flush_schema = nullptr);
int64_t num_rows_written() const { return _num_rows_written; }
int64_t num_rows_filtered() const { return _num_rows_filtered; }
Status close();
public:
class Writer {
friend class SegmentFlusher;
public:
~Writer();
Status add_rows(const vectorized::Block* block, size_t row_offset, size_t input_row_num) {
return _flusher->_add_rows(_writer, block, row_offset, input_row_num);
}
Status flush();
int64_t max_row_to_add(size_t row_avg_size_in_bytes);
private:
Writer(SegmentFlusher* flusher, std::unique_ptr<segment_v2::SegmentWriter>& segment_writer);
SegmentFlusher* _flusher;
std::unique_ptr<segment_v2::SegmentWriter> _writer;
};
Status create_writer(std::unique_ptr<SegmentFlusher::Writer>& writer, uint32_t segment_id);
private:
Status _add_rows(std::unique_ptr<segment_v2::SegmentWriter>& segment_writer,
const vectorized::Block* block, size_t row_offset, size_t row_num);
Status _create_segment_writer(std::unique_ptr<segment_v2::SegmentWriter>& writer,
int32_t segment_id, bool no_compression = false,
TabletSchemaSPtr flush_schema = nullptr);
Status _flush_segment_writer(std::unique_ptr<segment_v2::SegmentWriter>& writer,
int64_t* flush_size = nullptr);
private:
RowsetWriterContext _context;
mutable SpinLock _lock; // protect following vectors.
std::vector<io::FileWriterPtr> _file_writers;
// written rows by add_block/add_row
std::atomic<int64_t> _num_rows_written = 0;
std::atomic<int64_t> _num_rows_filtered = 0;
};
class SegmentCreator {
public:
SegmentCreator() = default;
~SegmentCreator() = default;
Status init(const RowsetWriterContext& rowset_writer_context);
void set_segment_start_id(uint32_t start_id) { _next_segment_id = start_id; }
Status add_block(const vectorized::Block* block);
Status flush();
int32_t allocate_segment_id() { return _next_segment_id.fetch_add(1); }
int32_t next_segment_id() const { return _next_segment_id.load(); }
int64_t num_rows_written() const { return _segment_flusher.num_rows_written(); }
int64_t num_rows_filtered() const { return _segment_flusher.num_rows_filtered(); }
// Flush a block into a single segment, with pre-allocated segment_id.
// Return the file size flushed to disk in "flush_size"
// This method is thread-safe.
Status flush_single_block(const vectorized::Block* block, int32_t segment_id,
int64_t* flush_size = nullptr,
TabletSchemaSPtr flush_schema = nullptr);
// Flush a block into a single segment, without pre-allocated segment_id.
// This method is thread-safe.
Status flush_single_block(const vectorized::Block* block) {
return flush_single_block(block, allocate_segment_id());
}
Status close();
private:
std::atomic<int32_t> _next_segment_id = 0;
SegmentFlusher _segment_flusher;
std::unique_ptr<SegmentFlusher::Writer> _flush_writer;
};
} // namespace doris

View File

@ -171,11 +171,6 @@ Status VerticalBetaRowsetWriter::flush_columns(bool is_key) {
Status VerticalBetaRowsetWriter::_create_segment_writer(
const std::vector<uint32_t>& column_ids, bool is_key,
std::unique_ptr<segment_v2::SegmentWriter>* writer) {
// TODO: just for pass DCHECK now, we should align the meaning
// of _num_segment and _next_segment_id with BetaRowsetWriter.
// i.e. _next_segment_id means next available segment id,
// and _num_segment means num of flushed segments.
allocate_segment_id();
auto path =
BetaRowset::segment_file_path(_context.rowset_dir, _context.rowset_id, _num_segment++);
auto fs = _rowset_meta->fs();