diff --git a/be/src/common/config.h b/be/src/common/config.h index cc900b5f53..021f5cd680 100644 --- a/be/src/common/config.h +++ b/be/src/common/config.h @@ -839,6 +839,7 @@ CONF_mString(file_cache_type, ""); CONF_Validator(file_cache_type, [](const std::string config) -> bool { return config == "sub_file_cache" || config == "whole_file_cache" || config == ""; }); +CONF_mInt64(file_cache_max_size_per_disk, "0"); // zero for no limit CONF_Int32(s3_transfer_executor_pool_size, "2"); diff --git a/be/src/io/CMakeLists.txt b/be/src/io/CMakeLists.txt index 5a9fdfafd3..0941505ba0 100644 --- a/be/src/io/CMakeLists.txt +++ b/be/src/io/CMakeLists.txt @@ -40,6 +40,7 @@ set(IO_FILES fs/local_file_writer.cpp fs/s3_file_reader.cpp fs/s3_file_system.cpp + cache/dummy_file_cache.cpp cache/file_cache.cpp cache/file_cache_manager.cpp cache/sub_file_cache.cpp diff --git a/be/src/io/cache/dummy_file_cache.cpp b/be/src/io/cache/dummy_file_cache.cpp new file mode 100644 index 0000000000..7e94bb388e --- /dev/null +++ b/be/src/io/cache/dummy_file_cache.cpp @@ -0,0 +1,144 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "io/cache/dummy_file_cache.h" + +#include "gutil/strings/util.h" +#include "io/fs/local_file_system.h" +#include "util/file_utils.h" +#include "util/string_util.h" + +namespace doris { +namespace io { + +DummyFileCache::DummyFileCache(const Path& cache_dir, int64_t alive_time_sec) + : _cache_dir(cache_dir), _alive_time_sec(alive_time_sec) {} + +DummyFileCache::~DummyFileCache() {} + +void DummyFileCache::_update_last_mtime(const Path& done_file) { + Path cache_done_file = _cache_dir / done_file; + time_t m_time; + if (FileUtils::mtime(cache_done_file.native(), &m_time).ok() && m_time > _last_match_time) { + _last_match_time = m_time; + } +} + +void DummyFileCache::_add_file_cache(const Path& data_file) { + Path cache_file = _cache_dir / data_file; + size_t file_size = 0; + if (io::global_local_filesystem()->file_size(cache_file, &file_size).ok()) { + _file_sizes[cache_file] = file_size; + _cache_file_size += file_size; + } else { + _unfinished_files.push_back(cache_file); + } +} + +void DummyFileCache::_load() { + // list all files + std::vector cache_file_names; + if (!io::global_local_filesystem()->list(_cache_dir, &cache_file_names).ok()) { + return; + } + + // separate DATA file and DONE file + std::set cache_names; + std::list done_names; + for (const auto& cache_file_name : cache_file_names) { + if (ends_with(cache_file_name.native(), CACHE_DONE_FILE_SUFFIX)) { + done_names.push_back(cache_file_name); + } else { + cache_names.insert(cache_file_name); + } + } + + // match DONE file with DATA file + for (auto iter = done_names.begin(); iter != done_names.end(); ++iter) { + Path cache_filename = StringReplace(iter->native(), CACHE_DONE_FILE_SUFFIX, "", true); + if (cache_names.find(cache_filename) != cache_names.end()) { + cache_names.erase(cache_filename); + _update_last_mtime(*iter); + _add_file_cache(cache_filename); + } else { + // not data file, but with DONE file + _unfinished_files.push_back(*iter); + } + } + // data file without DONE file + for (const auto& file : cache_names) { + _unfinished_files.push_back(file); + } +} + +Status DummyFileCache::_clean_unfinished_cache() { + // remove cache file without done file + for (auto iter = _unfinished_files.begin(); iter != _unfinished_files.end(); ++iter) { + Path cache_file_path = _cache_dir / *iter; + LOG(INFO) << "Delete unfinished cache file: " << cache_file_path.native(); + if (!io::global_local_filesystem()->delete_file(cache_file_path).ok()) { + LOG(ERROR) << "delete_file failed: " << cache_file_path.native(); + } + } + std::vector cache_file_names; + if (io::global_local_filesystem()->list(_cache_dir, &cache_file_names).ok() && + cache_file_names.size() == 0) { + if (global_local_filesystem()->delete_directory(_cache_dir).ok()) { + LOG(INFO) << "Delete empty dir: " << _cache_dir.native(); + } + } + return Status::OK(); +} + +Status DummyFileCache::load_and_clean() { + _load(); + return _clean_unfinished_cache(); +} + +Status DummyFileCache::clean_timeout_cache() { + if (time(nullptr) - _last_match_time > _alive_time_sec) { + return _clean_cache_internal(); + } + return Status::OK(); +} + +Status DummyFileCache::clean_all_cache() { + return _clean_cache_internal(); +} + +Status DummyFileCache::_clean_cache_internal() { + for (const auto& iter : _file_sizes) { + const auto cache_file_path = iter.first; + Path done_file_path = cache_file_path.native() + CACHE_DONE_FILE_SUFFIX; + LOG(INFO) << "Delete unused done_cache_path: " << done_file_path.native() + << ", cache_file_path: " << cache_file_path.native(); + if (!io::global_local_filesystem()->delete_file(done_file_path).ok()) { + LOG(ERROR) << "delete_file failed: " << done_file_path.native(); + continue; + } + if (!io::global_local_filesystem()->delete_file(cache_file_path).ok()) { + LOG(ERROR) << "delete_file failed: " << cache_file_path.native(); + continue; + } + } + _file_sizes.clear(); + _cache_file_size = 0; + return Status::OK(); +} + +} // namespace io +} // namespace doris diff --git a/be/src/io/cache/dummy_file_cache.h b/be/src/io/cache/dummy_file_cache.h new file mode 100644 index 0000000000..411ede5cc2 --- /dev/null +++ b/be/src/io/cache/dummy_file_cache.h @@ -0,0 +1,77 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include +#include + +#include "common/status.h" +#include "io/cache/file_cache.h" +#include "io/fs/path.h" + +namespace doris { +namespace io { + +// Only used for GC +class DummyFileCache final : public FileCache { +public: + DummyFileCache(const Path& cache_dir, int64_t alive_time_sec); + + ~DummyFileCache() override; + + Status close() override { return Status::OK(); } + + Status read_at(size_t offset, Slice result, size_t* bytes_read) override { + return Status::NotSupported("dummy file cache only used for GC"); + } + + const Path& path() const override { return _cache_dir; } + + size_t size() const override { return 0; } + + bool closed() const override { return true; } + + const Path& cache_dir() const override { return _cache_dir; } + + io::FileReaderSPtr remote_file_reader() const override { return nullptr; } + + Status clean_timeout_cache() override; + + Status clean_all_cache() override; + + Status load_and_clean(); + + bool is_dummy_file_cache() override { return true; } + +private: + Status _clean_unfinished_cache(); + void _update_last_mtime(const Path& done_file); + void _add_file_cache(const Path& data_file); + void _load(); + Status _clean_cache_internal(); + +private: + Path _cache_dir; + int64_t _alive_time_sec; + + std::map _file_sizes; + std::list _unfinished_files; +}; + +} // namespace io +} // namespace doris diff --git a/be/src/io/cache/file_cache.h b/be/src/io/cache/file_cache.h index 979f3c145f..8e0cb0a679 100644 --- a/be/src/io/cache/file_cache.h +++ b/be/src/io/cache/file_cache.h @@ -31,14 +31,14 @@ const std::string CACHE_DONE_FILE_SUFFIX = "_DONE"; class FileCache : public FileReader { public: - FileCache() = default; + FileCache() : _last_match_time(time(nullptr)), _cache_file_size(0) {} virtual ~FileCache() = default; DISALLOW_COPY_AND_ASSIGN(FileCache); virtual const Path& cache_dir() const = 0; - virtual size_t cache_file_size() const = 0; + size_t cache_file_size() const { return _cache_file_size; } virtual io::FileReaderSPtr remote_file_reader() const = 0; @@ -46,12 +46,27 @@ public: virtual Status clean_all_cache() = 0; + virtual bool is_dummy_file_cache() { return false; } + Status download_cache_to_local(const Path& cache_file, const Path& cache_done_file, io::FileReaderSPtr remote_file_reader, size_t req_size, size_t offset = 0); + + void update_last_match_time() { _last_match_time = time(nullptr); } + int64_t get_last_match_time() const { return _last_match_time; } + +protected: + int64_t _last_match_time; + size_t _cache_file_size; }; using FileCachePtr = std::shared_ptr; +struct FileCacheLRUComparator { + bool operator()(const FileCachePtr& lhs, const FileCachePtr& rhs) const { + return lhs->get_last_match_time() > rhs->get_last_match_time(); + } +}; + } // namespace io } // namespace doris diff --git a/be/src/io/cache/file_cache_manager.cpp b/be/src/io/cache/file_cache_manager.cpp index c5ea420460..14161b8063 100644 --- a/be/src/io/cache/file_cache_manager.cpp +++ b/be/src/io/cache/file_cache_manager.cpp @@ -18,15 +18,41 @@ #include "io/cache/file_cache_manager.h" #include "gutil/strings/util.h" +#include "io/cache/dummy_file_cache.h" #include "io/cache/sub_file_cache.h" #include "io/cache/whole_file_cache.h" #include "io/fs/local_file_system.h" +#include "olap/storage_engine.h" #include "util/file_utils.h" #include "util/string_util.h" namespace doris { namespace io { +void GCContextPerDisk::init(const std::string& path, int64_t max_size) { + _disk_path = path; + _conf_max_size = max_size; + _used_size = 0; +} + +bool GCContextPerDisk::try_add_file_cache(FileCachePtr cache, int64_t file_size) { + if (cache->cache_dir().string().substr(0, _disk_path.size()) == _disk_path) { + _lru_queue.push(cache); + _used_size += file_size; + return true; + } + return false; +} + +void GCContextPerDisk::get_gc_file_caches(std::list& result) { + while (!_lru_queue.empty() && _used_size > _conf_max_size) { + auto file_cache = _lru_queue.top(); + _used_size -= file_cache->cache_file_size(); + result.push_back(file_cache); + _lru_queue.pop(); + } +} + void FileCacheManager::add_file_cache(const std::string& cache_path, FileCachePtr file_cache) { std::lock_guard wrlock(_cache_map_lock); _file_cache_map.emplace(cache_path, file_cache); @@ -56,87 +82,109 @@ void FileCacheManager::remove_file_cache(const std::string& cache_path) { } } -void FileCacheManager::clean_timeout_caches() { - std::shared_lock rdlock(_cache_map_lock); - for (std::map::const_iterator iter = _file_cache_map.cbegin(); - iter != _file_cache_map.cend(); ++iter) { - if (iter->second == nullptr) { - continue; +void FileCacheManager::_add_file_cache_for_gc_by_disk(std::vector& contexts, + FileCachePtr file_cache) { + // sort file cache by last match time + if (config::file_cache_max_size_per_disk > 0) { + auto file_size = file_cache->cache_file_size(); + if (file_size <= 0) { + return; + } + for (size_t i = 0; i < contexts.size(); ++i) { + if (contexts[i].try_add_file_cache(file_cache, file_size)) { + break; + } } - iter->second->clean_timeout_cache(); } } -void FileCacheManager::clean_timeout_file_not_in_mem(const std::string& cache_path) { - time_t now = time(nullptr); - std::shared_lock rdlock(_cache_map_lock); - // Deal with caches not in _file_cache_map - if (_file_cache_map.find(cache_path) == _file_cache_map.end()) { - std::vector cache_file_names; - if (io::global_local_filesystem()->list(cache_path, &cache_file_names).ok()) { - std::map cache_names; - std::list done_names; - for (Path cache_file_name : cache_file_names) { - std::string filename = cache_file_name.native(); - if (!ends_with(filename, CACHE_DONE_FILE_SUFFIX)) { - cache_names[filename] = true; +void FileCacheManager::_gc_unused_file_caches(std::list& result) { + std::vector tablets = + StorageEngine::instance()->tablet_manager()->get_all_tablet(); + for (const auto& tablet : tablets) { + std::vector seg_file_paths; + if (io::global_local_filesystem()->list(tablet->tablet_path(), &seg_file_paths).ok()) { + for (Path seg_file : seg_file_paths) { + std::string seg_filename = seg_file.native(); + // check if it is a dir name + if (ends_with(seg_filename, ".dat")) { continue; } - done_names.push_back(filename); - std::stringstream done_file_ss; - done_file_ss << cache_path << "/" << filename; - std::string done_file_path = done_file_ss.str(); - time_t m_time; - if (!FileUtils::mtime(done_file_path, &m_time).ok()) { - continue; - } - if (now - m_time < config::file_cache_alive_time_sec) { - continue; - } - std::string cache_file_path = - StringReplace(done_file_path, CACHE_DONE_FILE_SUFFIX, "", true); - LOG(INFO) << "Delete timeout done_cache_path: " << done_file_path - << ", cache_file_path: " << cache_file_path << ", m_time: " << m_time; - if (!io::global_local_filesystem()->delete_file(done_file_path).ok()) { - LOG(ERROR) << "delete_file failed: " << done_file_path; - continue; - } - if (!io::global_local_filesystem()->delete_file(cache_file_path).ok()) { - LOG(ERROR) << "delete_file failed: " << cache_file_path; + // skip file cache already in memory + std::stringstream ss; + ss << tablet->tablet_path() << "/" << seg_filename; + std::string cache_path = ss.str(); + + std::shared_lock rdlock(_cache_map_lock); + if (_file_cache_map.find(cache_path) != _file_cache_map.end()) { continue; } + auto file_cache = std::make_shared( + cache_path, config::file_cache_alive_time_sec); + // load cache meta from disk and clean unfinished cache files + file_cache->load_and_clean(); + // policy1: GC file cache by timeout + file_cache->clean_timeout_cache(); + + result.push_back(file_cache); } - // find cache file without done file. - for (std::list::iterator itr = done_names.begin(); itr != done_names.end(); - ++itr) { - std::string cache_filename = StringReplace(*itr, CACHE_DONE_FILE_SUFFIX, "", true); - if (cache_names.find(cache_filename) != cache_names.end()) { - cache_names.erase(cache_filename); - } + } + } +} + +void FileCacheManager::gc_file_caches() { + int64_t gc_conf_size = config::file_cache_max_size_per_disk; + std::vector contexts; + // init for GC by disk size + if (gc_conf_size > 0) { + std::vector data_dirs = doris::StorageEngine::instance()->get_stores(); + contexts.resize(data_dirs.size()); + for (size_t i = 0; i < contexts.size(); ++i) { + contexts[i].init(data_dirs[i]->path(), gc_conf_size); + } + } + + // process unused file caches + std::list dummy_file_list; + _gc_unused_file_caches(dummy_file_list); + + { + std::shared_lock rdlock(_cache_map_lock); + for (auto item : dummy_file_list) { + // check again after _cache_map_lock hold + if (_file_cache_map.find(item->cache_dir().native()) != _file_cache_map.end()) { + continue; } - // remove cache file without done file - for (std::map::iterator itr = cache_names.begin(); - itr != cache_names.end(); ++itr) { - std::stringstream cache_file_ss; - cache_file_ss << cache_path << "/" << itr->first; - std::string cache_file_path = cache_file_ss.str(); - time_t m_time; - if (!FileUtils::mtime(cache_file_path, &m_time).ok()) { + // sort file cache by last match time + _add_file_cache_for_gc_by_disk(contexts, item); + } + + // process file caches in memory + for (std::map::const_iterator iter = _file_cache_map.cbegin(); + iter != _file_cache_map.cend(); ++iter) { + if (iter->second == nullptr) { + continue; + } + // policy1: GC file cache by timeout + iter->second->clean_timeout_cache(); + // sort file cache by last match time + _add_file_cache_for_gc_by_disk(contexts, iter->second); + } + } + + // policy2: GC file cache by disk size + if (gc_conf_size > 0) { + for (size_t i = 0; i < contexts.size(); ++i) { + std::list gc_file_list; + contexts[i].get_gc_file_caches(gc_file_list); + for (auto item : gc_file_list) { + std::shared_lock rdlock(_cache_map_lock); + // for dummy file cache, check already used or not again + if (item->is_dummy_file_cache() && + _file_cache_map.find(item->cache_dir().native()) != _file_cache_map.end()) { continue; } - if (now - m_time < config::file_cache_alive_time_sec) { - continue; - } - LOG(INFO) << "Delete cache file without done file: " << cache_file_path; - if (!io::global_local_filesystem()->delete_file(cache_file_path).ok()) { - LOG(ERROR) << "delete_file failed: " << cache_file_path; - } - } - if (io::global_local_filesystem()->list(cache_path, &cache_file_names).ok() && - cache_file_names.size() == 0) { - if (global_local_filesystem()->delete_directory(cache_path).ok()) { - LOG(INFO) << "Delete empty dir: " << cache_path; - } + item->clean_all_cache(); } } } diff --git a/be/src/io/cache/file_cache_manager.h b/be/src/io/cache/file_cache_manager.h index c4195ab59b..c25d1cfb5c 100644 --- a/be/src/io/cache/file_cache_manager.h +++ b/be/src/io/cache/file_cache_manager.h @@ -17,7 +17,9 @@ #pragma once +#include #include +#include #include "common/config.h" #include "common/status.h" @@ -26,6 +28,20 @@ namespace doris { namespace io { +class GCContextPerDisk { +public: + GCContextPerDisk() : _conf_max_size(0), _used_size(0) {} + void init(const std::string& path, int64_t max_size); + bool try_add_file_cache(FileCachePtr cache, int64_t file_size); + void get_gc_file_caches(std::list&); + +private: + std::string _disk_path; + int64_t _conf_max_size; + int64_t _used_size; + std::priority_queue, FileCacheLRUComparator> _lru_queue; +}; + class FileCacheManager { public: FileCacheManager() = default; @@ -37,9 +53,7 @@ public: void remove_file_cache(const std::string& cache_path); - void clean_timeout_caches(); - - void clean_timeout_file_not_in_mem(const std::string& cache_path); + void gc_file_caches(); FileCachePtr new_file_cache(const std::string& cache_dir, int64_t alive_time_sec, io::FileReaderSPtr remote_file_reader, @@ -47,6 +61,11 @@ public: bool exist(const std::string& cache_path); +private: + void _gc_unused_file_caches(std::list& result); + void _add_file_cache_for_gc_by_disk(std::vector& contexts, + FileCachePtr file_cache); + private: std::shared_mutex _cache_map_lock; // cache_path -> FileCache diff --git a/be/src/io/cache/sub_file_cache.cpp b/be/src/io/cache/sub_file_cache.cpp index 8cf2d693a9..ab52345e66 100644 --- a/be/src/io/cache/sub_file_cache.cpp +++ b/be/src/io/cache/sub_file_cache.cpp @@ -70,19 +70,10 @@ Status SubFileCache::read_at(size_t offset, Slice result, size_t* bytes_read) { if (offset_begin + req_size > _remote_file_reader->size()) { req_size = _remote_file_reader->size() - offset_begin; } - auto st = _generate_cache_reader(offset_begin, req_size); - if (!st.ok()) { - WARN_IF_ERROR(_remote_file_reader->close(), - fmt::format("Close remote file reader failed: {}", - _remote_file_reader->path().native())); - return st; - } + RETURN_IF_ERROR(_generate_cache_reader(offset_begin, req_size)); } } - RETURN_NOT_OK_STATUS_WITH_WARN(_remote_file_reader->close(), - fmt::format("Close remote file reader failed: {}", - _remote_file_reader->path().native())); - _cache_file_size = _get_cache_file_size(); + _cache_file_size = _calc_cache_file_size(); } { std::shared_lock rlock(_cache_map_lock); @@ -119,6 +110,7 @@ Status SubFileCache::read_at(size_t offset, Slice result, size_t* bytes_read) { _last_match_times[*iter] = time(nullptr); } } + update_last_match_time(); return Status::OK(); } @@ -183,6 +175,7 @@ Status SubFileCache::_generate_cache_reader(size_t offset, size_t req_size) { RETURN_IF_ERROR(io::global_local_filesystem()->open_file(cache_file, &cache_reader)); _cache_file_readers.emplace(offset, cache_reader); _last_match_times.emplace(offset, time(nullptr)); + update_last_match_time(); LOG(INFO) << "Create cache file from remote file successfully: " << _remote_file_reader->path().native() << "(" << offset << ", " << req_size << ") -> " << cache_file.native(); @@ -217,7 +210,7 @@ Status SubFileCache::clean_timeout_cache() { iter != timeout_keys.cend(); ++iter) { RETURN_IF_ERROR(_clean_cache_internal(*iter)); } - _cache_file_size = _get_cache_file_size(); + _cache_file_size = _calc_cache_file_size(); } return Status::OK(); } @@ -228,7 +221,7 @@ Status SubFileCache::clean_all_cache() { iter != _last_match_times.cend(); ++iter) { RETURN_IF_ERROR(_clean_cache_internal(iter->first)); } - _cache_file_size = _get_cache_file_size(); + _cache_file_size = _calc_cache_file_size(); return Status::OK(); } @@ -262,7 +255,7 @@ Status SubFileCache::_clean_cache_internal(size_t offset) { return Status::OK(); } -size_t SubFileCache::_get_cache_file_size() { +size_t SubFileCache::_calc_cache_file_size() { size_t cache_file_size = 0; for (std::map::const_iterator iter = _cache_file_readers.cbegin(); iter != _cache_file_readers.cend(); ++iter) { diff --git a/be/src/io/cache/sub_file_cache.h b/be/src/io/cache/sub_file_cache.h index dca5da0fe8..f3e8630385 100644 --- a/be/src/io/cache/sub_file_cache.h +++ b/be/src/io/cache/sub_file_cache.h @@ -45,8 +45,6 @@ public: const Path& cache_dir() const override { return _cache_dir; } - size_t cache_file_size() const override { return _cache_file_size; } - io::FileReaderSPtr remote_file_reader() const override { return _remote_file_reader; } Status clean_timeout_cache() override; @@ -61,11 +59,10 @@ private: Status _get_need_cache_offsets(size_t offset, size_t req_size, std::vector* cache_offsets); - size_t _get_cache_file_size(); + size_t _calc_cache_file_size(); private: Path _cache_dir; - size_t _cache_file_size; int64_t _alive_time_sec; io::FileReaderSPtr _remote_file_reader; diff --git a/be/src/io/cache/whole_file_cache.cpp b/be/src/io/cache/whole_file_cache.cpp index 0a33cbea0d..50b61f596a 100644 --- a/be/src/io/cache/whole_file_cache.cpp +++ b/be/src/io/cache/whole_file_cache.cpp @@ -29,23 +29,13 @@ WholeFileCache::WholeFileCache(const Path& cache_dir, int64_t alive_time_sec, : _cache_dir(cache_dir), _alive_time_sec(alive_time_sec), _remote_file_reader(remote_file_reader), - _last_match_time(time(nullptr)), _cache_file_reader(nullptr) {} WholeFileCache::~WholeFileCache() {} Status WholeFileCache::read_at(size_t offset, Slice result, size_t* bytes_read) { if (_cache_file_reader == nullptr) { - auto st = _generate_cache_reader(offset, result.size); - if (!st.ok()) { - WARN_IF_ERROR(_remote_file_reader->close(), - fmt::format("Close remote file reader failed: {}", - _remote_file_reader->path().native())); - return st; - } - RETURN_NOT_OK_STATUS_WITH_WARN(_remote_file_reader->close(), - fmt::format("Close remote file reader failed: {}", - _remote_file_reader->path().native())); + RETURN_IF_ERROR(_generate_cache_reader(offset, result.size)); } std::shared_lock rlock(_cache_lock); RETURN_NOT_OK_STATUS_WITH_WARN( @@ -56,7 +46,7 @@ Status WholeFileCache::read_at(size_t offset, Slice result, size_t* bytes_read) << ", bytes read: " << bytes_read << " vs required size: " << result.size; return Status::OLAPInternalError(OLAP_ERR_OS_ERROR); } - _last_match_time = time(nullptr); + update_last_match_time(); return Status::OK(); } @@ -132,7 +122,7 @@ Status WholeFileCache::_generate_cache_reader(size_t offset, size_t req_size) { } RETURN_IF_ERROR(io::global_local_filesystem()->open_file(cache_file, &_cache_file_reader)); _cache_file_size = _cache_file_reader->size(); - _last_match_time = time(nullptr); + update_last_match_time(); LOG(INFO) << "Create cache file from remote file successfully: " << _remote_file_reader->path().native() << " -> " << cache_file.native(); return Status::OK(); diff --git a/be/src/io/cache/whole_file_cache.h b/be/src/io/cache/whole_file_cache.h index 1ad0addff6..444ee7e012 100644 --- a/be/src/io/cache/whole_file_cache.h +++ b/be/src/io/cache/whole_file_cache.h @@ -45,8 +45,6 @@ public: const Path& cache_dir() const override { return _cache_dir; } - size_t cache_file_size() const override { return _cache_file_size; } - io::FileReaderSPtr remote_file_reader() const override { return _remote_file_reader; } Status clean_timeout_cache() override; @@ -60,12 +58,10 @@ private: private: Path _cache_dir; - size_t _cache_file_size; int64_t _alive_time_sec; io::FileReaderSPtr _remote_file_reader; std::shared_mutex _cache_lock; - int64_t _last_match_time; io::FileReaderSPtr _cache_file_reader; }; diff --git a/be/src/olap/olap_server.cpp b/be/src/olap/olap_server.cpp index 7facdb4c19..7a66e49d6f 100644 --- a/be/src/olap/olap_server.cpp +++ b/be/src/olap/olap_server.cpp @@ -767,28 +767,7 @@ void StorageEngine::_cache_file_cleaner_tasks_producer_callback() { int64_t interval = config::generate_cache_cleaner_task_interval_sec; do { LOG(INFO) << "Begin to Clean cache files"; - FileCacheManager::instance()->clean_timeout_caches(); - std::vector tablets = - StorageEngine::instance()->tablet_manager()->get_all_tablet(); - for (const auto& tablet : tablets) { - std::vector seg_file_paths; - if (io::global_local_filesystem()->list(tablet->tablet_path(), &seg_file_paths).ok()) { - for (Path seg_file : seg_file_paths) { - std::string seg_filename = seg_file.native(); - // check if it is a dir name - if (ends_with(seg_filename, ".dat")) { - continue; - } - std::stringstream ss; - ss << tablet->tablet_path() << "/" << seg_filename; - std::string cache_path = ss.str(); - if (FileCacheManager::instance()->exist(cache_path)) { - continue; - } - FileCacheManager::instance()->clean_timeout_file_not_in_mem(cache_path); - } - } - } + FileCacheManager::instance()->gc_file_caches(); } while (!_stop_background_threads_latch.wait_for(std::chrono::seconds(interval))); }