From eec142ae90670e290f2aa4b3c0588c6b618cd3a9 Mon Sep 17 00:00:00 2001 From: plat1ko Date: Sun, 17 Jul 2022 07:54:58 +0800 Subject: [PATCH] [Enhancement] Use shared file reader when read a segment (#10896) * readers under a segment use a shared FileReader * no need to cache fd in LocalFileReader --- be/src/io/fs/file_reader.h | 6 +++ be/src/io/fs/file_system.h | 6 ++- be/src/io/fs/file_writer.h | 4 ++ be/src/io/fs/local_file_reader.cpp | 26 +++++----- be/src/io/fs/local_file_reader.h | 14 ++--- be/src/io/fs/local_file_system.cpp | 39 ++++---------- be/src/io/fs/local_file_system.h | 6 +-- be/src/io/fs/s3_file_reader.cpp | 11 +++- be/src/io/fs/s3_file_reader.h | 3 ++ be/src/io/fs/s3_file_system.cpp | 4 +- be/src/io/fs/s3_file_system.h | 4 +- be/src/olap/primary_key_index.cpp | 7 +-- be/src/olap/primary_key_index.h | 3 +- be/src/olap/rowset/beta_rowset_writer.cpp | 2 +- be/src/olap/rowset/beta_rowset_writer.h | 3 +- .../rowset/segment_v2/bitmap_index_reader.cpp | 4 +- .../rowset/segment_v2/bitmap_index_reader.h | 10 ++-- .../segment_v2/bloom_filter_index_reader.cpp | 2 +- .../segment_v2/bloom_filter_index_reader.h | 10 ++-- .../olap/rowset/segment_v2/column_reader.cpp | 39 +++++++------- be/src/olap/rowset/segment_v2/column_reader.h | 9 ++-- .../segment_v2/indexed_column_reader.cpp | 26 ++++------ .../rowset/segment_v2/indexed_column_reader.h | 26 +++------- .../rowset/segment_v2/ordinal_page_index.cpp | 5 +- .../rowset/segment_v2/ordinal_page_index.h | 13 ++--- be/src/olap/rowset/segment_v2/page_io.h | 2 +- be/src/olap/rowset/segment_v2/segment.cpp | 52 +++++++++---------- be/src/olap/rowset/segment_v2/segment.h | 9 +--- .../rowset/segment_v2/segment_iterator.cpp | 3 +- .../olap/rowset/segment_v2/segment_iterator.h | 4 +- .../olap/rowset/segment_v2/zone_map_index.cpp | 4 +- .../olap/rowset/segment_v2/zone_map_index.h | 10 ++-- be/test/olap/primary_key_index_test.cpp | 6 ++- .../rowset/segment_v2/bitmap_index_test.cpp | 8 +-- .../bloom_filter_index_reader_writer_test.cpp | 11 ++-- .../segment_v2/column_reader_writer_test.cpp | 28 ++++------ .../segment_v2/ordinal_page_index_test.cpp | 10 ++-- .../olap/rowset/segment_v2/segment_test.cpp | 6 +-- .../rowset/segment_v2/zone_map_index_test.cpp | 18 ++++--- be/test/runtime/array_test.cpp | 8 +-- be/test/tools/benchmark_tool.cpp | 2 +- 41 files changed, 215 insertions(+), 248 deletions(-) diff --git a/be/src/io/fs/file_reader.h b/be/src/io/fs/file_reader.h index d5c07cc137..d0c568d0aa 100644 --- a/be/src/io/fs/file_reader.h +++ b/be/src/io/fs/file_reader.h @@ -17,6 +17,8 @@ #pragma once +#include + #include "common/status.h" #include "gutil/macros.h" #include "io/fs/path.h" @@ -38,7 +40,11 @@ public: virtual const Path& path() const = 0; virtual size_t size() const = 0; + + virtual bool closed() const = 0; }; +using FileReaderSPtr = std::shared_ptr; + } // namespace io } // namespace doris diff --git a/be/src/io/fs/file_system.h b/be/src/io/fs/file_system.h index db72e93a2b..e3c4a19018 100644 --- a/be/src/io/fs/file_system.h +++ b/be/src/io/fs/file_system.h @@ -21,6 +21,8 @@ #include "common/status.h" #include "gutil/macros.h" +#include "io/fs/file_reader.h" +#include "io/fs/file_writer.h" #include "io/fs/path.h" namespace doris { @@ -46,9 +48,9 @@ public: DISALLOW_COPY_AND_ASSIGN(FileSystem); - virtual Status create_file(const Path& path, std::unique_ptr* writer) = 0; + virtual Status create_file(const Path& path, FileWriterPtr* writer) = 0; - virtual Status open_file(const Path& path, std::unique_ptr* reader) = 0; + virtual Status open_file(const Path& path, FileReaderSPtr* reader) = 0; virtual Status delete_file(const Path& path) = 0; diff --git a/be/src/io/fs/file_writer.h b/be/src/io/fs/file_writer.h index 44a7614cc1..bbd65e4cbd 100644 --- a/be/src/io/fs/file_writer.h +++ b/be/src/io/fs/file_writer.h @@ -17,6 +17,8 @@ #pragma once +#include + #include "common/status.h" #include "gutil/macros.h" #include "io/fs/path.h" @@ -55,5 +57,7 @@ protected: Path _path; }; +using FileWriterPtr = std::unique_ptr; + } // namespace io } // namespace doris diff --git a/be/src/io/fs/local_file_reader.cpp b/be/src/io/fs/local_file_reader.cpp index a29b4cd4d2..984306bf17 100644 --- a/be/src/io/fs/local_file_reader.cpp +++ b/be/src/io/fs/local_file_reader.cpp @@ -17,19 +17,16 @@ #include "io/fs/local_file_reader.h" +#include + #include "util/doris_metrics.h" #include "util/errno.h" namespace doris { namespace io { -LocalFileReader::LocalFileReader(Path path, size_t file_size, - std::shared_ptr> file_handle) - : _file_handle(std::move(file_handle)), - _path(std::move(path)), - _file_size(file_size), - _closed(false) { - _fd = *_file_handle->file(); +LocalFileReader::LocalFileReader(Path path, size_t file_size, int fd) + : _fd(fd), _path(std::move(path)), _file_size(file_size) { DorisMetrics::instance()->local_file_open_reading->increment(1); DorisMetrics::instance()->local_file_reader_total->increment(1); } @@ -40,15 +37,18 @@ LocalFileReader::~LocalFileReader() { Status LocalFileReader::close() { bool expected = false; - if (_closed.compare_exchange_strong(expected, true)) { - _file_handle.reset(); - DorisMetrics::instance()->local_file_open_reading->increment(-1); + if (_closed.compare_exchange_strong(expected, true, std::memory_order_acq_rel)) { + auto res = ::close(_fd); + if (-1 == res) { + return Status::IOError("failed to close {}: {}", _path.native(), std::strerror(errno)); + } + _fd = -1; } return Status::OK(); } Status LocalFileReader::read_at(size_t offset, Slice result, size_t* bytes_read) { - DCHECK(!_closed.load()); + DCHECK(!closed()); if (offset > _file_size) { return Status::IOError( fmt::format("offset exceeds file size(offset: {), file size: {}, path: {})", offset, @@ -61,11 +61,11 @@ Status LocalFileReader::read_at(size_t offset, Slice result, size_t* bytes_read) while (bytes_req != 0) { auto res = ::pread(_fd, to, bytes_req, offset); - if (-1 == res && errno != EINTR) { + if (UNLIKELY(-1 == res && errno != EINTR)) { return Status::IOError( fmt::format("cannot read from {}: {}", _path.native(), std::strerror(errno))); } - if (res == 0) { + if (UNLIKELY(res == 0)) { return Status::IOError( fmt::format("cannot read from {}: unexpected EOF", _path.native())); } diff --git a/be/src/io/fs/local_file_reader.h b/be/src/io/fs/local_file_reader.h index f6fc9513c3..686c354e83 100644 --- a/be/src/io/fs/local_file_reader.h +++ b/be/src/io/fs/local_file_reader.h @@ -17,17 +17,17 @@ #pragma once +#include + #include "io/fs/file_reader.h" #include "io/fs/path.h" -#include "util/file_cache.h" namespace doris { namespace io { class LocalFileReader final : public FileReader { public: - LocalFileReader(Path path, size_t file_size, - std::shared_ptr> file_handle); + LocalFileReader(Path path, size_t file_size, int fd); ~LocalFileReader() override; @@ -39,13 +39,13 @@ public: size_t size() const override { return _file_size; } + bool closed() const override { return _closed.load(std::memory_order_acquire); } + private: - std::shared_ptr> _file_handle; - int _fd; // ref + int _fd = -1; // owned Path _path; size_t _file_size; - - std::atomic_bool _closed; + std::atomic _closed = false; }; } // namespace io diff --git a/be/src/io/fs/local_file_system.cpp b/be/src/io/fs/local_file_system.cpp index f9fced3ca5..3ba889c94d 100644 --- a/be/src/io/fs/local_file_system.cpp +++ b/be/src/io/fs/local_file_system.cpp @@ -20,21 +20,12 @@ #include "io/fs/file_system.h" #include "io/fs/local_file_reader.h" #include "io/fs/local_file_writer.h" -#include "olap/storage_engine.h" namespace doris { namespace io { LocalFileSystem::LocalFileSystem(Path root_path, ResourceId resource_id) - : FileSystem(std::move(root_path), std::move(resource_id), FileSystemType::LOCAL) { -#ifdef BE_TEST - _file_cache.reset( - new FileCache("Readable_file_cache", config::file_descriptor_cache_capacity)); -#else - _file_cache.reset(new FileCache("Readable_file_cache", - doris::StorageEngine::instance()->file_cache())); -#endif -} + : FileSystem(std::move(root_path), std::move(resource_id), FileSystemType::LOCAL) {} LocalFileSystem::~LocalFileSystem() = default; @@ -45,7 +36,7 @@ Path LocalFileSystem::absolute_path(const Path& path) const { return _root_path / path; } -Status LocalFileSystem::create_file(const Path& path, std::unique_ptr* writer) { +Status LocalFileSystem::create_file(const Path& path, FileWriterPtr* writer) { auto fs_path = absolute_path(path); int fd = ::open(fs_path.c_str(), O_TRUNC | O_WRONLY | O_CREAT | O_CLOEXEC, 0666); if (-1 == fd) { @@ -56,28 +47,16 @@ Status LocalFileSystem::create_file(const Path& path, std::unique_ptr* reader) { +Status LocalFileSystem::open_file(const Path& path, FileReaderSPtr* reader) { auto fs_path = absolute_path(path); - std::shared_ptr> file_handle(new OpenedFileHandle()); - bool found = _file_cache->lookup(fs_path.native(), file_handle.get()); - if (!found) { - int fd = -1; - RETRY_ON_EINTR(fd, open(fs_path.c_str(), O_RDONLY)); - if (fd < 0) { - return Status::IOError( - fmt::format("cannot open {}: {}", fs_path.native(), std::strerror(errno))); - } - int* p_fd = new int(fd); - _file_cache->insert(fs_path.native(), p_fd, file_handle.get(), - [](const CacheKey& key, void* value) { - auto fd = reinterpret_cast(value); - ::close(*fd); - delete fd; - }); - } size_t fsize = 0; RETURN_IF_ERROR(file_size(fs_path, &fsize)); - *reader = std::make_unique(std::move(fs_path), fsize, std::move(file_handle)); + int fd = -1; + RETRY_ON_EINTR(fd, open(fs_path.c_str(), O_RDONLY)); + if (fd < 0) { + return Status::IOError("cannot open {}: {}", fs_path.native(), std::strerror(errno)); + } + *reader = std::make_shared(std::move(fs_path), fsize, fd); return Status::OK(); } diff --git a/be/src/io/fs/local_file_system.h b/be/src/io/fs/local_file_system.h index d3d5938640..2363edc77f 100644 --- a/be/src/io/fs/local_file_system.h +++ b/be/src/io/fs/local_file_system.h @@ -28,9 +28,9 @@ public: LocalFileSystem(Path root_path, ResourceId resource_id = ResourceId()); ~LocalFileSystem() override; - Status create_file(const Path& path, std::unique_ptr* writer) override; + Status create_file(const Path& path, FileWriterPtr* writer) override; - Status open_file(const Path& path, std::unique_ptr* reader) override; + Status open_file(const Path& path, FileReaderSPtr* reader) override; Status delete_file(const Path& path) override; @@ -48,8 +48,6 @@ public: private: Path absolute_path(const Path& path) const; - - std::unique_ptr> _file_cache; }; LocalFileSystem* global_local_filesystem(); diff --git a/be/src/io/fs/s3_file_reader.cpp b/be/src/io/fs/s3_file_reader.cpp index bda3dafd75..fa5d36e38c 100644 --- a/be/src/io/fs/s3_file_reader.cpp +++ b/be/src/io/fs/s3_file_reader.cpp @@ -38,14 +38,19 @@ S3FileReader::S3FileReader(Path path, size_t file_size, std::string key, std::st } S3FileReader::~S3FileReader() { - DorisMetrics::instance()->s3_file_open_reading->increment(-1); + close(); } Status S3FileReader::close() { + bool expected = false; + if (_closed.compare_exchange_strong(expected, true, std::memory_order_acq_rel)) { + DorisMetrics::instance()->s3_file_open_reading->increment(-1); + } return Status::OK(); } Status S3FileReader::read_at(size_t offset, Slice result, size_t* bytes_read) { + DCHECK(!closed()); if (offset > _file_size) { return Status::IOError( fmt::format("offset exceeds file size(offset: {), file size: {}, path: {})", offset, @@ -54,6 +59,10 @@ Status S3FileReader::read_at(size_t offset, Slice result, size_t* bytes_read) { size_t bytes_req = result.size; char* to = result.data; bytes_req = std::min(bytes_req, _file_size - offset); + if (UNLIKELY(bytes_req == 0)) { + *bytes_read = 0; + return Status::OK(); + } Aws::S3::Model::GetObjectRequest request; request.WithBucket(_bucket).WithKey(_key); diff --git a/be/src/io/fs/s3_file_reader.h b/be/src/io/fs/s3_file_reader.h index c69f4df48e..88d2f42490 100644 --- a/be/src/io/fs/s3_file_reader.h +++ b/be/src/io/fs/s3_file_reader.h @@ -40,6 +40,8 @@ public: size_t size() const override { return _file_size; } + bool closed() const override { return _closed.load(std::memory_order_acquire); } + private: Path _path; size_t _file_size; @@ -47,6 +49,7 @@ private: std::string _bucket; std::string _key; + std::atomic _closed = false; }; } // namespace io diff --git a/be/src/io/fs/s3_file_system.cpp b/be/src/io/fs/s3_file_system.cpp index d484644c2d..53be081e3f 100644 --- a/be/src/io/fs/s3_file_system.cpp +++ b/be/src/io/fs/s3_file_system.cpp @@ -121,11 +121,11 @@ Status S3FileSystem::batch_upload(const std::vector& local_paths, return Status::OK(); } -Status S3FileSystem::create_file(const Path& path, std::unique_ptr* writer) { +Status S3FileSystem::create_file(const Path& path, FileWriterPtr* writer) { return Status::NotSupported("not support"); } -Status S3FileSystem::open_file(const Path& path, std::unique_ptr* reader) { +Status S3FileSystem::open_file(const Path& path, FileReaderSPtr* reader) { size_t fsize = 0; RETURN_IF_ERROR(file_size(path, &fsize)); auto key = get_key(path); diff --git a/be/src/io/fs/s3_file_system.h b/be/src/io/fs/s3_file_system.h index a0d724c7b4..fdfbe2f8ba 100644 --- a/be/src/io/fs/s3_file_system.h +++ b/be/src/io/fs/s3_file_system.h @@ -39,9 +39,9 @@ public: std::string prefix, ResourceId resource_id); ~S3FileSystem() override; - Status create_file(const Path& path, std::unique_ptr* writer) override; + Status create_file(const Path& path, FileWriterPtr* writer) override; - Status open_file(const Path& path, std::unique_ptr* reader) override; + Status open_file(const Path& path, FileReaderSPtr* reader) override; Status delete_file(const Path& path) override; diff --git a/be/src/olap/primary_key_index.cpp b/be/src/olap/primary_key_index.cpp index a8ef37a7fa..92911b3ed1 100644 --- a/be/src/olap/primary_key_index.cpp +++ b/be/src/olap/primary_key_index.cpp @@ -17,6 +17,7 @@ #include "olap/primary_key_index.h" +#include "io/fs/file_reader.h" #include "olap/rowset/segment_v2/encoding_info.h" namespace doris { @@ -62,15 +63,15 @@ Status PrimaryKeyIndexBuilder::finalize(segment_v2::PrimaryKeyIndexMetaPB* meta) return _bloom_filter_index_builder->finish(_file_writer, meta->mutable_bloom_filter_index()); } -Status PrimaryKeyIndexReader::parse(io::FileSystem* fs, const std::string& path, +Status PrimaryKeyIndexReader::parse(io::FileReaderSPtr file_reader, const segment_v2::PrimaryKeyIndexMetaPB& meta) { // parse primary key index - _index_reader.reset(new segment_v2::IndexedColumnReader(fs, path, meta.primary_key_index())); + _index_reader.reset(new segment_v2::IndexedColumnReader(file_reader, meta.primary_key_index())); RETURN_IF_ERROR(_index_reader->load(_use_page_cache, _kept_in_memory)); // parse bloom filter segment_v2::ColumnIndexMetaPB column_index_meta = meta.bloom_filter_index(); - segment_v2::BloomFilterIndexReader bf_index_reader(fs, path, + segment_v2::BloomFilterIndexReader bf_index_reader(std::move(file_reader), &column_index_meta.bloom_filter_index()); RETURN_IF_ERROR(bf_index_reader.load(_use_page_cache, _kept_in_memory)); std::unique_ptr bf_iter; diff --git a/be/src/olap/primary_key_index.h b/be/src/olap/primary_key_index.h index faa1a6cf80..2362a1f784 100644 --- a/be/src/olap/primary_key_index.h +++ b/be/src/olap/primary_key_index.h @@ -67,8 +67,7 @@ class PrimaryKeyIndexReader { public: PrimaryKeyIndexReader() : _parsed(false) {} - Status parse(io::FileSystem* fs, const std::string& path, - const segment_v2::PrimaryKeyIndexMetaPB& meta); + Status parse(io::FileReaderSPtr file_reader, const segment_v2::PrimaryKeyIndexMetaPB& meta); Status new_iterator(std::unique_ptr* index_iterator) const { DCHECK(_parsed); diff --git a/be/src/olap/rowset/beta_rowset_writer.cpp b/be/src/olap/rowset/beta_rowset_writer.cpp index 87a4ba8b4c..d9ea4f16eb 100644 --- a/be/src/olap/rowset/beta_rowset_writer.cpp +++ b/be/src/olap/rowset/beta_rowset_writer.cpp @@ -278,7 +278,7 @@ Status BetaRowsetWriter::_create_segment_writer( if (!fs) { return Status::OLAPInternalError(OLAP_ERR_INIT_FAILED); } - std::unique_ptr file_writer; + io::FileWriterPtr file_writer; Status st = fs->create_file(path, &file_writer); if (!st.ok()) { LOG(WARNING) << "failed to create writable file. path=" << path diff --git a/be/src/olap/rowset/beta_rowset_writer.h b/be/src/olap/rowset/beta_rowset_writer.h index bc0b91fdb6..a66981aef7 100644 --- a/be/src/olap/rowset/beta_rowset_writer.h +++ b/be/src/olap/rowset/beta_rowset_writer.h @@ -85,8 +85,7 @@ private: /// In other processes, such as merger or schema change, we will use this unified writer for data writing. std::unique_ptr _segment_writer; mutable SpinLock _lock; // lock to protect _wblocks. - // TODO(lingbin): it is better to wrapper in a Batch? - std::vector> _file_writers; + std::vector _file_writers; // counters and statistics maintained during data write std::atomic _num_rows_written; diff --git a/be/src/olap/rowset/segment_v2/bitmap_index_reader.cpp b/be/src/olap/rowset/segment_v2/bitmap_index_reader.cpp index cbaecedd78..eb108f8e8d 100644 --- a/be/src/olap/rowset/segment_v2/bitmap_index_reader.cpp +++ b/be/src/olap/rowset/segment_v2/bitmap_index_reader.cpp @@ -27,8 +27,8 @@ Status BitmapIndexReader::load(bool use_page_cache, bool kept_in_memory) { const IndexedColumnMetaPB& bitmap_meta = _bitmap_index_meta->bitmap_column(); _has_null = _bitmap_index_meta->has_null(); - _dict_column_reader.reset(new IndexedColumnReader(_fs, _path, dict_meta)); - _bitmap_column_reader.reset(new IndexedColumnReader(_fs, _path, bitmap_meta)); + _dict_column_reader.reset(new IndexedColumnReader(_file_reader, dict_meta)); + _bitmap_column_reader.reset(new IndexedColumnReader(_file_reader, bitmap_meta)); RETURN_IF_ERROR(_dict_column_reader->load(use_page_cache, kept_in_memory)); RETURN_IF_ERROR(_bitmap_column_reader->load(use_page_cache, kept_in_memory)); return Status::OK(); diff --git a/be/src/olap/rowset/segment_v2/bitmap_index_reader.h b/be/src/olap/rowset/segment_v2/bitmap_index_reader.h index 77231bc049..ecce8da235 100644 --- a/be/src/olap/rowset/segment_v2/bitmap_index_reader.h +++ b/be/src/olap/rowset/segment_v2/bitmap_index_reader.h @@ -21,7 +21,7 @@ #include "common/status.h" #include "gen_cpp/segment_v2.pb.h" -#include "io/fs/file_system.h" +#include "io/fs/file_reader.h" #include "olap/column_block.h" #include "olap/rowset/segment_v2/common.h" #include "olap/rowset/segment_v2/indexed_column_reader.h" @@ -39,10 +39,9 @@ class IndexedColumnIterator; class BitmapIndexReader { public: - explicit BitmapIndexReader(io::FileSystem* fs, const std::string& path, + explicit BitmapIndexReader(io::FileReaderSPtr file_reader, const BitmapIndexPB* bitmap_index_meta) - : _fs(fs), - _path(path), + : _file_reader(std::move(file_reader)), _type_info(get_scalar_type_info()), _bitmap_index_meta(bitmap_index_meta) {} @@ -58,8 +57,7 @@ public: private: friend class BitmapIndexIterator; - io::FileSystem* _fs; - std::string _path; + io::FileReaderSPtr _file_reader; const TypeInfo* _type_info; const BitmapIndexPB* _bitmap_index_meta; bool _has_null = false; diff --git a/be/src/olap/rowset/segment_v2/bloom_filter_index_reader.cpp b/be/src/olap/rowset/segment_v2/bloom_filter_index_reader.cpp index d7ad5bc2ab..f3779d032c 100644 --- a/be/src/olap/rowset/segment_v2/bloom_filter_index_reader.cpp +++ b/be/src/olap/rowset/segment_v2/bloom_filter_index_reader.cpp @@ -26,7 +26,7 @@ namespace segment_v2 { Status BloomFilterIndexReader::load(bool use_page_cache, bool kept_in_memory) { const IndexedColumnMetaPB& bf_index_meta = _bloom_filter_index_meta->bloom_filter(); - _bloom_filter_reader.reset(new IndexedColumnReader(_fs, _path, bf_index_meta)); + _bloom_filter_reader.reset(new IndexedColumnReader(_file_reader, bf_index_meta)); RETURN_IF_ERROR(_bloom_filter_reader->load(use_page_cache, kept_in_memory)); return Status::OK(); } diff --git a/be/src/olap/rowset/segment_v2/bloom_filter_index_reader.h b/be/src/olap/rowset/segment_v2/bloom_filter_index_reader.h index f01fd070ab..68b96a6044 100644 --- a/be/src/olap/rowset/segment_v2/bloom_filter_index_reader.h +++ b/be/src/olap/rowset/segment_v2/bloom_filter_index_reader.h @@ -22,7 +22,7 @@ #include "common/status.h" #include "gen_cpp/segment_v2.pb.h" -#include "io/fs/file_system.h" +#include "io/fs/file_reader.h" #include "olap/column_block.h" #include "olap/rowset/segment_v2/common.h" #include "olap/rowset/segment_v2/indexed_column_reader.h" @@ -42,10 +42,9 @@ class BloomFilter; class BloomFilterIndexReader { public: - explicit BloomFilterIndexReader(io::FileSystem* fs, const std::string& path, + explicit BloomFilterIndexReader(io::FileReaderSPtr file_reader, const BloomFilterIndexPB* bloom_filter_index_meta) - : _fs(fs), - _path(path), + : _file_reader(std::move(file_reader)), _type_info(get_scalar_type_info()), _bloom_filter_index_meta(bloom_filter_index_meta) {} @@ -59,8 +58,7 @@ public: private: friend class BloomFilterIndexIterator; - io::FileSystem* _fs; - std::string _path; + io::FileReaderSPtr _file_reader; const TypeInfo* _type_info; const BloomFilterIndexPB* _bloom_filter_index_meta; std::unique_ptr _bloom_filter_reader; diff --git a/be/src/olap/rowset/segment_v2/column_reader.cpp b/be/src/olap/rowset/segment_v2/column_reader.cpp index 57da4e3e41..71bd41f09f 100644 --- a/be/src/olap/rowset/segment_v2/column_reader.cpp +++ b/be/src/olap/rowset/segment_v2/column_reader.cpp @@ -17,7 +17,7 @@ #include "olap/rowset/segment_v2/column_reader.h" -#include "gutil/strings/substitute.h" // for Substitute +#include "io/fs/file_reader.h" #include "olap/column_block.h" // for ColumnBlockView #include "olap/rowset/segment_v2/binary_dict_page.h" // for BinaryDictPageDecoder #include "olap/rowset/segment_v2/bloom_filter_index_reader.h" @@ -36,14 +36,12 @@ namespace doris { namespace segment_v2 { -using strings::Substitute; - Status ColumnReader::create(const ColumnReaderOptions& opts, const ColumnMetaPB& meta, - uint64_t num_rows, io::FileSystem* fs, const std::string& path, + uint64_t num_rows, const io::FileReaderSPtr& file_reader, std::unique_ptr* reader) { if (is_scalar_type((FieldType)meta.type())) { std::unique_ptr reader_local( - new ColumnReader(opts, meta, num_rows, fs, path)); + new ColumnReader(opts, meta, num_rows, file_reader)); RETURN_IF_ERROR(reader_local->init()); *reader = std::move(reader_local); return Status::OK(); @@ -55,25 +53,25 @@ Status ColumnReader::create(const ColumnReaderOptions& opts, const ColumnMetaPB& std::unique_ptr item_reader; RETURN_IF_ERROR(ColumnReader::create(opts, meta.children_columns(0), - meta.children_columns(0).num_rows(), fs, path, + meta.children_columns(0).num_rows(), file_reader, &item_reader)); std::unique_ptr offset_reader; RETURN_IF_ERROR(ColumnReader::create(opts, meta.children_columns(1), - meta.children_columns(1).num_rows(), fs, path, + meta.children_columns(1).num_rows(), file_reader, &offset_reader)); std::unique_ptr null_reader; if (meta.is_nullable()) { RETURN_IF_ERROR(ColumnReader::create(opts, meta.children_columns(2), - meta.children_columns(2).num_rows(), fs, path, - &null_reader)); + meta.children_columns(2).num_rows(), + file_reader, &null_reader)); } // The num rows of the array reader equals to the num rows of the length reader. num_rows = meta.children_columns(1).num_rows(); std::unique_ptr array_reader( - new ColumnReader(opts, meta, num_rows, fs, path)); + new ColumnReader(opts, meta, num_rows, file_reader)); // array reader do not need to init array_reader->_sub_readers.resize(meta.children_columns_size()); array_reader->_sub_readers[0] = std::move(item_reader); @@ -92,12 +90,11 @@ Status ColumnReader::create(const ColumnReaderOptions& opts, const ColumnMetaPB& } ColumnReader::ColumnReader(const ColumnReaderOptions& opts, const ColumnMetaPB& meta, - uint64_t num_rows, io::FileSystem* fs, const std::string& path) + uint64_t num_rows, io::FileReaderSPtr file_reader) : _meta(meta), _opts(opts), _num_rows(num_rows), - _fs(fs), - _path(path), + _file_reader(std::move(file_reader)), _dict_encoding_type(UNKNOWN_DICT_ENCODING) {} ColumnReader::~ColumnReader() = default; @@ -125,15 +122,15 @@ Status ColumnReader::init() { _bf_index_meta = &index_meta.bloom_filter_index(); break; default: - return Status::Corruption("Bad file {}: invalid column index type {}", _path, - index_meta.type()); + return Status::Corruption("Bad file {}: invalid column index type {}", + _file_reader->path().native(), index_meta.type()); } } // ArrayColumnWriter writes a single empty array and flushes. In this scenario, // the item writer doesn't write any data and the corresponding ordinal index is empty. if (_ordinal_index_meta == nullptr && !is_empty()) { - return Status::Corruption("Bad file {}: missing ordinal index for column {}", _path, - _meta.column_id()); + return Status::Corruption("Bad file {}: missing ordinal index for column {}", + _file_reader->path().native(), _meta.column_id()); } return Status::OK(); } @@ -298,13 +295,13 @@ Status ColumnReader::get_row_ranges_by_bloom_filter(CondColumn* cond_column, Status ColumnReader::_load_ordinal_index(bool use_page_cache, bool kept_in_memory) { DCHECK(_ordinal_index_meta != nullptr); - _ordinal_index.reset(new OrdinalIndexReader(_fs, _path, _ordinal_index_meta, _num_rows)); + _ordinal_index.reset(new OrdinalIndexReader(_file_reader, _ordinal_index_meta, _num_rows)); return _ordinal_index->load(use_page_cache, kept_in_memory); } Status ColumnReader::_load_zone_map_index(bool use_page_cache, bool kept_in_memory) { if (_zone_map_index_meta != nullptr) { - _zone_map_index.reset(new ZoneMapIndexReader(_fs, _path, _zone_map_index_meta)); + _zone_map_index.reset(new ZoneMapIndexReader(_file_reader, _zone_map_index_meta)); return _zone_map_index->load(use_page_cache, kept_in_memory); } return Status::OK(); @@ -312,7 +309,7 @@ Status ColumnReader::_load_zone_map_index(bool use_page_cache, bool kept_in_memo Status ColumnReader::_load_bitmap_index(bool use_page_cache, bool kept_in_memory) { if (_bitmap_index_meta != nullptr) { - _bitmap_index.reset(new BitmapIndexReader(_fs, _path, _bitmap_index_meta)); + _bitmap_index.reset(new BitmapIndexReader(_file_reader, _bitmap_index_meta)); return _bitmap_index->load(use_page_cache, kept_in_memory); } return Status::OK(); @@ -320,7 +317,7 @@ Status ColumnReader::_load_bitmap_index(bool use_page_cache, bool kept_in_memory Status ColumnReader::_load_bloom_filter_index(bool use_page_cache, bool kept_in_memory) { if (_bf_index_meta != nullptr) { - _bloom_filter_index.reset(new BloomFilterIndexReader(_fs, _path, _bf_index_meta)); + _bloom_filter_index.reset(new BloomFilterIndexReader(_file_reader, _bf_index_meta)); return _bloom_filter_index->load(use_page_cache, kept_in_memory); } return Status::OK(); diff --git a/be/src/olap/rowset/segment_v2/column_reader.h b/be/src/olap/rowset/segment_v2/column_reader.h index 83f1c699cf..efa742200e 100644 --- a/be/src/olap/rowset/segment_v2/column_reader.h +++ b/be/src/olap/rowset/segment_v2/column_reader.h @@ -24,7 +24,7 @@ #include "common/logging.h" #include "common/status.h" // for Status #include "gen_cpp/segment_v2.pb.h" // for ColumnMetaPB -#include "io/fs/file_system.h" +#include "io/fs/file_reader.h" #include "olap/olap_cond.h" // for CondColumn #include "olap/rowset/segment_v2/bitmap_index_reader.h" // for BitmapIndexReader #include "olap/rowset/segment_v2/common.h" @@ -88,7 +88,7 @@ public: // Create an initialized ColumnReader in *reader. // This should be a lightweight operation without I/O. static Status create(const ColumnReaderOptions& opts, const ColumnMetaPB& meta, - uint64_t num_rows, io::FileSystem* fs, const std::string& path, + uint64_t num_rows, const io::FileReaderSPtr& file_reader, std::unique_ptr* reader); enum DictEncodingType { UNKNOWN_DICT_ENCODING, PARTIAL_DICT_ENCODING, ALL_DICT_ENCODING }; @@ -147,7 +147,7 @@ public: private: ColumnReader(const ColumnReaderOptions& opts, const ColumnMetaPB& meta, uint64_t num_rows, - io::FileSystem* fs, const std::string& path); + io::FileReaderSPtr file_reader); Status init(); // Read and load necessary column indexes into memory if it hasn't been loaded. @@ -184,8 +184,7 @@ private: ColumnReaderOptions _opts; uint64_t _num_rows; - io::FileSystem* _fs; - std::string _path; + io::FileReaderSPtr _file_reader; DictEncodingType _dict_encoding_type; diff --git a/be/src/olap/rowset/segment_v2/indexed_column_reader.cpp b/be/src/olap/rowset/segment_v2/indexed_column_reader.cpp index 3f254fb17f..d1b39fb031 100644 --- a/be/src/olap/rowset/segment_v2/indexed_column_reader.cpp +++ b/be/src/olap/rowset/segment_v2/indexed_column_reader.cpp @@ -40,16 +40,12 @@ Status IndexedColumnReader::load(bool use_page_cache, bool kept_in_memory) { RETURN_IF_ERROR(EncodingInfo::get(_type_info, _meta.encoding(), &_encoding_info)); _value_key_coder = get_key_coder(_type_info->type()); - std::unique_ptr file_reader; - RETURN_IF_ERROR(_fs->open_file(_path, &file_reader)); - // read and parse ordinal index page when exists if (_meta.has_ordinal_index_meta()) { if (_meta.ordinal_index_meta().is_root_data_page()) { _sole_data_page = PagePointer(_meta.ordinal_index_meta().root_page()); } else { - RETURN_IF_ERROR(load_index_page(file_reader.get(), - _meta.ordinal_index_meta().root_page(), + RETURN_IF_ERROR(load_index_page(_meta.ordinal_index_meta().root_page(), &_ordinal_index_page_handle, &_ordinal_index_reader)); _has_index_page = true; } @@ -60,7 +56,7 @@ Status IndexedColumnReader::load(bool use_page_cache, bool kept_in_memory) { if (_meta.value_index_meta().is_root_data_page()) { _sole_data_page = PagePointer(_meta.value_index_meta().root_page()); } else { - RETURN_IF_ERROR(load_index_page(file_reader.get(), _meta.value_index_meta().root_page(), + RETURN_IF_ERROR(load_index_page(_meta.value_index_meta().root_page(), &_value_index_page_handle, &_value_index_reader)); _has_index_page = true; } @@ -69,23 +65,23 @@ Status IndexedColumnReader::load(bool use_page_cache, bool kept_in_memory) { return Status::OK(); } -Status IndexedColumnReader::load_index_page(io::FileReader* file_reader, const PagePointerPB& pp, - PageHandle* handle, IndexPageReader* reader) { +Status IndexedColumnReader::load_index_page(const PagePointerPB& pp, PageHandle* handle, + IndexPageReader* reader) { Slice body; PageFooterPB footer; std::unique_ptr local_compress_codec; RETURN_IF_ERROR(get_block_compression_codec(_meta.compression(), local_compress_codec)); - RETURN_IF_ERROR(read_page(file_reader, PagePointer(pp), handle, &body, &footer, INDEX_PAGE, + RETURN_IF_ERROR(read_page(PagePointer(pp), handle, &body, &footer, INDEX_PAGE, local_compress_codec.get())); RETURN_IF_ERROR(reader->parse(body, footer.index_page_footer())); return Status::OK(); } -Status IndexedColumnReader::read_page(io::FileReader* file_reader, const PagePointer& pp, - PageHandle* handle, Slice* body, PageFooterPB* footer, - PageTypePB type, BlockCompressionCodec* codec) const { +Status IndexedColumnReader::read_page(const PagePointer& pp, PageHandle* handle, Slice* body, + PageFooterPB* footer, PageTypePB type, + BlockCompressionCodec* codec) const { PageReadOptions opts; - opts.file_reader = file_reader; + opts.file_reader = _file_reader.get(); opts.page_pointer = pp; opts.codec = codec; OlapReaderStatistics tmp_stats; @@ -109,8 +105,8 @@ Status IndexedColumnIterator::_read_data_page(const PagePointer& pp) { PageHandle handle; Slice body; PageFooterPB footer; - RETURN_IF_ERROR(_reader->read_page(_file_reader.get(), pp, &handle, &body, &footer, DATA_PAGE, - _compress_codec.get())); + RETURN_IF_ERROR( + _reader->read_page(pp, &handle, &body, &footer, DATA_PAGE, _compress_codec.get())); // parse data page // note that page_index is not used in IndexedColumnIterator, so we pass 0 PageDecoderOptions opts; diff --git a/be/src/olap/rowset/segment_v2/indexed_column_reader.h b/be/src/olap/rowset/segment_v2/indexed_column_reader.h index 8544cf970b..1346aecbdf 100644 --- a/be/src/olap/rowset/segment_v2/indexed_column_reader.h +++ b/be/src/olap/rowset/segment_v2/indexed_column_reader.h @@ -48,16 +48,14 @@ class IndexedColumnIterator; // thread-safe reader for IndexedColumn (see comments of `IndexedColumnWriter` to understand what IndexedColumn is) class IndexedColumnReader { public: - explicit IndexedColumnReader(io::FileSystem* fs, const std::string& path, - const IndexedColumnMetaPB& meta) - : _fs(fs), _path(path), _meta(meta) {}; + explicit IndexedColumnReader(io::FileReaderSPtr file_reader, const IndexedColumnMetaPB& meta) + : _file_reader(std::move(file_reader)), _meta(meta) {}; Status load(bool use_page_cache, bool kept_in_memory); // read a page specified by `pp' from `file' into `handle' - Status read_page(io::FileReader* file_reader, const PagePointer& pp, PageHandle* handle, - Slice* body, PageFooterPB* footer, PageTypePB type, - BlockCompressionCodec* codec) const; + Status read_page(const PagePointer& pp, PageHandle* handle, Slice* body, PageFooterPB* footer, + PageTypePB type, BlockCompressionCodec* codec) const; int64_t num_values() const { return _num_values; } const EncodingInfo* encoding_info() const { return _encoding_info; } @@ -68,13 +66,11 @@ public: CompressionTypePB get_compression() const { return _meta.compression(); } private: - Status load_index_page(io::FileReader* file_reader, const PagePointerPB& pp, PageHandle* handle, - IndexPageReader* reader); + Status load_index_page(const PagePointerPB& pp, PageHandle* handle, IndexPageReader* reader); friend class IndexedColumnIterator; - io::FileSystem* _fs; - std::string _path; + io::FileReaderSPtr _file_reader; IndexedColumnMetaPB _meta; bool _use_page_cache; @@ -100,13 +96,7 @@ public: explicit IndexedColumnIterator(const IndexedColumnReader* reader) : _reader(reader), _ordinal_iter(&reader->_ordinal_index_reader), - _value_iter(&reader->_value_index_reader) { - io::FileSystem* fs = _reader->_fs; - auto st = fs->open_file(_reader->_path, &_file_reader); - - DCHECK(st.ok()); - WARN_IF_ERROR(st, "open file failed:" + _reader->_path); - } + _value_iter(&reader->_value_index_reader) {} // Seek to the given ordinal entry. Entry 0 is the first entry. // Return NotFound if provided seek point is past the end. @@ -151,8 +141,6 @@ private: ParsedPage _data_page; // next_batch() will read from this position ordinal_t _current_ordinal = 0; - // open file handle - std::unique_ptr _file_reader; // iterator owned compress codec, should NOT be shared by threads, initialized before used std::unique_ptr _compress_codec; }; diff --git a/be/src/olap/rowset/segment_v2/ordinal_page_index.cpp b/be/src/olap/rowset/segment_v2/ordinal_page_index.cpp index ca3fe9cbf4..06f289f8e0 100644 --- a/be/src/olap/rowset/segment_v2/ordinal_page_index.cpp +++ b/be/src/olap/rowset/segment_v2/ordinal_page_index.cpp @@ -72,11 +72,8 @@ Status OrdinalIndexReader::load(bool use_page_cache, bool kept_in_memory) { return Status::OK(); } // need to read index page - std::unique_ptr file_reader; - RETURN_IF_ERROR(_fs->open_file(_path, &file_reader)); - PageReadOptions opts; - opts.file_reader = file_reader.get(); + opts.file_reader = _file_reader.get(); opts.page_pointer = PagePointer(_index_meta->root_page().root_page()); opts.codec = nullptr; // ordinal index page uses NO_COMPRESSION right now OlapReaderStatistics tmp_stats; diff --git a/be/src/olap/rowset/segment_v2/ordinal_page_index.h b/be/src/olap/rowset/segment_v2/ordinal_page_index.h index 81419a08d6..76df60fe2f 100644 --- a/be/src/olap/rowset/segment_v2/ordinal_page_index.h +++ b/be/src/olap/rowset/segment_v2/ordinal_page_index.h @@ -24,7 +24,7 @@ #include "common/status.h" #include "env/env.h" #include "gutil/macros.h" -#include "io/fs/file_system.h" +#include "io/fs/file_reader.h" #include "olap/rowset/segment_v2/common.h" #include "olap/rowset/segment_v2/index_page.h" #include "olap/rowset/segment_v2/page_pointer.h" @@ -63,9 +63,11 @@ class OrdinalPageIndexIterator; class OrdinalIndexReader { public: - explicit OrdinalIndexReader(io::FileSystem* fs, const std::string& path, - const OrdinalIndexPB* index_meta, ordinal_t num_values) - : _fs(fs), _path(path), _index_meta(index_meta), _num_values(num_values) {} + explicit OrdinalIndexReader(io::FileReaderSPtr file_reader, const OrdinalIndexPB* index_meta, + ordinal_t num_values) + : _file_reader(std::move(file_reader)), + _index_meta(index_meta), + _num_values(num_values) {} // load and parse the index page into memory Status load(bool use_page_cache, bool kept_in_memory); @@ -88,8 +90,7 @@ public: private: friend OrdinalPageIndexIterator; - io::FileSystem* _fs; - std::string _path; + io::FileReaderSPtr _file_reader; const OrdinalIndexPB* _index_meta; // total number of values (including NULLs) in the indexed column, // equals to 1 + 'last ordinal of last data pages' diff --git a/be/src/olap/rowset/segment_v2/page_io.h b/be/src/olap/rowset/segment_v2/page_io.h index 65f2c51388..7b8bee7358 100644 --- a/be/src/olap/rowset/segment_v2/page_io.h +++ b/be/src/olap/rowset/segment_v2/page_io.h @@ -46,7 +46,7 @@ namespace segment_v2 { struct PageReadOptions { // block to read page - doris::io::FileReader* file_reader = nullptr; + io::FileReader* file_reader = nullptr; // location of the page PagePointer page_pointer; // decompressor for page body (null means page body is not compressed) diff --git a/be/src/olap/rowset/segment_v2/segment.cpp b/be/src/olap/rowset/segment_v2/segment.cpp index c95c28e41d..5452d1c35e 100644 --- a/be/src/olap/rowset/segment_v2/segment.cpp +++ b/be/src/olap/rowset/segment_v2/segment.cpp @@ -17,10 +17,12 @@ #include "olap/rowset/segment_v2/segment.h" +#include #include #include "common/logging.h" // LOG #include "gutil/strings/substitute.h" +#include "io/fs/file_reader.h" #include "olap/fs/fs_util.h" #include "olap/rowset/segment_v2/column_reader.h" // ColumnReader #include "olap/rowset/segment_v2/empty_segment_iterator.h" @@ -39,15 +41,17 @@ using strings::Substitute; Status Segment::open(io::FileSystem* fs, const std::string& path, uint32_t segment_id, const TabletSchema* tablet_schema, std::shared_ptr* output) { - std::shared_ptr segment(new Segment(fs, path, segment_id, tablet_schema)); + std::shared_ptr segment(new Segment(segment_id, tablet_schema)); + io::FileReaderSPtr file_reader; + RETURN_IF_ERROR(fs->open_file(path, &file_reader)); + segment->_file_reader = std::move(file_reader); RETURN_IF_ERROR(segment->_open()); - output->swap(segment); + *output = std::move(segment); return Status::OK(); } -Segment::Segment(io::FileSystem* fs, const std::string& path, uint32_t segment_id, - const TabletSchema* tablet_schema) - : _fs(fs), _path(path), _segment_id(segment_id), _tablet_schema(*tablet_schema) { +Segment::Segment(uint32_t segment_id, const TabletSchema* tablet_schema) + : _segment_id(segment_id), _tablet_schema(*tablet_schema) { #ifndef BE_TEST _mem_tracker = StorageEngine::instance()->tablet_mem_tracker(); #else @@ -62,15 +66,11 @@ Segment::~Segment() { Status Segment::_open() { RETURN_IF_ERROR(_parse_footer()); RETURN_IF_ERROR(_create_column_readers()); - _is_open = true; return Status::OK(); } Status Segment::new_iterator(const Schema& schema, const StorageReadOptions& read_options, std::unique_ptr* iter) { - if (!_is_open) { - RETURN_IF_ERROR(_open()); - } read_options.stats->total_segment_number++; // trying to prune the current segment by segment-level zone map if (read_options.conditions != nullptr) { @@ -97,35 +97,34 @@ Status Segment::new_iterator(const Schema& schema, const StorageReadOptions& rea Status Segment::_parse_footer() { // Footer := SegmentFooterPB, FooterPBSize(4), FooterPBChecksum(4), MagicNumber(4) - std::unique_ptr file_reader; - RETURN_IF_ERROR(_fs->open_file(_path, &file_reader)); - - auto file_size = file_reader->size(); + auto file_size = _file_reader->size(); if (file_size < 12) { - return Status::Corruption("Bad segment file {}: file size {} < 12", _path, file_size); + return Status::Corruption("Bad segment file {}: file size {} < 12", + _file_reader->path().native(), file_size); } uint8_t fixed_buf[12]; size_t bytes_read = 0; - RETURN_IF_ERROR(file_reader->read_at(file_size - 12, Slice(fixed_buf, 12), &bytes_read)); + RETURN_IF_ERROR(_file_reader->read_at(file_size - 12, Slice(fixed_buf, 12), &bytes_read)); DCHECK_EQ(bytes_read, 12); // validate magic number if (memcmp(fixed_buf + 8, k_segment_magic, k_segment_magic_length) != 0) { - return Status::Corruption("Bad segment file {}: magic number not match", _path); + return Status::Corruption("Bad segment file {}: magic number not match", + _file_reader->path().native()); } // read footer PB uint32_t footer_length = decode_fixed32_le(fixed_buf); if (file_size < 12 + footer_length) { - return Status::Corruption("Bad segment file {}: file size {} < {}", _path, file_size, - 12 + footer_length); + return Status::Corruption("Bad segment file {}: file size {} < {}", + _file_reader->path().native(), file_size, 12 + footer_length); } _mem_tracker->consume(footer_length); std::string footer_buf; footer_buf.resize(footer_length); - RETURN_IF_ERROR(file_reader->read_at(file_size - 12 - footer_length, footer_buf, &bytes_read)); + RETURN_IF_ERROR(_file_reader->read_at(file_size - 12 - footer_length, footer_buf, &bytes_read)); DCHECK_EQ(bytes_read, footer_length); // validate footer PB's checksum @@ -133,13 +132,14 @@ Status Segment::_parse_footer() { uint32_t actual_checksum = crc32c::Value(footer_buf.data(), footer_buf.size()); if (actual_checksum != expect_checksum) { return Status::Corruption( - "Bad segment file {}: footer checksum not match, actual={} vs expect={}", _path, - actual_checksum, expect_checksum); + "Bad segment file {}: footer checksum not match, actual={} vs expect={}", + _file_reader->path().native(), actual_checksum, expect_checksum); } // deserialize footer PB if (!_footer.ParseFromString(footer_buf)) { - return Status::Corruption("Bad segment file {}: failed to parse SegmentFooterPB", _path); + return Status::Corruption("Bad segment file {}: failed to parse SegmentFooterPB", + _file_reader->path().native()); } return Status::OK(); } @@ -147,12 +147,8 @@ Status Segment::_parse_footer() { Status Segment::_load_index() { return _load_index_once.call([this] { // read and parse short key index page - - std::unique_ptr file_reader; - RETURN_IF_ERROR(_fs->open_file(_path, &file_reader)); - PageReadOptions opts; - opts.file_reader = file_reader.get(); + opts.file_reader = _file_reader.get(); opts.page_pointer = PagePointer(_footer.short_key_index_page()); opts.codec = nullptr; // short key index page uses NO_COMPRESSION for now OlapReaderStatistics tmp_stats; @@ -189,7 +185,7 @@ Status Segment::_create_column_readers() { opts.kept_in_memory = _tablet_schema.is_in_memory(); std::unique_ptr reader; RETURN_IF_ERROR(ColumnReader::create(opts, _footer.columns(iter->second), - _footer.num_rows(), _fs, _path, &reader)); + _footer.num_rows(), _file_reader, &reader)); _column_readers[ordinal] = std::move(reader); } return Status::OK(); diff --git a/be/src/olap/rowset/segment_v2/segment.h b/be/src/olap/rowset/segment_v2/segment.h index 64f8582936..9b8f2c8b7b 100644 --- a/be/src/olap/rowset/segment_v2/segment.h +++ b/be/src/olap/rowset/segment_v2/segment.h @@ -107,8 +107,7 @@ public: private: DISALLOW_COPY_AND_ASSIGN(Segment); - Segment(io::FileSystem* fs, const std::string& path, uint32_t segment_id, - const TabletSchema* tablet_schema); + Segment(uint32_t segment_id, const TabletSchema* tablet_schema); // open segment file and read the minimum amount of necessary information (footer) Status _open(); Status _parse_footer(); @@ -119,8 +118,7 @@ private: private: friend class SegmentIterator; - io::FileSystem* _fs; - std::string _path; + io::FileReaderSPtr _file_reader; uint32_t _segment_id; TabletSchema _tablet_schema; @@ -146,9 +144,6 @@ private: PageHandle _sk_index_handle; // short key index decoder std::unique_ptr _sk_index_decoder; - // segment footer need not to be read for remote storage, so _is_open is false. When remote file - // need to be read. footer will be read and _is_open will be set to true. - bool _is_open = false; }; } // namespace segment_v2 diff --git a/be/src/olap/rowset/segment_v2/segment_iterator.cpp b/be/src/olap/rowset/segment_v2/segment_iterator.cpp index 921c37e575..ed399926f9 100644 --- a/be/src/olap/rowset/segment_v2/segment_iterator.cpp +++ b/be/src/olap/rowset/segment_v2/segment_iterator.cpp @@ -136,8 +136,7 @@ Status SegmentIterator::_init(bool is_vec) { SCOPED_RAW_TIMER(&_opts.stats->block_init_ns); DorisMetrics::instance()->segment_read_total->increment(1); // get file handle from file descriptor of segment - auto fs = _segment->_fs; - RETURN_IF_ERROR(fs->open_file(_segment->_path, &_file_reader)); + _file_reader = _segment->_file_reader; _row_bitmap.addRange(0, _segment->num_rows()); RETURN_IF_ERROR(_init_return_column_iterators()); diff --git a/be/src/olap/rowset/segment_v2/segment_iterator.h b/be/src/olap/rowset/segment_v2/segment_iterator.h index 195307d9f5..85b0d26345 100644 --- a/be/src/olap/rowset/segment_v2/segment_iterator.h +++ b/be/src/olap/rowset/segment_v2/segment_iterator.h @@ -23,6 +23,7 @@ #include "common/status.h" #include "io/fs/file_reader.h" +#include "io/fs/file_system.h" #include "olap/olap_common.h" #include "olap/olap_cond.h" #include "olap/rowset/segment_v2/common.h" @@ -205,8 +206,7 @@ private: // only used in `_get_row_ranges_by_keys` std::unique_ptr _seek_block; - // block for file to read - std::unique_ptr _file_reader; + io::FileReaderSPtr _file_reader; // char_type columns cid std::vector _char_type_idx; diff --git a/be/src/olap/rowset/segment_v2/zone_map_index.cpp b/be/src/olap/rowset/segment_v2/zone_map_index.cpp index 1cb14988cf..7d59dc56e9 100644 --- a/be/src/olap/rowset/segment_v2/zone_map_index.cpp +++ b/be/src/olap/rowset/segment_v2/zone_map_index.cpp @@ -29,7 +29,7 @@ namespace doris { namespace segment_v2 { -ZoneMapIndexWriter::ZoneMapIndexWriter(Field* field) : _field(field), _pool() { +ZoneMapIndexWriter::ZoneMapIndexWriter(Field* field) : _field(field) { _page_zone_map.min_value = _field->allocate_zone_map_value(&_pool); _page_zone_map.max_value = _field->allocate_zone_map_value(&_pool); _reset_zone_map(&_page_zone_map); @@ -122,7 +122,7 @@ Status ZoneMapIndexWriter::finish(io::FileWriter* file_writer, ColumnIndexMetaPB } Status ZoneMapIndexReader::load(bool use_page_cache, bool kept_in_memory) { - IndexedColumnReader reader(_fs, _path, _index_meta->page_zone_maps()); + IndexedColumnReader reader(_file_reader, _index_meta->page_zone_maps()); RETURN_IF_ERROR(reader.load(use_page_cache, kept_in_memory)); IndexedColumnIterator iter(&reader); diff --git a/be/src/olap/rowset/segment_v2/zone_map_index.h b/be/src/olap/rowset/segment_v2/zone_map_index.h index 5de7e4804b..9c8750512c 100644 --- a/be/src/olap/rowset/segment_v2/zone_map_index.h +++ b/be/src/olap/rowset/segment_v2/zone_map_index.h @@ -24,7 +24,7 @@ #include "common/status.h" #include "env/env.h" #include "gen_cpp/segment_v2.pb.h" -#include "io/fs/file_system.h" +#include "io/fs/file_reader.h" #include "olap/field.h" #include "olap/rowset/segment_v2/binary_plain_page.h" #include "runtime/mem_pool.h" @@ -118,9 +118,8 @@ private: class ZoneMapIndexReader { public: - explicit ZoneMapIndexReader(io::FileSystem* fs, const std::string& path, - const ZoneMapIndexPB* index_meta) - : _fs(fs), _path(path), _index_meta(index_meta) {} + explicit ZoneMapIndexReader(io::FileReaderSPtr file_reader, const ZoneMapIndexPB* index_meta) + : _file_reader(std::move(file_reader)), _index_meta(index_meta) {} // load all page zone maps into memory Status load(bool use_page_cache, bool kept_in_memory); @@ -130,8 +129,7 @@ public: int32_t num_pages() const { return _page_zone_maps.size(); } private: - io::FileSystem* _fs; - std::string _path; + io::FileReaderSPtr _file_reader; const ZoneMapIndexPB* _index_meta; std::vector _page_zone_maps; diff --git a/be/test/olap/primary_key_index_test.cpp b/be/test/olap/primary_key_index_test.cpp index ae0a701ef4..8345e0c603 100644 --- a/be/test/olap/primary_key_index_test.cpp +++ b/be/test/olap/primary_key_index_test.cpp @@ -52,7 +52,7 @@ private: TEST_F(PrimaryKeyIndexTest, builder) { std::string filename = kTestDir + "/builder"; - std::unique_ptr file_writer; + io::FileWriterPtr file_writer; auto fs = io::global_local_filesystem(); EXPECT_TRUE(fs->create_file(filename, &file_writer).ok()); @@ -73,7 +73,9 @@ TEST_F(PrimaryKeyIndexTest, builder) { EXPECT_EQ(num_rows, builder.num_rows()); PrimaryKeyIndexReader index_reader; - EXPECT_TRUE(index_reader.parse(fs, filename, index_meta).ok()); + io::FileReaderSPtr file_reader; + EXPECT_TRUE(fs->open_file(filename, &file_reader).ok()); + EXPECT_TRUE(index_reader.parse(file_reader, index_meta).ok()); EXPECT_EQ(num_rows, index_reader.num_rows()); std::unique_ptr index_iterator; diff --git a/be/test/olap/rowset/segment_v2/bitmap_index_test.cpp b/be/test/olap/rowset/segment_v2/bitmap_index_test.cpp index e335034c91..3d2d23abed 100644 --- a/be/test/olap/rowset/segment_v2/bitmap_index_test.cpp +++ b/be/test/olap/rowset/segment_v2/bitmap_index_test.cpp @@ -22,6 +22,7 @@ #include "common/logging.h" #include "env/env.h" +#include "io/fs/file_reader.h" #include "io/fs/file_system.h" #include "io/fs/file_writer.h" #include "io/fs/local_file_system.h" @@ -59,7 +60,7 @@ void write_index_file(const std::string& filename, io::FileSystem* fs, const voi size_t value_count, size_t null_count, ColumnIndexMetaPB* meta) { const auto* type_info = get_scalar_type_info(); { - std::unique_ptr file_writer; + io::FileWriterPtr file_writer; EXPECT_TRUE(fs->create_file(filename, &file_writer).ok()); std::unique_ptr writer; @@ -75,7 +76,9 @@ void write_index_file(const std::string& filename, io::FileSystem* fs, const voi template void get_bitmap_reader_iter(const std::string& file_name, const ColumnIndexMetaPB& meta, BitmapIndexReader** reader, BitmapIndexIterator** iter) { - *reader = new BitmapIndexReader(io::global_local_filesystem(), file_name, &meta.bitmap_index()); + io::FileReaderSPtr file_reader; + ASSERT_EQ(io::global_local_filesystem()->open_file(file_name, &file_reader), Status::OK()); + *reader = new BitmapIndexReader(std::move(file_reader), &meta.bitmap_index()); auto st = (*reader)->load(true, false); EXPECT_TRUE(st.ok()); @@ -95,7 +98,6 @@ TEST_F(BitmapIndexTest, test_invert) { write_index_file(file_name, io::global_local_filesystem(), val, num_uint8_rows, 0, &meta); { - std::unique_ptr rfile; BitmapIndexReader* reader = nullptr; BitmapIndexIterator* iter = nullptr; get_bitmap_reader_iter(file_name, meta, &reader, &iter); diff --git a/be/test/olap/rowset/segment_v2/bloom_filter_index_reader_writer_test.cpp b/be/test/olap/rowset/segment_v2/bloom_filter_index_reader_writer_test.cpp index e1b8780d2f..fd5338fcd1 100644 --- a/be/test/olap/rowset/segment_v2/bloom_filter_index_reader_writer_test.cpp +++ b/be/test/olap/rowset/segment_v2/bloom_filter_index_reader_writer_test.cpp @@ -61,7 +61,7 @@ void write_bloom_filter_index_file(const std::string& file_name, const void* val std::string fname = dname + "/" + file_name; auto fs = io::global_local_filesystem(); { - std::unique_ptr file_writer; + io::FileWriterPtr file_writer; Status st = fs->create_file(fname, &file_writer); EXPECT_TRUE(st.ok()) << st.to_string(); @@ -89,12 +89,12 @@ void write_bloom_filter_index_file(const std::string& file_name, const void* val } void get_bloom_filter_reader_iter(const std::string& file_name, const ColumnIndexMetaPB& meta, - std::unique_ptr* rfile, BloomFilterIndexReader** reader, std::unique_ptr* iter) { std::string fname = dname + "/" + file_name; - auto fs = io::global_local_filesystem(); - *reader = new BloomFilterIndexReader(fs, fname, &meta.bloom_filter_index()); + io::FileReaderSPtr file_reader; + ASSERT_EQ(io::global_local_filesystem()->open_file(fname, &file_reader), Status::OK()); + *reader = new BloomFilterIndexReader(std::move(file_reader), &meta.bloom_filter_index()); auto st = (*reader)->load(true, false); EXPECT_TRUE(st.ok()); @@ -111,10 +111,9 @@ void test_bloom_filter_index_reader_writer_template( ColumnIndexMetaPB meta; write_bloom_filter_index_file(file_name, val, num, null_num, &meta); { - std::unique_ptr rfile; BloomFilterIndexReader* reader = nullptr; std::unique_ptr iter; - get_bloom_filter_reader_iter(file_name, meta, &rfile, &reader, &iter); + get_bloom_filter_reader_iter(file_name, meta, &reader, &iter); // page 0 std::unique_ptr bf; diff --git a/be/test/olap/rowset/segment_v2/column_reader_writer_test.cpp b/be/test/olap/rowset/segment_v2/column_reader_writer_test.cpp index 85b77325e4..dc492cc39c 100644 --- a/be/test/olap/rowset/segment_v2/column_reader_writer_test.cpp +++ b/be/test/olap/rowset/segment_v2/column_reader_writer_test.cpp @@ -87,7 +87,7 @@ void test_nullable_data(uint8_t* src_data, uint8_t* src_is_null, int num_rows, std::string fname = TEST_DIR + "/" + test_name; auto fs = io::global_local_filesystem(); { - std::unique_ptr file_writer; + io::FileWriterPtr file_writer; Status st = fs->create_file(fname, &file_writer); EXPECT_TRUE(st.ok()) << st.get_error_msg(); @@ -131,22 +131,21 @@ void test_nullable_data(uint8_t* src_data, uint8_t* src_is_null, int num_rows, EXPECT_TRUE(file_writer->close().ok()); } auto type_info = get_scalar_type_info(type); + io::FileReaderSPtr file_reader; + ASSERT_EQ(fs->open_file(fname, &file_reader), Status::OK()); // read and check { // sequence read { ColumnReaderOptions reader_opts; std::unique_ptr reader; - auto st = ColumnReader::create(reader_opts, meta, num_rows, fs, fname, &reader); + auto st = ColumnReader::create(reader_opts, meta, num_rows, file_reader, &reader); EXPECT_TRUE(st.ok()); ColumnIterator* iter = nullptr; st = reader->new_iterator(&iter); EXPECT_TRUE(st.ok()); - std::unique_ptr file_reader; - st = fs->open_file(fname, &file_reader); - EXPECT_TRUE(st.ok()); ColumnIteratorOptions iter_opts; OlapReaderStatistics stats; iter_opts.stats = &stats; @@ -194,21 +193,18 @@ void test_nullable_data(uint8_t* src_data, uint8_t* src_is_null, int num_rows, { ColumnReaderOptions reader_opts; std::unique_ptr reader; - auto st = ColumnReader::create(reader_opts, meta, num_rows, fs, fname, &reader); + auto st = ColumnReader::create(reader_opts, meta, num_rows, file_reader, &reader); EXPECT_TRUE(st.ok()); ColumnIterator* iter = nullptr; st = reader->new_iterator(&iter); EXPECT_TRUE(st.ok()); - std::unique_ptr rblock; - st = fs->open_file(fname, &rblock); - EXPECT_TRUE(st.ok()); ColumnIteratorOptions iter_opts; OlapReaderStatistics stats; iter_opts.stats = &stats; - iter_opts.file_reader = rblock.get(); + iter_opts.file_reader = file_reader.get(); st = iter->init(iter_opts); EXPECT_TRUE(st.ok()); @@ -266,7 +262,7 @@ void test_array_nullable_data(CollectionValue* src_data, uint8_t* src_is_null, i std::string fname = TEST_DIR + "/" + test_name; auto fs = io::global_local_filesystem(); { - std::unique_ptr file_writer; + io::FileWriterPtr file_writer; Status st = fs->create_file(fname, &file_writer); EXPECT_TRUE(st.ok()) << st.get_error_msg(); @@ -313,25 +309,23 @@ void test_array_nullable_data(CollectionValue* src_data, uint8_t* src_is_null, i EXPECT_TRUE(file_writer->close().ok()); } auto type_info = get_type_info(&meta); - + io::FileReaderSPtr file_reader; + ASSERT_EQ(fs->open_file(fname, &file_reader), Status::OK()); // read and check { ColumnReaderOptions reader_opts; std::unique_ptr reader; - auto st = ColumnReader::create(reader_opts, meta, num_rows, fs, fname, &reader); + auto st = ColumnReader::create(reader_opts, meta, num_rows, file_reader, &reader); EXPECT_TRUE(st.ok()); ColumnIterator* iter = nullptr; st = reader->new_iterator(&iter); EXPECT_TRUE(st.ok()); - std::unique_ptr rblock; - st = fs->open_file(fname, &rblock); - EXPECT_TRUE(st.ok()); ColumnIteratorOptions iter_opts; OlapReaderStatistics stats; iter_opts.stats = &stats; - iter_opts.file_reader = rblock.get(); + iter_opts.file_reader = file_reader.get(); st = iter->init(iter_opts); EXPECT_TRUE(st.ok()); // sequence read diff --git a/be/test/olap/rowset/segment_v2/ordinal_page_index_test.cpp b/be/test/olap/rowset/segment_v2/ordinal_page_index_test.cpp index 20f0b17d9a..0bda20536c 100644 --- a/be/test/olap/rowset/segment_v2/ordinal_page_index_test.cpp +++ b/be/test/olap/rowset/segment_v2/ordinal_page_index_test.cpp @@ -25,6 +25,7 @@ #include "common/logging.h" #include "env/env.h" +#include "io/fs/file_reader.h" #include "io/fs/file_system.h" #include "io/fs/file_writer.h" #include "io/fs/local_file_system.h" @@ -65,7 +66,7 @@ TEST_F(OrdinalPageIndexTest, normal) { } ColumnIndexMetaPB index_meta; { - std::unique_ptr file_writer; + io::FileWriterPtr file_writer; EXPECT_TRUE(fs->create_file(filename, &file_writer).ok()); EXPECT_TRUE(builder.finish(file_writer.get(), &index_meta).ok()); @@ -76,7 +77,9 @@ TEST_F(OrdinalPageIndexTest, normal) { << index_meta.ordinal_index().root_page().root_page().size(); } - OrdinalIndexReader index(fs, filename, &index_meta.ordinal_index(), 16 * 1024 * 4096 + 1); + io::FileReaderSPtr file_reader; + EXPECT_TRUE(fs->open_file(filename, &file_reader).ok()); + OrdinalIndexReader index(file_reader, &index_meta.ordinal_index(), 16 * 1024 * 4096 + 1); EXPECT_TRUE(index.load(true, false).ok()); EXPECT_EQ(16 * 1024, index.num_data_pages()); EXPECT_EQ(1, index.get_first_ordinal(0)); @@ -130,8 +133,7 @@ TEST_F(OrdinalPageIndexTest, one_data_page) { EXPECT_EQ(data_page_pointer, root_page_pointer); } - auto fs = io::global_local_filesystem(); - OrdinalIndexReader index(fs, "", &index_meta.ordinal_index(), num_values); + OrdinalIndexReader index(nullptr, &index_meta.ordinal_index(), num_values); EXPECT_TRUE(index.load(true, false).ok()); EXPECT_EQ(1, index.num_data_pages()); EXPECT_EQ(0, index.get_first_ordinal(0)); diff --git a/be/test/olap/rowset/segment_v2/segment_test.cpp b/be/test/olap/rowset/segment_v2/segment_test.cpp index e806988233..442aaecdac 100644 --- a/be/test/olap/rowset/segment_v2/segment_test.cpp +++ b/be/test/olap/rowset/segment_v2/segment_test.cpp @@ -116,7 +116,7 @@ protected: std::string path = fmt::format("{}/{}", kSegmentDir, filename); auto fs = io::global_local_filesystem(); - std::unique_ptr file_writer; + io::FileWriterPtr file_writer; Status st = fs->create_file(path, &file_writer); EXPECT_TRUE(st.ok()); DataDir data_dir(kSegmentDir); @@ -616,7 +616,7 @@ TEST_F(SegmentReaderWriterTest, estimate_segment_size) { std::string fname = kSegmentDir + "/int_case"; auto fs = io::global_local_filesystem(); - std::unique_ptr file_writer; + io::FileWriterPtr file_writer; Status st = fs->create_file(fname, &file_writer); EXPECT_TRUE(st.ok()) << st.to_string(); DataDir data_dir(kSegmentDir); @@ -783,7 +783,7 @@ TEST_F(SegmentReaderWriterTest, TestStringDict) { std::string fname = kSegmentDir + "/string_case"; auto fs = io::global_local_filesystem(); - std::unique_ptr file_writer; + io::FileWriterPtr file_writer; Status st = fs->create_file(fname, &file_writer); EXPECT_TRUE(st.ok()); DataDir data_dir(kSegmentDir); diff --git a/be/test/olap/rowset/segment_v2/zone_map_index_test.cpp b/be/test/olap/rowset/segment_v2/zone_map_index_test.cpp index 81e30bfad3..df5d4d6e24 100644 --- a/be/test/olap/rowset/segment_v2/zone_map_index_test.cpp +++ b/be/test/olap/rowset/segment_v2/zone_map_index_test.cpp @@ -77,14 +77,16 @@ public: // write out zone map index ColumnIndexMetaPB index_meta; { - std::unique_ptr file_writer; + io::FileWriterPtr file_writer; EXPECT_TRUE(fs->create_file(filename, &file_writer).ok()); EXPECT_TRUE(builder.finish(file_writer.get(), &index_meta).ok()); EXPECT_EQ(ZONE_MAP_INDEX, index_meta.type()); EXPECT_TRUE(file_writer->close().ok()); } - ZoneMapIndexReader column_zone_map(fs, filename, &index_meta.zone_map_index()); + io::FileReaderSPtr file_reader; + EXPECT_TRUE(fs->open_file(filename, &file_reader).ok()); + ZoneMapIndexReader column_zone_map(file_reader, &index_meta.zone_map_index()); Status status = column_zone_map.load(true, false); EXPECT_TRUE(status.ok()); EXPECT_EQ(3, column_zone_map.num_pages()); @@ -121,14 +123,16 @@ public: // write out zone map index ColumnIndexMetaPB index_meta; { - std::unique_ptr file_writer; + io::FileWriterPtr file_writer; EXPECT_TRUE(fs->create_file(filename, &file_writer).ok()); EXPECT_TRUE(builder.finish(file_writer.get(), &index_meta).ok()); EXPECT_EQ(ZONE_MAP_INDEX, index_meta.type()); EXPECT_TRUE(file_writer->close().ok()); } - ZoneMapIndexReader column_zone_map(fs, filename, &index_meta.zone_map_index()); + io::FileReaderSPtr file_reader; + EXPECT_TRUE(fs->open_file(filename, &file_reader).ok()); + ZoneMapIndexReader column_zone_map(file_reader, &index_meta.zone_map_index()); Status status = column_zone_map.load(true, false); EXPECT_TRUE(status.ok()); EXPECT_EQ(1, column_zone_map.num_pages()); @@ -171,14 +175,16 @@ TEST_F(ColumnZoneMapTest, NormalTestIntPage) { // write out zone map index ColumnIndexMetaPB index_meta; { - std::unique_ptr file_writer; + io::FileWriterPtr file_writer; EXPECT_TRUE(fs->create_file(filename, &file_writer).ok()); EXPECT_TRUE(builder.finish(file_writer.get(), &index_meta).ok()); EXPECT_EQ(ZONE_MAP_INDEX, index_meta.type()); EXPECT_TRUE(file_writer->close().ok()); } - ZoneMapIndexReader column_zone_map(fs, filename, &index_meta.zone_map_index()); + io::FileReaderSPtr file_reader; + EXPECT_TRUE(fs->open_file(filename, &file_reader).ok()); + ZoneMapIndexReader column_zone_map(file_reader, &index_meta.zone_map_index()); Status status = column_zone_map.load(true, false); EXPECT_TRUE(status.ok()); EXPECT_EQ(3, column_zone_map.num_pages()); diff --git a/be/test/runtime/array_test.cpp b/be/test/runtime/array_test.cpp index 2964b4baac..7adb6d3810 100644 --- a/be/test/runtime/array_test.cpp +++ b/be/test/runtime/array_test.cpp @@ -378,8 +378,8 @@ private: } } - std::unique_ptr creat_file_writer(const std::string& path) { - std::unique_ptr file_writer; + io::FileWriterPtr creat_file_writer(const std::string& path) { + io::FileWriterPtr file_writer; io::global_local_filesystem()->create_file(path, &file_writer); return file_writer; } @@ -409,8 +409,8 @@ private: return st.ok() ? std::move(reader) : nullptr; } - std::unique_ptr create_readable_block(const std::string& path) { - std::unique_ptr reader; + io::FileReaderSPtr create_readable_block(const std::string& path) { + io::FileReaderSPtr reader; auto st = io::global_local_filesystem()->open_file(path, &reader); return st.ok() ? std::move(reader) : nullptr; } diff --git a/be/test/tools/benchmark_tool.cpp b/be/test/tools/benchmark_tool.cpp index fb7eef57a3..daa50bc86a 100644 --- a/be/test/tools/benchmark_tool.cpp +++ b/be/test/tools/benchmark_tool.cpp @@ -347,7 +347,7 @@ public: std::string path = fmt::format("{}/{}", kSegmentDir, filename); auto fs = io::global_local_filesystem(); - std::unique_ptr file_writer; + io::FileWriterPtr file_writer; fs->create_file(path, &file_writer); SegmentWriterOptions opts; DataDir data_dir(kSegmentDir);