Use block layer to write files (#3064)

This is the second patch following 58b8e3f574614433ea9e0c427961f2efb3476c2a,

This patch use block-layer to write files.
This commit is contained in:
LingBin
2020-03-10 23:11:25 -05:00
committed by GitHub
parent cf219ddf18
commit 608917c04d
28 changed files with 281 additions and 158 deletions

View File

@ -18,6 +18,7 @@
#include "olap/fs/fs_util.h"
#include "common/status.h"
#include "env/env.h"
#include "olap/fs/file_block_manager.h"
namespace doris {
@ -28,6 +29,12 @@ BlockManager* file_block_mgr(Env* env, BlockManagerOptions opts) {
return new FileBlockManager(env, std::move(opts));
}
BlockManager* block_mgr_for_ut() {
fs::BlockManagerOptions bm_opts;
bm_opts.read_only = false;
return file_block_mgr(Env::Default(), bm_opts);
}
} // namespace fs_util
} // namespace fs
} // namespace doris

View File

@ -28,6 +28,9 @@ namespace fs_util {
// method for each type(instead of a factory method which require same params)
BlockManager* file_block_mgr(Env* env, BlockManagerOptions opts);
// For UnitTest.
BlockManager* block_mgr_for_ut();
} // namespace fs_util
} // namespace fs
} // namespace doris

View File

@ -17,13 +17,13 @@
#include "olap/rowset/beta_rowset_writer.h"
#include <cmath> // lround
#include <cstdio> // remove
#include <cstring> // strerror_r
#include <ctime> // time
#include "common/config.h"
#include "common/logging.h"
#include "env/env.h"
#include "gutil/strings/substitute.h"
#include "olap/fs/fs_util.h"
#include "olap/olap_define.h"
#include "olap/rowset/beta_rowset.h"
#include "olap/rowset/rowset_factory.h"
@ -33,28 +33,31 @@
namespace doris {
BetaRowsetWriter::BetaRowsetWriter()
: _rowset_meta(nullptr),
_num_segment(0),
_segment_writer(nullptr),
_num_rows_written(0),
_total_data_size(0),
_total_index_size(0) {
auto size = static_cast<double>(OLAP_MAX_COLUMN_SEGMENT_FILE_SIZE);
size *= OLAP_COLUMN_FILE_SEGMENT_SIZE_SCALE;
_max_segment_size = static_cast<uint32_t>(lround(size));
}
// TODO(lingbin): Should be a conf that can be dynamically adjusted, or a member in the context
const uint32_t MAX_SEGMENT_SIZE = static_cast<uint32_t>(
OLAP_MAX_COLUMN_SEGMENT_FILE_SIZE * OLAP_COLUMN_FILE_SEGMENT_SIZE_SCALE);
BetaRowsetWriter::BetaRowsetWriter() :
_rowset_meta(nullptr),
_num_segment(0),
_segment_writer(nullptr),
_num_rows_written(0),
_total_data_size(0),
_total_index_size(0) {}
BetaRowsetWriter::~BetaRowsetWriter() {
if (!_rowset_build) { // abnormal exit, remove all files generated
_segment_writer.reset(nullptr); // ensure all files are closed
// TODO(lingbin): Should wrapper exception logic, no need to know file ops directly.
if (!_already_built) { // abnormal exit, remove all files generated
_segment_writer.reset(); // ensure all files are closed
Status st;
for (int i = 0; i < _num_segment; ++i) {
auto path = BetaRowset::segment_file_path(_context.rowset_path_prefix, _context.rowset_id, i);
if (::remove(path.c_str()) != 0) {
char errmsg[64];
LOG(WARNING) << "failed to delete file. err=" << strerror_r(errno, errmsg, 64)
<< ", path=" << path;
}
auto path = BetaRowset::segment_file_path(
_context.rowset_path_prefix, _context.rowset_id, i);
// Even if an error is encountered, these files that have not been cleaned up
// will be cleaned up by the GC background. So here we only print the error
// message when we encounter an error.
WARN_IF_ERROR(Env::Default()->delete_file(path),
strings::Substitute("Failed to delete file=$0", path));
}
}
}
@ -93,11 +96,11 @@ OLAPStatus BetaRowsetWriter::_add_row(const RowType& row) {
LOG(WARNING) << "failed to append row: " << s.to_string();
return OLAP_ERR_WRITER_DATA_WRITE_ERROR;
}
if (PREDICT_FALSE(_segment_writer->estimate_segment_size() >= _max_segment_size ||
_segment_writer->num_rows_written() >= _context.max_rows_per_segment)) {
if (PREDICT_FALSE(_segment_writer->estimate_segment_size() >= MAX_SEGMENT_SIZE
|| _segment_writer->num_rows_written() >= _context.max_rows_per_segment)) {
RETURN_NOT_OK(_flush_segment_writer());
}
_num_rows_written++;
++_num_rows_written;
return OLAP_SUCCESS;
}
@ -132,6 +135,13 @@ OLAPStatus BetaRowsetWriter::flush() {
}
RowsetSharedPtr BetaRowsetWriter::build() {
// TODO(lingbin): move to more better place, or in a CreateBlockBatch?
for (auto& wblock : _wblocks) {
wblock->close();
}
// When building a rowset, we must ensure that the current _segment_writer has been
// flushed, that is, the current _segment_wirter is nullptr
DCHECK(_segment_writer == nullptr) << "segment must be null when build rowset";
_rowset_meta->set_num_rows(_num_rows_written);
_rowset_meta->set_total_disk_size(_total_data_size);
_rowset_meta->set_data_disk_size(_total_data_size);
@ -158,15 +168,35 @@ RowsetSharedPtr BetaRowsetWriter::build() {
LOG(WARNING) << "rowset init failed when build new rowset, res=" << status;
return nullptr;
}
_rowset_build = true;
_already_built = true;
return rowset;
}
OLAPStatus BetaRowsetWriter::_create_segment_writer() {
auto path = BetaRowset::segment_file_path(_context.rowset_path_prefix, _context.rowset_id, _num_segment);
auto path = BetaRowset::segment_file_path(_context.rowset_path_prefix,
_context.rowset_id,
_num_segment);
// TODO(lingbin): should use a more general way to get BlockManager object
// and tablets with the same type should share one BlockManager object;
fs::BlockManagerOptions bm_opts;
bm_opts.read_only = false;
fs::BlockManager* block_mgr = fs::fs_util::file_block_mgr(Env::Default(), bm_opts);
std::unique_ptr<fs::WritableBlock> wblock;
fs::CreateBlockOptions opts({path});
DCHECK(block_mgr != nullptr);
Status st = block_mgr->create_block(opts, &wblock);
if (!st.ok()) {
LOG(WARNING) << "failed to create writable block. path=" << path;
return OLAP_ERR_INIT_FAILED;
}
DCHECK(wblock != nullptr);
segment_v2::SegmentWriterOptions writer_options;
writer_options.whether_to_filter_value = _context.version.first == 0;
_segment_writer.reset(new segment_v2::SegmentWriter(path, _num_segment, _context.tablet_schema, writer_options));
_segment_writer.reset(new segment_v2::SegmentWriter(
wblock.get(), _num_segment, _context.tablet_schema, writer_options));
_wblocks.push_back(std::move(wblock));
// TODO set write_mbytes_per_sec based on writer type (load/base compaction/cumulative compaction)
auto s = _segment_writer->init(config::push_write_mbytes_per_sec);
if (!s.ok()) {
@ -174,21 +204,21 @@ OLAPStatus BetaRowsetWriter::_create_segment_writer() {
_segment_writer.reset(nullptr);
return OLAP_ERR_INIT_FAILED;
}
_num_segment++;
++_num_segment;
return OLAP_SUCCESS;
}
OLAPStatus BetaRowsetWriter::_flush_segment_writer() {
uint64_t segment_size;
uint64_t index_size;
auto s = _segment_writer->finalize(&segment_size, &index_size);
Status s = _segment_writer->finalize(&segment_size, &index_size);
if (!s.ok()) {
LOG(WARNING) << "failed to finalize segment: " << s.to_string();
return OLAP_ERR_WRITER_DATA_WRITE_ERROR;
}
_total_data_size += segment_size;
_total_index_size += index_size;
_segment_writer.reset(nullptr);
_segment_writer.reset();
return OLAP_SUCCESS;
}

View File

@ -18,10 +18,16 @@
#ifndef DORIS_BE_SRC_OLAP_ROWSET_BETA_ROWSET_WRITER_H
#define DORIS_BE_SRC_OLAP_ROWSET_BETA_ROWSET_WRITER_H
#include "vector"
#include "olap/rowset/rowset_writer.h"
namespace doris {
namespace fs {
class WritableBlock;
}
namespace segment_v2 {
class SegmentWriter;
} // namespace segment_v2
@ -37,6 +43,7 @@ public:
OLAPStatus add_row(const RowCursor& row) override {
return _add_row(row);
}
// For Memtable::flush()
OLAPStatus add_row(const ContiguousRow& row) override {
return _add_row(row);
}
@ -45,7 +52,7 @@ public:
OLAPStatus add_rowset(RowsetSharedPtr rowset) override;
OLAPStatus add_rowset_for_linked_schema_change(
RowsetSharedPtr rowset, const SchemaMapping& schema_mapping) override;
RowsetSharedPtr rowset, const SchemaMapping& schema_mapping) override;
OLAPStatus flush() override;
@ -70,8 +77,9 @@ private:
std::shared_ptr<RowsetMeta> _rowset_meta;
int _num_segment;
uint32_t _max_segment_size;
std::unique_ptr<segment_v2::SegmentWriter> _segment_writer;
// TODO(lingbin): it is better to wrapper in a Batch?
std::vector<std::unique_ptr<fs::WritableBlock>> _wblocks;
// counters and statistics maintained during data write
int64_t _num_rows_written;
@ -80,7 +88,7 @@ private:
// TODO rowset's Zonemap
bool _is_pending = false;
bool _rowset_build = false;
bool _already_built = false;
};
} // namespace doris

View File

@ -125,4 +125,4 @@ private:
};
} // namespace segment_v2
} // namespace doris
} // namespace doris

