diff --git a/be/src/io/CMakeLists.txt b/be/src/io/CMakeLists.txt index a8ea5ff232..900026002f 100644 --- a/be/src/io/CMakeLists.txt +++ b/be/src/io/CMakeLists.txt @@ -33,6 +33,7 @@ set(IO_FILES local_file_writer.cpp s3_reader.cpp s3_writer.cpp + fs/file_reader_options.cpp fs/file_system_map.cpp fs/local_file_reader.cpp fs/local_file_system.cpp @@ -44,6 +45,7 @@ set(IO_FILES fs/hdfs_file_reader.cpp fs/broker_file_system.cpp fs/broker_file_reader.cpp + fs/remote_file_system.cpp fs/stream_load_pipe.cpp cache/dummy_file_cache.cpp cache/file_cache.cpp diff --git a/be/src/io/cache/file_cache_manager.cpp b/be/src/io/cache/file_cache_manager.cpp index a49dfcd240..59a12b0ea0 100644 --- a/be/src/io/cache/file_cache_manager.cpp +++ b/be/src/io/cache/file_cache_manager.cpp @@ -216,12 +216,13 @@ void FileCacheManager::gc_file_caches() { FileCachePtr FileCacheManager::new_file_cache(const std::string& cache_dir, int64_t alive_time_sec, io::FileReaderSPtr remote_file_reader, - const std::string& file_cache_type) { - if (file_cache_type == "whole_file_cache") { + io::FileCacheType cache_type) { + switch (cache_type) { + case io::FileCacheType::SUB_FILE_CACHE: return std::make_unique(cache_dir, alive_time_sec, remote_file_reader); - } else if (file_cache_type == "sub_file_cache") { + case io::FileCacheType::WHOLE_FILE_CACHE: return std::make_unique(cache_dir, alive_time_sec, remote_file_reader); - } else { + default: return nullptr; } } diff --git a/be/src/io/cache/file_cache_manager.h b/be/src/io/cache/file_cache_manager.h index 9332b324fc..b8200d964e 100644 --- a/be/src/io/cache/file_cache_manager.h +++ b/be/src/io/cache/file_cache_manager.h @@ -24,6 +24,7 @@ #include "common/config.h" #include "common/status.h" #include "io/cache/file_cache.h" +#include "io/fs/file_reader_options.h" namespace doris { namespace io { @@ -59,7 +60,7 @@ public: FileCachePtr new_file_cache(const std::string& cache_dir, int64_t alive_time_sec, io::FileReaderSPtr remote_file_reader, - const std::string& file_cache_type); + io::FileCacheType cache_type); bool exist(const std::string& cache_path); diff --git a/be/src/io/fs/file_reader_options.cpp b/be/src/io/fs/file_reader_options.cpp new file mode 100644 index 0000000000..00534d8c4e --- /dev/null +++ b/be/src/io/fs/file_reader_options.cpp @@ -0,0 +1,36 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "io/fs/file_reader_options.h" + +namespace doris { +namespace io { + +FileCacheType cache_type_from_string(const std::string& type) { + if (type == "sub_file_cache") { + return FileCacheType::SUB_FILE_CACHE; + } else if (type == "whole_file_cache") { + return FileCacheType::WHOLE_FILE_CACHE; + } else if (type == "file_block_cache") { + return FileCacheType::FILE_BLOCK_CACHE; + } else { + return FileCacheType::NO_CACHE; + } +} + +} // namespace io +} // namespace doris diff --git a/be/src/io/fs/file_reader_options.h b/be/src/io/fs/file_reader_options.h new file mode 100644 index 0000000000..c4c0061704 --- /dev/null +++ b/be/src/io/fs/file_reader_options.h @@ -0,0 +1,72 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include + +#include "common/status.h" + +namespace doris { +namespace io { + +enum class FileCacheType : uint8_t { + NO_CACHE, + SUB_FILE_CACHE, + WHOLE_FILE_CACHE, + FILE_BLOCK_CACHE, +}; + +FileCacheType cache_type_from_string(const std::string& type); + +// CachePathPolicy it to define which cache path should be used +// for the local cache of the given file(path). +// The dervied class should implement get_cache_path() method +class CachePathPolicy { +public: + // path: the path of file which will be cached + // return value: the cache path of the given file. + virtual std::string get_cache_path(const std::string& path) const { return ""; } +}; + +class NoCachePathPolicy : public CachePathPolicy { +public: + NoCachePathPolicy() = default; + std::string get_cache_path(const std::string& path) const override { return path; } +}; + +class SegmentCachePathPolicy : public CachePathPolicy { +public: + SegmentCachePathPolicy() = default; + std::string get_cache_path(const std::string& path) const override { + // the segment file path is {rowset_dir}/{schema_hash}/{rowset_id}_{seg_num}.dat + // cache path is: {rowset_dir}/{schema_hash}/{rowset_id}_{seg_num}/ + return path.substr(0, path.size() - 4) + "/"; + } +}; + +class FileReaderOptions { +public: + FileReaderOptions(FileCacheType cache_type_, const CachePathPolicy& path_policy_) + : cache_type(cache_type_), path_policy(path_policy_) {} + + FileCacheType cache_type; + CachePathPolicy path_policy; +}; + +} // namespace io +} // namespace doris diff --git a/be/src/io/fs/file_system.h b/be/src/io/fs/file_system.h index 2dde2a64e7..735e257930 100644 --- a/be/src/io/fs/file_system.h +++ b/be/src/io/fs/file_system.h @@ -22,6 +22,7 @@ #include "common/status.h" #include "gutil/macros.h" #include "io/fs/file_reader.h" +#include "io/fs/file_reader_options.h" #include "io/fs/file_writer.h" #include "io/fs/path.h" @@ -52,6 +53,9 @@ public: virtual Status create_file(const Path& path, FileWriterPtr* writer) = 0; + virtual Status open_file(const Path& path, const FileReaderOptions& reader_options, + FileReaderSPtr* reader) = 0; + virtual Status open_file(const Path& path, FileReaderSPtr* reader) = 0; virtual Status delete_file(const Path& path) = 0; diff --git a/be/src/io/fs/local_file_system.h b/be/src/io/fs/local_file_system.h index 1477d0aa99..92c6944ab8 100644 --- a/be/src/io/fs/local_file_system.h +++ b/be/src/io/fs/local_file_system.h @@ -30,6 +30,11 @@ public: Status create_file(const Path& path, FileWriterPtr* writer) override; + Status open_file(const Path& path, const FileReaderOptions& reader_options, + FileReaderSPtr* reader) override { + return open_file(path, reader); + } + Status open_file(const Path& path, FileReaderSPtr* reader) override; Status delete_file(const Path& path) override; diff --git a/be/src/io/fs/remote_file_system.cpp b/be/src/io/fs/remote_file_system.cpp new file mode 100644 index 0000000000..871c325782 --- /dev/null +++ b/be/src/io/fs/remote_file_system.cpp @@ -0,0 +1,59 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "io/fs/remote_file_system.h" + +#include "gutil/strings/stringpiece.h" +#include "io/cache/file_cache_manager.h" +#include "io/fs/file_reader_options.h" + +namespace doris { +namespace io { + +Status RemoteFileSystem::open_file(const Path& path, const FileReaderOptions& reader_options, + FileReaderSPtr* reader) { + FileReaderSPtr raw_reader; + RETURN_IF_ERROR(open_file(path, &raw_reader)); + switch (reader_options.cache_type) { + case io::FileCacheType::NO_CACHE: { + *reader = raw_reader; + break; + } + case io::FileCacheType::SUB_FILE_CACHE: + case io::FileCacheType::WHOLE_FILE_CACHE: { + StringPiece str(path.native()); + std::string cache_path = reader_options.path_policy.get_cache_path(str.as_string()); + io::FileCachePtr cache_reader = FileCacheManager::instance()->new_file_cache( + cache_path, config::file_cache_alive_time_sec, raw_reader, + reader_options.cache_type); + FileCacheManager::instance()->add_file_cache(cache_path, cache_reader); + *reader = cache_reader; + break; + } + case io::FileCacheType::FILE_BLOCK_CACHE: { + return Status::NotSupported("add file block cache reader"); + } + default: { + // TODO: add file block cache reader + return Status::InternalError("Unknown cache type: {}", reader_options.cache_type); + } + } + return Status::OK(); +} + +} // namespace io +} // namespace doris diff --git a/be/src/io/fs/remote_file_system.h b/be/src/io/fs/remote_file_system.h index 390da5ca0e..2218ce40c3 100644 --- a/be/src/io/fs/remote_file_system.h +++ b/be/src/io/fs/remote_file_system.h @@ -35,6 +35,13 @@ public: const std::vector& dest_paths) = 0; virtual Status connect() = 0; + + Status open_file(const Path& path, const FileReaderOptions& reader_options, + FileReaderSPtr* reader) override; + + Status open_file(const Path& path, FileReaderSPtr* reader) override { + return Status::NotSupported("implemented in derived classes"); + } }; } // namespace io diff --git a/be/src/olap/rowset/beta_rowset.cpp b/be/src/olap/rowset/beta_rowset.cpp index a6a0d29870..c9e304d383 100644 --- a/be/src/olap/rowset/beta_rowset.cpp +++ b/be/src/olap/rowset/beta_rowset.cpp @@ -47,12 +47,6 @@ std::string BetaRowset::segment_file_path(int segment_id) { return segment_file_path(_rowset_dir, rowset_id(), segment_id); } -std::string BetaRowset::segment_cache_path(const std::string& rowset_dir, const RowsetId& rowset_id, - int segment_id) { - // {root_path}/data/{shard_id}/{tablet_id}/{schema_hash}/{rowset_id}_{seg_num} - return fmt::format("{}/{}_{}", rowset_dir, rowset_id.to_string(), segment_id); -} - std::string BetaRowset::segment_cache_path(int segment_id) { // {root_path}/data/{shard_id}/{tablet_id}/{schema_hash}/{rowset_id}_{seg_num} return fmt::format("{}/{}_{}", _tablet_path, rowset_id().to_string(), segment_id); @@ -139,10 +133,8 @@ Status BetaRowset::load_segments(std::vector* segm } for (int seg_id = 0; seg_id < num_segments(); ++seg_id) { auto seg_path = segment_file_path(seg_id); - auto cache_path = segment_cache_path(seg_id); std::shared_ptr segment; - auto s = segment_v2::Segment::open(fs, seg_path, cache_path, seg_id, rowset_id(), _schema, - &segment); + auto s = segment_v2::Segment::open(fs, seg_path, seg_id, rowset_id(), _schema, &segment); if (!s.ok()) { LOG(WARNING) << "failed to open segment. " << seg_path << " under rowset " << unique_id() << " : " << s.to_string(); @@ -163,10 +155,8 @@ Status BetaRowset::load_segments(int64_t seg_id_begin, int64_t seg_id_end, return Status::Error(); } auto seg_path = segment_file_path(seg_id); - auto cache_path = segment_cache_path(seg_id); std::shared_ptr segment; - auto s = segment_v2::Segment::open(fs, seg_path, cache_path, seg_id, rowset_id(), _schema, - &segment); + auto s = segment_v2::Segment::open(fs, seg_path, seg_id, rowset_id(), _schema, &segment); if (!s.ok()) { LOG(WARNING) << "failed to open segment. " << seg_path << " under rowset " << unique_id() << " : " << s.to_string(); @@ -335,10 +325,8 @@ bool BetaRowset::check_current_rowset_segment() { } for (int seg_id = 0; seg_id < num_segments(); ++seg_id) { auto seg_path = segment_file_path(seg_id); - auto cache_path = segment_cache_path(seg_id); std::shared_ptr segment; - auto s = segment_v2::Segment::open(fs, seg_path, cache_path, seg_id, rowset_id(), _schema, - &segment); + auto s = segment_v2::Segment::open(fs, seg_path, seg_id, rowset_id(), _schema, &segment); if (!s.ok()) { LOG(WARNING) << "segment can not be opened. file=" << seg_path; return false; diff --git a/be/src/olap/rowset/beta_rowset.h b/be/src/olap/rowset/beta_rowset.h index a1167d2c2c..5ac860eaa1 100644 --- a/be/src/olap/rowset/beta_rowset.h +++ b/be/src/olap/rowset/beta_rowset.h @@ -46,9 +46,6 @@ public: std::string segment_cache_path(int segment_id); - static std::string segment_cache_path(const std::string& rowset_dir, const RowsetId& rowset_id, - int segment_id); - static bool is_segment_cache_dir(const std::string& cache_dir); static std::string segment_file_path(const std::string& rowset_dir, const RowsetId& rowset_id, diff --git a/be/src/olap/rowset/beta_rowset_writer.cpp b/be/src/olap/rowset/beta_rowset_writer.cpp index e6ee1833b4..4544dc7bb8 100644 --- a/be/src/olap/rowset/beta_rowset_writer.cpp +++ b/be/src/olap/rowset/beta_rowset_writer.cpp @@ -391,10 +391,8 @@ Status BetaRowsetWriter::_load_noncompacted_segments( for (int seg_id = _segcompacted_point; seg_id < num; ++seg_id) { auto seg_path = BetaRowset::segment_file_path(_context.rowset_dir, _context.rowset_id, seg_id); - auto cache_path = - BetaRowset::segment_cache_path(_context.rowset_dir, _context.rowset_id, seg_id); std::shared_ptr segment; - auto s = segment_v2::Segment::open(fs, seg_path, cache_path, seg_id, rowset_id(), + auto s = segment_v2::Segment::open(fs, seg_path, seg_id, rowset_id(), _context.tablet_schema, &segment); if (!s.ok()) { LOG(WARNING) << "failed to open segment. " << seg_path << ":" << s.to_string(); diff --git a/be/src/olap/rowset/segment_v2/segment.cpp b/be/src/olap/rowset/segment_v2/segment.cpp index 6651e18799..cc489ef85c 100644 --- a/be/src/olap/rowset/segment_v2/segment.cpp +++ b/be/src/olap/rowset/segment_v2/segment.cpp @@ -25,6 +25,7 @@ #include "common/config.h" #include "common/logging.h" // LOG #include "io/cache/file_cache_manager.h" +#include "io/fs/file_reader_options.h" #include "io/fs/file_system.h" #include "olap/iterators.h" #include "olap/rowset/segment_v2/empty_segment_iterator.h" @@ -42,30 +43,26 @@ namespace segment_v2 { using io::FileCacheManager; -Status Segment::open(io::FileSystemSPtr fs, const std::string& path, const std::string& cache_path, - uint32_t segment_id, RowsetId rowset_id, TabletSchemaSPtr tablet_schema, +Status Segment::open(io::FileSystemSPtr fs, const std::string& path, uint32_t segment_id, + RowsetId rowset_id, TabletSchemaSPtr tablet_schema, std::shared_ptr* output) { - std::shared_ptr segment(new Segment(segment_id, rowset_id, tablet_schema)); + io::FileReaderOptions reader_options(io::cache_type_from_string(config::file_cache_type), + io::SegmentCachePathPolicy()); io::FileReaderSPtr file_reader; #ifndef BE_TEST - RETURN_IF_ERROR(fs->open_file(path, &file_reader)); + RETURN_IF_ERROR(fs->open_file(path, reader_options, &file_reader)); #else // be ut use local file reader instead of remote file reader while use remote cache if (!config::file_cache_type.empty()) { - RETURN_IF_ERROR(io::global_local_filesystem()->open_file(path, &file_reader)); + RETURN_IF_ERROR( + io::global_local_filesystem()->open_file(path, reader_options, &file_reader)); } else { - RETURN_IF_ERROR(fs->open_file(path, &file_reader)); + RETURN_IF_ERROR(fs->open_file(path, reader_options, &file_reader)); } #endif - if (fs->type() != io::FileSystemType::LOCAL && !config::file_cache_type.empty()) { - io::FileCachePtr cache_reader = FileCacheManager::instance()->new_file_cache( - cache_path, config::file_cache_alive_time_sec, file_reader, - config::file_cache_type); - segment->_file_reader = cache_reader; - FileCacheManager::instance()->add_file_cache(cache_path, cache_reader); - } else { - segment->_file_reader = std::move(file_reader); - } + + std::shared_ptr segment(new Segment(segment_id, rowset_id, tablet_schema)); + segment->_file_reader = std::move(file_reader); RETURN_IF_ERROR(segment->_open()); *output = std::move(segment); return Status::OK(); diff --git a/be/src/olap/rowset/segment_v2/segment.h b/be/src/olap/rowset/segment_v2/segment.h index b5e54dfa4f..f2fa40bb43 100644 --- a/be/src/olap/rowset/segment_v2/segment.h +++ b/be/src/olap/rowset/segment_v2/segment.h @@ -61,9 +61,9 @@ using SegmentSharedPtr = std::shared_ptr; // change finished, client should disable all cached Segment for old TabletSchema. class Segment : public std::enable_shared_from_this { public: - static Status open(io::FileSystemSPtr fs, const std::string& path, - const std::string& cache_path, uint32_t segment_id, RowsetId rowset_id, - TabletSchemaSPtr tablet_schema, std::shared_ptr* output); + static Status open(io::FileSystemSPtr fs, const std::string& path, uint32_t segment_id, + RowsetId rowset_id, TabletSchemaSPtr tablet_schema, + std::shared_ptr* output); ~Segment(); diff --git a/be/test/io/cache/remote_file_cache_test.cpp b/be/test/io/cache/remote_file_cache_test.cpp index 6c917a5f24..596ff88725 100644 --- a/be/test/io/cache/remote_file_cache_test.cpp +++ b/be/test/io/cache/remote_file_cache_test.cpp @@ -141,7 +141,7 @@ protected: EXPECT_NE("", writer.min_encoded_key().to_string()); EXPECT_NE("", writer.max_encoded_key().to_string()); - st = segment_v2::Segment::open(fs, path, "", 0, {}, query_schema, res); + st = segment_v2::Segment::open(fs, path, 0, {}, query_schema, res); EXPECT_TRUE(st.ok()); EXPECT_EQ(nrows, (*res)->num_rows()); } @@ -172,7 +172,7 @@ protected: std::vector segments; Status st = rowset.load_segments(&segments); - ASSERT_TRUE(st.ok()); + ASSERT_TRUE(st.ok()) << st; } };