From e2245cbdd38920941c60950d8143ccc049e5467e Mon Sep 17 00:00:00 2001 From: Ashin Gau Date: Thu, 16 Feb 2023 16:04:29 +0800 Subject: [PATCH] [improvement](filecache) split file cache into sharding directories (#16767) Save cached file segment into path like `cache_path / hash(filepath).substr(0, 3) / hash(filepath) / offset` to prevent too many directories in `cache_path`. --- be/src/io/cache/block/block_file_cache.cpp | 12 +- be/src/io/cache/block/block_file_cache.h | 5 + .../io/cache/block/block_lru_file_cache.cpp | 182 +++++++++++++----- be/src/io/cache/block/block_lru_file_cache.h | 4 + be/test/io/cache/file_block_cache_test.cpp | 4 +- 5 files changed, 153 insertions(+), 54 deletions(-) diff --git a/be/src/io/cache/block/block_file_cache.cpp b/be/src/io/cache/block/block_file_cache.cpp index 147a0a3339..292903c812 100644 --- a/be/src/io/cache/block/block_file_cache.cpp +++ b/be/src/io/cache/block/block_file_cache.cpp @@ -32,6 +32,9 @@ namespace fs = std::filesystem; namespace doris { namespace io { +const std::string IFileCache::FILE_CACHE_VERSION = "2.0"; +const int IFileCache::KEY_PREFIX_LENGTH = 3; + IFileCache::IFileCache(const std::string& cache_base_path, const FileCacheSettings& cache_settings) : _cache_base_path(cache_base_path), _max_size(cache_settings.max_size), @@ -55,12 +58,17 @@ std::string IFileCache::get_path_in_local_cache(const Key& key, size_t offset, bool is_persistent) const { auto key_str = key.to_string(); std::string suffix = is_persistent ? "_persistent" : ""; - return fs::path(_cache_base_path) / key_str / (std::to_string(offset) + suffix); + return fs::path(_cache_base_path) / key_str.substr(0, KEY_PREFIX_LENGTH) / key_str / + (std::to_string(offset) + suffix); } std::string IFileCache::get_path_in_local_cache(const Key& key) const { auto key_str = key.to_string(); - return fs::path(_cache_base_path) / key_str; + return fs::path(_cache_base_path) / key_str.substr(0, KEY_PREFIX_LENGTH) / key_str; +} + +std::string IFileCache::get_version_path() const { + return fs::path(_cache_base_path) / "version"; } IFileCache::QueryFileCacheContextHolderPtr IFileCache::get_query_context_holder( diff --git a/be/src/io/cache/block/block_file_cache.h b/be/src/io/cache/block/block_file_cache.h index 466a9938d8..3e3113eb3c 100644 --- a/be/src/io/cache/block/block_file_cache.h +++ b/be/src/io/cache/block/block_file_cache.h @@ -43,6 +43,9 @@ class IFileCache { friend struct FileBlocksHolder; public: + static const std::string FILE_CACHE_VERSION; + static const int KEY_PREFIX_LENGTH; + struct Key { uint128_t key; std::string to_string() const; @@ -73,6 +76,8 @@ public: std::string get_path_in_local_cache(const Key& key) const; + std::string get_version_path() const; + const std::string& get_base_path() const { return _cache_base_path; } virtual std::vector try_get_cache_paths(const Key& key, bool is_persistent) = 0; diff --git a/be/src/io/cache/block/block_lru_file_cache.cpp b/be/src/io/cache/block/block_lru_file_cache.cpp index db3418870e..446c662b88 100644 --- a/be/src/io/cache/block/block_lru_file_cache.cpp +++ b/be/src/io/cache/block/block_lru_file_cache.cpp @@ -28,6 +28,8 @@ #include "common/status.h" #include "io/cache/block/block_file_cache.h" #include "io/cache/block/block_file_cache_settings.h" +#include "io/fs/local_file_system.h" +#include "olap/iterators.h" #include "util/time.h" #include "vec/common/hex.h" #include "vec/common/sip_hash.h" @@ -53,6 +55,7 @@ Status LRUFileCache::initialize() { return Status::IOError("cannot create {}: {}", _cache_base_path, std::strerror(ec.value())); } + RETURN_IF_ERROR(write_file_cache_version()); } } _is_initialized = true; @@ -634,63 +637,113 @@ void LRUFileCache::remove(const Key& key, bool is_persistent, size_t offset, } void LRUFileCache::load_cache_info_into_memory(std::lock_guard& cache_lock) { + /// version 1.0: cache_base_path / key / offset + /// version 2.0: cache_base_path / key_prefix / key / offset + if (read_file_cache_version() != FILE_CACHE_VERSION) { + // move directories format as version 2.0 + fs::directory_iterator key_it {_cache_base_path}; + for (; key_it != fs::directory_iterator(); ++key_it) { + if (key_it->is_directory()) { + std::string cache_key = key_it->path().filename().native(); + if (cache_key.size() > KEY_PREFIX_LENGTH) { + std::string key_prefix = + fs::path(_cache_base_path) / cache_key.substr(0, KEY_PREFIX_LENGTH); + if (!fs::exists(key_prefix)) { + std::error_code ec; + fs::create_directories(key_prefix, ec); + if (ec) { + LOG(WARNING) << "Failed to create new version cached directory: " + << ec.message(); + continue; + } + } + std::error_code ec; + std::filesystem::rename(key_it->path(), key_prefix / cache_key, ec); + if (ec) { + LOG(WARNING) + << "Failed to move old version cached directory: " << ec.message(); + } + } + } + } + if (!write_file_cache_version().ok()) { + LOG(WARNING) << "Failed to write version hints for file cache"; + } + } + Key key; uint64_t offset = 0; size_t size = 0; std::vector> queue_entries; - /// cache_base_path / key / offset - fs::directory_iterator key_it {_cache_base_path}; - for (; key_it != fs::directory_iterator(); ++key_it) { - key = Key(vectorized::unhex_uint(key_it->path().filename().native().c_str())); + /// version 2.0: cache_base_path / key_prefix / key / offset + fs::directory_iterator key_prefix_it {_cache_base_path}; + for (; key_prefix_it != fs::directory_iterator(); ++key_prefix_it) { + if (!key_prefix_it->is_directory()) { + // maybe version hits file + continue; + } + if (key_prefix_it->path().filename().native().size() != KEY_PREFIX_LENGTH) { + LOG(WARNING) << "Unknown directory " << key_prefix_it->path().native() + << ", try to remove it"; + std::filesystem::remove(key_prefix_it->path()); + continue; + } - fs::directory_iterator offset_it {key_it->path()}; - for (; offset_it != fs::directory_iterator(); ++offset_it) { - auto offset_with_suffix = offset_it->path().filename().native(); - auto delim_pos = offset_with_suffix.find('_'); - bool is_persistent = false; - bool parsed = true; - try { - if (delim_pos == std::string::npos) { - offset = stoull(offset_with_suffix); + fs::directory_iterator key_it {key_prefix_it->path()}; + for (; key_it != fs::directory_iterator(); ++key_it) { + key = Key( + vectorized::unhex_uint(key_it->path().filename().native().c_str())); + + fs::directory_iterator offset_it {key_it->path()}; + for (; offset_it != fs::directory_iterator(); ++offset_it) { + auto offset_with_suffix = offset_it->path().filename().native(); + auto delim_pos = offset_with_suffix.find('_'); + bool is_persistent = false; + bool parsed = true; + try { + if (delim_pos == std::string::npos) { + offset = stoull(offset_with_suffix); + } else { + offset = stoull(offset_with_suffix.substr(0, delim_pos)); + is_persistent = offset_with_suffix.substr(delim_pos + 1) == "persistent"; + } + } catch (...) { + parsed = false; + } + + if (!parsed) { + LOG(WARNING) << "Unexpected file: " << offset_it->path().native(); + continue; /// Or just remove? Some unexpected file. + } + + size = offset_it->file_size(); + if (size == 0) { + std::error_code ec; + fs::remove(offset_it->path(), ec); + if (ec) { + LOG(WARNING) << ec.message(); + } + continue; + } + + if (try_reserve(key, TUniqueId(), is_persistent, offset, size, cache_lock)) { + auto* cell = add_cell(key, is_persistent, offset, size, + FileBlock::State::DOWNLOADED, cache_lock); + if (cell) { + queue_entries.emplace_back(*cell->queue_iterator, is_persistent); + } } else { - offset = stoull(offset_with_suffix.substr(0, delim_pos)); - is_persistent = offset_with_suffix.substr(delim_pos + 1) == "persistent"; - } - } catch (...) { - parsed = false; - } - - if (!parsed) { - LOG(WARNING) << "Unexpected file: " << offset_it->path().native(); - continue; /// Or just remove? Some unexpected file. - } - - size = offset_it->file_size(); - if (size == 0) { - std::error_code ec; - fs::remove(offset_it->path(), ec); - if (ec) { - LOG(WARNING) << ec.message(); - } - continue; - } - - if (try_reserve(key, TUniqueId(), is_persistent, offset, size, cache_lock)) { - auto* cell = add_cell(key, is_persistent, offset, size, - FileBlock::State::DOWNLOADED, cache_lock); - if (cell) { - queue_entries.emplace_back(*cell->queue_iterator, is_persistent); - } - } else { - LOG(WARNING) << "Cache capacity changed (max size: " << _max_size << ", available: " - << get_available_cache_size_unlocked(is_persistent, cache_lock) - << "), cached file " << key_it->path().string() - << " does not fit in cache anymore (size: " << size << ")"; - std::error_code ec; - fs::remove(offset_it->path(), ec); - if (ec) { - LOG(WARNING) << ec.message(); + LOG(WARNING) << "Cache capacity changed (max size: " << _max_size + << ", available: " + << get_available_cache_size_unlocked(is_persistent, cache_lock) + << "), cached file " << key_it->path().string() + << " does not fit in cache anymore (size: " << size << ")"; + std::error_code ec; + fs::remove(offset_it->path(), ec); + if (ec) { + LOG(WARNING) << ec.message(); + } } } } @@ -706,6 +759,35 @@ void LRUFileCache::load_cache_info_into_memory(std::lock_guard& cach } } +Status LRUFileCache::write_file_cache_version() const { + std::string version_path = get_version_path(); + Slice version(FILE_CACHE_VERSION); + FileWriterPtr version_writer; + RETURN_IF_ERROR(global_local_filesystem()->create_file(version_path, &version_writer)); + RETURN_IF_ERROR(version_writer->append(version)); + return version_writer->close(); +} + +std::string LRUFileCache::read_file_cache_version() const { + std::string version_path = get_version_path(); + const FileSystemSPtr& fs = global_local_filesystem(); + bool exists = false; + fs->exists(version_path, &exists); + if (!exists) { + return "1.0"; + } + FileReaderSPtr version_reader; + size_t file_size = 0; + fs->file_size(version_path, &file_size); + char version[file_size]; + + IOContext io_ctx; + fs->open_file(version_path, &version_reader, &io_ctx); + version_reader->read_at(0, Slice(version, file_size), io_ctx, &file_size); + version_reader->close(); + return std::string(version, file_size); +} + std::vector LRUFileCache::try_get_cache_paths(const Key& key, bool is_persistent) { std::lock_guard cache_lock(_mutex); diff --git a/be/src/io/cache/block/block_lru_file_cache.h b/be/src/io/cache/block/block_lru_file_cache.h index f9f0862d7b..ac6c09371e 100644 --- a/be/src/io/cache/block/block_lru_file_cache.h +++ b/be/src/io/cache/block/block_lru_file_cache.h @@ -130,6 +130,10 @@ private: void load_cache_info_into_memory(std::lock_guard& cache_lock); + Status write_file_cache_version() const; + + std::string read_file_cache_version() const; + FileBlocks split_range_into_cells(const Key& key, const TUniqueId& query_id, bool is_persistent, size_t offset, size_t size, FileBlock::State state, std::lock_guard& cache_lock); diff --git a/be/test/io/cache/file_block_cache_test.cpp b/be/test/io/cache/file_block_cache_test.cpp index d24bcc8732..157e20b2f7 100644 --- a/be/test/io/cache/file_block_cache_test.cpp +++ b/be/test/io/cache/file_block_cache_test.cpp @@ -54,7 +54,7 @@ std::vector fromHolder(const io::FileBlocksHolder& holder) { std::string getFileBlockPath(const std::string& base_path, const io::IFileCache::Key& key, size_t offset) { auto key_str = key.to_string(); - return fs::path(base_path) / key_str / std::to_string(offset); + return fs::path(base_path) / key_str.substr(0, 3) / key_str / std::to_string(offset); } void download(io::FileBlockSPtr file_segment) { @@ -62,7 +62,7 @@ void download(io::FileBlockSPtr file_segment) { size_t size = file_segment->range().size(); auto key_str = key.to_string(); - auto subdir = fs::path(cache_base_path) / key_str; + auto subdir = fs::path(cache_base_path) / key_str.substr(0, 3) / key_str; ASSERT_TRUE(fs::exists(subdir)); std::string data(size, '0');