[improvement](filecache) split file cache into sharding directories (#16767)

Save cached file segment into path like `cache_path / hash(filepath).substr(0, 3) / hash(filepath) / offset`
to prevent too many directories in `cache_path`.
This commit is contained in:
Ashin Gau
2023-02-16 16:04:29 +08:00
committed by GitHub
parent 292926e5aa
commit e2245cbdd3
5 changed files with 153 additions and 54 deletions

View File

@ -32,6 +32,9 @@ namespace fs = std::filesystem;
namespace doris {
namespace io {
const std::string IFileCache::FILE_CACHE_VERSION = "2.0";
const int IFileCache::KEY_PREFIX_LENGTH = 3;
IFileCache::IFileCache(const std::string& cache_base_path, const FileCacheSettings& cache_settings)
: _cache_base_path(cache_base_path),
_max_size(cache_settings.max_size),
@ -55,12 +58,17 @@ std::string IFileCache::get_path_in_local_cache(const Key& key, size_t offset,
bool is_persistent) const {
auto key_str = key.to_string();
std::string suffix = is_persistent ? "_persistent" : "";
return fs::path(_cache_base_path) / key_str / (std::to_string(offset) + suffix);
return fs::path(_cache_base_path) / key_str.substr(0, KEY_PREFIX_LENGTH) / key_str /
(std::to_string(offset) + suffix);
}
std::string IFileCache::get_path_in_local_cache(const Key& key) const {
auto key_str = key.to_string();
return fs::path(_cache_base_path) / key_str;
return fs::path(_cache_base_path) / key_str.substr(0, KEY_PREFIX_LENGTH) / key_str;
}
std::string IFileCache::get_version_path() const {
return fs::path(_cache_base_path) / "version";
}
IFileCache::QueryFileCacheContextHolderPtr IFileCache::get_query_context_holder(

View File

@ -43,6 +43,9 @@ class IFileCache {
friend struct FileBlocksHolder;
public:
static const std::string FILE_CACHE_VERSION;
static const int KEY_PREFIX_LENGTH;
struct Key {
uint128_t key;
std::string to_string() const;
@ -73,6 +76,8 @@ public:
std::string get_path_in_local_cache(const Key& key) const;
std::string get_version_path() const;
const std::string& get_base_path() const { return _cache_base_path; }
virtual std::vector<std::string> try_get_cache_paths(const Key& key, bool is_persistent) = 0;

View File

@ -28,6 +28,8 @@
#include "common/status.h"
#include "io/cache/block/block_file_cache.h"
#include "io/cache/block/block_file_cache_settings.h"
#include "io/fs/local_file_system.h"
#include "olap/iterators.h"
#include "util/time.h"
#include "vec/common/hex.h"
#include "vec/common/sip_hash.h"
@ -53,6 +55,7 @@ Status LRUFileCache::initialize() {
return Status::IOError("cannot create {}: {}", _cache_base_path,
std::strerror(ec.value()));
}
RETURN_IF_ERROR(write_file_cache_version());
}
}
_is_initialized = true;
@ -634,63 +637,113 @@ void LRUFileCache::remove(const Key& key, bool is_persistent, size_t offset,
}
void LRUFileCache::load_cache_info_into_memory(std::lock_guard<std::mutex>& cache_lock) {
/// version 1.0: cache_base_path / key / offset
/// version 2.0: cache_base_path / key_prefix / key / offset
if (read_file_cache_version() != FILE_CACHE_VERSION) {
// move directories format as version 2.0
fs::directory_iterator key_it {_cache_base_path};
for (; key_it != fs::directory_iterator(); ++key_it) {
if (key_it->is_directory()) {
std::string cache_key = key_it->path().filename().native();
if (cache_key.size() > KEY_PREFIX_LENGTH) {
std::string key_prefix =
fs::path(_cache_base_path) / cache_key.substr(0, KEY_PREFIX_LENGTH);
if (!fs::exists(key_prefix)) {
std::error_code ec;
fs::create_directories(key_prefix, ec);
if (ec) {
LOG(WARNING) << "Failed to create new version cached directory: "
<< ec.message();
continue;
}
}
std::error_code ec;
std::filesystem::rename(key_it->path(), key_prefix / cache_key, ec);
if (ec) {
LOG(WARNING)
<< "Failed to move old version cached directory: " << ec.message();
}
}
}
}
if (!write_file_cache_version().ok()) {
LOG(WARNING) << "Failed to write version hints for file cache";
}
}
Key key;
uint64_t offset = 0;
size_t size = 0;
std::vector<std::pair<LRUQueue::Iterator, bool>> queue_entries;
/// cache_base_path / key / offset
fs::directory_iterator key_it {_cache_base_path};
for (; key_it != fs::directory_iterator(); ++key_it) {
key = Key(vectorized::unhex_uint<uint128_t>(key_it->path().filename().native().c_str()));
/// version 2.0: cache_base_path / key_prefix / key / offset
fs::directory_iterator key_prefix_it {_cache_base_path};
for (; key_prefix_it != fs::directory_iterator(); ++key_prefix_it) {
if (!key_prefix_it->is_directory()) {
// maybe version hits file
continue;
}
if (key_prefix_it->path().filename().native().size() != KEY_PREFIX_LENGTH) {
LOG(WARNING) << "Unknown directory " << key_prefix_it->path().native()
<< ", try to remove it";
std::filesystem::remove(key_prefix_it->path());
continue;
}
fs::directory_iterator offset_it {key_it->path()};
for (; offset_it != fs::directory_iterator(); ++offset_it) {
auto offset_with_suffix = offset_it->path().filename().native();
auto delim_pos = offset_with_suffix.find('_');
bool is_persistent = false;
bool parsed = true;
try {
if (delim_pos == std::string::npos) {
offset = stoull(offset_with_suffix);
fs::directory_iterator key_it {key_prefix_it->path()};
for (; key_it != fs::directory_iterator(); ++key_it) {
key = Key(
vectorized::unhex_uint<uint128_t>(key_it->path().filename().native().c_str()));
fs::directory_iterator offset_it {key_it->path()};
for (; offset_it != fs::directory_iterator(); ++offset_it) {
auto offset_with_suffix = offset_it->path().filename().native();
auto delim_pos = offset_with_suffix.find('_');
bool is_persistent = false;
bool parsed = true;
try {
if (delim_pos == std::string::npos) {
offset = stoull(offset_with_suffix);
} else {
offset = stoull(offset_with_suffix.substr(0, delim_pos));
is_persistent = offset_with_suffix.substr(delim_pos + 1) == "persistent";
}
} catch (...) {
parsed = false;
}
if (!parsed) {
LOG(WARNING) << "Unexpected file: " << offset_it->path().native();
continue; /// Or just remove? Some unexpected file.
}
size = offset_it->file_size();
if (size == 0) {
std::error_code ec;
fs::remove(offset_it->path(), ec);
if (ec) {
LOG(WARNING) << ec.message();
}
continue;
}
if (try_reserve(key, TUniqueId(), is_persistent, offset, size, cache_lock)) {
auto* cell = add_cell(key, is_persistent, offset, size,
FileBlock::State::DOWNLOADED, cache_lock);
if (cell) {
queue_entries.emplace_back(*cell->queue_iterator, is_persistent);
}
} else {
offset = stoull(offset_with_suffix.substr(0, delim_pos));
is_persistent = offset_with_suffix.substr(delim_pos + 1) == "persistent";
}
} catch (...) {
parsed = false;
}
if (!parsed) {
LOG(WARNING) << "Unexpected file: " << offset_it->path().native();
continue; /// Or just remove? Some unexpected file.
}
size = offset_it->file_size();
if (size == 0) {
std::error_code ec;
fs::remove(offset_it->path(), ec);
if (ec) {
LOG(WARNING) << ec.message();
}
continue;
}
if (try_reserve(key, TUniqueId(), is_persistent, offset, size, cache_lock)) {
auto* cell = add_cell(key, is_persistent, offset, size,
FileBlock::State::DOWNLOADED, cache_lock);
if (cell) {
queue_entries.emplace_back(*cell->queue_iterator, is_persistent);
}
} else {
LOG(WARNING) << "Cache capacity changed (max size: " << _max_size << ", available: "
<< get_available_cache_size_unlocked(is_persistent, cache_lock)
<< "), cached file " << key_it->path().string()
<< " does not fit in cache anymore (size: " << size << ")";
std::error_code ec;
fs::remove(offset_it->path(), ec);
if (ec) {
LOG(WARNING) << ec.message();
LOG(WARNING) << "Cache capacity changed (max size: " << _max_size
<< ", available: "
<< get_available_cache_size_unlocked(is_persistent, cache_lock)
<< "), cached file " << key_it->path().string()
<< " does not fit in cache anymore (size: " << size << ")";
std::error_code ec;
fs::remove(offset_it->path(), ec);
if (ec) {
LOG(WARNING) << ec.message();
}
}
}
}
@ -706,6 +759,35 @@ void LRUFileCache::load_cache_info_into_memory(std::lock_guard<std::mutex>& cach
}
}
Status LRUFileCache::write_file_cache_version() const {
std::string version_path = get_version_path();
Slice version(FILE_CACHE_VERSION);
FileWriterPtr version_writer;
RETURN_IF_ERROR(global_local_filesystem()->create_file(version_path, &version_writer));
RETURN_IF_ERROR(version_writer->append(version));
return version_writer->close();
}
std::string LRUFileCache::read_file_cache_version() const {
std::string version_path = get_version_path();
const FileSystemSPtr& fs = global_local_filesystem();
bool exists = false;
fs->exists(version_path, &exists);
if (!exists) {
return "1.0";
}
FileReaderSPtr version_reader;
size_t file_size = 0;
fs->file_size(version_path, &file_size);
char version[file_size];
IOContext io_ctx;
fs->open_file(version_path, &version_reader, &io_ctx);
version_reader->read_at(0, Slice(version, file_size), io_ctx, &file_size);
version_reader->close();
return std::string(version, file_size);
}
std::vector<std::string> LRUFileCache::try_get_cache_paths(const Key& key, bool is_persistent) {
std::lock_guard cache_lock(_mutex);

View File

@ -130,6 +130,10 @@ private:
void load_cache_info_into_memory(std::lock_guard<std::mutex>& cache_lock);
Status write_file_cache_version() const;
std::string read_file_cache_version() const;
FileBlocks split_range_into_cells(const Key& key, const TUniqueId& query_id, bool is_persistent,
size_t offset, size_t size, FileBlock::State state,
std::lock_guard<std::mutex>& cache_lock);

View File

@ -54,7 +54,7 @@ std::vector<io::FileBlockSPtr> fromHolder(const io::FileBlocksHolder& holder) {
std::string getFileBlockPath(const std::string& base_path, const io::IFileCache::Key& key,
size_t offset) {
auto key_str = key.to_string();
return fs::path(base_path) / key_str / std::to_string(offset);
return fs::path(base_path) / key_str.substr(0, 3) / key_str / std::to_string(offset);
}
void download(io::FileBlockSPtr file_segment) {
@ -62,7 +62,7 @@ void download(io::FileBlockSPtr file_segment) {
size_t size = file_segment->range().size();
auto key_str = key.to_string();
auto subdir = fs::path(cache_base_path) / key_str;
auto subdir = fs::path(cache_base_path) / key_str.substr(0, 3) / key_str;
ASSERT_TRUE(fs::exists(subdir));
std::string data(size, '0');