From 09bcedb116639e64e905dcd45cf536b1064ff4bf Mon Sep 17 00:00:00 2001 From: plat1ko Date: Wed, 6 Sep 2023 08:07:05 +0800 Subject: [PATCH] [feature](merge-cloud) Remove deprecated old cache (#23881) * Remove deprecated old cache --- be/src/common/config.cpp | 10 +- be/src/common/config.h | 5 - .../cache/block/cached_remote_file_reader.cpp | 38 +- .../cache/block/cached_remote_file_reader.h | 7 +- be/src/io/cache/dummy_file_cache.cpp | 104 ------ be/src/io/cache/dummy_file_cache.h | 102 ------ be/src/io/cache/file_cache.cpp | 210 ----------- be/src/io/cache/file_cache.h | 106 ------ be/src/io/cache/file_cache_manager.cpp | 256 ------------- be/src/io/cache/file_cache_manager.h | 84 ----- be/src/io/cache/sub_file_cache.cpp | 342 ------------------ be/src/io/cache/sub_file_cache.h | 117 ------ be/src/io/cache/whole_file_cache.cpp | 184 ---------- be/src/io/cache/whole_file_cache.h | 92 ----- be/src/io/file_factory.cpp | 16 +- be/src/io/file_factory.h | 4 +- be/src/io/fs/file_reader.h | 24 ++ be/src/io/fs/file_reader_options.cpp | 39 -- be/src/io/fs/file_reader_options.h | 88 ----- be/src/io/fs/file_reader_writer_fwd.h | 2 + be/src/io/fs/file_system.cpp | 11 + be/src/io/fs/file_system.h | 13 +- be/src/io/fs/file_writer.h | 11 +- be/src/io/fs/local_file_system.cpp | 1 - be/src/io/fs/local_file_system.h | 1 - be/src/io/fs/remote_file_system.cpp | 26 +- be/src/io/fs/remote_file_system.h | 1 - be/src/olap/compaction.cpp | 2 +- be/src/olap/olap_server.cpp | 26 -- be/src/olap/rowset/beta_rowset.cpp | 60 +-- be/src/olap/rowset/beta_rowset.h | 11 +- be/src/olap/rowset/beta_rowset_writer.cpp | 14 +- be/src/olap/rowset/beta_rowset_writer_v2.cpp | 1 - be/src/olap/rowset/rowset.cpp | 5 +- be/src/olap/rowset/rowset.h | 14 +- be/src/olap/rowset/segment_v2/segment.cpp | 16 - be/src/olap/rowset/segment_v2/segment.h | 4 - be/src/olap/storage_engine.cpp | 3 +- be/src/olap/storage_engine.h | 2 - be/src/olap/tablet.cpp | 3 +- be/src/service/backend_service.cpp | 5 +- be/src/service/point_query_executor.cpp | 2 +- be/src/service/point_query_executor.h | 2 +- be/src/vec/core/block_spill_reader.cpp | 4 +- be/test/io/cache/remote_file_cache_test.cpp | 192 ---------- .../olap/delete_bitmap_calculator_test.cpp | 5 +- be/test/olap/rowset/rowset_tree_test.cpp | 2 +- be/test/olap/tablet_cooldown_test.cpp | 45 +-- be/test/olap/tablet_meta_test.cpp | 2 +- be/test/runtime/load_stream_test.cpp | 3 +- be/test/testutil/mock_rowset.h | 12 +- 51 files changed, 131 insertions(+), 2198 deletions(-) delete mode 100644 be/src/io/cache/dummy_file_cache.cpp delete mode 100644 be/src/io/cache/dummy_file_cache.h delete mode 100644 be/src/io/cache/file_cache.cpp delete mode 100644 be/src/io/cache/file_cache.h delete mode 100644 be/src/io/cache/file_cache_manager.cpp delete mode 100644 be/src/io/cache/file_cache_manager.h delete mode 100644 be/src/io/cache/sub_file_cache.cpp delete mode 100644 be/src/io/cache/sub_file_cache.h delete mode 100644 be/src/io/cache/whole_file_cache.cpp delete mode 100644 be/src/io/cache/whole_file_cache.h delete mode 100644 be/src/io/fs/file_reader_options.cpp delete mode 100644 be/src/io/fs/file_reader_options.h delete mode 100644 be/test/io/cache/remote_file_cache_test.cpp diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp index 0bac22880d..2719046a03 100644 --- a/be/src/common/config.cpp +++ b/be/src/common/config.cpp @@ -882,20 +882,14 @@ DEFINE_mInt32(remove_unused_remote_files_interval_sec, "21600"); // 6h DEFINE_mInt32(confirm_unused_remote_files_interval_sec, "60"); DEFINE_Int32(cold_data_compaction_thread_num, "2"); DEFINE_mInt32(cold_data_compaction_interval_sec, "1800"); -DEFINE_mInt64(generate_cache_cleaner_task_interval_sec, "43200"); // 12 h DEFINE_Int32(concurrency_per_dir, "2"); -DEFINE_mInt64(cooldown_lag_time_sec, "10800"); // 3h -DEFINE_mInt64(max_sub_cache_file_size, "104857600"); // 100MB -DEFINE_mInt64(file_cache_alive_time_sec, "604800"); // 1 week // file_cache_type is used to set the type of file cache for remote files. // "": no cache, "sub_file_cache": split sub files from remote file. // "whole_file_cache": the whole file. DEFINE_mString(file_cache_type, "file_block_cache"); -DEFINE_Validator(file_cache_type, [](const std::string config) -> bool { - return config == "sub_file_cache" || config == "whole_file_cache" || config == "" || - config == "file_block_cache"; +DEFINE_Validator(file_cache_type, [](std::string_view config) -> bool { + return config == "" || config == "file_block_cache"; }); -DEFINE_mInt64(file_cache_max_size_per_disk, "0"); // zero for no limit DEFINE_Int32(s3_transfer_executor_pool_size, "2"); diff --git a/be/src/common/config.h b/be/src/common/config.h index d83e012719..95c4d219e0 100644 --- a/be/src/common/config.h +++ b/be/src/common/config.h @@ -933,16 +933,11 @@ DECLARE_mInt32(remove_unused_remote_files_interval_sec); // 6h DECLARE_mInt32(confirm_unused_remote_files_interval_sec); DECLARE_Int32(cold_data_compaction_thread_num); DECLARE_mInt32(cold_data_compaction_interval_sec); -DECLARE_mInt64(generate_cache_cleaner_task_interval_sec); // 12 h DECLARE_Int32(concurrency_per_dir); -DECLARE_mInt64(cooldown_lag_time_sec); // 3h -DECLARE_mInt64(max_sub_cache_file_size); // 100MB -DECLARE_mInt64(file_cache_alive_time_sec); // 1 week // file_cache_type is used to set the type of file cache for remote files. // "": no cache, "sub_file_cache": split sub files from remote file. // "whole_file_cache": the whole file. DECLARE_mString(file_cache_type); -DECLARE_mInt64(file_cache_max_size_per_disk); // zero for no limit DECLARE_Int32(s3_transfer_executor_pool_size); diff --git a/be/src/io/cache/block/cached_remote_file_reader.cpp b/be/src/io/cache/block/cached_remote_file_reader.cpp index 4ee5b7970d..e2a629fd4c 100644 --- a/be/src/io/cache/block/cached_remote_file_reader.cpp +++ b/be/src/io/cache/block/cached_remote_file_reader.cpp @@ -42,27 +42,27 @@ namespace doris { namespace io { CachedRemoteFileReader::CachedRemoteFileReader(FileReaderSPtr remote_file_reader, - const std::string& cache_path, - const long modification_time) + const FileReaderOptions* opts) : _remote_file_reader(std::move(remote_file_reader)) { - // Use path and modification time to build cache key - std::string unique_path = fmt::format("{}:{}", cache_path, modification_time); - _cache_key = IFileCache::hash(unique_path); - _cache = FileCacheFactory::instance().get_by_path(_cache_key); -} - -CachedRemoteFileReader::CachedRemoteFileReader(FileReaderSPtr remote_file_reader, - const std::string& cache_base_path, - const std::string& cache_path, - const long modification_time) - : _remote_file_reader(std::move(remote_file_reader)) { - std::string unique_path = fmt::format("{}:{}", cache_path, modification_time); - _cache_key = IFileCache::hash(unique_path); - _cache = FileCacheFactory::instance().get_by_path(cache_base_path); - if (_cache == nullptr) { - LOG(WARNING) << "Can't get cache from base path: " << cache_base_path - << ", using random instead."; + DCHECK(opts) << remote_file_reader->path().native(); + _is_doris_table = opts->is_doris_table; + if (_is_doris_table) { + _cache_key = IFileCache::hash(path().filename().native()); _cache = FileCacheFactory::instance().get_by_path(_cache_key); + } else { + // Use path and modification time to build cache key + std::string unique_path = fmt::format("{}:{}", path().native(), opts->modification_time); + _cache_key = IFileCache::hash(unique_path); + if (!opts->cache_base_path.empty()) { + // from query session variable: file_cache_base_path + _cache = FileCacheFactory::instance().get_by_path(opts->cache_base_path); + if (_cache == nullptr) { + LOG(WARNING) << "Can't get cache from base path: " << opts->cache_base_path + << ", using random instead."; + _cache = FileCacheFactory::instance().get_by_path(_cache_key); + } + } + _cache = FileCacheFactory::instance().get_by_path(path().native()); } } diff --git a/be/src/io/cache/block/cached_remote_file_reader.h b/be/src/io/cache/block/cached_remote_file_reader.h index 5e66f9970a..51e9e562a2 100644 --- a/be/src/io/cache/block/cached_remote_file_reader.h +++ b/be/src/io/cache/block/cached_remote_file_reader.h @@ -39,11 +39,7 @@ struct FileCacheStatistics; class CachedRemoteFileReader final : public FileReader { public: - CachedRemoteFileReader(FileReaderSPtr remote_file_reader, const std::string& cache_path, - const long modification_time); - - CachedRemoteFileReader(FileReaderSPtr remote_file_reader, const std::string& cache_base_path, - const std::string& cache_path, const long modification_time); + CachedRemoteFileReader(FileReaderSPtr remote_file_reader, const FileReaderOptions* opts); ~CachedRemoteFileReader() override; @@ -69,6 +65,7 @@ private: FileReaderSPtr _remote_file_reader; IFileCache::Key _cache_key; CloudFileCachePtr _cache; + bool _is_doris_table; struct ReadStatistics { bool hit_cache = true; diff --git a/be/src/io/cache/dummy_file_cache.cpp b/be/src/io/cache/dummy_file_cache.cpp deleted file mode 100644 index 14dda6898a..0000000000 --- a/be/src/io/cache/dummy_file_cache.cpp +++ /dev/null @@ -1,104 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -#include "io/cache/dummy_file_cache.h" - -#include - -#include -#include - -#include "io/fs/local_file_system.h" - -namespace doris { -namespace io { - -DummyFileCache::DummyFileCache(const Path& cache_dir, int64_t alive_time_sec) - : _cache_dir(cache_dir), _alive_time_sec(alive_time_sec) {} - -DummyFileCache::~DummyFileCache() = default; - -void DummyFileCache::_add_file_cache(const Path& data_file) { - Path cache_file = _cache_dir / data_file; - int64_t file_size = -1; - time_t m_time = 0; - if (io::global_local_filesystem()->file_size(cache_file, &file_size).ok() && - io::global_local_filesystem()->mtime(cache_file, &m_time).ok()) { - _gc_lru_queue.push({cache_file, m_time}); - _cache_file_size += file_size; - } else { - _unfinished_files.push_back(cache_file); - } -} - -void DummyFileCache::_load() { - std::vector cache_names; - if (!_get_dir_files_and_remove_unfinished(_cache_dir, cache_names).ok()) { - return; - } - - for (const auto& file : cache_names) { - _add_file_cache(file); - } -} - -Status DummyFileCache::load_and_clean() { - _load(); - RETURN_IF_ERROR(_clean_unfinished_files(_unfinished_files)); - return _check_and_delete_empty_dir(_cache_dir); -} - -Status DummyFileCache::clean_timeout_cache() { - while (!_gc_lru_queue.empty() && - time(nullptr) - _gc_lru_queue.top().last_match_time > _alive_time_sec) { - size_t cleaned_size = 0; - RETURN_IF_ERROR(_clean_cache_internal(_gc_lru_queue.top().file, &cleaned_size)); - _cache_file_size -= cleaned_size; - _gc_lru_queue.pop(); - } - return Status::OK(); -} - -Status DummyFileCache::clean_all_cache() { - while (!_gc_lru_queue.empty()) { - RETURN_IF_ERROR(_clean_cache_internal(_gc_lru_queue.top().file, nullptr)); - _gc_lru_queue.pop(); - } - _cache_file_size = 0; - return _check_and_delete_empty_dir(_cache_dir); -} - -Status DummyFileCache::clean_one_cache(size_t* cleaned_size) { - if (!_gc_lru_queue.empty()) { - const auto& cache = _gc_lru_queue.top(); - RETURN_IF_ERROR(_clean_cache_internal(cache.file, cleaned_size)); - _cache_file_size -= *cleaned_size; - _gc_lru_queue.pop(); - } - if (_gc_lru_queue.empty()) { - RETURN_IF_ERROR(_check_and_delete_empty_dir(_cache_dir)); - } - return Status::OK(); -} - -Status DummyFileCache::_clean_cache_internal(const Path& cache_file_path, size_t* cleaned_size) { - Path done_file_path = cache_file_path.native() + CACHE_DONE_FILE_SUFFIX; - return _remove_cache_and_done(cache_file_path, done_file_path, cleaned_size); -} - -} // namespace io -} // namespace doris diff --git a/be/src/io/cache/dummy_file_cache.h b/be/src/io/cache/dummy_file_cache.h deleted file mode 100644 index 46d21d99a2..0000000000 --- a/be/src/io/cache/dummy_file_cache.h +++ /dev/null @@ -1,102 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -#pragma once - -#include -#include - -#include -#include -#include - -#include "common/status.h" -#include "io/cache/file_cache.h" -#include "io/fs/file_reader_writer_fwd.h" -#include "io/fs/file_system.h" -#include "io/fs/path.h" -#include "util/slice.h" - -namespace doris { -namespace io { -class IOContext; - -// Only used for GC -class DummyFileCache final : public FileCache { -public: - DummyFileCache(const Path& cache_dir, int64_t alive_time_sec); - - ~DummyFileCache() override; - - Status close() override { return Status::OK(); } - - const Path& path() const override { return _cache_dir; } - - size_t size() const override { return 0; } - - bool closed() const override { return true; } - - const Path& cache_dir() const override { return _cache_dir; } - - io::FileReaderSPtr remote_file_reader() const override { return nullptr; } - - Status clean_timeout_cache() override; - - Status clean_all_cache() override; - - Status clean_one_cache(size_t* cleaned_size) override; - - Status load_and_clean(); - - bool is_dummy_file_cache() override { return true; } - - int64_t get_oldest_match_time() const override { - return _gc_lru_queue.empty() ? 0 : _gc_lru_queue.top().last_match_time; - } - - bool is_gc_finish() const override { return _gc_lru_queue.empty(); } - - FileSystemSPtr fs() const override { return nullptr; } - -protected: - Status read_at_impl(size_t offset, Slice result, size_t* bytes_read, - const IOContext* io_ctx) override { - return Status::NotSupported("dummy file cache only used for GC"); - } - -private: - void _add_file_cache(const Path& data_file); - void _load(); - Status _clean_cache_internal(const Path&, size_t*); - -private: - struct DummyFileInfo { - Path file; - int64_t last_match_time; - }; - using DummyGcQueue = std::priority_queue, - SubFileLRUComparator>; - DummyGcQueue _gc_lru_queue; - - Path _cache_dir; - int64_t _alive_time_sec; - - std::vector _unfinished_files; -}; - -} // namespace io -} // namespace doris diff --git a/be/src/io/cache/file_cache.cpp b/be/src/io/cache/file_cache.cpp deleted file mode 100644 index 38ea672c82..0000000000 --- a/be/src/io/cache/file_cache.cpp +++ /dev/null @@ -1,210 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -#include "io/cache/file_cache.h" - -#include -#include -#include - -#include -#include -#include -#include -#include -#include - -#include "common/config.h" -#include "common/status.h" -#include "gutil/strings/util.h" -#include "io/fs/file_system.h" -#include "io/fs/file_writer.h" -#include "io/fs/local_file_system.h" -#include "runtime/exec_env.h" -#include "util/string_util.h" - -namespace doris { -using namespace ErrorCode; -namespace io { - -Status FileCache::download_cache_to_local(const Path& cache_file, const Path& cache_done_file, - io::FileReaderSPtr remote_file_reader, size_t req_size, - size_t offset) { - LOG(INFO) << "Download cache file from remote file: " << remote_file_reader->path().native() - << " -> " << cache_file.native() << ". offset: " << offset - << ", request size: " << req_size; - io::FileWriterPtr file_writer; - RETURN_NOT_OK_STATUS_WITH_WARN( - io::global_local_filesystem()->create_file(cache_file, &file_writer), - fmt::format("Create local cache file failed: {}", cache_file.native())); - auto func = [cache_file, cache_done_file, remote_file_reader, req_size, - offset](io::FileWriter* file_writer) { - char* file_buf = ExecEnv::GetInstance()->get_download_cache_buf( - ExecEnv::GetInstance()->get_serial_download_cache_thread_token()); - size_t count_bytes_read = 0; - size_t need_req_size = config::download_cache_buffer_size; - while (count_bytes_read < req_size) { - memset(file_buf, 0, need_req_size); - if (req_size - count_bytes_read < config::download_cache_buffer_size) { - need_req_size = req_size - count_bytes_read; - } - Slice file_slice(file_buf, need_req_size); - size_t bytes_read = 0; - RETURN_NOT_OK_STATUS_WITH_WARN( - remote_file_reader->read_at(offset + count_bytes_read, file_slice, &bytes_read), - fmt::format("read remote file failed. {}. offset: {}, request size: {}", - remote_file_reader->path().native(), offset + count_bytes_read, - need_req_size)); - if (bytes_read != need_req_size) { - return Status::Error( - "read remote file failed: {}, bytes read: {} vs need read size: {}", - remote_file_reader->path().native(), bytes_read, need_req_size); - } - count_bytes_read += bytes_read; - RETURN_NOT_OK_STATUS_WITH_WARN( - file_writer->append(file_slice), - fmt::format("Write local cache file failed: {}", cache_file.native())); - } - return Status::OK(); - }; - auto st = func(file_writer.get()); - if (!st.ok()) { - WARN_IF_ERROR(file_writer->close(), - fmt::format("Close local cache file failed: {}", cache_file.native())); - return st; - } - RETURN_NOT_OK_STATUS_WITH_WARN( - file_writer->close(), - fmt::format("Close local cache file failed: {}", cache_file.native())); - io::FileWriterPtr done_file_writer; - RETURN_NOT_OK_STATUS_WITH_WARN( - io::global_local_filesystem()->create_file(cache_done_file, &done_file_writer), - fmt::format("Create local done file failed: {}", cache_done_file.native())); - RETURN_NOT_OK_STATUS_WITH_WARN( - done_file_writer->close(), - fmt::format("Close local done file failed: {}", cache_done_file.native())); - return Status::OK(); -} - -Status FileCache::_remove_file(const Path& file, size_t* cleaned_size) { - bool cache_file_exist = false; - RETURN_NOT_OK_STATUS_WITH_WARN(io::global_local_filesystem()->exists(file, &cache_file_exist), - "Check local cache file exist failed."); - int64_t file_size = -1; - if (cache_file_exist) { - RETURN_NOT_OK_STATUS_WITH_WARN( - io::global_local_filesystem()->file_size(file, &file_size), - fmt::format("get local cache file size failed: {}", file.native())); - - RETURN_NOT_OK_STATUS_WITH_WARN( - io::global_local_filesystem()->delete_file(file), - fmt::format("Delete local cache file failed: {}", file.native())); - LOG(INFO) << "Delete local cache file successfully: " << file.native() - << ", file size: " << file_size; - } - if (cleaned_size) { - *cleaned_size = file_size; - } - return Status::OK(); -} - -Status FileCache::_remove_cache_and_done(const Path& cache_file, const Path& cache_done_file, - size_t* cleaned_size) { - RETURN_IF_ERROR(_remove_file(cache_done_file, nullptr)); - RETURN_IF_ERROR(_remove_file(cache_file, cleaned_size)); - return Status::OK(); -} - -Status FileCache::_get_dir_files_and_remove_unfinished(const Path& cache_dir, - std::vector& cache_names) { - bool cache_dir_exist = true; - RETURN_NOT_OK_STATUS_WITH_WARN( - io::global_local_filesystem()->exists(cache_dir, &cache_dir_exist), - fmt::format("Check local cache dir exist failed. {}", cache_dir.native())); - if (!cache_dir_exist) { - return Status::OK(); - } - - // list all files - std::vector cache_files; - bool exists = true; - RETURN_NOT_OK_STATUS_WITH_WARN( - io::global_local_filesystem()->list(cache_dir, true, &cache_files, &exists), - fmt::format("List dir failed: {}", cache_dir.native())) - - // separate DATA file and DONE file - std::set cache_names_temp; - std::list done_names_temp; - for (auto& cache_file : cache_files) { - if (ends_with(cache_file.file_name, CACHE_DONE_FILE_SUFFIX)) { - done_names_temp.push_back(cache_file.file_name); - } else { - cache_names_temp.insert(cache_file.file_name); - } - } - - // match DONE file with DATA file - for (auto done_file : done_names_temp) { - Path cache_filename = StringReplace(done_file.native(), CACHE_DONE_FILE_SUFFIX, "", true); - if (auto cache_iter = cache_names_temp.find(cache_filename); - cache_iter != cache_names_temp.end()) { - cache_names_temp.erase(cache_iter); - cache_names.push_back(std::move(cache_filename)); - } else { - // not data file, but with DONE file - RETURN_IF_ERROR(_remove_file(done_file, nullptr)); - } - } - // data file without DONE file - for (auto& file : cache_names_temp) { - RETURN_IF_ERROR(_remove_file(file, nullptr)); - } - return Status::OK(); -} - -Status FileCache::_clean_unfinished_files(const std::vector& unfinished_files) { - // remove cache file without done file - for (auto file : unfinished_files) { - RETURN_IF_ERROR(_remove_file(file, nullptr)); - } - return Status::OK(); -} - -Status FileCache::_check_and_delete_empty_dir(const Path& cache_dir) { - bool cache_dir_exist = true; - RETURN_NOT_OK_STATUS_WITH_WARN( - io::global_local_filesystem()->exists(cache_dir, &cache_dir_exist), - fmt::format("Check local cache dir exist failed. {}", cache_dir.native())); - if (!cache_dir_exist) { - return Status::OK(); - } - - std::vector cache_files; - bool exists = true; - RETURN_NOT_OK_STATUS_WITH_WARN( - io::global_local_filesystem()->list(cache_dir, true, &cache_files, &exists), - fmt::format("List dir failed: {}", cache_dir.native())); - if (cache_files.empty()) { - RETURN_NOT_OK_STATUS_WITH_WARN(io::global_local_filesystem()->delete_directory(cache_dir), - fmt::format("Delete dir failed: {}", cache_dir.native())); - LOG(INFO) << "Delete empty dir: " << cache_dir.native(); - } - return Status::OK(); -} - -} // namespace io -} // namespace doris diff --git a/be/src/io/cache/file_cache.h b/be/src/io/cache/file_cache.h deleted file mode 100644 index 2b2156ea46..0000000000 --- a/be/src/io/cache/file_cache.h +++ /dev/null @@ -1,106 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -#pragma once - -#include -#include -#include - -#include -#include -#include - -#include "common/status.h" -#include "io/fs/file_reader.h" -#include "io/fs/file_reader_writer_fwd.h" -#include "io/fs/path.h" -#include "util/slice.h" - -namespace doris { -namespace io { -class IOContext; - -const std::string CACHE_DONE_FILE_SUFFIX = "_DONE"; - -class FileCache : public FileReader { -public: - FileCache() : _cache_file_size(0) {} - ~FileCache() override = default; - - DISALLOW_COPY_AND_ASSIGN(FileCache); - - virtual const Path& cache_dir() const = 0; - - size_t cache_file_size() const { return _cache_file_size; } - - virtual io::FileReaderSPtr remote_file_reader() const = 0; - - virtual Status clean_timeout_cache() = 0; - - virtual Status clean_all_cache() = 0; - - virtual Status clean_one_cache(size_t* cleaned_size) = 0; - - virtual bool is_gc_finish() const = 0; - - virtual bool is_dummy_file_cache() { return false; } - - Status download_cache_to_local(const Path& cache_file, const Path& cache_done_file, - io::FileReaderSPtr remote_file_reader, size_t req_size, - size_t offset = 0); - - virtual int64_t get_oldest_match_time() const = 0; - -protected: - Status read_at_impl(size_t offset, Slice result, size_t* bytes_read, - const IOContext* io_ctx) override { - return Status::NotSupported("dummy file cache only used for GC"); - } - - Status _remove_file(const Path& file, size_t* cleaned_size); - - Status _remove_cache_and_done(const Path& cache_file, const Path& cache_done_file, - size_t* cleaned_size); - - Status _get_dir_files_and_remove_unfinished(const Path& cache_dir, - std::vector& cache_names); - - Status _clean_unfinished_files(const std::vector& unfinished_files); - - Status _check_and_delete_empty_dir(const Path& cache_dir); - - template - struct SubFileLRUComparator { - bool operator()(const T& lhs, const T& rhs) const { - return lhs.last_match_time > rhs.last_match_time; - } - }; - - size_t _cache_file_size; -}; - -using FileCachePtr = std::shared_ptr; - -struct FileCacheLRUComparator { - bool operator()(const FileCachePtr& lhs, const FileCachePtr& rhs) const { - return lhs->get_oldest_match_time() > rhs->get_oldest_match_time(); - } -}; - -} // namespace io -} // namespace doris diff --git a/be/src/io/cache/file_cache_manager.cpp b/be/src/io/cache/file_cache_manager.cpp deleted file mode 100644 index f581b7d651..0000000000 --- a/be/src/io/cache/file_cache_manager.cpp +++ /dev/null @@ -1,256 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -#include "io/cache/file_cache_manager.h" - -#include -#include -#include - -#include -#include -#include -#include - -#include "common/config.h" -#include "io/cache/dummy_file_cache.h" -#include "io/cache/sub_file_cache.h" -#include "io/cache/whole_file_cache.h" -#include "io/fs/file_reader_options.h" -#include "io/fs/file_system.h" -#include "io/fs/local_file_system.h" -#include "olap/data_dir.h" -#include "olap/rowset/beta_rowset.h" -#include "olap/storage_engine.h" -#include "olap/tablet.h" -#include "olap/tablet_manager.h" - -namespace doris { -namespace io { - -void GCContextPerDisk::init(const std::string& path, int64_t max_size) { - _disk_path = path; - _conf_max_size = max_size; - _used_size = 0; -} - -bool GCContextPerDisk::try_add_file_cache(FileCachePtr cache, int64_t file_size) { - if (cache->cache_dir().string().substr(0, _disk_path.size()) == _disk_path) { - _lru_queue.push(cache); - _used_size += file_size; - return true; - } - return false; -} - -FileCachePtr GCContextPerDisk::top() { - if (!_lru_queue.empty() && _used_size > _conf_max_size) { - return _lru_queue.top(); - } - return nullptr; -} - -void GCContextPerDisk::pop() { - if (!_lru_queue.empty()) { - _lru_queue.pop(); - } -} - -Status GCContextPerDisk::gc_top() { - if (!_lru_queue.empty() && _used_size > _conf_max_size) { - auto file_cache = _lru_queue.top(); - size_t cleaned_size = 0; - RETURN_IF_ERROR(file_cache->clean_one_cache(&cleaned_size)); - _used_size -= cleaned_size; - _lru_queue.pop(); - if (!file_cache->is_gc_finish()) { - _lru_queue.push(file_cache); - } - } - return Status::OK(); -} - -void FileCacheManager::add_file_cache(const std::string& cache_path, FileCachePtr file_cache) { - std::lock_guard wrlock(_cache_map_lock); - _file_cache_map.emplace(cache_path, file_cache); -} - -void FileCacheManager::remove_file_cache(const std::string& cache_path) { - bool cache_path_exist = false; - { - std::shared_lock rdlock(_cache_map_lock); - if (_file_cache_map.find(cache_path) == _file_cache_map.end()) { - bool cache_dir_exist = false; - if (global_local_filesystem()->exists(cache_path, &cache_dir_exist).ok() && - cache_dir_exist) { - Status st = global_local_filesystem()->delete_directory(cache_path); - if (!st.ok()) { - LOG(WARNING) << st.to_string(); - } - } - } else { - cache_path_exist = true; - _file_cache_map.find(cache_path)->second->clean_all_cache(); - } - } - if (cache_path_exist) { - std::lock_guard wrlock(_cache_map_lock); - _file_cache_map.erase(cache_path); - } -} - -void FileCacheManager::_add_file_cache_for_gc_by_disk(std::vector& contexts, - FileCachePtr file_cache) { - // sort file cache by last match time - if (config::file_cache_max_size_per_disk > 0) { - auto file_size = file_cache->cache_file_size(); - if (file_size <= 0) { - return; - } - for (size_t i = 0; i < contexts.size(); ++i) { - if (contexts[i].try_add_file_cache(file_cache, file_size)) { - break; - } - } - } -} - -void FileCacheManager::_gc_unused_file_caches(std::list& result) { - std::vector tablets = - StorageEngine::instance()->tablet_manager()->get_all_tablet(); - bool exists = true; - for (const auto& tablet : tablets) { - std::vector seg_files; - if (io::global_local_filesystem() - ->list(tablet->tablet_path(), true, &seg_files, &exists) - .ok()) { - for (auto& seg_file : seg_files) { - std::string seg_filename = seg_file.file_name; - // check if it is a dir name - if (!BetaRowset::is_segment_cache_dir(seg_filename)) { - continue; - } - // skip file cache already in memory - std::stringstream ss; - ss << tablet->tablet_path() << "/" << seg_filename; - std::string cache_path = ss.str(); - - std::shared_lock rdlock(_cache_map_lock); - if (_file_cache_map.find(cache_path) != _file_cache_map.end()) { - continue; - } - auto file_cache = std::make_shared( - cache_path, config::file_cache_alive_time_sec); - // load cache meta from disk and clean unfinished cache files - file_cache->load_and_clean(); - // policy1: GC file cache by timeout - file_cache->clean_timeout_cache(); - - result.push_back(file_cache); - } - } - } -} - -void FileCacheManager::gc_file_caches() { - int64_t gc_conf_size = config::file_cache_max_size_per_disk; - std::vector contexts; - // init for GC by disk size - if (gc_conf_size > 0) { - std::vector data_dirs = doris::StorageEngine::instance()->get_stores(); - contexts.resize(data_dirs.size()); - for (size_t i = 0; i < contexts.size(); ++i) { - contexts[i].init(data_dirs[i]->path(), gc_conf_size); - } - } - - // process unused file caches - std::list dummy_file_list; - _gc_unused_file_caches(dummy_file_list); - - { - std::shared_lock rdlock(_cache_map_lock); - for (auto item : dummy_file_list) { - // check again after _cache_map_lock hold - if (_file_cache_map.find(item->cache_dir().native()) != _file_cache_map.end()) { - continue; - } - // sort file cache by last match time - _add_file_cache_for_gc_by_disk(contexts, item); - } - - // process file caches in memory - for (std::map::const_iterator iter = _file_cache_map.cbegin(); - iter != _file_cache_map.cend(); ++iter) { - if (iter->second == nullptr) { - continue; - } - // policy1: GC file cache by timeout - iter->second->clean_timeout_cache(); - // sort file cache by last match time - _add_file_cache_for_gc_by_disk(contexts, iter->second); - } - } - - // policy2: GC file cache by disk size - if (gc_conf_size > 0) { - for (size_t i = 0; i < contexts.size(); ++i) { - auto& context = contexts[i]; - FileCachePtr file_cache; - while ((file_cache = context.top()) != nullptr) { - { - std::shared_lock rdlock(_cache_map_lock); - // for dummy file cache, check already used or not again - if (file_cache->is_dummy_file_cache() && - _file_cache_map.find(file_cache->cache_dir().native()) != - _file_cache_map.end()) { - context.pop(); - continue; - } - } - WARN_IF_ERROR(context.gc_top(), - fmt::format("gc {} error", file_cache->cache_dir().native())); - } - } - } -} - -FileCachePtr FileCacheManager::new_file_cache(const std::string& cache_dir, int64_t alive_time_sec, - io::FileReaderSPtr remote_file_reader, - io::FileCachePolicy cache_type) { - switch (cache_type) { - case io::FileCachePolicy::WHOLE_FILE_CACHE: - return std::make_unique(cache_dir, alive_time_sec, remote_file_reader); - case io::FileCachePolicy::SUB_FILE_CACHE: - return std::make_unique(cache_dir, alive_time_sec, remote_file_reader); - default: - return nullptr; - } -} - -bool FileCacheManager::exist(const std::string& cache_path) { - std::shared_lock rdlock(_cache_map_lock); - return _file_cache_map.find(cache_path) != _file_cache_map.end(); -} - -FileCacheManager* FileCacheManager::instance() { - static FileCacheManager cache_manager; - return &cache_manager; -} - -} // namespace io -} // namespace doris diff --git a/be/src/io/cache/file_cache_manager.h b/be/src/io/cache/file_cache_manager.h deleted file mode 100644 index 7ac5e652d4..0000000000 --- a/be/src/io/cache/file_cache_manager.h +++ /dev/null @@ -1,84 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -#pragma once - -#include - -#include -#include -#include -#include -#include -#include - -#include "common/status.h" -#include "io/cache/file_cache.h" -#include "io/fs/file_reader_writer_fwd.h" - -namespace doris { -namespace io { -enum class FileCachePolicy : uint8_t; - -class GCContextPerDisk { -public: - GCContextPerDisk() : _conf_max_size(0), _used_size(0) {} - void init(const std::string& path, int64_t max_size); - bool try_add_file_cache(FileCachePtr cache, int64_t file_size); - FileCachePtr top(); - Status gc_top(); - void pop(); - -private: - std::string _disk_path; - int64_t _conf_max_size; - int64_t _used_size; - std::priority_queue, FileCacheLRUComparator> _lru_queue; -}; - -class FileCacheManager { -public: - FileCacheManager() = default; - ~FileCacheManager() = default; - - static FileCacheManager* instance(); - - void add_file_cache(const std::string& cache_path, FileCachePtr file_cache); - - void remove_file_cache(const std::string& cache_path); - - void gc_file_caches(); - - FileCachePtr new_file_cache(const std::string& cache_dir, int64_t alive_time_sec, - io::FileReaderSPtr remote_file_reader, - io::FileCachePolicy cache_type); - - bool exist(const std::string& cache_path); - -private: - void _gc_unused_file_caches(std::list& result); - void _add_file_cache_for_gc_by_disk(std::vector& contexts, - FileCachePtr file_cache); - -private: - std::shared_mutex _cache_map_lock; - // cache_path -> FileCache - std::map _file_cache_map; -}; - -} // namespace io -} // namespace doris diff --git a/be/src/io/cache/sub_file_cache.cpp b/be/src/io/cache/sub_file_cache.cpp deleted file mode 100644 index 1d64c64d4d..0000000000 --- a/be/src/io/cache/sub_file_cache.cpp +++ /dev/null @@ -1,342 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -#include "io/cache/sub_file_cache.h" - -#include -#include -#include - -#include -#include -#include -#include -#include -#include -#include -#include -#include - -#include "common/config.h" -#include "common/status.h" -#include "io/fs/local_file_system.h" -#include "io/io_common.h" -#include "runtime/exec_env.h" -#include "util/string_util.h" -#include "util/threadpool.h" - -namespace doris { -using namespace ErrorCode; -namespace io { - -using std::vector; - -const static std::string SUB_FILE_CACHE_PREFIX = "SUB_CACHE"; - -SubFileCache::SubFileCache(const Path& cache_dir, int64_t alive_time_sec, - io::FileReaderSPtr remote_file_reader) - : _cache_dir(cache_dir), - _alive_time_sec(alive_time_sec), - _remote_file_reader(remote_file_reader) {} - -SubFileCache::~SubFileCache() = default; - -Status SubFileCache::read_at_impl(size_t offset, Slice result, size_t* bytes_read, - const IOContext* io_ctx) { - RETURN_IF_ERROR(_init()); - if (io_ctx != nullptr && io_ctx->reader_type != ReaderType::READER_QUERY) { - return _remote_file_reader->read_at(offset, result, bytes_read, io_ctx); - } - std::vector need_cache_offsets; - RETURN_IF_ERROR(_get_need_cache_offsets(offset, result.size, &need_cache_offsets)); - bool need_download = false; - { - std::shared_lock rlock(_cache_map_lock); - for (vector::const_iterator iter = need_cache_offsets.cbegin(); - iter != need_cache_offsets.cend(); ++iter) { - if (_cache_file_readers.find(*iter) == _cache_file_readers.end() || - _cache_file_readers[*iter] == nullptr) { - need_download = true; - break; - } - } - } - if (need_download) { - std::unique_lock wrlock(_cache_map_lock); - bool cache_dir_exist = false; - RETURN_NOT_OK_STATUS_WITH_WARN( - io::global_local_filesystem()->exists(_cache_dir, &cache_dir_exist), - fmt::format("Check local cache dir exist failed. {}", _cache_dir.native())); - if (!cache_dir_exist) { - RETURN_NOT_OK_STATUS_WITH_WARN( - io::global_local_filesystem()->create_directory(_cache_dir), - fmt::format("Create local cache dir failed. {}", _cache_dir.native())); - } - for (vector::const_iterator iter = need_cache_offsets.cbegin(); - iter != need_cache_offsets.cend(); ++iter) { - if (_cache_file_readers.find(*iter) == _cache_file_readers.end() || - _cache_file_readers[*iter] == nullptr) { - size_t offset_begin = *iter; - size_t req_size = config::max_sub_cache_file_size; - if (offset_begin + req_size > _remote_file_reader->size()) { - req_size = _remote_file_reader->size() - offset_begin; - } - RETURN_IF_ERROR(_generate_cache_reader(offset_begin, req_size)); - } - } - } - { - std::shared_lock rlock(_cache_map_lock); - *bytes_read = 0; - for (vector::const_iterator iter = need_cache_offsets.cbegin(); - iter != need_cache_offsets.cend(); ++iter) { - size_t offset_begin = *iter; - if (_cache_file_readers.find(*iter) == _cache_file_readers.end()) { - return Status::Error("Local cache file reader can't be found: {}", - offset_begin); - } - if (offset_begin < offset) { - offset_begin = offset; - } - size_t req_size = *iter + config::max_sub_cache_file_size - offset_begin; - if (offset + result.size < *iter + config::max_sub_cache_file_size) { - req_size = offset + result.size - offset_begin; - } - Slice read_slice(result.mutable_data() + offset_begin - offset, req_size); - size_t sub_bytes_read = -1; - RETURN_NOT_OK_STATUS_WITH_WARN( - _cache_file_readers[*iter]->read_at(offset_begin - *iter, read_slice, - &sub_bytes_read), - fmt::format("Read local cache file failed: {}", - _cache_file_readers[*iter]->path().native())); - if (sub_bytes_read != read_slice.size) { - return Status::Error( - "read local cache file failed: {} , bytes read: {} vs req size: {}", - _cache_file_readers[*iter]->path().native(), sub_bytes_read, req_size); - } - *bytes_read += sub_bytes_read; - _last_match_times[*iter] = time(nullptr); - } - } - return Status::OK(); -} - -std::pair SubFileCache::_cache_path(size_t offset) { - return {_cache_dir / fmt::format("{}_{}", SUB_FILE_CACHE_PREFIX, offset), - _cache_dir / - fmt::format("{}_{}{}", SUB_FILE_CACHE_PREFIX, offset, CACHE_DONE_FILE_SUFFIX)}; -} - -Status SubFileCache::_generate_cache_reader(size_t offset, size_t req_size) { - auto [cache_file, cache_done_file] = _cache_path(offset); - bool done_file_exist = false; - RETURN_NOT_OK_STATUS_WITH_WARN( - io::global_local_filesystem()->exists(cache_done_file, &done_file_exist), - fmt::format("Check local cache done file exist failed. {}", cache_done_file.native())); - - std::promise download_st; - std::future future = download_st.get_future(); - if (!done_file_exist) { - ThreadPoolToken* thread_token = - ExecEnv::GetInstance()->get_serial_download_cache_thread_token(); - if (thread_token != nullptr) { - auto st = thread_token->submit_func([this, &download_st, - cache_done_file = cache_done_file, - cache_file = cache_file, offset, req_size] { - auto func = [this, cache_done_file, cache_file, offset, req_size] { - bool done_file_exist = false; - // Judge again whether cache_done_file exists, it is possible that the cache - // is downloaded while waiting in the thread pool - RETURN_NOT_OK_STATUS_WITH_WARN(io::global_local_filesystem()->exists( - cache_done_file, &done_file_exist), - "Check local cache done file exist failed."); - bool cache_file_exist = false; - RETURN_NOT_OK_STATUS_WITH_WARN( - io::global_local_filesystem()->exists(cache_file, &cache_file_exist), - fmt::format("Check local cache file exist failed. {}", - cache_file.native())); - if (done_file_exist && cache_file_exist) { - return Status::OK(); - } else if (!done_file_exist && cache_file_exist) { - RETURN_NOT_OK_STATUS_WITH_WARN( - io::global_local_filesystem()->delete_file(cache_file), - fmt::format("Check local cache file exist failed. {}", - cache_file.native())); - } - RETURN_NOT_OK_STATUS_WITH_WARN( - download_cache_to_local(cache_file, cache_done_file, - _remote_file_reader, req_size, offset), - "Download cache from remote to local failed."); - return Status::OK(); - }; - download_st.set_value(func()); - }); - if (!st.ok()) { - LOG(FATAL) << "Failed to submit download cache task to thread pool! " << st; - } - } else { - return Status::InternalError("Failed to get download cache thread token"); - } - auto st = future.get(); - if (!st.ok()) { - return st; - } - } - io::FileReaderSPtr cache_reader; - RETURN_IF_ERROR(io::global_local_filesystem()->open_file(cache_file, &cache_reader)); - _cache_file_readers.emplace(offset, cache_reader); - _last_match_times.emplace(offset, time(nullptr)); - LOG(INFO) << "Create cache file from remote file successfully: " - << _remote_file_reader->path().native() << "(" << offset << ", " << req_size - << ") -> " << cache_file.native(); - return Status::OK(); -} - -Status SubFileCache::_get_need_cache_offsets(size_t offset, size_t req_size, - std::vector* cache_offsets) { - size_t first_offset_begin = - offset / config::max_sub_cache_file_size * config::max_sub_cache_file_size; - for (size_t begin = first_offset_begin; begin < offset + req_size; - begin += config::max_sub_cache_file_size) { - cache_offsets->push_back(begin); - } - return Status::OK(); -} - -Status SubFileCache::clean_timeout_cache() { - RETURN_IF_ERROR(_init()); - SubGcQueue gc_queue; - _gc_lru_queue.swap(gc_queue); - std::vector timeout_keys; - { - std::shared_lock rlock(_cache_map_lock); - for (std::map::const_iterator iter = _last_match_times.cbegin(); - iter != _last_match_times.cend(); ++iter) { - if (time(nullptr) - iter->second > _alive_time_sec) { - timeout_keys.emplace_back(iter->first); - } else { - _gc_lru_queue.push({iter->first, iter->second}); - } - } - } - - std::unique_lock wrlock(_cache_map_lock); - if (timeout_keys.size() > 0) { - for (std::vector::const_iterator iter = timeout_keys.cbegin(); - iter != timeout_keys.cend(); ++iter) { - size_t cleaned_size = 0; - RETURN_IF_ERROR(_clean_cache_internal(*iter, &cleaned_size)); - _cache_file_size -= cleaned_size; - } - } - return _check_and_delete_empty_dir(_cache_dir); -} - -Status SubFileCache::clean_all_cache() { - std::unique_lock wrlock(_cache_map_lock); - for (std::map::const_iterator iter = _last_match_times.cbegin(); - iter != _last_match_times.cend(); ++iter) { - RETURN_IF_ERROR(_clean_cache_internal(iter->first, nullptr)); - } - _cache_file_size = 0; - return _check_and_delete_empty_dir(_cache_dir); -} - -Status SubFileCache::clean_one_cache(size_t* cleaned_size) { - if (!_gc_lru_queue.empty()) { - const auto& cache = _gc_lru_queue.top(); - { - std::unique_lock wrlock(_cache_map_lock); - if (auto it = _last_match_times.find(cache.offset); - it != _last_match_times.end() && it->second == cache.last_match_time) { - RETURN_IF_ERROR(_clean_cache_internal(cache.offset, cleaned_size)); - _cache_file_size -= *cleaned_size; - _gc_lru_queue.pop(); - } - } - decltype(_last_match_times.begin()) it; - while (!_gc_lru_queue.empty() && - (it = _last_match_times.find(_gc_lru_queue.top().offset)) != - _last_match_times.end() && - it->second != _gc_lru_queue.top().last_match_time) { - _gc_lru_queue.pop(); - } - } - if (_gc_lru_queue.empty()) { - std::unique_lock wrlock(_cache_map_lock); - RETURN_IF_ERROR(_check_and_delete_empty_dir(_cache_dir)); - } - return Status::OK(); -} - -Status SubFileCache::_clean_cache_internal(size_t offset, size_t* cleaned_size) { - if (_cache_file_readers.find(offset) != _cache_file_readers.end()) { - _cache_file_readers.erase(offset); - } - if (_last_match_times.find(offset) != _last_match_times.end()) { - _last_match_times.erase(offset); - } - auto [cache_file, done_file] = _cache_path(offset); - return _remove_cache_and_done(cache_file, done_file, cleaned_size); -} - -Status SubFileCache::_init() { - if (_is_inited) { - return Status::OK(); - } - std::vector cache_names; - - std::unique_lock wrlock(_cache_map_lock); - size_t cache_file_size = 0; - RETURN_IF_ERROR(_get_dir_files_and_remove_unfinished(_cache_dir, cache_names)); - std::map expect_file_size_map; - RETURN_IF_ERROR(_get_all_sub_file_size(&expect_file_size_map)); - for (const auto& file : cache_names) { - auto str_vec = split(file.native(), "_"); - size_t offset = std::strtoul(str_vec[str_vec.size() - 1].c_str(), nullptr, 10); - - int64_t file_size = -1; - auto path = _cache_dir / file; - RETURN_IF_ERROR(io::global_local_filesystem()->file_size(path, &file_size)); - if (expect_file_size_map.find(offset) == expect_file_size_map.end() || - expect_file_size_map[offset] != file_size) { - LOG(INFO) << "Delete invalid cache file: " << path.native() << ", offset: " << offset - << ", size: " << file_size; - _clean_cache_internal(offset, nullptr); - continue; - } - _last_match_times[offset] = time(nullptr); - cache_file_size += file_size; - } - _cache_file_size = cache_file_size; - _is_inited = true; - return Status::OK(); -} - -Status SubFileCache::_get_all_sub_file_size(std::map* expect_file_size_map) { - std::vector cache_offsets; - RETURN_IF_ERROR(_get_need_cache_offsets(0, _remote_file_reader->size(), &cache_offsets)); - for (int i = 0; i < cache_offsets.size() - 1; ++i) { - expect_file_size_map->emplace(cache_offsets[i], config::max_sub_cache_file_size); - } - expect_file_size_map->emplace(cache_offsets[cache_offsets.size() - 1], - _remote_file_reader->size() % config::max_sub_cache_file_size); - return Status::OK(); -} - -} // namespace io -} // namespace doris diff --git a/be/src/io/cache/sub_file_cache.h b/be/src/io/cache/sub_file_cache.h deleted file mode 100644 index 96dc873a88..0000000000 --- a/be/src/io/cache/sub_file_cache.h +++ /dev/null @@ -1,117 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -#pragma once - -#include -#include - -#include -#include -#include -#include -#include -#include -#include - -#include "common/status.h" -#include "io/cache/file_cache.h" -#include "io/fs/file_reader.h" -#include "io/fs/file_reader_writer_fwd.h" -#include "io/fs/file_system.h" -#include "io/fs/path.h" -#include "util/slice.h" - -namespace doris { -namespace io { -class IOContext; - -class SubFileCache final : public FileCache { -public: - SubFileCache(const Path& cache_dir, int64_t alive_time_sec, - io::FileReaderSPtr remote_file_reader); - ~SubFileCache() override; - - Status close() override { return _remote_file_reader->close(); } - - const Path& path() const override { return _remote_file_reader->path(); } - - size_t size() const override { return _remote_file_reader->size(); } - - bool closed() const override { return _remote_file_reader->closed(); } - - const Path& cache_dir() const override { return _cache_dir; } - - io::FileReaderSPtr remote_file_reader() const override { return _remote_file_reader; } - - Status clean_timeout_cache() override; - - Status clean_all_cache() override; - - Status clean_one_cache(size_t* cleaned_size) override; - - int64_t get_oldest_match_time() const override { - return _gc_lru_queue.empty() ? 0 : _gc_lru_queue.top().last_match_time; - } - - bool is_gc_finish() const override { return _gc_lru_queue.empty(); } - - FileSystemSPtr fs() const override { return _remote_file_reader->fs(); } - -protected: - Status read_at_impl(size_t offset, Slice result, size_t* bytes_read, - const IOContext* io_ctx) override; - -private: - Status _generate_cache_reader(size_t offset, size_t req_size); - - Status _clean_cache_internal(size_t offset, size_t* cleaned_size); - - Status _get_need_cache_offsets(size_t offset, size_t req_size, - std::vector* cache_offsets); - - std::pair _cache_path(size_t offset); - - Status _init(); - - Status _get_all_sub_file_size(std::map* expect_file_size_map); - -private: - struct SubFileInfo { - size_t offset; - int64_t last_match_time; - }; - using SubGcQueue = std::priority_queue, - SubFileLRUComparator>; - // used by gc thread, and currently has no lock protection - SubGcQueue _gc_lru_queue; - - Path _cache_dir; - int64_t _alive_time_sec; - io::FileReaderSPtr _remote_file_reader; - - std::shared_mutex _cache_map_lock; - // offset_begin -> last_match_time - std::map _last_match_times; - // offset_begin -> local file reader - std::map _cache_file_readers; - - bool _is_inited = false; -}; - -} // namespace io -} // namespace doris diff --git a/be/src/io/cache/whole_file_cache.cpp b/be/src/io/cache/whole_file_cache.cpp deleted file mode 100644 index c24d64f2a9..0000000000 --- a/be/src/io/cache/whole_file_cache.cpp +++ /dev/null @@ -1,184 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -#include "io/cache/whole_file_cache.h" - -#include -#include - -#include -#include -#include -#include -#include - -#include "io/fs/local_file_system.h" -#include "io/io_common.h" -#include "runtime/exec_env.h" -#include "util/threadpool.h" - -namespace doris { -using namespace ErrorCode; -namespace io { - -const static std::string WHOLE_FILE_CACHE_NAME = "WHOLE_FILE_CACHE"; - -WholeFileCache::WholeFileCache(const Path& cache_dir, int64_t alive_time_sec, - io::FileReaderSPtr remote_file_reader) - : _cache_dir(cache_dir), - _alive_time_sec(alive_time_sec), - _remote_file_reader(remote_file_reader), - _cache_file_reader(nullptr) {} - -WholeFileCache::~WholeFileCache() = default; - -Status WholeFileCache::read_at_impl(size_t offset, Slice result, size_t* bytes_read, - const IOContext* io_ctx) { - if (io_ctx != nullptr && io_ctx->reader_type != ReaderType::READER_QUERY) { - return _remote_file_reader->read_at(offset, result, bytes_read, io_ctx); - } - if (_cache_file_reader == nullptr) { - RETURN_IF_ERROR(_generate_cache_reader(offset, result.size)); - } - std::shared_lock rlock(_cache_lock); - RETURN_NOT_OK_STATUS_WITH_WARN( - _cache_file_reader->read_at(offset, result, bytes_read, io_ctx), - fmt::format("Read local cache file failed: {}", _cache_file_reader->path().native())); - if (*bytes_read != result.size) { - return Status::Error( - "read cache file failed: {}, bytes read: {} vs required size: {}", - _cache_file_reader->path().native(), *bytes_read, result.size); - } - update_last_match_time(); - return Status::OK(); -} - -Status WholeFileCache::_generate_cache_reader(size_t offset, size_t req_size) { - std::unique_lock wrlock(_cache_lock); - Path cache_file = _cache_dir / WHOLE_FILE_CACHE_NAME; - Path cache_done_file = - _cache_dir / fmt::format("{}{}", WHOLE_FILE_CACHE_NAME, CACHE_DONE_FILE_SUFFIX); - bool done_file_exist = false; - RETURN_NOT_OK_STATUS_WITH_WARN( - io::global_local_filesystem()->exists(cache_done_file, &done_file_exist), - "Check local cache done file exist failed."); - - std::promise download_st; - std::future future = download_st.get_future(); - if (!done_file_exist) { - ThreadPoolToken* thread_token = - ExecEnv::GetInstance()->get_serial_download_cache_thread_token(); - if (thread_token != nullptr) { - auto st = thread_token->submit_func([this, &download_st, cache_done_file, cache_file] { - auto func = [this, cache_done_file, cache_file] { - bool done_file_exist = false; - bool cache_dir_exist = false; - RETURN_NOT_OK_STATUS_WITH_WARN( - io::global_local_filesystem()->exists(_cache_dir, &cache_dir_exist), - fmt::format("Check local cache dir exist failed. {}", - _cache_dir.native())); - if (!cache_dir_exist) { - RETURN_NOT_OK_STATUS_WITH_WARN( - io::global_local_filesystem()->create_directory(_cache_dir), - fmt::format("Create local cache dir failed. {}", - _cache_dir.native())); - } else { - // Judge again whether cache_done_file exists, it is possible that the cache - // is downloaded while waiting in the thread pool - RETURN_NOT_OK_STATUS_WITH_WARN(io::global_local_filesystem()->exists( - cache_done_file, &done_file_exist), - "Check local cache done file exist failed."); - } - bool cache_file_exist = false; - RETURN_NOT_OK_STATUS_WITH_WARN( - io::global_local_filesystem()->exists(cache_file, &cache_file_exist), - "Check local cache file exist failed."); - if (done_file_exist && cache_file_exist) { - return Status::OK(); - } else if (!done_file_exist && cache_file_exist) { - RETURN_NOT_OK_STATUS_WITH_WARN( - io::global_local_filesystem()->delete_file(cache_file), - fmt::format("Check local cache file exist failed. {}", - cache_file.native())); - } - size_t req_size = _remote_file_reader->size(); - RETURN_NOT_OK_STATUS_WITH_WARN( - download_cache_to_local(cache_file, cache_done_file, - _remote_file_reader, req_size), - "Download cache from remote to local failed."); - return Status::OK(); - }; - download_st.set_value(func()); - }); - if (!st.ok()) { - LOG(FATAL) << "Failed to submit download cache task to thread pool! " << st; - return st; - } - } else { - return Status::InternalError("Failed to get download cache thread token"); - } - auto st = future.get(); - if (!st.ok()) { - return st; - } - } - RETURN_IF_ERROR(io::global_local_filesystem()->open_file(cache_file, &_cache_file_reader)); - _cache_file_size = _cache_file_reader->size(); - LOG(INFO) << "Create cache file from remote file successfully: " - << _remote_file_reader->path().native() << " -> " << cache_file.native(); - return Status::OK(); -} - -Status WholeFileCache::clean_timeout_cache() { - std::unique_lock wrlock(_cache_lock); - _gc_match_time = _last_match_time; - if (time(nullptr) - _last_match_time > _alive_time_sec) { - _clean_cache_internal(nullptr); - } - return Status::OK(); -} - -Status WholeFileCache::clean_all_cache() { - std::unique_lock wrlock(_cache_lock); - return _clean_cache_internal(nullptr); -} - -Status WholeFileCache::clean_one_cache(size_t* cleaned_size) { - std::unique_lock wrlock(_cache_lock); - if (_gc_match_time == _last_match_time) { - return _clean_cache_internal(cleaned_size); - } - return Status::OK(); -} - -Status WholeFileCache::_clean_cache_internal(size_t* cleaned_size) { - _cache_file_reader.reset(); - _cache_file_size = 0; - Path cache_file = _cache_dir / WHOLE_FILE_CACHE_NAME; - Path done_file = - _cache_dir / fmt::format("{}{}", WHOLE_FILE_CACHE_NAME, CACHE_DONE_FILE_SUFFIX); - RETURN_IF_ERROR(_remove_cache_and_done(cache_file, done_file, cleaned_size)); - return _check_and_delete_empty_dir(_cache_dir); -} - -bool WholeFileCache::is_gc_finish() const { - std::shared_lock rlock(_cache_lock); - return _cache_file_reader == nullptr; -} - -} // namespace io -} // namespace doris diff --git a/be/src/io/cache/whole_file_cache.h b/be/src/io/cache/whole_file_cache.h deleted file mode 100644 index 866e6e142e..0000000000 --- a/be/src/io/cache/whole_file_cache.h +++ /dev/null @@ -1,92 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -#pragma once - -#include -#include - -#include -#include - -#include "common/status.h" -#include "io/cache/file_cache.h" -#include "io/fs/file_reader.h" -#include "io/fs/file_reader_writer_fwd.h" -#include "io/fs/file_system.h" -#include "io/fs/path.h" -#include "util/slice.h" - -namespace doris { -namespace io { -class IOContext; - -class WholeFileCache final : public FileCache { -public: - WholeFileCache(const Path& cache_dir, int64_t alive_time_sec, - io::FileReaderSPtr remote_file_reader); - ~WholeFileCache() override; - - Status close() override { return _remote_file_reader->close(); } - - const Path& path() const override { return _remote_file_reader->path(); } - - size_t size() const override { return _remote_file_reader->size(); } - - bool closed() const override { return _remote_file_reader->closed(); } - - const Path& cache_dir() const override { return _cache_dir; } - - io::FileReaderSPtr remote_file_reader() const override { return _remote_file_reader; } - - Status clean_timeout_cache() override; - - Status clean_all_cache() override; - - Status clean_one_cache(size_t* cleaned_size) override; - - int64_t get_oldest_match_time() const override { return _gc_match_time; } - - bool is_gc_finish() const override; - - FileSystemSPtr fs() const override { return _remote_file_reader->fs(); } - -protected: - Status read_at_impl(size_t offset, Slice result, size_t* bytes_read, - const IOContext* io_ctx) override; - -private: - Status _generate_cache_reader(size_t offset, size_t req_size); - - Status _clean_cache_internal(size_t* cleaned_size); - - void update_last_match_time() { _last_match_time = time(nullptr); } - -private: - Path _cache_dir; - int64_t _alive_time_sec; - io::FileReaderSPtr _remote_file_reader; - - int64_t _gc_match_time {0}; - int64_t _last_match_time {0}; - - mutable std::shared_mutex _cache_lock; - io::FileReaderSPtr _cache_file_reader; -}; - -} // namespace io -} // namespace doris diff --git a/be/src/io/file_factory.cpp b/be/src/io/file_factory.cpp index 9c49f0dd8b..02650c9419 100644 --- a/be/src/io/file_factory.cpp +++ b/be/src/io/file_factory.cpp @@ -27,7 +27,6 @@ #include "common/config.h" #include "common/status.h" #include "io/fs/broker_file_system.h" -#include "io/fs/file_reader_options.h" #include "io/fs/hdfs_file_system.h" #include "io/fs/local_file_system.h" #include "io/fs/multi_table_pipe.h" @@ -47,27 +46,22 @@ namespace io { class FileWriter; } // namespace io -static io::FileBlockCachePathPolicy BLOCK_CACHE_POLICY; -static std::string RANDOM_CACHE_BASE_PATH = "random"; +constexpr std::string_view RANDOM_CACHE_BASE_PATH = "random"; io::FileReaderOptions FileFactory::get_reader_options(RuntimeState* state) { - io::FileCachePolicy cache_policy = io::FileCachePolicy::NO_CACHE; + io::FileReaderOptions opts; if (config::enable_file_cache && state != nullptr && state->query_options().__isset.enable_file_cache && state->query_options().enable_file_cache) { - cache_policy = io::FileCachePolicy::FILE_BLOCK_CACHE; + opts.cache_type = io::FileCachePolicy::FILE_BLOCK_CACHE; } - io::FileReaderOptions reader_options(cache_policy, BLOCK_CACHE_POLICY); if (state != nullptr && state->query_options().__isset.file_cache_base_path && state->query_options().file_cache_base_path != RANDOM_CACHE_BASE_PATH) { - reader_options.specify_cache_path(state->query_options().file_cache_base_path); + opts.cache_base_path = state->query_options().file_cache_base_path; } - return reader_options; + return opts; } -io::FileReaderOptions FileFactory::NO_CACHE_READER_OPTIONS = - FileFactory::get_reader_options(nullptr); - Status FileFactory::create_file_writer(TFileType::type type, ExecEnv* env, const std::vector& broker_addresses, const std::map& properties, diff --git a/be/src/io/file_factory.h b/be/src/io/file_factory.h index a311b8d58b..ef3d8b9fc3 100644 --- a/be/src/io/file_factory.h +++ b/be/src/io/file_factory.h @@ -29,8 +29,7 @@ #include "common/factory_creator.h" #include "common/status.h" -#include "io/fs/file_reader_options.h" -#include "io/fs/file_reader_writer_fwd.h" +#include "io/fs/file_reader.h" #include "io/fs/fs_utils.h" namespace doris { @@ -47,7 +46,6 @@ class FileFactory { public: static io::FileReaderOptions get_reader_options(RuntimeState* state); - static io::FileReaderOptions NO_CACHE_READER_OPTIONS; /// Create FileWriter static Status create_file_writer(TFileType::type type, ExecEnv* env, diff --git a/be/src/io/fs/file_reader.h b/be/src/io/fs/file_reader.h index d4f7c5d530..7b7d09a5f3 100644 --- a/be/src/io/fs/file_reader.h +++ b/be/src/io/fs/file_reader.h @@ -33,6 +33,28 @@ namespace io { class FileSystem; class IOContext; +enum class FileCachePolicy : uint8_t { + NO_CACHE, + FILE_BLOCK_CACHE, +}; + +inline FileCachePolicy cache_type_from_string(std::string_view type) { + if (type == "file_block_cache") { + return FileCachePolicy::FILE_BLOCK_CACHE; + } else { + return FileCachePolicy::NO_CACHE; + } +} + +// Only affects remote file readers +struct FileReaderOptions { + FileCachePolicy cache_type {FileCachePolicy::NO_CACHE}; + bool is_doris_table = false; + std::string cache_base_path; + // Use modification time to determine whether the file is changed + int64_t modification_time = 0; +}; + class FileReader { public: FileReader() = default; @@ -60,5 +82,7 @@ protected: const IOContext* io_ctx) = 0; }; +using FileReaderSPtr = std::shared_ptr; + } // namespace io } // namespace doris diff --git a/be/src/io/fs/file_reader_options.cpp b/be/src/io/fs/file_reader_options.cpp deleted file mode 100644 index f388f51322..0000000000 --- a/be/src/io/fs/file_reader_options.cpp +++ /dev/null @@ -1,39 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -#include "io/fs/file_reader_options.h" - -namespace doris { -namespace io { - -FileReaderOptions FileReaderOptions::DEFAULT = - FileReaderOptions(FileCachePolicy::NO_CACHE, NoCachePathPolicy()); - -FileCachePolicy cache_type_from_string(const std::string& type) { - if (type == "sub_file_cache") { - return FileCachePolicy::SUB_FILE_CACHE; - } else if (type == "whole_file_cache") { - return FileCachePolicy::WHOLE_FILE_CACHE; - } else if (type == "file_block_cache") { - return FileCachePolicy::FILE_BLOCK_CACHE; - } else { - return FileCachePolicy::NO_CACHE; - } -} - -} // namespace io -} // namespace doris diff --git a/be/src/io/fs/file_reader_options.h b/be/src/io/fs/file_reader_options.h deleted file mode 100644 index 7477816f8f..0000000000 --- a/be/src/io/fs/file_reader_options.h +++ /dev/null @@ -1,88 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -#pragma once - -#include - -#include - -namespace doris { -namespace io { - -enum class FileCachePolicy : uint8_t { - NO_CACHE, - SUB_FILE_CACHE, - WHOLE_FILE_CACHE, - FILE_BLOCK_CACHE, -}; - -FileCachePolicy cache_type_from_string(const std::string& type); - -// CachePathPolicy it to define which cache path should be used -// for the local cache of the given file(path). -// The dervied class should implement get_cache_path() method -class CachePathPolicy { -public: - virtual ~CachePathPolicy() = default; - // path: the path of file which will be cached - // return value: the cache path of the given file. - virtual std::string get_cache_path(const std::string& path) const = 0; -}; - -class NoCachePathPolicy : public CachePathPolicy { -public: - std::string get_cache_path(const std::string& path) const override { return ""; } -}; - -class SegmentCachePathPolicy : public CachePathPolicy { -public: - void set_cache_path(const std::string& cache_path) { _cache_path = cache_path; } - - std::string get_cache_path(const std::string& path) const override { return _cache_path; } - -private: - std::string _cache_path; -}; - -class FileBlockCachePathPolicy : public CachePathPolicy { -public: - std::string get_cache_path(const std::string& path) const override { return path; } -}; - -class FileReaderOptions { -public: - FileReaderOptions(FileCachePolicy cache_type_, const CachePathPolicy& path_policy_) - : cache_type(cache_type_), path_policy(path_policy_) {} - - FileCachePolicy cache_type; - const CachePathPolicy& path_policy; - bool has_cache_base_path = false; - std::string cache_base_path; - // Use modification time to determine whether the file is changed - int64_t modification_time = 0; - - void specify_cache_path(const std::string& base_path) { - has_cache_base_path = true; - cache_base_path = base_path; - } - - static FileReaderOptions DEFAULT; -}; - -} // namespace io -} // namespace doris diff --git a/be/src/io/fs/file_reader_writer_fwd.h b/be/src/io/fs/file_reader_writer_fwd.h index e63c5395de..e4c90460e8 100644 --- a/be/src/io/fs/file_reader_writer_fwd.h +++ b/be/src/io/fs/file_reader_writer_fwd.h @@ -31,5 +31,7 @@ class FileWriter; using FileReaderSPtr = std::shared_ptr; using FileWriterPtr = std::unique_ptr; +struct FileReaderOptions; + } // namespace io } // namespace doris diff --git a/be/src/io/fs/file_system.cpp b/be/src/io/fs/file_system.cpp index 989a68884a..fb051fb005 100644 --- a/be/src/io/fs/file_system.cpp +++ b/be/src/io/fs/file_system.cpp @@ -17,6 +17,7 @@ #include "io/fs/file_system.h" +#include "io/fs/file_reader.h" #include "util/async_io.h" // IWYU pragma: keep namespace doris { @@ -27,6 +28,16 @@ Status FileSystem::create_file(const Path& file, FileWriterPtr* writer) { FILESYSTEM_M(create_file_impl(path, writer)); } +Status FileSystem::open_file(const Path& file, FileReaderSPtr* reader) { + FileDescription fd; + fd.path = file.native(); + return open_file(fd, FileReaderOptions {}, reader); +} + +Status FileSystem::open_file(const FileDescription& fd, FileReaderSPtr* reader) { + return open_file(fd, FileReaderOptions {}, reader); +} + Status FileSystem::open_file(const FileDescription& fd, const FileReaderOptions& reader_options, FileReaderSPtr* reader) { auto path = absolute_path(fd.path); diff --git a/be/src/io/fs/file_system.h b/be/src/io/fs/file_system.h index 04e2615fb0..748de46eb2 100644 --- a/be/src/io/fs/file_system.h +++ b/be/src/io/fs/file_system.h @@ -30,14 +30,12 @@ #include #include "common/status.h" -#include "io/fs/file_reader_options.h" #include "io/fs/file_reader_writer_fwd.h" #include "io/fs/fs_utils.h" #include "io/fs/path.h" namespace doris { namespace io { -class FileSystem; #ifndef FILESYSTEM_M #define FILESYSTEM_M(stmt) \ @@ -75,14 +73,9 @@ public: // The following are public interface. // And derived classes should implement all xxx_impl methods. Status create_file(const Path& file, FileWriterPtr* writer); - Status open_file(const Path& file, FileReaderSPtr* reader) { - FileDescription fd; - fd.path = file.native(); - return open_file(fd, FileReaderOptions::DEFAULT, reader); - } - Status open_file(const FileDescription& fd, FileReaderSPtr* reader) { - return open_file(fd, FileReaderOptions::DEFAULT, reader); - } + // FIXME(plat1ko): Use `Status open_file(const Path&, FileReaderSPtr*, const FileReaderOptions*)` + Status open_file(const Path& file, FileReaderSPtr* reader); + Status open_file(const FileDescription& fd, FileReaderSPtr* reader); Status open_file(const FileDescription& fd, const FileReaderOptions& reader_options, FileReaderSPtr* reader); Status create_directory(const Path& dir, bool failed_if_exists = false); diff --git a/be/src/io/fs/file_writer.h b/be/src/io/fs/file_writer.h index 012f0a7430..49a28a1a9b 100644 --- a/be/src/io/fs/file_writer.h +++ b/be/src/io/fs/file_writer.h @@ -21,17 +21,16 @@ #include "common/status.h" #include "gutil/macros.h" -#include "io/fs/file_reader_writer_fwd.h" -#include "io/fs/file_system.h" #include "io/fs/path.h" #include "util/slice.h" namespace doris { namespace io { +class FileSystem; class FileWriter { public: - FileWriter(Path&& path, FileSystemSPtr fs) : _path(std::move(path)), _fs(fs) {} + FileWriter(Path&& path, std::shared_ptr fs) : _path(std::move(path)), _fs(fs) {} FileWriter() = default; virtual ~FileWriter() = default; @@ -57,17 +56,19 @@ public: size_t bytes_appended() const { return _bytes_appended; } - FileSystemSPtr fs() const { return _fs; } + std::shared_ptr fs() const { return _fs; } bool is_closed() { return _closed; } protected: Path _path; size_t _bytes_appended = 0; - FileSystemSPtr _fs; + std::shared_ptr _fs; bool _closed = false; bool _opened = false; }; +using FileWriterPtr = std::unique_ptr; + } // namespace io } // namespace doris diff --git a/be/src/io/fs/local_file_system.cpp b/be/src/io/fs/local_file_system.cpp index 48c1981202..2a3dff4071 100644 --- a/be/src/io/fs/local_file_system.cpp +++ b/be/src/io/fs/local_file_system.cpp @@ -44,7 +44,6 @@ namespace doris { namespace io { -class FileReaderOptions; std::shared_ptr LocalFileSystem::create(Path path, std::string id) { return std::shared_ptr(new LocalFileSystem(std::move(path), std::move(id))); diff --git a/be/src/io/fs/local_file_system.h b/be/src/io/fs/local_file_system.h index 1f8d35c096..eeee0e2488 100644 --- a/be/src/io/fs/local_file_system.h +++ b/be/src/io/fs/local_file_system.h @@ -32,7 +32,6 @@ namespace doris { namespace io { -class FileReaderOptions; class LocalFileSystem final : public FileSystem { public: diff --git a/be/src/io/fs/remote_file_system.cpp b/be/src/io/fs/remote_file_system.cpp index c169777653..660a532440 100644 --- a/be/src/io/fs/remote_file_system.cpp +++ b/be/src/io/fs/remote_file_system.cpp @@ -22,12 +22,8 @@ #include #include "common/config.h" -#include "gutil/strings/stringpiece.h" #include "io/cache/block/cached_remote_file_reader.h" -#include "io/cache/file_cache.h" -#include "io/cache/file_cache_manager.h" #include "io/fs/file_reader.h" -#include "io/fs/file_reader_options.h" #include "util/async_io.h" // IWYU pragma: keep namespace doris { @@ -82,28 +78,8 @@ Status RemoteFileSystem::open_file_impl(const FileDescription& fd, const Path& a *reader = raw_reader; break; } - case io::FileCachePolicy::SUB_FILE_CACHE: - case io::FileCachePolicy::WHOLE_FILE_CACHE: { - std::string cache_path = reader_options.path_policy.get_cache_path(abs_path.native()); - io::FileCachePtr cache_reader = FileCacheManager::instance()->new_file_cache( - cache_path, config::file_cache_alive_time_sec, raw_reader, - reader_options.cache_type); - FileCacheManager::instance()->add_file_cache(cache_path, cache_reader); - *reader = cache_reader; - break; - } case io::FileCachePolicy::FILE_BLOCK_CACHE: { - StringPiece str(raw_reader->path().native()); - std::string cache_path = reader_options.path_policy.get_cache_path(abs_path.native()); - if (reader_options.has_cache_base_path) { - // from query session variable: file_cache_base_path - *reader = std::make_shared( - std::move(raw_reader), reader_options.cache_base_path, cache_path, - reader_options.modification_time); - } else { - *reader = std::make_shared(std::move(raw_reader), cache_path, - fd.mtime); - } + *reader = std::make_shared(std::move(raw_reader), &reader_options); break; } default: { diff --git a/be/src/io/fs/remote_file_system.h b/be/src/io/fs/remote_file_system.h index 559890d5ee..b27a4bd523 100644 --- a/be/src/io/fs/remote_file_system.h +++ b/be/src/io/fs/remote_file_system.h @@ -31,7 +31,6 @@ namespace doris { namespace io { -class FileReaderOptions; class RemoteFileSystem : public FileSystem { public: diff --git a/be/src/olap/compaction.cpp b/be/src/olap/compaction.cpp index b0940c45f1..08feb6b048 100644 --- a/be/src/olap/compaction.cpp +++ b/be/src/olap/compaction.cpp @@ -435,7 +435,7 @@ Status Compaction::do_compaction_impl(int64_t permits) { // create index_writer to compaction indexes auto& fs = _output_rowset->rowset_meta()->fs(); - auto tablet_path = _output_rowset->tablet_path(); + auto& tablet_path = _tablet->tablet_path(); DCHECK(dest_index_files.size() > 0); // we choose the first destination segment name as the temporary index writer path diff --git a/be/src/olap/olap_server.cpp b/be/src/olap/olap_server.cpp index 6ac80b18da..e9219d84ac 100644 --- a/be/src/olap/olap_server.cpp +++ b/be/src/olap/olap_server.cpp @@ -46,7 +46,6 @@ #include "gen_cpp/Types_constants.h" #include "gen_cpp/internal_service.pb.h" #include "gutil/ref_counted.h" -#include "io/cache/file_cache_manager.h" #include "io/fs/file_writer.h" // IWYU pragma: keep #include "io/fs/path.h" #include "olap/cold_data_compaction.h" @@ -84,7 +83,6 @@ using std::string; namespace doris { -using io::FileCacheManager; using io::Path; // number of running SCHEMA-CHANGE threads @@ -227,12 +225,6 @@ Status StorageEngine::start_bg_threads() { &_cold_data_compaction_producer_thread)); LOG(INFO) << "cold data compaction producer thread started"; - RETURN_IF_ERROR(Thread::create( - "StorageEngine", "cache_file_cleaner_tasks_producer_thread", - [this]() { this->_cache_file_cleaner_tasks_producer_callback(); }, - &_cache_file_cleaner_tasks_producer_thread)); - LOG(INFO) << "cache file cleaner tasks producer thread started"; - // add tablet publish version thread pool ThreadPoolBuilder("TabletPublishTxnThreadPool") .set_min_threads(config::tablet_publish_txn_max_thread) @@ -1206,24 +1198,6 @@ void StorageEngine::_cold_data_compaction_producer_callback() { } } -void StorageEngine::_cache_file_cleaner_tasks_producer_callback() { - while (true) { - int64_t interval = config::generate_cache_cleaner_task_interval_sec; - if (interval <= 0) { - interval = 10; - } - bool stop = _stop_background_threads_latch.wait_for(std::chrono::seconds(interval)); - if (stop) { - break; - } - if (config::generate_cache_cleaner_task_interval_sec <= 0) { - continue; - } - LOG(INFO) << "Begin to Clean cache files"; - FileCacheManager::instance()->gc_file_caches(); - } -} - void StorageEngine::add_async_publish_task(int64_t partition_id, int64_t tablet_id, int64_t publish_version, int64_t transaction_id, bool is_recovery) { diff --git a/be/src/olap/rowset/beta_rowset.cpp b/be/src/olap/rowset/beta_rowset.cpp index e9a2e65bdc..0b9e46bc05 100644 --- a/be/src/olap/rowset/beta_rowset.cpp +++ b/be/src/olap/rowset/beta_rowset.cpp @@ -29,8 +29,7 @@ #include "common/config.h" #include "common/logging.h" #include "common/status.h" -#include "io/cache/file_cache_manager.h" -#include "io/fs/file_reader_options.h" +#include "io/fs/file_reader.h" #include "io/fs/file_system.h" #include "io/fs/local_file_system.h" #include "io/fs/path.h" @@ -47,34 +46,13 @@ namespace doris { using namespace ErrorCode; -using io::FileCacheManager; - std::string BetaRowset::segment_file_path(int segment_id) { -#ifdef BE_TEST - if (!config::file_cache_type.empty()) { - return segment_file_path(_tablet_path, rowset_id(), segment_id); - } -#endif return segment_file_path(_rowset_dir, rowset_id(), segment_id); } -std::string BetaRowset::segment_cache_path(int segment_id) { - // {root_path}/data/{shard_id}/{tablet_id}/{schema_hash}/{rowset_id}_{seg_num} - return fmt::format("{}/{}_{}", _tablet_path, rowset_id().to_string(), segment_id); -} - -// just check that the format is xxx_segmentid and segmentid is numeric -bool BetaRowset::is_segment_cache_dir(const std::string& cache_dir) { - auto segment_id_pos = cache_dir.find_last_of('_') + 1; - if (segment_id_pos >= cache_dir.size() || segment_id_pos == 0) { - return false; - } - return std::all_of(cache_dir.cbegin() + segment_id_pos, cache_dir.cend(), ::isdigit); -} - std::string BetaRowset::segment_file_path(const std::string& rowset_dir, const RowsetId& rowset_id, int segment_id) { - // {rowset_dir}/{schema_hash}/{rowset_id}_{seg_num}.dat + // {rowset_dir}/{rowset_id}_{seg_num}.dat return fmt::format("{}/{}_{}.dat", rowset_dir, rowset_id.to_string(), segment_id); } @@ -99,13 +77,7 @@ std::string BetaRowset::local_segment_path_segcompacted(const std::string& table BetaRowset::BetaRowset(const TabletSchemaSPtr& schema, const std::string& tablet_path, const RowsetMetaSharedPtr& rowset_meta) - : Rowset(schema, tablet_path, rowset_meta) { - if (_rowset_meta->is_local()) { - _rowset_dir = tablet_path; - } else { - _rowset_dir = remote_tablet_path(_rowset_meta->tablet_id()); - } -} + : Rowset(schema, rowset_meta), _rowset_dir(tablet_path) {} BetaRowset::~BetaRowset() = default; @@ -155,14 +127,14 @@ Status BetaRowset::load_segment(int64_t seg_id, segment_v2::SegmentSharedPtr* se } DCHECK(seg_id >= 0); auto seg_path = segment_file_path(seg_id); - io::SegmentCachePathPolicy cache_policy; - cache_policy.set_cache_path(segment_cache_path(seg_id)); - auto type = config::enable_file_cache ? config::file_cache_type : ""; - io::FileReaderOptions reader_options(io::cache_type_from_string(type), cache_policy); + io::FileReaderOptions reader_options { + .cache_type = config::enable_file_cache ? io::FileCachePolicy::FILE_BLOCK_CACHE + : io::FileCachePolicy::NO_CACHE, + .is_doris_table = true}; auto s = segment_v2::Segment::open(fs, seg_path, seg_id, rowset_id(), _schema, reader_options, segment); if (!s.ok()) { - LOG(WARNING) << "failed to open segment. " << seg_path << " under rowset " << unique_id() + LOG(WARNING) << "failed to open segment. " << seg_path << " under rowset " << rowset_id() << " : " << s.to_string(); return s; } @@ -177,7 +149,7 @@ Status BetaRowset::create_reader(RowsetReaderSharedPtr* result) { Status BetaRowset::remove() { // TODO should we close and remove all segment reader first? - VLOG_NOTICE << "begin to remove files in rowset " << unique_id() + VLOG_NOTICE << "begin to remove files in rowset " << rowset_id() << ", version:" << start_version() << "-" << end_version() << ", tabletid:" << _rowset_meta->tablet_id(); // If the rowset was removed, it need to remove the fds in segment cache directly @@ -211,14 +183,10 @@ Status BetaRowset::remove() { } } } - if (fs->type() != io::FileSystemType::LOCAL) { - auto cache_path = segment_cache_path(i); - FileCacheManager::instance()->remove_file_cache(cache_path); - } } if (!success) { return Status::Error("failed to remove files in rowset {}", - unique_id()); + rowset_id().to_string()); } return Status::OK(); } @@ -397,10 +365,10 @@ bool BetaRowset::check_current_rowset_segment() { for (int seg_id = 0; seg_id < num_segments(); ++seg_id) { auto seg_path = segment_file_path(seg_id); std::shared_ptr segment; - io::SegmentCachePathPolicy cache_policy; - cache_policy.set_cache_path(segment_cache_path(seg_id)); - auto type = config::enable_file_cache ? config::file_cache_type : ""; - io::FileReaderOptions reader_options(io::cache_type_from_string(type), cache_policy); + io::FileReaderOptions reader_options { + .cache_type = config::enable_file_cache ? io::FileCachePolicy::FILE_BLOCK_CACHE + : io::FileCachePolicy::NO_CACHE, + .is_doris_table = true}; auto s = segment_v2::Segment::open(fs, seg_path, seg_id, rowset_id(), _schema, reader_options, &segment); if (!s.ok()) { diff --git a/be/src/olap/rowset/beta_rowset.h b/be/src/olap/rowset/beta_rowset.h index fb7369b57d..372431cb4b 100644 --- a/be/src/olap/rowset/beta_rowset.h +++ b/be/src/olap/rowset/beta_rowset.h @@ -51,20 +51,19 @@ public: std::string segment_file_path(int segment_id); - std::string segment_cache_path(int segment_id); - - static bool is_segment_cache_dir(const std::string& cache_dir); - static std::string segment_file_path(const std::string& rowset_dir, const RowsetId& rowset_id, int segment_id); + // Return the absolute path of local segcompacted segment file static std::string local_segment_path_segcompacted(const std::string& tablet_path, const RowsetId& rowset_id, int64_t begin, int64_t end); + // Return the relative path of remote segment file static std::string remote_segment_path(int64_t tablet_id, const RowsetId& rowset_id, int segment_id); + // Return the relative path of remote segment file static std::string remote_segment_path(int64_t tablet_id, const std::string& rowset_id, int segment_id); @@ -114,6 +113,10 @@ protected: private: friend class RowsetFactory; friend class BetaRowsetReader; + + // Remote format: {remote_fs_root}/data/{tablet_id} + // Local format: {local_storage_root}/data/{shard_id}/{tablet_id}/{schema_hash} + std::string _rowset_dir; }; } // namespace doris diff --git a/be/src/olap/rowset/beta_rowset_writer.cpp b/be/src/olap/rowset/beta_rowset_writer.cpp index b404138c05..ae525eed35 100644 --- a/be/src/olap/rowset/beta_rowset_writer.cpp +++ b/be/src/olap/rowset/beta_rowset_writer.cpp @@ -33,7 +33,6 @@ #include "common/logging.h" #include "gutil/strings/substitute.h" #include "io/fs/file_reader.h" -#include "io/fs/file_reader_options.h" #include "io/fs/file_system.h" #include "io/fs/file_writer.h" #include "olap/olap_define.h" @@ -84,7 +83,7 @@ BetaRowsetWriter::~BetaRowsetWriter() { if (!_already_built) { // abnormal exit, remove all files generated _segment_creator.close(); // ensure all files are closed auto fs = _rowset_meta->fs(); - if (!fs) { + if (fs->type() != io::FileSystemType::LOCAL) { // Remote fs will delete them asynchronously return; } for (int i = _segment_start_id; i < _segment_creator.next_segment_id(); ++i) { @@ -164,15 +163,17 @@ Status BetaRowsetWriter::_generate_delete_bitmap(int32_t segment_id) { Status BetaRowsetWriter::_load_noncompacted_segment(segment_v2::SegmentSharedPtr& segment, int32_t segment_id) { + DCHECK(_rowset_meta->is_local()); auto fs = _rowset_meta->fs(); if (!fs) { return Status::Error( "BetaRowsetWriter::_load_noncompacted_segment _rowset_meta->fs get failed"); } auto path = BetaRowset::segment_file_path(_context.rowset_dir, _context.rowset_id, segment_id); - auto type = config::enable_file_cache ? config::file_cache_type : ""; - io::FileReaderOptions reader_options(io::cache_type_from_string(type), - io::SegmentCachePathPolicy()); + io::FileReaderOptions reader_options { + .cache_type = config::enable_file_cache ? io::FileCachePolicy::FILE_BLOCK_CACHE + : io::FileCachePolicy::NO_CACHE, + .is_doris_table = true}; auto s = segment_v2::Segment::open(fs, path, segment_id, rowset_id(), _context.tablet_schema, reader_options, &segment); if (!s.ok()) { @@ -282,7 +283,6 @@ Status BetaRowsetWriter::_rename_compacted_segment_plain(uint64_t seg_id) { return Status::OK(); } - int ret; auto src_seg_path = BetaRowset::segment_file_path(_context.rowset_dir, _context.rowset_id, seg_id); auto dst_seg_path = BetaRowset::segment_file_path(_context.rowset_dir, _context.rowset_id, @@ -298,7 +298,7 @@ Status BetaRowsetWriter::_rename_compacted_segment_plain(uint64_t seg_id) { _segid_statistics_map.emplace(_num_segcompacted, org); _clear_statistics_for_deleting_segments_unsafe(seg_id, seg_id); } - ret = rename(src_seg_path.c_str(), dst_seg_path.c_str()); + int ret = rename(src_seg_path.c_str(), dst_seg_path.c_str()); if (ret) { return Status::Error( "failed to rename {} to {}. ret:{}, errno:{}", src_seg_path, dst_seg_path, ret, diff --git a/be/src/olap/rowset/beta_rowset_writer_v2.cpp b/be/src/olap/rowset/beta_rowset_writer_v2.cpp index a4bc32e32b..3b601eb09b 100644 --- a/be/src/olap/rowset/beta_rowset_writer_v2.cpp +++ b/be/src/olap/rowset/beta_rowset_writer_v2.cpp @@ -34,7 +34,6 @@ #include "common/logging.h" #include "gutil/integral_types.h" #include "gutil/strings/substitute.h" -#include "io/fs/file_reader_options.h" #include "io/fs/file_system.h" #include "io/fs/file_writer.h" #include "io/fs/stream_sink_file_writer.h" diff --git a/be/src/olap/rowset/rowset.cpp b/be/src/olap/rowset/rowset.cpp index 3cf6d92a8f..b3e6b1ca9e 100644 --- a/be/src/olap/rowset/rowset.cpp +++ b/be/src/olap/rowset/rowset.cpp @@ -25,9 +25,8 @@ namespace doris { -Rowset::Rowset(const TabletSchemaSPtr& schema, const std::string& tablet_path, - const RowsetMetaSharedPtr& rowset_meta) - : _tablet_path(tablet_path), _rowset_meta(rowset_meta), _refs_by_reader(0) { +Rowset::Rowset(const TabletSchemaSPtr& schema, const RowsetMetaSharedPtr& rowset_meta) + : _rowset_meta(rowset_meta), _refs_by_reader(0) { _is_pending = !_rowset_meta->has_version(); if (_is_pending) { _is_cumulative = false; diff --git a/be/src/olap/rowset/rowset.h b/be/src/olap/rowset/rowset.h index cac574f2da..541bbe3cb0 100644 --- a/be/src/olap/rowset/rowset.h +++ b/be/src/olap/rowset/rowset.h @@ -222,11 +222,6 @@ public: virtual bool check_file_exist() = 0; - // return an unique identifier string for this rowset - std::string unique_id() const { - return fmt::format("{}/{}", _tablet_path, rowset_id().to_string()); - } - bool need_delete_file() const { return _need_delete_file; } void set_need_delete_file() { _need_delete_file = true; } @@ -235,10 +230,6 @@ public: return rowset_meta()->version().contains(version); } - const std::string& tablet_path() const { return _tablet_path; } - - virtual std::string rowset_dir() { return _rowset_dir; } - static bool comparator(const RowsetSharedPtr& left, const RowsetSharedPtr& right) { return left->end_version() < right->end_version(); } @@ -315,8 +306,7 @@ protected: DISALLOW_COPY_AND_ASSIGN(Rowset); // this is non-public because all clients should use RowsetFactory to obtain pointer to initialized Rowset - Rowset(const TabletSchemaSPtr& schema, const std::string& tablet_path, - const RowsetMetaSharedPtr& rowset_meta); + Rowset(const TabletSchemaSPtr& schema, const RowsetMetaSharedPtr& rowset_meta); // this is non-public because all clients should use RowsetFactory to obtain pointer to initialized Rowset virtual Status init() = 0; @@ -331,8 +321,6 @@ protected: TabletSchemaSPtr _schema; - std::string _tablet_path; - std::string _rowset_dir; RowsetMetaSharedPtr _rowset_meta; // init in constructor bool _is_pending; // rowset is pending iff it's not in visible state diff --git a/be/src/olap/rowset/segment_v2/segment.cpp b/be/src/olap/rowset/segment_v2/segment.cpp index a04c74ce1a..21ceea7a93 100644 --- a/be/src/olap/rowset/segment_v2/segment.cpp +++ b/be/src/olap/rowset/segment_v2/segment.cpp @@ -63,16 +63,10 @@ #include "vec/olap/vgeneric_iterators.h" namespace doris { -namespace io { -class FileCacheManager; -class FileReaderOptions; -} // namespace io namespace segment_v2 { class InvertedIndexIterator; -using io::FileCacheManager; - Status Segment::open(io::FileSystemSPtr fs, const std::string& path, uint32_t segment_id, RowsetId rowset_id, TabletSchemaSPtr tablet_schema, const io::FileReaderOptions& reader_options, @@ -80,17 +74,7 @@ Status Segment::open(io::FileSystemSPtr fs, const std::string& path, uint32_t se io::FileReaderSPtr file_reader; io::FileDescription fd; fd.path = path; -#ifndef BE_TEST RETURN_IF_ERROR(fs->open_file(fd, reader_options, &file_reader)); -#else - // be ut use local file reader instead of remote file reader while use remote cache - if (!config::file_cache_type.empty()) { - RETURN_IF_ERROR(io::global_local_filesystem()->open_file(fd, reader_options, &file_reader)); - } else { - RETURN_IF_ERROR(fs->open_file(fd, reader_options, &file_reader)); - } -#endif - std::shared_ptr segment(new Segment(segment_id, rowset_id, tablet_schema)); segment->_file_reader = std::move(file_reader); RETURN_IF_ERROR(segment->_open()); diff --git a/be/src/olap/rowset/segment_v2/segment.h b/be/src/olap/rowset/segment_v2/segment.h index 8507d105e6..65f3245e6b 100644 --- a/be/src/olap/rowset/segment_v2/segment.h +++ b/be/src/olap/rowset/segment_v2/segment.h @@ -48,10 +48,6 @@ class StorageReadOptions; class MemTracker; class PrimaryKeyIndexReader; class RowwiseIterator; - -namespace io { -class FileReaderOptions; -} // namespace io struct RowLocation; namespace segment_v2 { diff --git a/be/src/olap/storage_engine.cpp b/be/src/olap/storage_engine.cpp index b4e093fa2d..07d980fd02 100644 --- a/be/src/olap/storage_engine.cpp +++ b/be/src/olap/storage_engine.cpp @@ -1013,8 +1013,7 @@ void StorageEngine::add_unused_rowset(RowsetSharedPtr rowset) { } VLOG_NOTICE << "add unused rowset, rowset id:" << rowset->rowset_id() - << ", version:" << rowset->version().first << "-" << rowset->version().second - << ", unique id:" << rowset->unique_id(); + << ", version:" << rowset->version(); auto rowset_id = rowset->rowset_id().to_string(); diff --git a/be/src/olap/storage_engine.h b/be/src/olap/storage_engine.h index 116ed61bf5..f738da02f4 100644 --- a/be/src/olap/storage_engine.h +++ b/be/src/olap/storage_engine.h @@ -325,8 +325,6 @@ private: void _remove_unused_remote_files_callback(); void _cold_data_compaction_producer_callback(); - void _cache_file_cleaner_tasks_producer_callback(); - Status _handle_seg_compaction(SegcompactionWorker* worker, SegCompactionCandidatesSharedPtr segments); diff --git a/be/src/olap/tablet.cpp b/be/src/olap/tablet.cpp index 68423945e1..3daf025bd6 100644 --- a/be/src/olap/tablet.cpp +++ b/be/src/olap/tablet.cpp @@ -2106,7 +2106,8 @@ Status Tablet::_cooldown_data() { new_rowset_meta->set_creation_time(time(nullptr)); UniqueId cooldown_meta_id = UniqueId::gen_uid(); RowsetSharedPtr new_rowset; - RowsetFactory::create_rowset(_schema, _tablet_path, new_rowset_meta, &new_rowset); + RowsetFactory::create_rowset(_schema, remote_tablet_path(tablet_id()), new_rowset_meta, + &new_rowset); { std::unique_lock meta_wlock(_meta_lock); diff --git a/be/src/service/backend_service.cpp b/be/src/service/backend_service.cpp index f2b1279265..ae8e3fa97e 100644 --- a/be/src/service/backend_service.cpp +++ b/be/src/service/backend_service.cpp @@ -596,9 +596,8 @@ void BackendService::ingest_binlog(TIngestBinlogResult& result, estimate_timeout = config::download_low_speed_time; } - std::string local_segment_path = - fmt::format("{}/{}_{}.dat", local_tablet->tablet_path(), - rowset_meta->rowset_id().to_string(), segment_index); + auto local_segment_path = BetaRowset::segment_file_path( + local_tablet->tablet_path(), rowset_meta->rowset_id(), segment_index); LOG(INFO) << fmt::format("download segment file from {} to {}", get_segment_file_url, local_segment_path); auto get_segment_file_cb = [&get_segment_file_url, &local_segment_path, segment_file_size, diff --git a/be/src/service/point_query_executor.cpp b/be/src/service/point_query_executor.cpp index c7d1b52362..df762226a2 100644 --- a/be/src/service/point_query_executor.cpp +++ b/be/src/service/point_query_executor.cpp @@ -294,7 +294,7 @@ Status PointQueryExecutor::_lookup_row_key() { _row_read_ctxs[i]._row_location = location; // acquire and wrap this rowset (*rowset_ptr)->acquire(); - VLOG_DEBUG << "aquire rowset " << (*rowset_ptr)->unique_id(); + VLOG_DEBUG << "aquire rowset " << (*rowset_ptr)->rowset_id(); _row_read_ctxs[i]._rowset_ptr = std::unique_ptr( rowset_ptr.release(), &release_rowset); } diff --git a/be/src/service/point_query_executor.h b/be/src/service/point_query_executor.h index a49bfb442f..180af54fdd 100644 --- a/be/src/service/point_query_executor.h +++ b/be/src/service/point_query_executor.h @@ -282,7 +282,7 @@ private: static void release_rowset(RowsetSharedPtr* r) { if (r && *r) { - VLOG_DEBUG << "release rowset " << (*r)->unique_id(); + VLOG_DEBUG << "release rowset " << (*r)->rowset_id(); (*r)->release(); } delete r; diff --git a/be/src/vec/core/block_spill_reader.cpp b/be/src/vec/core/block_spill_reader.cpp index d0cebd3043..eaa6fe4f73 100644 --- a/be/src/vec/core/block_spill_reader.cpp +++ b/be/src/vec/core/block_spill_reader.cpp @@ -52,9 +52,9 @@ Status BlockSpillReader::open() { io::FileDescription file_description; file_description.path = file_path_; - io::FileReaderOptions reader_options = io::FileReaderOptions::DEFAULT; RETURN_IF_ERROR(FileFactory::create_file_reader(system_properties, file_description, - reader_options, &file_system, &file_reader_)); + io::FileReaderOptions {}, &file_system, + &file_reader_)); size_t file_size = file_reader_->size(); diff --git a/be/test/io/cache/remote_file_cache_test.cpp b/be/test/io/cache/remote_file_cache_test.cpp deleted file mode 100644 index 18fcb6de14..0000000000 --- a/be/test/io/cache/remote_file_cache_test.cpp +++ /dev/null @@ -1,192 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -#include -#include -#include -#include -#include - -#include -#include -#include -#include -#include - -#include "common/config.h" -#include "common/status.h" -#include "gen_cpp/olap_file.pb.h" -#include "gtest/gtest_pred_impl.h" -#include "io/fs/file_reader_options.h" -#include "io/fs/file_reader_writer_fwd.h" -#include "io/fs/file_writer.h" -#include "io/fs/local_file_system.h" -#include "io/fs/s3_file_system.h" -#include "olap/data_dir.h" -#include "olap/olap_common.h" -#include "olap/options.h" -#include "olap/row_cursor.h" -#include "olap/row_cursor_cell.h" -#include "olap/rowset/beta_rowset.h" -#include "olap/rowset/rowset_meta.h" -#include "olap/rowset/segment_v2/segment.h" -#include "olap/rowset/segment_v2/segment_writer.h" -#include "olap/storage_engine.h" -#include "olap/tablet_schema.h" -#include "olap/tablet_schema_helper.h" -#include "runtime/exec_env.h" -#include "util/s3_util.h" -#include "util/slice.h" - -namespace doris { - -using ValueGenerator = std::function; -// 0, 1, 2, 3 -// 10, 11, 12, 13 -// 20, 21, 22, 23 -static void DefaultIntGenerator(size_t rid, int cid, int block_id, RowCursorCell& cell) { - cell.set_not_null(); - *(int*)cell.mutable_cell_ptr() = rid * 10 + cid; -} - -static StorageEngine* k_engine = nullptr; -static std::string kSegmentDir = "./ut_dir/remote_file_cache_test"; -static int64_t tablet_id = 0; -static RowsetId rowset_id; -static std::string resource_id = "10000"; - -class RemoteFileCacheTest : public ::testing::Test { -protected: - static void SetUpTestSuite() { - EXPECT_TRUE(io::global_local_filesystem()->delete_and_create_directory(kSegmentDir).ok()); - - doris::ExecEnv::GetInstance()->init_download_cache_required_components(); - - doris::EngineOptions options; - k_engine = new StorageEngine(options); - StorageEngine::_s_instance = k_engine; - } - - static void TearDownTestSuite() { - EXPECT_TRUE(io::global_local_filesystem()->delete_directory(kSegmentDir).ok()); - if (k_engine != nullptr) { - k_engine->stop(); - delete k_engine; - k_engine = nullptr; - } - config::file_cache_type = ""; - } - - TabletSchemaSPtr create_schema(const std::vector& columns, - KeysType keys_type = DUP_KEYS, int num_custom_key_columns = -1) { - TabletSchemaSPtr res = std::make_shared(); - - for (auto& col : columns) { - res->append_column(col); - } - res->_num_short_key_columns = - num_custom_key_columns != -1 ? num_custom_key_columns : res->num_key_columns(); - res->_keys_type = keys_type; - return res; - } - - void build_segment(SegmentWriterOptions opts, TabletSchemaSPtr build_schema, - TabletSchemaSPtr query_schema, size_t nrows, const ValueGenerator& generator, - std::shared_ptr* res) { - std::string filename = fmt::format("{}_0.dat", rowset_id.to_string()); - std::string path = fmt::format("{}/{}", kSegmentDir, filename); - auto fs = io::global_local_filesystem(); - - io::FileWriterPtr file_writer; - Status st = fs->create_file(path, &file_writer); - EXPECT_TRUE(st.ok()); - DataDir data_dir(kSegmentDir); - data_dir.init(); - SegmentWriter writer(file_writer.get(), 0, build_schema, nullptr, &data_dir, INT32_MAX, - opts, nullptr); - st = writer.init(); - EXPECT_TRUE(st.ok()); - - RowCursor row; - auto olap_st = row.init(build_schema); - EXPECT_EQ(Status::OK(), olap_st); - - for (size_t rid = 0; rid < nrows; ++rid) { - for (int cid = 0; cid < build_schema->num_columns(); ++cid) { - int row_block_id = rid / opts.num_rows_per_block; - RowCursorCell cell = row.cell(cid); - generator(rid, cid, row_block_id, cell); - } - EXPECT_TRUE(writer.append_row(row).ok()); - } - - uint64_t file_size, index_size; - st = writer.finalize(&file_size, &index_size); - EXPECT_TRUE(st.ok()); - EXPECT_TRUE(file_writer->close().ok()); - - EXPECT_NE("", writer.min_encoded_key().to_string()); - EXPECT_NE("", writer.max_encoded_key().to_string()); - - io::FileReaderOptions reader_options(io::FileCachePolicy::NO_CACHE, - io::SegmentCachePathPolicy()); - st = segment_v2::Segment::open(fs, path, 0, {}, query_schema, reader_options, res); - EXPECT_TRUE(st.ok()); - EXPECT_EQ(nrows, (*res)->num_rows()); - } - - void test_remote_file_cache(std::string file_cache_type, int max_sub_cache_file_size) { - TabletSchemaSPtr tablet_schema = create_schema( - {create_int_key(1), create_int_key(2), create_int_value(3), create_int_value(4)}); - SegmentWriterOptions opts; - opts.num_rows_per_block = 10; - - std::shared_ptr segment; - build_segment(opts, tablet_schema, tablet_schema, 4096, DefaultIntGenerator, &segment); - - config::file_cache_type = file_cache_type; - config::max_sub_cache_file_size = max_sub_cache_file_size; - RowsetMetaSharedPtr rowset_meta = std::make_shared(); - BetaRowset rowset(tablet_schema, kSegmentDir, rowset_meta); - - // just use to create s3 filesystem, otherwise won't use cache - S3Conf s3_conf; - std::shared_ptr fs; - Status st = io::S3FileSystem::create(std::move(s3_conf), resource_id, &fs); - // io::S3FileSystem::create will call connect, which will fail because s3_conf is empty. - // but it does affect the following unit test - ASSERT_FALSE(st.ok()) << st; - rowset.rowset_meta()->set_num_segments(1); - rowset.rowset_meta()->set_fs(fs); - rowset.rowset_meta()->set_tablet_id(tablet_id); - rowset.rowset_meta()->set_rowset_id(rowset_id); - - std::vector segments; - st = rowset.load_segments(&segments); - ASSERT_TRUE(st.ok()) << st; - } -}; - -TEST_F(RemoteFileCacheTest, wholefilecache) { - test_remote_file_cache("whole_file_cache", 0); -} - -TEST_F(RemoteFileCacheTest, subfilecache) { - test_remote_file_cache("sub_file_cache", 1024); -} - -} // namespace doris diff --git a/be/test/olap/delete_bitmap_calculator_test.cpp b/be/test/olap/delete_bitmap_calculator_test.cpp index e182af0f9d..6e842edc36 100644 --- a/be/test/olap/delete_bitmap_calculator_test.cpp +++ b/be/test/olap/delete_bitmap_calculator_test.cpp @@ -29,6 +29,7 @@ #include #include "gtest/gtest_pred_impl.h" +#include "io/fs/file_reader.h" #include "io/fs/file_writer.h" #include "io/fs/local_file_system.h" #include "olap/primary_key_index.h" @@ -132,10 +133,8 @@ public: EXPECT_NE("", writer.min_encoded_key().to_string()); EXPECT_NE("", writer.max_encoded_key().to_string()); - io::FileReaderOptions reader_options(io::FileCachePolicy::NO_CACHE, - io::SegmentCachePathPolicy()); st = segment_v2::Segment::open(fs, path, segment_id, rowset_id, query_schema, - reader_options, res); + io::FileReaderOptions {}, res); EXPECT_TRUE(st.ok()); EXPECT_EQ(nrows, (*res)->num_rows()); } diff --git a/be/test/olap/rowset/rowset_tree_test.cpp b/be/test/olap/rowset/rowset_tree_test.cpp index 0e86eec6b9..0864be576a 100644 --- a/be/test/olap/rowset/rowset_tree_test.cpp +++ b/be/test/olap/rowset/rowset_tree_test.cpp @@ -94,7 +94,7 @@ public: RowsetMetaSharedPtr meta_ptr = make_shared(); meta_ptr->init_from_pb(rs_meta_pb); RowsetSharedPtr res_ptr; - MockRowset::create_rowset(schema_, rowset_path_, meta_ptr, &res_ptr, is_mem_rowset); + MockRowset::create_rowset(schema_, meta_ptr, &res_ptr, is_mem_rowset); return res_ptr; } diff --git a/be/test/olap/tablet_cooldown_test.cpp b/be/test/olap/tablet_cooldown_test.cpp index ff24826763..027807d2db 100644 --- a/be/test/olap/tablet_cooldown_test.cpp +++ b/be/test/olap/tablet_cooldown_test.cpp @@ -38,8 +38,7 @@ #include "exec/tablet_info.h" #include "gen_cpp/internal_service.pb.h" #include "gtest/gtest_pred_impl.h" -#include "io/fs/file_reader_options.h" -#include "io/fs/file_reader_writer_fwd.h" +#include "io/fs/file_reader.h" #include "io/fs/file_system.h" #include "io/fs/file_writer.h" #include "io/fs/local_file_system.h" @@ -77,16 +76,11 @@ static const std::string kTestDir = "ut_dir/tablet_cooldown_test"; static constexpr int64_t kResourceId = 10000; static constexpr int64_t kStoragePolicyId = 10002; static constexpr int64_t kTabletId = 10005; -static constexpr int64_t kTabletId2 = 10006; static constexpr int64_t kReplicaId = 10009; static constexpr int32_t kSchemaHash = 270068377; -static constexpr int64_t kReplicaId2 = 10010; -static constexpr int32_t kSchemaHash2 = 270068381; static constexpr int32_t kTxnId = 20003; static constexpr int32_t kPartitionId = 30003; -static constexpr int32_t kTxnId2 = 40003; -static constexpr int32_t kPartitionId2 = 50003; using io::Path; @@ -178,7 +172,7 @@ protected: } Status upload_impl(const Path& local_path, const Path& dest_path) override { - return _local_fs->link_file(local_path.string(), get_remote_path(dest_path)); + return _local_fs->link_file(local_path, get_remote_path(dest_path)); } Status batch_upload_impl(const std::vector& local_paths, @@ -210,7 +204,7 @@ protected: io::FileReaderSPtr* reader) override { io::FileDescription tmp_fd; tmp_fd.path = get_remote_path(abs_path); - return _local_fs->open_file(tmp_fd, io::FileReaderOptions::DEFAULT, reader); + return _local_fs->open_file(tmp_fd, io::FileReaderOptions {}, reader); } Status connect_impl() override { return Status::OK(); } @@ -250,10 +244,7 @@ public: ->delete_and_create_directory(config::storage_root_path) .ok()); EXPECT_TRUE(io::global_local_filesystem() - ->create_directory(get_remote_path(fmt::format("data/{}", kTabletId))) - .ok()); - EXPECT_TRUE(io::global_local_filesystem() - ->create_directory(get_remote_path(fmt::format("data/{}", kTabletId2))) + ->create_directory(get_remote_path(remote_tablet_path(kTabletId))) .ok()); std::vector paths {{config::storage_root_path, -1}}; @@ -435,7 +426,6 @@ TEST_F(TabletCooldownTest, normal) { TabletSharedPtr tablet1; TabletSharedPtr tablet2; createTablet(&tablet1, kReplicaId, kSchemaHash, kTabletId, kTxnId, kPartitionId); - createTablet(&tablet2, kReplicaId2, kSchemaHash2, kTabletId2, kTxnId2, kPartitionId2); // test cooldown tablet1->set_storage_policy_id(kStoragePolicyId); Status st = tablet1->cooldown(); // rowset [0-1] @@ -446,7 +436,6 @@ TEST_F(TabletCooldownTest, normal) { ASSERT_EQ(Status::OK(), st); st = tablet1->cooldown(); // rowset [2-2] ASSERT_EQ(Status::OK(), st); - sleep(30); auto rs = tablet1->get_rowset_by_version({2, 2}); ASSERT_FALSE(rs->is_local()); @@ -456,32 +445,6 @@ TEST_F(TabletCooldownTest, normal) { st = std::static_pointer_cast(rs)->load_segments(&segments); ASSERT_EQ(Status::OK(), st); ASSERT_EQ(segments.size(), 1); - - st = io::global_local_filesystem()->link_file( - get_remote_path(fmt::format("data/{}/{}.{}.meta", kTabletId, kReplicaId, 1)), - get_remote_path(fmt::format("data/{}/{}.{}.meta", kTabletId2, kReplicaId, 2))); - ASSERT_EQ(Status::OK(), st); - // follow cooldown - tablet2->set_storage_policy_id(kStoragePolicyId); - tablet2->update_cooldown_conf(1, 111111111); - st = tablet2->cooldown(); // rowset [0-1] - ASSERT_NE(Status::OK(), st); - tablet2->update_cooldown_conf(1, kReplicaId); - st = tablet2->cooldown(); // rowset [0-1] - ASSERT_NE(Status::OK(), st); - tablet2->update_cooldown_conf(2, kReplicaId); - st = tablet2->cooldown(); // rowset [0-1] - ASSERT_EQ(Status::OK(), st); - sleep(30); - auto rs2 = tablet2->get_rowset_by_version({2, 2}); - ASSERT_FALSE(rs2->is_local()); - - // test read tablet2 - ASSERT_EQ(Status::OK(), st); - std::vector segments2; - st = std::static_pointer_cast(rs2)->load_segments(&segments2); - ASSERT_EQ(Status::OK(), st); - ASSERT_EQ(segments2.size(), 1); } } // namespace doris diff --git a/be/test/olap/tablet_meta_test.cpp b/be/test/olap/tablet_meta_test.cpp index 993fbf2d49..7d57765ca5 100644 --- a/be/test/olap/tablet_meta_test.cpp +++ b/be/test/olap/tablet_meta_test.cpp @@ -67,7 +67,7 @@ TEST(TabletMetaTest, TestReviseMeta) { meta_ptr->init_from_pb(rs_meta_pb); RowsetSharedPtr rowset_ptr; TabletSchemaSPtr schema = std::make_shared(); - MockRowset::create_rowset(schema, "", meta_ptr, &rowset_ptr, false); + MockRowset::create_rowset(schema, meta_ptr, &rowset_ptr, false); src_rowsets.push_back(rowset_ptr); tablet_meta.add_rs_meta(rowset_ptr->rowset_meta()); } diff --git a/be/test/runtime/load_stream_test.cpp b/be/test/runtime/load_stream_test.cpp index 0911337f03..dae8b93e19 100644 --- a/be/test/runtime/load_stream_test.cpp +++ b/be/test/runtime/load_stream_test.cpp @@ -628,8 +628,7 @@ public: if (tablet.tablet_id != tablet_id || rowset == nullptr) { continue; } - auto path = - BetaRowset::segment_file_path(rowset->rowset_dir(), rowset->rowset_id(), segid); + auto path = static_cast(rowset.get())->segment_file_path(segid); LOG(INFO) << "read data from " << path; std::ifstream inputFile(path, std::ios::binary); inputFile.seekg(0, std::ios::end); diff --git a/be/test/testutil/mock_rowset.h b/be/test/testutil/mock_rowset.h index 93b76d9c67..50065ebe6b 100644 --- a/be/test/testutil/mock_rowset.h +++ b/be/test/testutil/mock_rowset.h @@ -58,18 +58,16 @@ class MockRowset : public Rowset { return Rowset::get_segments_key_bounds(segments_key_bounds); } - static Status create_rowset(TabletSchemaSPtr schema, const std::string& rowset_path, - RowsetMetaSharedPtr rowset_meta, RowsetSharedPtr* rowset, - bool is_mem_rowset = false) { - rowset->reset(new MockRowset(schema, rowset_path, rowset_meta)); + static Status create_rowset(TabletSchemaSPtr schema, RowsetMetaSharedPtr rowset_meta, + RowsetSharedPtr* rowset, bool is_mem_rowset = false) { + rowset->reset(new MockRowset(schema, rowset_meta)); ((MockRowset*)rowset->get())->is_mem_rowset_ = is_mem_rowset; return Status::OK(); } protected: - MockRowset(TabletSchemaSPtr schema, const std::string& rowset_path, - RowsetMetaSharedPtr rowset_meta) - : Rowset(schema, rowset_path, rowset_meta) {} + MockRowset(TabletSchemaSPtr schema, RowsetMetaSharedPtr rowset_meta) + : Rowset(schema, rowset_meta) {} Status init() override { return Status::NotSupported("MockRowset not support this method."); }