View File

@ -100,7 +100,7 @@ public:
_rid += count;
}
Status finish(WritableFile* file, ColumnIndexMetaPB* index_meta) override {
Status finish(fs::WritableBlock* wblock, ColumnIndexMetaPB* index_meta) override {
index_meta->set_type(BITMAP_INDEX);
BitmapIndexPB* meta = index_meta->mutable_bitmap_index();
@ -114,7 +114,7 @@ public:
options.encoding = EncodingInfo::get_default_encoding(_typeinfo, true);
options.compression = LZ4F;
IndexedColumnWriter dict_column_writer(options, _typeinfo, file);
IndexedColumnWriter dict_column_writer(options, _typeinfo, wblock);
RETURN_IF_ERROR(dict_column_writer.init());
for (auto const& it : _mem_index) {
RETURN_IF_ERROR(dict_column_writer.add(&(it.first)));
@ -150,7 +150,7 @@ public:
// we already store compressed bitmap, use NO_COMPRESSION to save some cpu
options.compression = NO_COMPRESSION;
IndexedColumnWriter bitmap_column_writer(options, bitmap_typeinfo, file);
IndexedColumnWriter bitmap_column_writer(options, bitmap_typeinfo, wblock);
RETURN_IF_ERROR(bitmap_column_writer.init());
faststring buf;

View File

@ -29,6 +29,10 @@ namespace doris {
class TypeInfo;
class WritableFile;
namespace fs {
class WritableBlock;
}
namespace segment_v2 {
class BitmapIndexWriter {
@ -42,7 +46,7 @@ public:
virtual void add_nulls(uint32_t count) = 0;
virtual Status finish(WritableFile* file, ColumnIndexMetaPB* index_meta) = 0;
virtual Status finish(fs::WritableBlock* file, ColumnIndexMetaPB* index_meta) = 0;
virtual uint64_t size() const = 0;
private:

View File

@ -21,6 +21,7 @@
#include <roaring/roaring.hh>
#include "env/env.h"
#include "olap/fs/block_manager.h"
#include "olap/rowset/segment_v2/common.h"
#include "olap/rowset/segment_v2/encoding_info.h"
#include "olap/rowset/segment_v2/indexed_column_writer.h"
@ -100,7 +101,7 @@ public:
_has_null = true;
}
Status flush() override {
Status flush() override {
std::unique_ptr<BloomFilter> bf;
RETURN_IF_ERROR(BloomFilter::create(BLOCK_BLOOM_FILTER, &bf));
RETURN_IF_ERROR(bf->init(_values.size(), _bf_options.fpp, _bf_options.strategy));
@ -119,7 +120,7 @@ public:
return Status::OK();
}
Status finish(WritableFile* file, ColumnIndexMetaPB* index_meta) override {
Status finish(fs::WritableBlock* wblock, ColumnIndexMetaPB* index_meta) override {
if (_values.size() > 0) {
RETURN_IF_ERROR(flush());
}
@ -134,7 +135,7 @@ public:
options.write_ordinal_index = true;
options.write_value_index = false;
options.encoding = PLAIN_ENCODING;
IndexedColumnWriter bf_writer(options, bf_typeinfo, file);
IndexedColumnWriter bf_writer(options, bf_typeinfo, wblock);
RETURN_IF_ERROR(bf_writer.init());
for (auto& bf : _bfs) {
Slice data(bf->data(), bf->size());

View File

@ -29,6 +29,10 @@ namespace doris {
class TypeInfo;
class WritableFile;
namespace fs {
class WritableBlock;
}
namespace segment_v2 {
class BloomFilterOptions;
@ -47,7 +51,7 @@ public:
virtual Status flush() = 0;
virtual Status finish(WritableFile* file, ColumnIndexMetaPB* index_meta) = 0;
virtual Status finish(fs::WritableBlock* wblock, ColumnIndexMetaPB* index_meta) = 0;
virtual uint64_t size() = 0;
private:

View File

@ -22,6 +22,7 @@
#include "common/logging.h"
#include "env/env.h"
#include "gutil/strings/substitute.h"
#include "olap/fs/block_manager.h"
#include "olap/rowset/segment_v2/bitmap_index_writer.h"
#include "olap/rowset/segment_v2/bloom_filter.h"
#include "olap/rowset/segment_v2/bloom_filter_index_writer.h"
@ -78,10 +79,10 @@ private:
ColumnWriter::ColumnWriter(const ColumnWriterOptions& opts,
std::unique_ptr<Field> field,
WritableFile* output_file) :
fs::WritableBlock* wblock) :
_opts(opts),
_field(std::move(field)),
_output_file(output_file),
_wblock(wblock),
_is_nullable(_opts.meta->is_nullable()),
_data_size(0) {
// these opts.meta fields should be set by client
@ -92,6 +93,7 @@ ColumnWriter::ColumnWriter(const ColumnWriterOptions& opts,
DCHECK(opts.meta->has_encoding());
DCHECK(opts.meta->has_compression());
DCHECK(opts.meta->has_is_nullable());
DCHECK(wblock != nullptr);
}
ColumnWriter::~ColumnWriter() {
@ -264,7 +266,7 @@ Status ColumnWriter::write_data() {
PagePointer dict_pp;
RETURN_IF_ERROR(PageIO::compress_and_write_page(
_compress_codec, _opts.compression_min_space_saving, _output_file,
_compress_codec, _opts.compression_min_space_saving, _wblock,
{ dict_body.slice() }, footer, &dict_pp));
dict_pp.to_proto(_opts.meta->mutable_dict_page());
}
@ -272,26 +274,26 @@ Status ColumnWriter::write_data() {
}
Status ColumnWriter::write_ordinal_index() {
return _ordinal_index_builder->finish(_output_file, _opts.meta->add_indexes());
return _ordinal_index_builder->finish(_wblock, _opts.meta->add_indexes());
}
Status ColumnWriter::write_zone_map() {
if (_opts.need_zone_map) {
return _zone_map_index_builder->finish(_output_file, _opts.meta->add_indexes());
return _zone_map_index_builder->finish(_wblock, _opts.meta->add_indexes());
}
return Status::OK();
}
Status ColumnWriter::write_bitmap_index() {
if (_opts.need_bitmap_index) {
return _bitmap_index_builder->finish(_output_file, _opts.meta->add_indexes());
return _bitmap_index_builder->finish(_wblock, _opts.meta->add_indexes());
}
return Status::OK();
}
Status ColumnWriter::write_bloom_filter_index() {
if (_opts.need_bloom_filter) {
return _bloom_filter_index_builder->finish(_output_file, _opts.meta->add_indexes());
return _bloom_filter_index_builder->finish(_wblock, _opts.meta->add_indexes());
}
return Status::OK();
}
@ -303,7 +305,7 @@ Status ColumnWriter::_write_data_page(Page* page) {
for (auto& data : page->data) {
compressed_body.push_back(data.slice());
}
RETURN_IF_ERROR(PageIO::write_page(_output_file, compressed_body, page->footer, &pp));
RETURN_IF_ERROR(PageIO::write_page(_wblock, compressed_body, page->footer, &pp));
_ordinal_index_builder->append_entry(page->footer.data_page_footer().first_ordinal(), pp);
return Status::OK();
}

View File

@ -32,6 +32,10 @@ class TypeInfo;
class WritableFile;
class BlockCompressionCodec;
namespace fs {
class WritableBlock;
}
namespace segment_v2 {
struct ColumnWriterOptions {
@ -64,7 +68,7 @@ class ColumnWriter {
public:
ColumnWriter(const ColumnWriterOptions& opts,
std::unique_ptr<Field> field,
WritableFile* output_file);
fs::WritableBlock* output_file);
~ColumnWriter();
Status init();
@ -145,7 +149,7 @@ private:
private:
ColumnWriterOptions _opts;
std::unique_ptr<Field> _field;
WritableFile* _output_file;
fs::WritableBlock* _wblock = nullptr;
bool _is_nullable;
// total size of data page list
uint64_t _data_size;

View File

@ -21,6 +21,7 @@
#include "common/logging.h"
#include "env/env.h"
#include "olap/fs/block_manager.h"
#include "olap/rowset/segment_v2/encoding_info.h"
#include "olap/rowset/segment_v2/index_page.h"
#include "olap/rowset/segment_v2/options.h"
@ -37,10 +38,10 @@ namespace segment_v2 {
IndexedColumnWriter::IndexedColumnWriter(const IndexedColumnWriterOptions& options,
const TypeInfo* typeinfo,
WritableFile* output_file)
fs::WritableBlock* wblock)
: _options(options),
_typeinfo(typeinfo),
_file(output_file),
_wblock(wblock),
_mem_tracker(-1),
_mem_pool(&_mem_tracker),
_num_values(0),
@ -111,7 +112,7 @@ Status IndexedColumnWriter::_finish_current_data_page() {
footer.mutable_data_page_footer()->set_nullmap_size(0);
RETURN_IF_ERROR(PageIO::compress_and_write_page(
_compress_codec, _options.compression_min_space_saving, _file, { page_body.slice() },
_compress_codec, _options.compression_min_space_saving, _wblock, { page_body.slice() },
footer, &_last_data_page));
_num_data_pages++;
@ -159,7 +160,7 @@ Status IndexedColumnWriter::_flush_index(IndexPageBuilder* index_builder, BTreeM
PagePointer pp;
RETURN_IF_ERROR(PageIO::compress_and_write_page(
_compress_codec, _options.compression_min_space_saving, _file,
_compress_codec, _options.compression_min_space_saving, _wblock,
{ page_body.slice() }, page_footer, &pp));
meta->set_is_root_data_page(false);

View File

@ -37,6 +37,10 @@ class KeyCoder;
class TypeInfo;
class WritableFile;
namespace fs {
class WritableBlock;
}
namespace segment_v2 {
class IndexPageBuilder;
@ -69,7 +73,7 @@ class IndexedColumnWriter {
public:
explicit IndexedColumnWriter(const IndexedColumnWriterOptions& options,
const TypeInfo* typeinfo,
WritableFile* output_file);
fs::WritableBlock* wblock);
~IndexedColumnWriter();
@ -87,7 +91,7 @@ private:
IndexedColumnWriterOptions _options;
const TypeInfo* _typeinfo;
WritableFile* _file;
fs::WritableBlock* _wblock;
// only used for `_first_value`
MemTracker _mem_tracker;
MemPool _mem_pool;

View File

@ -19,6 +19,7 @@
#include "common/logging.h"
#include "env/env.h"
#include "olap/fs/block_manager.h"
#include "olap/key_coder.h"
#include "olap/rowset/segment_v2/page_handle.h"
#include "olap/rowset/segment_v2/page_io.h"
@ -34,8 +35,8 @@ void OrdinalIndexWriter::append_entry(ordinal_t ordinal, const PagePointer& data
_last_pp = data_pp;
}
Status OrdinalIndexWriter::finish(WritableFile* file, ColumnIndexMetaPB* meta) {
CHECK(_page_builder->count() > 0) << "no entry has been added, file=" << file->filename();
Status OrdinalIndexWriter::finish(fs::WritableBlock* wblock, ColumnIndexMetaPB* meta) {
CHECK(_page_builder->count() > 0) << "no entry has been added, filepath=" << wblock->path();
meta->set_type(ORDINAL_INDEX);
BTreeMetaPB* root_page_meta = meta->mutable_ordinal_index()->mutable_root_page();
@ -50,7 +51,7 @@ Status OrdinalIndexWriter::finish(WritableFile* file, ColumnIndexMetaPB* meta) {
// write index page (currently it's not compressed)
PagePointer pp;
RETURN_IF_ERROR(PageIO::write_page(file, { page_body.slice() }, page_footer, &pp));
RETURN_IF_ERROR(PageIO::write_page(wblock, { page_body.slice() }, page_footer, &pp));
root_page_meta->set_is_root_data_page(false);
pp.to_proto(root_page_meta->mutable_root_page());

View File

@ -31,7 +31,9 @@
namespace doris {
class WritableFile;
namespace fs {
class WritableBlock;
}
namespace segment_v2 {
@ -47,7 +49,7 @@ public:
uint64_t size() { return _page_builder->size(); }
Status finish(WritableFile* file, ColumnIndexMetaPB* meta);
Status finish(fs::WritableBlock* wblock, ColumnIndexMetaPB* meta);
private:
DISALLOW_COPY_AND_ASSIGN(OrdinalIndexWriter);

View File

@ -23,6 +23,7 @@
#include "common/logging.h"
#include "env/env.h"
#include "gutil/strings/substitute.h"
#include "olap/fs/block_manager.h"
#include "olap/page_cache.h"
#include "util/block_compression.h"
#include "util/coding.h"
@ -61,7 +62,7 @@ Status PageIO::compress_page_body(const BlockCompressionCodec* codec,
return Status::OK();
}
Status PageIO::write_page(WritableFile* file,
Status PageIO::write_page(fs::WritableBlock* wblock,
const std::vector<Slice>& body,
const PageFooterPB& footer,
PagePointer* result) {
@ -99,11 +100,11 @@ Status PageIO::write_page(WritableFile* file,
encode_fixed32_le(checksum_buf, checksum);
page.emplace_back(checksum_buf, sizeof(uint32_t));
uint64_t offset = file->size();
RETURN_IF_ERROR(file->appendv(&page[0], page.size()));
uint64_t offset = wblock->bytes_appended();
RETURN_IF_ERROR(wblock->appendv(&page[0], page.size()));
result->offset = offset;
result->size = file->size() - offset;
result->size = wblock->bytes_appended() - offset;
return Status::OK();
}
@ -205,4 +206,4 @@ Status PageIO::read_and_decompress_page(const PageReadOptions& opts,
}
} // namespace segment_v2
} // namespace doris
} // namespace doris

View File

@ -31,7 +31,9 @@ namespace doris {
class BlockCompressionCodec;
struct OlapReaderStatistics;
class RandomAccessFile;
class WritableFile;
namespace fs {
class WritableBlock;
}
namespace segment_v2 {
@ -80,7 +82,7 @@ public:
// Encode page from `body' and `footer' and write to `file'.
// `body' could be either uncompressed or compressed.
// On success, the file pointer to the written page is stored in `result'.
static Status write_page(WritableFile* file,
static Status write_page(fs::WritableBlock* wblock,
const std::vector<Slice>& body,
const PageFooterPB& footer,
PagePointer* result);
@ -88,7 +90,7 @@ public:
// Convenient function to compress page body and write page in one go.
static Status compress_and_write_page(const BlockCompressionCodec* codec,
double min_space_saving,
WritableFile* file,
fs::WritableBlock* wblock,
const std::vector<Slice>& body,
const PageFooterPB& footer,
PagePointer* result) {
@ -96,9 +98,9 @@ public:
OwnedSlice compressed_body;
RETURN_IF_ERROR(compress_page_body(codec, min_space_saving, body, &compressed_body));
if (compressed_body.slice().empty()) { // uncompressed
return write_page(file, body, footer, result);
return write_page(wblock, body, footer, result);
}
return write_page(file, { compressed_body.slice() }, footer, result);
return write_page(wblock, { compressed_body.slice() }, footer, result);
}
// Read and parse a page according to `opts'.

View File

@ -17,13 +17,17 @@
#include "olap/rowset/segment_v2/segment_writer.h"
#include "common/logging.h" // LOG
#include "env/env.h" // Env
#include "olap/fs/block_manager.h"
#include "olap/row.h" // ContiguousRow
#include "olap/row_cursor.h" // RowCursor
#include "olap/rowset/segment_v2/column_writer.h" // ColumnWriter
#include "olap/rowset/segment_v2/page_io.h"
#include "olap/short_key_index.h"
#include "olap/schema.h"
#include "util/crc32c.h"
#include "util/faststring.h"
namespace doris {
namespace segment_v2 {
@ -31,21 +35,20 @@ namespace segment_v2 {
const char* k_segment_magic = "D0R1";
const uint32_t k_segment_magic_length = 4;
SegmentWriter::SegmentWriter(std::string fname, uint32_t segment_id,
SegmentWriter::SegmentWriter(fs::WritableBlock* wblock,
uint32_t segment_id,
const TabletSchema* tablet_schema,
const SegmentWriterOptions& opts)
: _fname(std::move(fname)),
const SegmentWriterOptions& opts) :
_segment_id(segment_id),
_tablet_schema(tablet_schema),
_opts(opts) {
_opts(opts),
_wblock(wblock) {
CHECK_NOTNULL(_wblock);
}
SegmentWriter::~SegmentWriter() = default;
Status SegmentWriter::init(uint32_t write_mbytes_per_sec) {
// create for write
RETURN_IF_ERROR(Env::Default()->new_writable_file(_fname, &_output_file));
uint32_t column_id = 0;
for (auto& column : _tablet_schema->columns()) {
std::unique_ptr<Field> field(FieldFactory::create(column));
@ -90,7 +93,7 @@ Status SegmentWriter::init(uint32_t write_mbytes_per_sec) {
}
std::unique_ptr<ColumnWriter> writer(
new ColumnWriter(opts, std::move(field), _output_file.get()));
new ColumnWriter(opts, std::move(field), _wblock));
RETURN_IF_ERROR(writer->init());
_column_writers.push_back(std::move(writer));
}
@ -105,20 +108,26 @@ Status SegmentWriter::append_row(const RowType& row) {
RETURN_IF_ERROR(_column_writers[cid]->append(cell));
}
// At the begin of one block, so add a short key index entry
if ((_row_count % _opts.num_rows_per_block) == 0) {
std::string encoded_key;
encode_key(&encoded_key, row, _tablet_schema->num_short_key_columns());
RETURN_IF_ERROR(_index_builder->add_item(encoded_key));
}
_row_count++;
++_row_count;
return Status::OK();
}
template Status SegmentWriter::append_row(const RowCursor& row);
template Status SegmentWriter::append_row(const ContiguousRow& row);
// TODO(lingbin): Currently this function does not include the size of various indexes,
// We should make this more precise.
// NOTE: This function will be called when any row of data is added, so we need to
// make this function efficient.
uint64_t SegmentWriter::estimate_segment_size() {
uint64_t size = 8; //magic size
// magic size
uint64_t size = 12;
for (auto& column_writer : _column_writers) {
size += column_writer->estimate_buffer_size();
}
@ -131,16 +140,16 @@ Status SegmentWriter::finalize(uint64_t* segment_file_size, uint64_t* index_size
RETURN_IF_ERROR(column_writer->finish());
}
RETURN_IF_ERROR(_write_data());
uint64_t index_offset = _output_file->size();
uint64_t index_offset = _wblock->bytes_appended();
RETURN_IF_ERROR(_write_ordinal_index());
RETURN_IF_ERROR(_write_zone_map());
RETURN_IF_ERROR(_write_bitmap_index());
RETURN_IF_ERROR(_write_bloom_filter_index());
RETURN_IF_ERROR(_write_short_key_index());
*index_size = _output_file->size() - index_offset;
*index_size = _wblock->bytes_appended() - index_offset;
RETURN_IF_ERROR(_write_footer());
RETURN_IF_ERROR(_output_file->sync());
*segment_file_size = _output_file->size();
RETURN_IF_ERROR(_wblock->finalize());
*segment_file_size = _wblock->bytes_appended();
return Status::OK();
}
@ -187,7 +196,7 @@ Status SegmentWriter::_write_short_key_index() {
RETURN_IF_ERROR(_index_builder->finalize(_row_count, &body, &footer));
PagePointer pp;
// short key index page is not compressed right now
RETURN_IF_ERROR(PageIO::write_page(_output_file.get(), body, footer, &pp));
RETURN_IF_ERROR(PageIO::write_page(_wblock, body, footer, &pp));
pp.to_proto(_footer.mutable_short_key_index_page());
return Status::OK();
}
@ -201,13 +210,14 @@ Status SegmentWriter::_write_footer() {
return Status::InternalError("failed to serialize segment footer");
}
std::string fixed_buf;
faststring fixed_buf;
// footer's size
put_fixed32_le(&fixed_buf, footer_buf.size());
// footer's checksum
uint32_t checksum = crc32c::Value(footer_buf.data(), footer_buf.size());
put_fixed32_le(&fixed_buf, checksum);
// magic number. we don't write magic number in the header because that requires an extra seek when reading
// Append magic number. we don't write magic number in the header because
// that will need an extra seek when reading
fixed_buf.append(k_segment_magic, k_segment_magic_length);
std::vector<Slice> slices{footer_buf, fixed_buf};
@ -215,7 +225,7 @@ Status SegmentWriter::_write_footer() {
}
Status SegmentWriter::_write_raw_data(const std::vector<Slice>& slices) {
RETURN_IF_ERROR(_output_file->appendv(&slices[0], slices.size()));
RETURN_IF_ERROR(_wblock->appendv(&slices[0], slices.size()));
return Status::OK();
}

View File

@ -22,18 +22,21 @@
#include <string>
#include <vector>
#include "common/logging.h" // LOG
#include "common/status.h" // Status
#include "gen_cpp/segment_v2.pb.h"
#include "gutil/macros.h"
#include "olap/schema.h"
namespace doris {
class WritableFile;
class RowBlock;
class RowCursor;
class TabletSchema;
class ShortKeyIndexBuilder;
class WritableFile;
namespace fs {
class WritableBlock;
}
namespace segment_v2 {
@ -50,12 +53,12 @@ struct SegmentWriterOptions {
class SegmentWriter {
public:
explicit SegmentWriter(std::string file_name,
explicit SegmentWriter(fs::WritableBlock* block,
uint32_t segment_id,
const TabletSchema* tablet_schema,
const SegmentWriterOptions& opts);
~SegmentWriter();
Status init(uint32_t write_mbytes_per_sec);
template<typename RowType>
@ -79,14 +82,15 @@ private:
Status _write_raw_data(const std::vector<Slice>& slices);
private:
std::string _fname;
uint32_t _segment_id;
const TabletSchema* _tablet_schema;
SegmentWriterOptions _opts;
// Not owned. owned by RowsetWriter
fs::WritableBlock* _wblock;
SegmentFooterPB _footer;
std::unique_ptr<ShortKeyIndexBuilder> _index_builder;
std::unique_ptr<WritableFile> _output_file;
std::vector<std::unique_ptr<ColumnWriter>> _column_writers;
uint32_t _row_count = 0;
};

View File

@ -18,6 +18,7 @@
#include "olap/rowset/segment_v2/zone_map_index.h"
#include "olap/column_block.h"
#include "olap/fs/block_manager.h"
#include "olap/olap_define.h"
#include "olap/rowset/segment_v2/encoding_info.h"
#include "olap/rowset/segment_v2/indexed_column_reader.h"
@ -84,7 +85,7 @@ Status ZoneMapIndexWriter::flush() {
return Status::OK();
}
Status ZoneMapIndexWriter::finish(WritableFile* file, ColumnIndexMetaPB* index_meta) {
Status ZoneMapIndexWriter::finish(fs::WritableBlock* wblock, ColumnIndexMetaPB* index_meta) {
index_meta->set_type(ZONE_MAP_INDEX);
ZoneMapIndexPB* meta = index_meta->mutable_zone_map_index();
// store segment zone map
@ -98,7 +99,7 @@ Status ZoneMapIndexWriter::finish(WritableFile* file, ColumnIndexMetaPB* index_m
options.encoding = EncodingInfo::get_default_encoding(typeinfo, false);
options.compression = NO_COMPRESSION; // currently not compressed
IndexedColumnWriter writer(options, typeinfo, file);
IndexedColumnWriter writer(options, typeinfo, wblock);
RETURN_IF_ERROR(writer.init());
for (auto& value : _values) {

View File

@ -31,7 +31,9 @@
namespace doris {
class WritableFile;
namespace fs {
class WritableBlock;
}
namespace segment_v2 {
@ -75,7 +77,7 @@ public:
// mark the end of one data page so that we can finalize the corresponding zone map
Status flush();
Status finish(WritableFile* file, ColumnIndexMetaPB* index_meta);
Status finish(fs::WritableBlock* wblock, ColumnIndexMetaPB* index_meta);
uint64_t size() { return _estimated_size; }

View File

@ -112,7 +112,7 @@ void create_tablet_schema(KeysType keys_type, TabletSchema* tablet_schema) {
column_2->set_is_key(true);
column_2->set_is_nullable(false);
column_2->set_is_bf_column(false);
ColumnPB* column_3 = tablet_schema_pb.add_column();
column_3->set_unique_id(3);
column_3->set_name("v1");
@ -223,7 +223,7 @@ void RowsetConverterTest::process(RowsetTypePB src_type, RowsetTypePB dst_type)
ASSERT_EQ(OLAP_SUCCESS, rowset_converter.convert_beta_to_alpha(
src_rowset->rowset_meta(), _schema_hash_path, &dst_rowset_meta_pb));
}
ASSERT_EQ(dst_type, dst_rowset_meta_pb.rowset_type());
ASSERT_EQ(12345, dst_rowset_meta_pb.tablet_id());
ASSERT_EQ(1024, dst_rowset_meta_pb.num_rows());
@ -234,7 +234,7 @@ void RowsetConverterTest::process(RowsetTypePB src_type, RowsetTypePB dst_type)
RowsetSharedPtr dst_rowset;
ASSERT_EQ(OLAP_SUCCESS, RowsetFactory::create_rowset(&tablet_schema,
_schema_hash_path, dst_rowset_meta, &dst_rowset));
RowsetReaderSharedPtr dst_rowset_reader;
ASSERT_EQ(OLAP_SUCCESS, dst_rowset->create_reader(&dst_rowset_reader));
RowsetReaderContext rowset_reader_context;

View File

@ -24,11 +24,13 @@
#include "common/logging.h"
#include "env/env.h"
#include "olap/fs/block_manager.h"
#include "olap/fs/fs_util.h"
#include "olap/olap_common.h"
#include "olap/types.h"
#include "util/file_utils.h"
#include "runtime/mem_tracker.h"
#include "runtime/mem_pool.h"
#include "util/file_utils.h"
namespace doris {
namespace segment_v2 {
@ -61,14 +63,17 @@ void write_index_file(std::string& filename, const void* values,
ColumnIndexMetaPB* meta) {
const TypeInfo* type_info = get_type_info(type);
{
std::unique_ptr<WritableFile> wfile;
ASSERT_TRUE(Env::Default()->new_writable_file(filename, &wfile).ok());
std::unique_ptr<fs::WritableBlock> wblock;
fs::CreateBlockOptions opts({ filename });
ASSERT_TRUE(fs::fs_util::block_mgr_for_ut()->create_block(opts, &wblock).ok());
std::unique_ptr<BitmapIndexWriter> writer;
BitmapIndexWriter::create(type_info, &writer);
writer->add_values(values, value_count);
writer->add_nulls(null_count);
ASSERT_TRUE(writer->finish(wfile.get(), meta).ok());
ASSERT_TRUE(writer->finish(wblock.get(), meta).ok());
ASSERT_EQ(BITMAP_INDEX, meta->type());
ASSERT_TRUE(wblock->close().ok());
}
}
@ -231,4 +236,4 @@ TEST_F(BitmapIndexTest, test_null) {
int main(int argc, char** argv) {
::testing::InitGoogleTest(&argc, argv);
return RUN_ALL_TESTS();
}
}

View File

@ -24,6 +24,8 @@
#include "common/logging.h"
#include "env/env.h"
#include "olap/fs/block_manager.h"
#include "olap/fs/fs_util.h"
#include "olap/olap_common.h"
#include "olap/types.h"
#include "util/file_utils.h"
@ -42,16 +44,18 @@ const std::string dname = "./ut_dir/bloom_filter_index_reader_writer_test";
template<FieldType type>
void write_bloom_filter_index_file(const std::string& file_name, const void* values,
size_t value_count, size_t null_count,
ColumnIndexMetaPB* index_meta) {
size_t value_count, size_t null_count,
ColumnIndexMetaPB* index_meta) {
const TypeInfo* type_info = get_type_info(type);
using CppType = typename CppTypeTraits<type>::CppType;
FileUtils::create_dir(dname);
std::string fname = dname + "/" + file_name;
{
std::unique_ptr<WritableFile> wfile;
auto st = Env::Default()->new_writable_file(fname, &wfile);
std::unique_ptr<fs::WritableBlock> wblock;
fs::CreateBlockOptions opts({ fname });
Status st = fs::fs_util::block_mgr_for_ut()->create_block(opts, &wblock);
ASSERT_TRUE(st.ok());
std::unique_ptr<BloomFilterIndexWriter> bloom_filter_index_writer;
BloomFilterOptions bf_options;
BloomFilterIndexWriter::create(bf_options, type_info, &bloom_filter_index_writer);
@ -67,8 +71,9 @@ void write_bloom_filter_index_file(const std::string& file_name, const void* val
ASSERT_TRUE(st.ok());
i += 1024;
}
st = bloom_filter_index_writer->finish(wfile.get(), index_meta);
st = bloom_filter_index_writer->finish(wblock.get(), index_meta);
ASSERT_TRUE(st.ok()) << "writer finish status:" << st.to_string();
ASSERT_TRUE(wblock->close().ok());
ASSERT_EQ(BLOOM_FILTER_INDEX, index_meta->type());
ASSERT_EQ(bf_options.strategy, index_meta->bloom_filter_index().hash_strategy());
}
@ -273,4 +278,5 @@ TEST_F(BloomFilterIndexReaderWriterTest, test_decimal) {
int main(int argc, char** argv) {
::testing::InitGoogleTest(&argc, argv);
return RUN_ALL_TESTS();
}
}

View File

@ -25,12 +25,14 @@
#include "common/logging.h"
#include "env/env.h"
#include "olap/column_block.h"
#include "olap/fs/block_manager.h"
#include "olap/fs/fs_util.h"
#include "olap/olap_common.h"
#include "olap/types.h"
#include "olap/column_block.h"
#include "util/file_utils.h"
#include "runtime/mem_tracker.h"
#include "runtime/mem_pool.h"
#include "util/file_utils.h"
using std::string;
@ -74,9 +76,10 @@ void test_nullable_data(uint8_t* src_data, uint8_t* src_is_null, int num_rows, s
// write data
string fname = TEST_DIR + "/" + test_name;
{
std::unique_ptr<WritableFile> wfile;
auto st = Env::Default()->new_writable_file(fname, &wfile);
ASSERT_TRUE(st.ok());
std::unique_ptr<fs::WritableBlock> wblock;
fs::CreateBlockOptions opts({ fname });
Status st = fs::fs_util::block_mgr_for_ut()->create_block(opts, &wblock);
ASSERT_TRUE(st.ok()) << st.get_error_msg();
ColumnWriterOptions writer_opts;
writer_opts.meta = &meta;
@ -100,7 +103,7 @@ void test_nullable_data(uint8_t* src_data, uint8_t* src_is_null, int num_rows, s
column = create_char_key(1);
}
std::unique_ptr<Field> field(FieldFactory::create(column));
ColumnWriter writer(writer_opts, std::move(field), wfile.get());
ColumnWriter writer(writer_opts, std::move(field), wblock.get());
st = writer.init();
ASSERT_TRUE(st.ok()) << st.to_string();
@ -109,18 +112,13 @@ void test_nullable_data(uint8_t* src_data, uint8_t* src_is_null, int num_rows, s
ASSERT_TRUE(st.ok());
}
st = writer.finish();
ASSERT_TRUE(st.ok());
st = writer.write_data();
ASSERT_TRUE(st.ok());
st = writer.write_ordinal_index();
ASSERT_TRUE(st.ok());
st = writer.write_zone_map();
ASSERT_TRUE(st.ok());
ASSERT_TRUE(writer.finish().ok());
ASSERT_TRUE(writer.write_data().ok());
ASSERT_TRUE(writer.write_ordinal_index().ok());
ASSERT_TRUE(writer.write_zone_map().ok());
// close the file
wfile.reset();
ASSERT_TRUE(wblock->close().ok());
}
// read and check
{

View File

@ -24,6 +24,7 @@
#include "common/logging.h"
#include "env/env.h"
#include "olap/fs/fs_util.h"
#include "util/file_utils.h"
namespace doris {
@ -58,11 +59,14 @@ TEST_F(OrdinalPageIndexTest, normal) {
}
ColumnIndexMetaPB index_meta;
{
std::unique_ptr<WritableFile> out_file;
ASSERT_TRUE(Env::Default()->new_writable_file(filename, &out_file).ok());
ASSERT_TRUE(builder.finish(out_file.get(), &index_meta).ok());
std::unique_ptr<fs::WritableBlock> wblock;
fs::CreateBlockOptions opts({ filename });
ASSERT_TRUE(fs::fs_util::block_mgr_for_ut()->create_block(opts, &wblock).ok());
ASSERT_TRUE(builder.finish(wblock.get(), &index_meta).ok());
ASSERT_EQ(ORDINAL_INDEX, index_meta.type());
ASSERT_FALSE(index_meta.ordinal_index().root_page().is_root_data_page());
ASSERT_TRUE(wblock->close().ok());
LOG(INFO) << "index page size="
<< index_meta.ordinal_index().root_page().root_page().size();
}

View File

@ -27,17 +27,20 @@
#include "common/logging.h"
#include "gutil/strings/substitute.h"
#include "olap/comparison_predicate.h"
#include "olap/fs/block_manager.h"
#include "olap/fs/fs_util.h"
#include "olap/in_list_predicate.h"
#include "olap/olap_common.h"
#include "olap/row_cursor.h"
#include "olap/tablet_schema.h"
#include "olap/row_block.h"
#include "olap/row_block2.h"
#include "olap/types.h"
#include "olap/rowset/segment_v2/segment_writer.h"
#include "olap/tablet_schema.h"
#include "olap/tablet_schema_helper.h"
#include "util/file_utils.h"
#include "olap/types.h"
#include "runtime/mem_pool.h"
#include "runtime/mem_tracker.h"
#include "util/file_utils.h"
namespace doris {
namespace segment_v2 {
@ -105,8 +108,12 @@ protected:
// must use unique filename for each segment, otherwise page cache kicks in and produces
// the wrong answer (it use (filename,offset) as cache key)
string filename = strings::Substitute("$0/seg_$1.dat", kSegmentDir, seg_id++);
SegmentWriter writer(filename, 0, &build_schema, opts);
auto st = writer.init(10);
std::unique_ptr<fs::WritableBlock> wblock;
fs::CreateBlockOptions block_opts({ filename });
Status st = fs::fs_util::block_mgr_for_ut()->create_block(block_opts, &wblock);
ASSERT_TRUE(st.ok());
SegmentWriter writer(wblock.get(), 0, &build_schema, opts);
st = writer.init(10);
ASSERT_TRUE(st.ok());
RowCursor row;
@ -125,6 +132,7 @@ protected:
uint64_t file_size, index_size;
st = writer.finalize(&file_size, &index_size);
ASSERT_TRUE(st.ok());
ASSERT_TRUE(wblock->close().ok());
st = Segment::open(filename, 0, &query_schema, res);
ASSERT_TRUE(st.ok());
@ -599,8 +607,12 @@ TEST_F(SegmentReaderWriterTest, estimate_segment_size) {
opts.num_rows_per_block = num_rows_per_block;
std::string fname = dname + "/int_case";
SegmentWriter writer(fname, 0, tablet_schema.get(), opts);
auto st = writer.init(10);
std::unique_ptr<fs::WritableBlock> wblock;
fs::CreateBlockOptions wblock_opts({ fname });
Status st = fs::fs_util::block_mgr_for_ut()->create_block(wblock_opts, &wblock);
ASSERT_TRUE(st.ok());
SegmentWriter writer(wblock.get(), 0, tablet_schema.get(), opts);
st = writer.init(10);
ASSERT_TRUE(st.ok());
RowCursor row;
@ -624,9 +636,8 @@ TEST_F(SegmentReaderWriterTest, estimate_segment_size) {
uint64_t file_size = 0;
uint64_t index_size;
st = writer.finalize(&file_size, &index_size);
ASSERT_TRUE(st.ok());
ASSERT_TRUE(writer.finalize(&file_size, &index_size).ok());
ASSERT_TRUE(wblock->close().ok());
file_size = boost::filesystem::file_size(fname);
LOG(INFO) << "segment file size is:" << file_size;
@ -762,9 +773,12 @@ TEST_F(SegmentReaderWriterTest, TestStringDict) {
opts.num_rows_per_block = num_rows_per_block;
std::string fname = dname + "/string_case";
SegmentWriter writer(fname, 0, tablet_schema.get(), opts);
auto st = writer.init(10);
std::unique_ptr<fs::WritableBlock> wblock;
fs::CreateBlockOptions wblock_opts({ fname });
Status st = fs::fs_util::block_mgr_for_ut()->create_block(wblock_opts, &wblock);
ASSERT_TRUE(st.ok());
SegmentWriter writer(wblock.get(), 0, tablet_schema.get(), opts);
st = writer.init(10);
ASSERT_TRUE(st.ok());
RowCursor row;
@ -787,8 +801,8 @@ TEST_F(SegmentReaderWriterTest, TestStringDict) {
uint64_t file_size = 0;
uint64_t index_size;
st = writer.finalize(&file_size, &index_size);
ASSERT_TRUE(st.ok());
ASSERT_TRUE(writer.finalize(&file_size, &index_size).ok());
ASSERT_TRUE(wblock->close().ok());
{
std::shared_ptr<Segment> segment;

View File

@ -21,6 +21,8 @@
#include <string>
#include "env/env.h"
#include "olap/fs/block_manager.h"
#include "olap/fs/fs_util.h"
#include "olap/rowset/segment_v2/zone_map_index.h"
#include "olap/tablet_schema_helper.h"
#include "util/file_utils.h"
@ -68,13 +70,14 @@ public:
// write out zone map index
ColumnIndexMetaPB index_meta;
{
std::unique_ptr<WritableFile> out_file;
ASSERT_TRUE(Env::Default()->new_writable_file(filename, &out_file).ok());
ASSERT_TRUE(builder.finish(out_file.get(), &index_meta).ok());
std::unique_ptr<fs::WritableBlock> wblock;
fs::CreateBlockOptions opts({ filename });
ASSERT_TRUE(fs::fs_util::block_mgr_for_ut()->create_block(opts, &wblock).ok());
ASSERT_TRUE(builder.finish(wblock.get(), &index_meta).ok());
ASSERT_EQ(ZONE_MAP_INDEX, index_meta.type());
ASSERT_TRUE(wblock->close().ok());
}
ZoneMapIndexReader column_zone_map(filename, &index_meta.zone_map_index());
Status status = column_zone_map.load(true, false);
ASSERT_TRUE(status.ok());
@ -120,10 +123,12 @@ TEST_F(ColumnZoneMapTest, NormalTestIntPage) {
// write out zone map index
ColumnIndexMetaPB index_meta;
{
std::unique_ptr<WritableFile> out_file;
ASSERT_TRUE(Env::Default()->new_writable_file(filename, &out_file).ok());
ASSERT_TRUE(builder.finish(out_file.get(), &index_meta).ok());
std::unique_ptr<fs::WritableBlock> wblock;
fs::CreateBlockOptions opts({ filename });
ASSERT_TRUE(fs::fs_util::block_mgr_for_ut()->create_block(opts, &wblock).ok());
ASSERT_TRUE(builder.finish(wblock.get(), &index_meta).ok());
ASSERT_EQ(ZONE_MAP_INDEX, index_meta.type());
ASSERT_TRUE(wblock->close().ok());
}
ZoneMapIndexReader column_zone_map(filename, &index_meta.zone_map_index());