diff --git a/be/src/olap/fs/fs_util.cpp b/be/src/olap/fs/fs_util.cpp index aec73f23fd..9658e7cda0 100644 --- a/be/src/olap/fs/fs_util.cpp +++ b/be/src/olap/fs/fs_util.cpp @@ -18,6 +18,7 @@ #include "olap/fs/fs_util.h" #include "common/status.h" +#include "env/env.h" #include "olap/fs/file_block_manager.h" namespace doris { @@ -28,6 +29,12 @@ BlockManager* file_block_mgr(Env* env, BlockManagerOptions opts) { return new FileBlockManager(env, std::move(opts)); } +BlockManager* block_mgr_for_ut() { + fs::BlockManagerOptions bm_opts; + bm_opts.read_only = false; + return file_block_mgr(Env::Default(), bm_opts); +} + } // namespace fs_util } // namespace fs } // namespace doris diff --git a/be/src/olap/fs/fs_util.h b/be/src/olap/fs/fs_util.h index fc70797e0b..060409ffd2 100644 --- a/be/src/olap/fs/fs_util.h +++ b/be/src/olap/fs/fs_util.h @@ -28,6 +28,9 @@ namespace fs_util { // method for each type(instead of a factory method which require same params) BlockManager* file_block_mgr(Env* env, BlockManagerOptions opts); +// For UnitTest. +BlockManager* block_mgr_for_ut(); + } // namespace fs_util } // namespace fs } // namespace doris diff --git a/be/src/olap/rowset/beta_rowset_writer.cpp b/be/src/olap/rowset/beta_rowset_writer.cpp index 9a8b0f0031..eb550ffafd 100644 --- a/be/src/olap/rowset/beta_rowset_writer.cpp +++ b/be/src/olap/rowset/beta_rowset_writer.cpp @@ -17,13 +17,13 @@ #include "olap/rowset/beta_rowset_writer.h" -#include // lround -#include // remove -#include // strerror_r #include // time #include "common/config.h" #include "common/logging.h" +#include "env/env.h" +#include "gutil/strings/substitute.h" +#include "olap/fs/fs_util.h" #include "olap/olap_define.h" #include "olap/rowset/beta_rowset.h" #include "olap/rowset/rowset_factory.h" @@ -33,28 +33,31 @@ namespace doris { -BetaRowsetWriter::BetaRowsetWriter() - : _rowset_meta(nullptr), - _num_segment(0), - _segment_writer(nullptr), - _num_rows_written(0), - _total_data_size(0), - _total_index_size(0) { - auto size = static_cast(OLAP_MAX_COLUMN_SEGMENT_FILE_SIZE); - size *= OLAP_COLUMN_FILE_SEGMENT_SIZE_SCALE; - _max_segment_size = static_cast(lround(size)); -} +// TODO(lingbin): Should be a conf that can be dynamically adjusted, or a member in the context +const uint32_t MAX_SEGMENT_SIZE = static_cast( + OLAP_MAX_COLUMN_SEGMENT_FILE_SIZE * OLAP_COLUMN_FILE_SEGMENT_SIZE_SCALE); + +BetaRowsetWriter::BetaRowsetWriter() : + _rowset_meta(nullptr), + _num_segment(0), + _segment_writer(nullptr), + _num_rows_written(0), + _total_data_size(0), + _total_index_size(0) {} BetaRowsetWriter::~BetaRowsetWriter() { - if (!_rowset_build) { // abnormal exit, remove all files generated - _segment_writer.reset(nullptr); // ensure all files are closed + // TODO(lingbin): Should wrapper exception logic, no need to know file ops directly. + if (!_already_built) { // abnormal exit, remove all files generated + _segment_writer.reset(); // ensure all files are closed + Status st; for (int i = 0; i < _num_segment; ++i) { - auto path = BetaRowset::segment_file_path(_context.rowset_path_prefix, _context.rowset_id, i); - if (::remove(path.c_str()) != 0) { - char errmsg[64]; - LOG(WARNING) << "failed to delete file. err=" << strerror_r(errno, errmsg, 64) - << ", path=" << path; - } + auto path = BetaRowset::segment_file_path( + _context.rowset_path_prefix, _context.rowset_id, i); + // Even if an error is encountered, these files that have not been cleaned up + // will be cleaned up by the GC background. So here we only print the error + // message when we encounter an error. + WARN_IF_ERROR(Env::Default()->delete_file(path), + strings::Substitute("Failed to delete file=$0", path)); } } } @@ -93,11 +96,11 @@ OLAPStatus BetaRowsetWriter::_add_row(const RowType& row) { LOG(WARNING) << "failed to append row: " << s.to_string(); return OLAP_ERR_WRITER_DATA_WRITE_ERROR; } - if (PREDICT_FALSE(_segment_writer->estimate_segment_size() >= _max_segment_size || - _segment_writer->num_rows_written() >= _context.max_rows_per_segment)) { + if (PREDICT_FALSE(_segment_writer->estimate_segment_size() >= MAX_SEGMENT_SIZE + || _segment_writer->num_rows_written() >= _context.max_rows_per_segment)) { RETURN_NOT_OK(_flush_segment_writer()); } - _num_rows_written++; + ++_num_rows_written; return OLAP_SUCCESS; } @@ -132,6 +135,13 @@ OLAPStatus BetaRowsetWriter::flush() { } RowsetSharedPtr BetaRowsetWriter::build() { + // TODO(lingbin): move to more better place, or in a CreateBlockBatch? + for (auto& wblock : _wblocks) { + wblock->close(); + } + // When building a rowset, we must ensure that the current _segment_writer has been + // flushed, that is, the current _segment_wirter is nullptr + DCHECK(_segment_writer == nullptr) << "segment must be null when build rowset"; _rowset_meta->set_num_rows(_num_rows_written); _rowset_meta->set_total_disk_size(_total_data_size); _rowset_meta->set_data_disk_size(_total_data_size); @@ -158,15 +168,35 @@ RowsetSharedPtr BetaRowsetWriter::build() { LOG(WARNING) << "rowset init failed when build new rowset, res=" << status; return nullptr; } - _rowset_build = true; + _already_built = true; return rowset; } OLAPStatus BetaRowsetWriter::_create_segment_writer() { - auto path = BetaRowset::segment_file_path(_context.rowset_path_prefix, _context.rowset_id, _num_segment); + auto path = BetaRowset::segment_file_path(_context.rowset_path_prefix, + _context.rowset_id, + _num_segment); + // TODO(lingbin): should use a more general way to get BlockManager object + // and tablets with the same type should share one BlockManager object; + fs::BlockManagerOptions bm_opts; + bm_opts.read_only = false; + fs::BlockManager* block_mgr = fs::fs_util::file_block_mgr(Env::Default(), bm_opts); + + std::unique_ptr wblock; + fs::CreateBlockOptions opts({path}); + DCHECK(block_mgr != nullptr); + Status st = block_mgr->create_block(opts, &wblock); + if (!st.ok()) { + LOG(WARNING) << "failed to create writable block. path=" << path; + return OLAP_ERR_INIT_FAILED; + } + + DCHECK(wblock != nullptr); segment_v2::SegmentWriterOptions writer_options; writer_options.whether_to_filter_value = _context.version.first == 0; - _segment_writer.reset(new segment_v2::SegmentWriter(path, _num_segment, _context.tablet_schema, writer_options)); + _segment_writer.reset(new segment_v2::SegmentWriter( + wblock.get(), _num_segment, _context.tablet_schema, writer_options)); + _wblocks.push_back(std::move(wblock)); // TODO set write_mbytes_per_sec based on writer type (load/base compaction/cumulative compaction) auto s = _segment_writer->init(config::push_write_mbytes_per_sec); if (!s.ok()) { @@ -174,21 +204,21 @@ OLAPStatus BetaRowsetWriter::_create_segment_writer() { _segment_writer.reset(nullptr); return OLAP_ERR_INIT_FAILED; } - _num_segment++; + ++_num_segment; return OLAP_SUCCESS; } OLAPStatus BetaRowsetWriter::_flush_segment_writer() { uint64_t segment_size; uint64_t index_size; - auto s = _segment_writer->finalize(&segment_size, &index_size); + Status s = _segment_writer->finalize(&segment_size, &index_size); if (!s.ok()) { LOG(WARNING) << "failed to finalize segment: " << s.to_string(); return OLAP_ERR_WRITER_DATA_WRITE_ERROR; } _total_data_size += segment_size; _total_index_size += index_size; - _segment_writer.reset(nullptr); + _segment_writer.reset(); return OLAP_SUCCESS; } diff --git a/be/src/olap/rowset/beta_rowset_writer.h b/be/src/olap/rowset/beta_rowset_writer.h index 0c127b34b4..e8edac242b 100644 --- a/be/src/olap/rowset/beta_rowset_writer.h +++ b/be/src/olap/rowset/beta_rowset_writer.h @@ -18,10 +18,16 @@ #ifndef DORIS_BE_SRC_OLAP_ROWSET_BETA_ROWSET_WRITER_H #define DORIS_BE_SRC_OLAP_ROWSET_BETA_ROWSET_WRITER_H +#include "vector" + #include "olap/rowset/rowset_writer.h" namespace doris { +namespace fs { +class WritableBlock; +} + namespace segment_v2 { class SegmentWriter; } // namespace segment_v2 @@ -37,6 +43,7 @@ public: OLAPStatus add_row(const RowCursor& row) override { return _add_row(row); } + // For Memtable::flush() OLAPStatus add_row(const ContiguousRow& row) override { return _add_row(row); } @@ -45,7 +52,7 @@ public: OLAPStatus add_rowset(RowsetSharedPtr rowset) override; OLAPStatus add_rowset_for_linked_schema_change( - RowsetSharedPtr rowset, const SchemaMapping& schema_mapping) override; + RowsetSharedPtr rowset, const SchemaMapping& schema_mapping) override; OLAPStatus flush() override; @@ -70,8 +77,9 @@ private: std::shared_ptr _rowset_meta; int _num_segment; - uint32_t _max_segment_size; std::unique_ptr _segment_writer; + // TODO(lingbin): it is better to wrapper in a Batch? + std::vector> _wblocks; // counters and statistics maintained during data write int64_t _num_rows_written; @@ -80,7 +88,7 @@ private: // TODO rowset's Zonemap bool _is_pending = false; - bool _rowset_build = false; + bool _already_built = false; }; } // namespace doris diff --git a/be/src/olap/rowset/segment_v2/bitmap_index_reader.h b/be/src/olap/rowset/segment_v2/bitmap_index_reader.h index c1dac83e20..78d05a46d2 100644 --- a/be/src/olap/rowset/segment_v2/bitmap_index_reader.h +++ b/be/src/olap/rowset/segment_v2/bitmap_index_reader.h @@ -125,4 +125,4 @@ private: }; } // namespace segment_v2 -} // namespace doris \ No newline at end of file +} // namespace doris diff --git a/be/src/olap/rowset/segment_v2/bitmap_index_writer.cpp b/be/src/olap/rowset/segment_v2/bitmap_index_writer.cpp index cf44c9928b..eb98767f72 100644 --- a/be/src/olap/rowset/segment_v2/bitmap_index_writer.cpp +++ b/be/src/olap/rowset/segment_v2/bitmap_index_writer.cpp @@ -100,7 +100,7 @@ public: _rid += count; } - Status finish(WritableFile* file, ColumnIndexMetaPB* index_meta) override { + Status finish(fs::WritableBlock* wblock, ColumnIndexMetaPB* index_meta) override { index_meta->set_type(BITMAP_INDEX); BitmapIndexPB* meta = index_meta->mutable_bitmap_index(); @@ -114,7 +114,7 @@ public: options.encoding = EncodingInfo::get_default_encoding(_typeinfo, true); options.compression = LZ4F; - IndexedColumnWriter dict_column_writer(options, _typeinfo, file); + IndexedColumnWriter dict_column_writer(options, _typeinfo, wblock); RETURN_IF_ERROR(dict_column_writer.init()); for (auto const& it : _mem_index) { RETURN_IF_ERROR(dict_column_writer.add(&(it.first))); @@ -150,7 +150,7 @@ public: // we already store compressed bitmap, use NO_COMPRESSION to save some cpu options.compression = NO_COMPRESSION; - IndexedColumnWriter bitmap_column_writer(options, bitmap_typeinfo, file); + IndexedColumnWriter bitmap_column_writer(options, bitmap_typeinfo, wblock); RETURN_IF_ERROR(bitmap_column_writer.init()); faststring buf; diff --git a/be/src/olap/rowset/segment_v2/bitmap_index_writer.h b/be/src/olap/rowset/segment_v2/bitmap_index_writer.h index 8b4dc44c36..db33568667 100644 --- a/be/src/olap/rowset/segment_v2/bitmap_index_writer.h +++ b/be/src/olap/rowset/segment_v2/bitmap_index_writer.h @@ -29,6 +29,10 @@ namespace doris { class TypeInfo; class WritableFile; +namespace fs { +class WritableBlock; +} + namespace segment_v2 { class BitmapIndexWriter { @@ -42,7 +46,7 @@ public: virtual void add_nulls(uint32_t count) = 0; - virtual Status finish(WritableFile* file, ColumnIndexMetaPB* index_meta) = 0; + virtual Status finish(fs::WritableBlock* file, ColumnIndexMetaPB* index_meta) = 0; virtual uint64_t size() const = 0; private: diff --git a/be/src/olap/rowset/segment_v2/bloom_filter_index_writer.cpp b/be/src/olap/rowset/segment_v2/bloom_filter_index_writer.cpp index 14132b29ce..9b0ae13038 100644 --- a/be/src/olap/rowset/segment_v2/bloom_filter_index_writer.cpp +++ b/be/src/olap/rowset/segment_v2/bloom_filter_index_writer.cpp @@ -21,6 +21,7 @@ #include #include "env/env.h" +#include "olap/fs/block_manager.h" #include "olap/rowset/segment_v2/common.h" #include "olap/rowset/segment_v2/encoding_info.h" #include "olap/rowset/segment_v2/indexed_column_writer.h" @@ -100,7 +101,7 @@ public: _has_null = true; } - Status flush() override { + Status flush() override { std::unique_ptr bf; RETURN_IF_ERROR(BloomFilter::create(BLOCK_BLOOM_FILTER, &bf)); RETURN_IF_ERROR(bf->init(_values.size(), _bf_options.fpp, _bf_options.strategy)); @@ -119,7 +120,7 @@ public: return Status::OK(); } - Status finish(WritableFile* file, ColumnIndexMetaPB* index_meta) override { + Status finish(fs::WritableBlock* wblock, ColumnIndexMetaPB* index_meta) override { if (_values.size() > 0) { RETURN_IF_ERROR(flush()); } @@ -134,7 +135,7 @@ public: options.write_ordinal_index = true; options.write_value_index = false; options.encoding = PLAIN_ENCODING; - IndexedColumnWriter bf_writer(options, bf_typeinfo, file); + IndexedColumnWriter bf_writer(options, bf_typeinfo, wblock); RETURN_IF_ERROR(bf_writer.init()); for (auto& bf : _bfs) { Slice data(bf->data(), bf->size()); diff --git a/be/src/olap/rowset/segment_v2/bloom_filter_index_writer.h b/be/src/olap/rowset/segment_v2/bloom_filter_index_writer.h index 04a877af43..6c5df729f9 100644 --- a/be/src/olap/rowset/segment_v2/bloom_filter_index_writer.h +++ b/be/src/olap/rowset/segment_v2/bloom_filter_index_writer.h @@ -29,6 +29,10 @@ namespace doris { class TypeInfo; class WritableFile; +namespace fs { +class WritableBlock; +} + namespace segment_v2 { class BloomFilterOptions; @@ -47,7 +51,7 @@ public: virtual Status flush() = 0; - virtual Status finish(WritableFile* file, ColumnIndexMetaPB* index_meta) = 0; + virtual Status finish(fs::WritableBlock* wblock, ColumnIndexMetaPB* index_meta) = 0; virtual uint64_t size() = 0; private: diff --git a/be/src/olap/rowset/segment_v2/column_writer.cpp b/be/src/olap/rowset/segment_v2/column_writer.cpp index bf3f1b051f..62170aa4d0 100644 --- a/be/src/olap/rowset/segment_v2/column_writer.cpp +++ b/be/src/olap/rowset/segment_v2/column_writer.cpp @@ -22,6 +22,7 @@ #include "common/logging.h" #include "env/env.h" #include "gutil/strings/substitute.h" +#include "olap/fs/block_manager.h" #include "olap/rowset/segment_v2/bitmap_index_writer.h" #include "olap/rowset/segment_v2/bloom_filter.h" #include "olap/rowset/segment_v2/bloom_filter_index_writer.h" @@ -78,10 +79,10 @@ private: ColumnWriter::ColumnWriter(const ColumnWriterOptions& opts, std::unique_ptr field, - WritableFile* output_file) : + fs::WritableBlock* wblock) : _opts(opts), _field(std::move(field)), - _output_file(output_file), + _wblock(wblock), _is_nullable(_opts.meta->is_nullable()), _data_size(0) { // these opts.meta fields should be set by client @@ -92,6 +93,7 @@ ColumnWriter::ColumnWriter(const ColumnWriterOptions& opts, DCHECK(opts.meta->has_encoding()); DCHECK(opts.meta->has_compression()); DCHECK(opts.meta->has_is_nullable()); + DCHECK(wblock != nullptr); } ColumnWriter::~ColumnWriter() { @@ -264,7 +266,7 @@ Status ColumnWriter::write_data() { PagePointer dict_pp; RETURN_IF_ERROR(PageIO::compress_and_write_page( - _compress_codec, _opts.compression_min_space_saving, _output_file, + _compress_codec, _opts.compression_min_space_saving, _wblock, { dict_body.slice() }, footer, &dict_pp)); dict_pp.to_proto(_opts.meta->mutable_dict_page()); } @@ -272,26 +274,26 @@ Status ColumnWriter::write_data() { } Status ColumnWriter::write_ordinal_index() { - return _ordinal_index_builder->finish(_output_file, _opts.meta->add_indexes()); + return _ordinal_index_builder->finish(_wblock, _opts.meta->add_indexes()); } Status ColumnWriter::write_zone_map() { if (_opts.need_zone_map) { - return _zone_map_index_builder->finish(_output_file, _opts.meta->add_indexes()); + return _zone_map_index_builder->finish(_wblock, _opts.meta->add_indexes()); } return Status::OK(); } Status ColumnWriter::write_bitmap_index() { if (_opts.need_bitmap_index) { - return _bitmap_index_builder->finish(_output_file, _opts.meta->add_indexes()); + return _bitmap_index_builder->finish(_wblock, _opts.meta->add_indexes()); } return Status::OK(); } Status ColumnWriter::write_bloom_filter_index() { if (_opts.need_bloom_filter) { - return _bloom_filter_index_builder->finish(_output_file, _opts.meta->add_indexes()); + return _bloom_filter_index_builder->finish(_wblock, _opts.meta->add_indexes()); } return Status::OK(); } @@ -303,7 +305,7 @@ Status ColumnWriter::_write_data_page(Page* page) { for (auto& data : page->data) { compressed_body.push_back(data.slice()); } - RETURN_IF_ERROR(PageIO::write_page(_output_file, compressed_body, page->footer, &pp)); + RETURN_IF_ERROR(PageIO::write_page(_wblock, compressed_body, page->footer, &pp)); _ordinal_index_builder->append_entry(page->footer.data_page_footer().first_ordinal(), pp); return Status::OK(); } diff --git a/be/src/olap/rowset/segment_v2/column_writer.h b/be/src/olap/rowset/segment_v2/column_writer.h index 43039e3edf..12bf54d27d 100644 --- a/be/src/olap/rowset/segment_v2/column_writer.h +++ b/be/src/olap/rowset/segment_v2/column_writer.h @@ -32,6 +32,10 @@ class TypeInfo; class WritableFile; class BlockCompressionCodec; +namespace fs { +class WritableBlock; +} + namespace segment_v2 { struct ColumnWriterOptions { @@ -64,7 +68,7 @@ class ColumnWriter { public: ColumnWriter(const ColumnWriterOptions& opts, std::unique_ptr field, - WritableFile* output_file); + fs::WritableBlock* output_file); ~ColumnWriter(); Status init(); @@ -145,7 +149,7 @@ private: private: ColumnWriterOptions _opts; std::unique_ptr _field; - WritableFile* _output_file; + fs::WritableBlock* _wblock = nullptr; bool _is_nullable; // total size of data page list uint64_t _data_size; diff --git a/be/src/olap/rowset/segment_v2/indexed_column_writer.cpp b/be/src/olap/rowset/segment_v2/indexed_column_writer.cpp index 0f5b3df85f..7082204ab4 100644 --- a/be/src/olap/rowset/segment_v2/indexed_column_writer.cpp +++ b/be/src/olap/rowset/segment_v2/indexed_column_writer.cpp @@ -21,6 +21,7 @@ #include "common/logging.h" #include "env/env.h" +#include "olap/fs/block_manager.h" #include "olap/rowset/segment_v2/encoding_info.h" #include "olap/rowset/segment_v2/index_page.h" #include "olap/rowset/segment_v2/options.h" @@ -37,10 +38,10 @@ namespace segment_v2 { IndexedColumnWriter::IndexedColumnWriter(const IndexedColumnWriterOptions& options, const TypeInfo* typeinfo, - WritableFile* output_file) + fs::WritableBlock* wblock) : _options(options), _typeinfo(typeinfo), - _file(output_file), + _wblock(wblock), _mem_tracker(-1), _mem_pool(&_mem_tracker), _num_values(0), @@ -111,7 +112,7 @@ Status IndexedColumnWriter::_finish_current_data_page() { footer.mutable_data_page_footer()->set_nullmap_size(0); RETURN_IF_ERROR(PageIO::compress_and_write_page( - _compress_codec, _options.compression_min_space_saving, _file, { page_body.slice() }, + _compress_codec, _options.compression_min_space_saving, _wblock, { page_body.slice() }, footer, &_last_data_page)); _num_data_pages++; @@ -159,7 +160,7 @@ Status IndexedColumnWriter::_flush_index(IndexPageBuilder* index_builder, BTreeM PagePointer pp; RETURN_IF_ERROR(PageIO::compress_and_write_page( - _compress_codec, _options.compression_min_space_saving, _file, + _compress_codec, _options.compression_min_space_saving, _wblock, { page_body.slice() }, page_footer, &pp)); meta->set_is_root_data_page(false); diff --git a/be/src/olap/rowset/segment_v2/indexed_column_writer.h b/be/src/olap/rowset/segment_v2/indexed_column_writer.h index 8c81476f23..2e2549bd19 100644 --- a/be/src/olap/rowset/segment_v2/indexed_column_writer.h +++ b/be/src/olap/rowset/segment_v2/indexed_column_writer.h @@ -37,6 +37,10 @@ class KeyCoder; class TypeInfo; class WritableFile; +namespace fs { +class WritableBlock; +} + namespace segment_v2 { class IndexPageBuilder; @@ -69,7 +73,7 @@ class IndexedColumnWriter { public: explicit IndexedColumnWriter(const IndexedColumnWriterOptions& options, const TypeInfo* typeinfo, - WritableFile* output_file); + fs::WritableBlock* wblock); ~IndexedColumnWriter(); @@ -87,7 +91,7 @@ private: IndexedColumnWriterOptions _options; const TypeInfo* _typeinfo; - WritableFile* _file; + fs::WritableBlock* _wblock; // only used for `_first_value` MemTracker _mem_tracker; MemPool _mem_pool; diff --git a/be/src/olap/rowset/segment_v2/ordinal_page_index.cpp b/be/src/olap/rowset/segment_v2/ordinal_page_index.cpp index 3050bc42c9..97b232bdba 100644 --- a/be/src/olap/rowset/segment_v2/ordinal_page_index.cpp +++ b/be/src/olap/rowset/segment_v2/ordinal_page_index.cpp @@ -19,6 +19,7 @@ #include "common/logging.h" #include "env/env.h" +#include "olap/fs/block_manager.h" #include "olap/key_coder.h" #include "olap/rowset/segment_v2/page_handle.h" #include "olap/rowset/segment_v2/page_io.h" @@ -34,8 +35,8 @@ void OrdinalIndexWriter::append_entry(ordinal_t ordinal, const PagePointer& data _last_pp = data_pp; } -Status OrdinalIndexWriter::finish(WritableFile* file, ColumnIndexMetaPB* meta) { - CHECK(_page_builder->count() > 0) << "no entry has been added, file=" << file->filename(); +Status OrdinalIndexWriter::finish(fs::WritableBlock* wblock, ColumnIndexMetaPB* meta) { + CHECK(_page_builder->count() > 0) << "no entry has been added, filepath=" << wblock->path(); meta->set_type(ORDINAL_INDEX); BTreeMetaPB* root_page_meta = meta->mutable_ordinal_index()->mutable_root_page(); @@ -50,7 +51,7 @@ Status OrdinalIndexWriter::finish(WritableFile* file, ColumnIndexMetaPB* meta) { // write index page (currently it's not compressed) PagePointer pp; - RETURN_IF_ERROR(PageIO::write_page(file, { page_body.slice() }, page_footer, &pp)); + RETURN_IF_ERROR(PageIO::write_page(wblock, { page_body.slice() }, page_footer, &pp)); root_page_meta->set_is_root_data_page(false); pp.to_proto(root_page_meta->mutable_root_page()); diff --git a/be/src/olap/rowset/segment_v2/ordinal_page_index.h b/be/src/olap/rowset/segment_v2/ordinal_page_index.h index c132684614..59e100b139 100644 --- a/be/src/olap/rowset/segment_v2/ordinal_page_index.h +++ b/be/src/olap/rowset/segment_v2/ordinal_page_index.h @@ -31,7 +31,9 @@ namespace doris { -class WritableFile; +namespace fs { +class WritableBlock; +} namespace segment_v2 { @@ -47,7 +49,7 @@ public: uint64_t size() { return _page_builder->size(); } - Status finish(WritableFile* file, ColumnIndexMetaPB* meta); + Status finish(fs::WritableBlock* wblock, ColumnIndexMetaPB* meta); private: DISALLOW_COPY_AND_ASSIGN(OrdinalIndexWriter); diff --git a/be/src/olap/rowset/segment_v2/page_io.cpp b/be/src/olap/rowset/segment_v2/page_io.cpp index 6363520349..fa3c78ee64 100644 --- a/be/src/olap/rowset/segment_v2/page_io.cpp +++ b/be/src/olap/rowset/segment_v2/page_io.cpp @@ -23,6 +23,7 @@ #include "common/logging.h" #include "env/env.h" #include "gutil/strings/substitute.h" +#include "olap/fs/block_manager.h" #include "olap/page_cache.h" #include "util/block_compression.h" #include "util/coding.h" @@ -61,7 +62,7 @@ Status PageIO::compress_page_body(const BlockCompressionCodec* codec, return Status::OK(); } -Status PageIO::write_page(WritableFile* file, +Status PageIO::write_page(fs::WritableBlock* wblock, const std::vector& body, const PageFooterPB& footer, PagePointer* result) { @@ -99,11 +100,11 @@ Status PageIO::write_page(WritableFile* file, encode_fixed32_le(checksum_buf, checksum); page.emplace_back(checksum_buf, sizeof(uint32_t)); - uint64_t offset = file->size(); - RETURN_IF_ERROR(file->appendv(&page[0], page.size())); + uint64_t offset = wblock->bytes_appended(); + RETURN_IF_ERROR(wblock->appendv(&page[0], page.size())); result->offset = offset; - result->size = file->size() - offset; + result->size = wblock->bytes_appended() - offset; return Status::OK(); } @@ -205,4 +206,4 @@ Status PageIO::read_and_decompress_page(const PageReadOptions& opts, } } // namespace segment_v2 -} // namespace doris \ No newline at end of file +} // namespace doris diff --git a/be/src/olap/rowset/segment_v2/page_io.h b/be/src/olap/rowset/segment_v2/page_io.h index c18bf5c441..0b25e210c0 100644 --- a/be/src/olap/rowset/segment_v2/page_io.h +++ b/be/src/olap/rowset/segment_v2/page_io.h @@ -31,7 +31,9 @@ namespace doris { class BlockCompressionCodec; struct OlapReaderStatistics; class RandomAccessFile; -class WritableFile; +namespace fs { +class WritableBlock; +} namespace segment_v2 { @@ -80,7 +82,7 @@ public: // Encode page from `body' and `footer' and write to `file'. // `body' could be either uncompressed or compressed. // On success, the file pointer to the written page is stored in `result'. - static Status write_page(WritableFile* file, + static Status write_page(fs::WritableBlock* wblock, const std::vector& body, const PageFooterPB& footer, PagePointer* result); @@ -88,7 +90,7 @@ public: // Convenient function to compress page body and write page in one go. static Status compress_and_write_page(const BlockCompressionCodec* codec, double min_space_saving, - WritableFile* file, + fs::WritableBlock* wblock, const std::vector& body, const PageFooterPB& footer, PagePointer* result) { @@ -96,9 +98,9 @@ public: OwnedSlice compressed_body; RETURN_IF_ERROR(compress_page_body(codec, min_space_saving, body, &compressed_body)); if (compressed_body.slice().empty()) { // uncompressed - return write_page(file, body, footer, result); + return write_page(wblock, body, footer, result); } - return write_page(file, { compressed_body.slice() }, footer, result); + return write_page(wblock, { compressed_body.slice() }, footer, result); } // Read and parse a page according to `opts'. diff --git a/be/src/olap/rowset/segment_v2/segment_writer.cpp b/be/src/olap/rowset/segment_v2/segment_writer.cpp index 3fc26e432c..3d8742ac2c 100644 --- a/be/src/olap/rowset/segment_v2/segment_writer.cpp +++ b/be/src/olap/rowset/segment_v2/segment_writer.cpp @@ -17,13 +17,17 @@ #include "olap/rowset/segment_v2/segment_writer.h" +#include "common/logging.h" // LOG #include "env/env.h" // Env +#include "olap/fs/block_manager.h" #include "olap/row.h" // ContiguousRow #include "olap/row_cursor.h" // RowCursor #include "olap/rowset/segment_v2/column_writer.h" // ColumnWriter #include "olap/rowset/segment_v2/page_io.h" #include "olap/short_key_index.h" +#include "olap/schema.h" #include "util/crc32c.h" +#include "util/faststring.h" namespace doris { namespace segment_v2 { @@ -31,21 +35,20 @@ namespace segment_v2 { const char* k_segment_magic = "D0R1"; const uint32_t k_segment_magic_length = 4; -SegmentWriter::SegmentWriter(std::string fname, uint32_t segment_id, +SegmentWriter::SegmentWriter(fs::WritableBlock* wblock, + uint32_t segment_id, const TabletSchema* tablet_schema, - const SegmentWriterOptions& opts) - : _fname(std::move(fname)), + const SegmentWriterOptions& opts) : _segment_id(segment_id), _tablet_schema(tablet_schema), - _opts(opts) { + _opts(opts), + _wblock(wblock) { + CHECK_NOTNULL(_wblock); } SegmentWriter::~SegmentWriter() = default; Status SegmentWriter::init(uint32_t write_mbytes_per_sec) { - // create for write - RETURN_IF_ERROR(Env::Default()->new_writable_file(_fname, &_output_file)); - uint32_t column_id = 0; for (auto& column : _tablet_schema->columns()) { std::unique_ptr field(FieldFactory::create(column)); @@ -90,7 +93,7 @@ Status SegmentWriter::init(uint32_t write_mbytes_per_sec) { } std::unique_ptr writer( - new ColumnWriter(opts, std::move(field), _output_file.get())); + new ColumnWriter(opts, std::move(field), _wblock)); RETURN_IF_ERROR(writer->init()); _column_writers.push_back(std::move(writer)); } @@ -105,20 +108,26 @@ Status SegmentWriter::append_row(const RowType& row) { RETURN_IF_ERROR(_column_writers[cid]->append(cell)); } + // At the begin of one block, so add a short key index entry if ((_row_count % _opts.num_rows_per_block) == 0) { std::string encoded_key; encode_key(&encoded_key, row, _tablet_schema->num_short_key_columns()); RETURN_IF_ERROR(_index_builder->add_item(encoded_key)); } - _row_count++; + ++_row_count; return Status::OK(); } template Status SegmentWriter::append_row(const RowCursor& row); template Status SegmentWriter::append_row(const ContiguousRow& row); +// TODO(lingbin): Currently this function does not include the size of various indexes, +// We should make this more precise. +// NOTE: This function will be called when any row of data is added, so we need to +// make this function efficient. uint64_t SegmentWriter::estimate_segment_size() { - uint64_t size = 8; //magic size + // magic size + uint64_t size = 12; for (auto& column_writer : _column_writers) { size += column_writer->estimate_buffer_size(); } @@ -131,16 +140,16 @@ Status SegmentWriter::finalize(uint64_t* segment_file_size, uint64_t* index_size RETURN_IF_ERROR(column_writer->finish()); } RETURN_IF_ERROR(_write_data()); - uint64_t index_offset = _output_file->size(); + uint64_t index_offset = _wblock->bytes_appended(); RETURN_IF_ERROR(_write_ordinal_index()); RETURN_IF_ERROR(_write_zone_map()); RETURN_IF_ERROR(_write_bitmap_index()); RETURN_IF_ERROR(_write_bloom_filter_index()); RETURN_IF_ERROR(_write_short_key_index()); - *index_size = _output_file->size() - index_offset; + *index_size = _wblock->bytes_appended() - index_offset; RETURN_IF_ERROR(_write_footer()); - RETURN_IF_ERROR(_output_file->sync()); - *segment_file_size = _output_file->size(); + RETURN_IF_ERROR(_wblock->finalize()); + *segment_file_size = _wblock->bytes_appended(); return Status::OK(); } @@ -187,7 +196,7 @@ Status SegmentWriter::_write_short_key_index() { RETURN_IF_ERROR(_index_builder->finalize(_row_count, &body, &footer)); PagePointer pp; // short key index page is not compressed right now - RETURN_IF_ERROR(PageIO::write_page(_output_file.get(), body, footer, &pp)); + RETURN_IF_ERROR(PageIO::write_page(_wblock, body, footer, &pp)); pp.to_proto(_footer.mutable_short_key_index_page()); return Status::OK(); } @@ -201,13 +210,14 @@ Status SegmentWriter::_write_footer() { return Status::InternalError("failed to serialize segment footer"); } - std::string fixed_buf; + faststring fixed_buf; // footer's size put_fixed32_le(&fixed_buf, footer_buf.size()); // footer's checksum uint32_t checksum = crc32c::Value(footer_buf.data(), footer_buf.size()); put_fixed32_le(&fixed_buf, checksum); - // magic number. we don't write magic number in the header because that requires an extra seek when reading + // Append magic number. we don't write magic number in the header because + // that will need an extra seek when reading fixed_buf.append(k_segment_magic, k_segment_magic_length); std::vector slices{footer_buf, fixed_buf}; @@ -215,7 +225,7 @@ Status SegmentWriter::_write_footer() { } Status SegmentWriter::_write_raw_data(const std::vector& slices) { - RETURN_IF_ERROR(_output_file->appendv(&slices[0], slices.size())); + RETURN_IF_ERROR(_wblock->appendv(&slices[0], slices.size())); return Status::OK(); } diff --git a/be/src/olap/rowset/segment_v2/segment_writer.h b/be/src/olap/rowset/segment_v2/segment_writer.h index 70d332b5aa..22b4f06cb3 100644 --- a/be/src/olap/rowset/segment_v2/segment_writer.h +++ b/be/src/olap/rowset/segment_v2/segment_writer.h @@ -22,18 +22,21 @@ #include #include -#include "common/logging.h" // LOG #include "common/status.h" // Status #include "gen_cpp/segment_v2.pb.h" #include "gutil/macros.h" -#include "olap/schema.h" namespace doris { -class WritableFile; class RowBlock; class RowCursor; +class TabletSchema; class ShortKeyIndexBuilder; +class WritableFile; + +namespace fs { +class WritableBlock; +} namespace segment_v2 { @@ -50,12 +53,12 @@ struct SegmentWriterOptions { class SegmentWriter { public: - explicit SegmentWriter(std::string file_name, + explicit SegmentWriter(fs::WritableBlock* block, uint32_t segment_id, const TabletSchema* tablet_schema, const SegmentWriterOptions& opts); - ~SegmentWriter(); + Status init(uint32_t write_mbytes_per_sec); template @@ -79,14 +82,15 @@ private: Status _write_raw_data(const std::vector& slices); private: - std::string _fname; uint32_t _segment_id; const TabletSchema* _tablet_schema; SegmentWriterOptions _opts; + // Not owned. owned by RowsetWriter + fs::WritableBlock* _wblock; + SegmentFooterPB _footer; std::unique_ptr _index_builder; - std::unique_ptr _output_file; std::vector> _column_writers; uint32_t _row_count = 0; }; diff --git a/be/src/olap/rowset/segment_v2/zone_map_index.cpp b/be/src/olap/rowset/segment_v2/zone_map_index.cpp index 8dc3a895f4..5206676782 100644 --- a/be/src/olap/rowset/segment_v2/zone_map_index.cpp +++ b/be/src/olap/rowset/segment_v2/zone_map_index.cpp @@ -18,6 +18,7 @@ #include "olap/rowset/segment_v2/zone_map_index.h" #include "olap/column_block.h" +#include "olap/fs/block_manager.h" #include "olap/olap_define.h" #include "olap/rowset/segment_v2/encoding_info.h" #include "olap/rowset/segment_v2/indexed_column_reader.h" @@ -84,7 +85,7 @@ Status ZoneMapIndexWriter::flush() { return Status::OK(); } -Status ZoneMapIndexWriter::finish(WritableFile* file, ColumnIndexMetaPB* index_meta) { +Status ZoneMapIndexWriter::finish(fs::WritableBlock* wblock, ColumnIndexMetaPB* index_meta) { index_meta->set_type(ZONE_MAP_INDEX); ZoneMapIndexPB* meta = index_meta->mutable_zone_map_index(); // store segment zone map @@ -98,7 +99,7 @@ Status ZoneMapIndexWriter::finish(WritableFile* file, ColumnIndexMetaPB* index_m options.encoding = EncodingInfo::get_default_encoding(typeinfo, false); options.compression = NO_COMPRESSION; // currently not compressed - IndexedColumnWriter writer(options, typeinfo, file); + IndexedColumnWriter writer(options, typeinfo, wblock); RETURN_IF_ERROR(writer.init()); for (auto& value : _values) { diff --git a/be/src/olap/rowset/segment_v2/zone_map_index.h b/be/src/olap/rowset/segment_v2/zone_map_index.h index 6bafa07fca..3070cccd6b 100644 --- a/be/src/olap/rowset/segment_v2/zone_map_index.h +++ b/be/src/olap/rowset/segment_v2/zone_map_index.h @@ -31,7 +31,9 @@ namespace doris { -class WritableFile; +namespace fs { +class WritableBlock; +} namespace segment_v2 { @@ -75,7 +77,7 @@ public: // mark the end of one data page so that we can finalize the corresponding zone map Status flush(); - Status finish(WritableFile* file, ColumnIndexMetaPB* index_meta); + Status finish(fs::WritableBlock* wblock, ColumnIndexMetaPB* index_meta); uint64_t size() { return _estimated_size; } diff --git a/be/test/olap/rowset/rowset_converter_test.cpp b/be/test/olap/rowset/rowset_converter_test.cpp index a65242bf49..10877d675c 100644 --- a/be/test/olap/rowset/rowset_converter_test.cpp +++ b/be/test/olap/rowset/rowset_converter_test.cpp @@ -112,7 +112,7 @@ void create_tablet_schema(KeysType keys_type, TabletSchema* tablet_schema) { column_2->set_is_key(true); column_2->set_is_nullable(false); column_2->set_is_bf_column(false); - + ColumnPB* column_3 = tablet_schema_pb.add_column(); column_3->set_unique_id(3); column_3->set_name("v1"); @@ -223,7 +223,7 @@ void RowsetConverterTest::process(RowsetTypePB src_type, RowsetTypePB dst_type) ASSERT_EQ(OLAP_SUCCESS, rowset_converter.convert_beta_to_alpha( src_rowset->rowset_meta(), _schema_hash_path, &dst_rowset_meta_pb)); } - + ASSERT_EQ(dst_type, dst_rowset_meta_pb.rowset_type()); ASSERT_EQ(12345, dst_rowset_meta_pb.tablet_id()); ASSERT_EQ(1024, dst_rowset_meta_pb.num_rows()); @@ -234,7 +234,7 @@ void RowsetConverterTest::process(RowsetTypePB src_type, RowsetTypePB dst_type) RowsetSharedPtr dst_rowset; ASSERT_EQ(OLAP_SUCCESS, RowsetFactory::create_rowset(&tablet_schema, _schema_hash_path, dst_rowset_meta, &dst_rowset)); - + RowsetReaderSharedPtr dst_rowset_reader; ASSERT_EQ(OLAP_SUCCESS, dst_rowset->create_reader(&dst_rowset_reader)); RowsetReaderContext rowset_reader_context; diff --git a/be/test/olap/rowset/segment_v2/bitmap_index_test.cpp b/be/test/olap/rowset/segment_v2/bitmap_index_test.cpp index 0067644fbb..af0a46ba52 100644 --- a/be/test/olap/rowset/segment_v2/bitmap_index_test.cpp +++ b/be/test/olap/rowset/segment_v2/bitmap_index_test.cpp @@ -24,11 +24,13 @@ #include "common/logging.h" #include "env/env.h" +#include "olap/fs/block_manager.h" +#include "olap/fs/fs_util.h" #include "olap/olap_common.h" #include "olap/types.h" -#include "util/file_utils.h" #include "runtime/mem_tracker.h" #include "runtime/mem_pool.h" +#include "util/file_utils.h" namespace doris { namespace segment_v2 { @@ -61,14 +63,17 @@ void write_index_file(std::string& filename, const void* values, ColumnIndexMetaPB* meta) { const TypeInfo* type_info = get_type_info(type); { - std::unique_ptr wfile; - ASSERT_TRUE(Env::Default()->new_writable_file(filename, &wfile).ok()); + std::unique_ptr wblock; + fs::CreateBlockOptions opts({ filename }); + ASSERT_TRUE(fs::fs_util::block_mgr_for_ut()->create_block(opts, &wblock).ok()); + std::unique_ptr writer; BitmapIndexWriter::create(type_info, &writer); writer->add_values(values, value_count); writer->add_nulls(null_count); - ASSERT_TRUE(writer->finish(wfile.get(), meta).ok()); + ASSERT_TRUE(writer->finish(wblock.get(), meta).ok()); ASSERT_EQ(BITMAP_INDEX, meta->type()); + ASSERT_TRUE(wblock->close().ok()); } } @@ -231,4 +236,4 @@ TEST_F(BitmapIndexTest, test_null) { int main(int argc, char** argv) { ::testing::InitGoogleTest(&argc, argv); return RUN_ALL_TESTS(); -} \ No newline at end of file +} diff --git a/be/test/olap/rowset/segment_v2/bloom_filter_index_reader_writer_test.cpp b/be/test/olap/rowset/segment_v2/bloom_filter_index_reader_writer_test.cpp index 01110e919e..a802c0e4d4 100644 --- a/be/test/olap/rowset/segment_v2/bloom_filter_index_reader_writer_test.cpp +++ b/be/test/olap/rowset/segment_v2/bloom_filter_index_reader_writer_test.cpp @@ -24,6 +24,8 @@ #include "common/logging.h" #include "env/env.h" +#include "olap/fs/block_manager.h" +#include "olap/fs/fs_util.h" #include "olap/olap_common.h" #include "olap/types.h" #include "util/file_utils.h" @@ -42,16 +44,18 @@ const std::string dname = "./ut_dir/bloom_filter_index_reader_writer_test"; template void write_bloom_filter_index_file(const std::string& file_name, const void* values, - size_t value_count, size_t null_count, - ColumnIndexMetaPB* index_meta) { + size_t value_count, size_t null_count, + ColumnIndexMetaPB* index_meta) { const TypeInfo* type_info = get_type_info(type); using CppType = typename CppTypeTraits::CppType; FileUtils::create_dir(dname); std::string fname = dname + "/" + file_name; { - std::unique_ptr wfile; - auto st = Env::Default()->new_writable_file(fname, &wfile); + std::unique_ptr wblock; + fs::CreateBlockOptions opts({ fname }); + Status st = fs::fs_util::block_mgr_for_ut()->create_block(opts, &wblock); ASSERT_TRUE(st.ok()); + std::unique_ptr bloom_filter_index_writer; BloomFilterOptions bf_options; BloomFilterIndexWriter::create(bf_options, type_info, &bloom_filter_index_writer); @@ -67,8 +71,9 @@ void write_bloom_filter_index_file(const std::string& file_name, const void* val ASSERT_TRUE(st.ok()); i += 1024; } - st = bloom_filter_index_writer->finish(wfile.get(), index_meta); + st = bloom_filter_index_writer->finish(wblock.get(), index_meta); ASSERT_TRUE(st.ok()) << "writer finish status:" << st.to_string(); + ASSERT_TRUE(wblock->close().ok()); ASSERT_EQ(BLOOM_FILTER_INDEX, index_meta->type()); ASSERT_EQ(bf_options.strategy, index_meta->bloom_filter_index().hash_strategy()); } @@ -273,4 +278,5 @@ TEST_F(BloomFilterIndexReaderWriterTest, test_decimal) { int main(int argc, char** argv) { ::testing::InitGoogleTest(&argc, argv); return RUN_ALL_TESTS(); -} \ No newline at end of file +} + diff --git a/be/test/olap/rowset/segment_v2/column_reader_writer_test.cpp b/be/test/olap/rowset/segment_v2/column_reader_writer_test.cpp index 48b9ddcd53..ea8bde6e0a 100644 --- a/be/test/olap/rowset/segment_v2/column_reader_writer_test.cpp +++ b/be/test/olap/rowset/segment_v2/column_reader_writer_test.cpp @@ -25,12 +25,14 @@ #include "common/logging.h" #include "env/env.h" +#include "olap/column_block.h" +#include "olap/fs/block_manager.h" +#include "olap/fs/fs_util.h" #include "olap/olap_common.h" #include "olap/types.h" -#include "olap/column_block.h" -#include "util/file_utils.h" #include "runtime/mem_tracker.h" #include "runtime/mem_pool.h" +#include "util/file_utils.h" using std::string; @@ -74,9 +76,10 @@ void test_nullable_data(uint8_t* src_data, uint8_t* src_is_null, int num_rows, s // write data string fname = TEST_DIR + "/" + test_name; { - std::unique_ptr wfile; - auto st = Env::Default()->new_writable_file(fname, &wfile); - ASSERT_TRUE(st.ok()); + std::unique_ptr wblock; + fs::CreateBlockOptions opts({ fname }); + Status st = fs::fs_util::block_mgr_for_ut()->create_block(opts, &wblock); + ASSERT_TRUE(st.ok()) << st.get_error_msg(); ColumnWriterOptions writer_opts; writer_opts.meta = &meta; @@ -100,7 +103,7 @@ void test_nullable_data(uint8_t* src_data, uint8_t* src_is_null, int num_rows, s column = create_char_key(1); } std::unique_ptr field(FieldFactory::create(column)); - ColumnWriter writer(writer_opts, std::move(field), wfile.get()); + ColumnWriter writer(writer_opts, std::move(field), wblock.get()); st = writer.init(); ASSERT_TRUE(st.ok()) << st.to_string(); @@ -109,18 +112,13 @@ void test_nullable_data(uint8_t* src_data, uint8_t* src_is_null, int num_rows, s ASSERT_TRUE(st.ok()); } - st = writer.finish(); - ASSERT_TRUE(st.ok()); - - st = writer.write_data(); - ASSERT_TRUE(st.ok()); - st = writer.write_ordinal_index(); - ASSERT_TRUE(st.ok()); - st = writer.write_zone_map(); - ASSERT_TRUE(st.ok()); + ASSERT_TRUE(writer.finish().ok()); + ASSERT_TRUE(writer.write_data().ok()); + ASSERT_TRUE(writer.write_ordinal_index().ok()); + ASSERT_TRUE(writer.write_zone_map().ok()); // close the file - wfile.reset(); + ASSERT_TRUE(wblock->close().ok()); } // read and check { diff --git a/be/test/olap/rowset/segment_v2/ordinal_page_index_test.cpp b/be/test/olap/rowset/segment_v2/ordinal_page_index_test.cpp index e8a101e8c4..6749dec769 100644 --- a/be/test/olap/rowset/segment_v2/ordinal_page_index_test.cpp +++ b/be/test/olap/rowset/segment_v2/ordinal_page_index_test.cpp @@ -24,6 +24,7 @@ #include "common/logging.h" #include "env/env.h" +#include "olap/fs/fs_util.h" #include "util/file_utils.h" namespace doris { @@ -58,11 +59,14 @@ TEST_F(OrdinalPageIndexTest, normal) { } ColumnIndexMetaPB index_meta; { - std::unique_ptr out_file; - ASSERT_TRUE(Env::Default()->new_writable_file(filename, &out_file).ok()); - ASSERT_TRUE(builder.finish(out_file.get(), &index_meta).ok()); + std::unique_ptr wblock; + fs::CreateBlockOptions opts({ filename }); + ASSERT_TRUE(fs::fs_util::block_mgr_for_ut()->create_block(opts, &wblock).ok()); + + ASSERT_TRUE(builder.finish(wblock.get(), &index_meta).ok()); ASSERT_EQ(ORDINAL_INDEX, index_meta.type()); ASSERT_FALSE(index_meta.ordinal_index().root_page().is_root_data_page()); + ASSERT_TRUE(wblock->close().ok()); LOG(INFO) << "index page size=" << index_meta.ordinal_index().root_page().root_page().size(); } diff --git a/be/test/olap/rowset/segment_v2/segment_test.cpp b/be/test/olap/rowset/segment_v2/segment_test.cpp index 1305aa73d4..5d40baf093 100644 --- a/be/test/olap/rowset/segment_v2/segment_test.cpp +++ b/be/test/olap/rowset/segment_v2/segment_test.cpp @@ -27,17 +27,20 @@ #include "common/logging.h" #include "gutil/strings/substitute.h" #include "olap/comparison_predicate.h" +#include "olap/fs/block_manager.h" +#include "olap/fs/fs_util.h" #include "olap/in_list_predicate.h" #include "olap/olap_common.h" #include "olap/row_cursor.h" -#include "olap/tablet_schema.h" #include "olap/row_block.h" #include "olap/row_block2.h" -#include "olap/types.h" +#include "olap/rowset/segment_v2/segment_writer.h" +#include "olap/tablet_schema.h" #include "olap/tablet_schema_helper.h" -#include "util/file_utils.h" +#include "olap/types.h" #include "runtime/mem_pool.h" #include "runtime/mem_tracker.h" +#include "util/file_utils.h" namespace doris { namespace segment_v2 { @@ -105,8 +108,12 @@ protected: // must use unique filename for each segment, otherwise page cache kicks in and produces // the wrong answer (it use (filename,offset) as cache key) string filename = strings::Substitute("$0/seg_$1.dat", kSegmentDir, seg_id++); - SegmentWriter writer(filename, 0, &build_schema, opts); - auto st = writer.init(10); + std::unique_ptr wblock; + fs::CreateBlockOptions block_opts({ filename }); + Status st = fs::fs_util::block_mgr_for_ut()->create_block(block_opts, &wblock); + ASSERT_TRUE(st.ok()); + SegmentWriter writer(wblock.get(), 0, &build_schema, opts); + st = writer.init(10); ASSERT_TRUE(st.ok()); RowCursor row; @@ -125,6 +132,7 @@ protected: uint64_t file_size, index_size; st = writer.finalize(&file_size, &index_size); ASSERT_TRUE(st.ok()); + ASSERT_TRUE(wblock->close().ok()); st = Segment::open(filename, 0, &query_schema, res); ASSERT_TRUE(st.ok()); @@ -599,8 +607,12 @@ TEST_F(SegmentReaderWriterTest, estimate_segment_size) { opts.num_rows_per_block = num_rows_per_block; std::string fname = dname + "/int_case"; - SegmentWriter writer(fname, 0, tablet_schema.get(), opts); - auto st = writer.init(10); + std::unique_ptr wblock; + fs::CreateBlockOptions wblock_opts({ fname }); + Status st = fs::fs_util::block_mgr_for_ut()->create_block(wblock_opts, &wblock); + ASSERT_TRUE(st.ok()); + SegmentWriter writer(wblock.get(), 0, tablet_schema.get(), opts); + st = writer.init(10); ASSERT_TRUE(st.ok()); RowCursor row; @@ -624,9 +636,8 @@ TEST_F(SegmentReaderWriterTest, estimate_segment_size) { uint64_t file_size = 0; uint64_t index_size; - st = writer.finalize(&file_size, &index_size); - - ASSERT_TRUE(st.ok()); + ASSERT_TRUE(writer.finalize(&file_size, &index_size).ok()); + ASSERT_TRUE(wblock->close().ok()); file_size = boost::filesystem::file_size(fname); LOG(INFO) << "segment file size is:" << file_size; @@ -762,9 +773,12 @@ TEST_F(SegmentReaderWriterTest, TestStringDict) { opts.num_rows_per_block = num_rows_per_block; std::string fname = dname + "/string_case"; - - SegmentWriter writer(fname, 0, tablet_schema.get(), opts); - auto st = writer.init(10); + std::unique_ptr wblock; + fs::CreateBlockOptions wblock_opts({ fname }); + Status st = fs::fs_util::block_mgr_for_ut()->create_block(wblock_opts, &wblock); + ASSERT_TRUE(st.ok()); + SegmentWriter writer(wblock.get(), 0, tablet_schema.get(), opts); + st = writer.init(10); ASSERT_TRUE(st.ok()); RowCursor row; @@ -787,8 +801,8 @@ TEST_F(SegmentReaderWriterTest, TestStringDict) { uint64_t file_size = 0; uint64_t index_size; - st = writer.finalize(&file_size, &index_size); - ASSERT_TRUE(st.ok()); + ASSERT_TRUE(writer.finalize(&file_size, &index_size).ok()); + ASSERT_TRUE(wblock->close().ok()); { std::shared_ptr segment; diff --git a/be/test/olap/rowset/segment_v2/zone_map_index_test.cpp b/be/test/olap/rowset/segment_v2/zone_map_index_test.cpp index f7dbfd2ba3..290142e89b 100644 --- a/be/test/olap/rowset/segment_v2/zone_map_index_test.cpp +++ b/be/test/olap/rowset/segment_v2/zone_map_index_test.cpp @@ -21,6 +21,8 @@ #include #include "env/env.h" +#include "olap/fs/block_manager.h" +#include "olap/fs/fs_util.h" #include "olap/rowset/segment_v2/zone_map_index.h" #include "olap/tablet_schema_helper.h" #include "util/file_utils.h" @@ -68,13 +70,14 @@ public: // write out zone map index ColumnIndexMetaPB index_meta; { - std::unique_ptr out_file; - ASSERT_TRUE(Env::Default()->new_writable_file(filename, &out_file).ok()); - ASSERT_TRUE(builder.finish(out_file.get(), &index_meta).ok()); + std::unique_ptr wblock; + fs::CreateBlockOptions opts({ filename }); + ASSERT_TRUE(fs::fs_util::block_mgr_for_ut()->create_block(opts, &wblock).ok()); + ASSERT_TRUE(builder.finish(wblock.get(), &index_meta).ok()); ASSERT_EQ(ZONE_MAP_INDEX, index_meta.type()); + ASSERT_TRUE(wblock->close().ok()); } - ZoneMapIndexReader column_zone_map(filename, &index_meta.zone_map_index()); Status status = column_zone_map.load(true, false); ASSERT_TRUE(status.ok()); @@ -120,10 +123,12 @@ TEST_F(ColumnZoneMapTest, NormalTestIntPage) { // write out zone map index ColumnIndexMetaPB index_meta; { - std::unique_ptr out_file; - ASSERT_TRUE(Env::Default()->new_writable_file(filename, &out_file).ok()); - ASSERT_TRUE(builder.finish(out_file.get(), &index_meta).ok()); + std::unique_ptr wblock; + fs::CreateBlockOptions opts({ filename }); + ASSERT_TRUE(fs::fs_util::block_mgr_for_ut()->create_block(opts, &wblock).ok()); + ASSERT_TRUE(builder.finish(wblock.get(), &index_meta).ok()); ASSERT_EQ(ZONE_MAP_INDEX, index_meta.type()); + ASSERT_TRUE(wblock->close().ok()); } ZoneMapIndexReader column_zone_map(filename, &index_meta.zone_map_index());