[improve](cache) File cache async init (#39036)
## Proposed changes Do `load_cache_info_into_memory()` asynchronously in a background thread in `LRUFileCache::initialize()`. When the cache is not ready, `LRUFileCache::get_or_set()` will return the FileBlock which state is SKIP_CACHE.
This commit is contained in:
@ -996,6 +996,9 @@ DEFINE_Bool(enable_file_cache, "false");
|
||||
// format: [{"path":"/path/to/file_cache","total_size":21474836480,"query_limit":10737418240}]
|
||||
// format: [{"path":"/path/to/file_cache","total_size":21474836480,"query_limit":10737418240},{"path":"/path/to/file_cache2","total_size":21474836480,"query_limit":10737418240}]
|
||||
DEFINE_String(file_cache_path, "");
|
||||
// thread will sleep 10ms per scan file num to limit IO
|
||||
DEFINE_Int64(async_file_cache_init_file_num_interval, "1000");
|
||||
DEFINE_Int64(async_file_cache_init_sleep_interval_ms, "20");
|
||||
DEFINE_Int64(file_cache_max_file_segment_size, "4194304"); // 4MB
|
||||
// 4KB <= file_cache_max_file_segment_size <= 256MB
|
||||
DEFINE_Validator(file_cache_max_file_segment_size, [](const int64_t config) -> bool {
|
||||
|
||||
@ -1047,6 +1047,8 @@ DECLARE_Bool(enable_file_cache);
|
||||
// format: [{"path":"/path/to/file_cache","total_size":21474836480,"query_limit":10737418240},{"path":"/path/to/file_cache2","total_size":21474836480,"query_limit":10737418240}]
|
||||
// format: [{"path":"/path/to/file_cache","total_size":21474836480,"query_limit":10737418240,"normal_percent":85, "disposable_percent":10, "index_percent":5}]
|
||||
DECLARE_String(file_cache_path);
|
||||
DECLARE_Int64(async_file_cache_init_file_num_interval);
|
||||
DECLARE_Int64(async_file_cache_init_sleep_interval_ms);
|
||||
DECLARE_Int64(file_cache_min_file_segment_size);
|
||||
DECLARE_Int64(file_cache_max_file_segment_size);
|
||||
DECLARE_Bool(clear_file_cache);
|
||||
|
||||
57
be/src/io/cache/block/block_lru_file_cache.cpp
vendored
57
be/src/io/cache/block/block_lru_file_cache.cpp
vendored
@ -39,6 +39,7 @@
|
||||
#include <system_error>
|
||||
#include <utility>
|
||||
|
||||
#include "common/logging.h"
|
||||
#include "common/status.h"
|
||||
#include "io/cache/block/block_file_cache.h"
|
||||
#include "io/cache/block/block_file_cache_fwd.h"
|
||||
@ -119,10 +120,33 @@ LRUFileCache::LRUFileCache(const std::string& cache_base_path,
|
||||
Status LRUFileCache::initialize() {
|
||||
MonotonicStopWatch watch;
|
||||
watch.start();
|
||||
std::lock_guard cache_lock(_mutex);
|
||||
if (!_is_initialized) {
|
||||
if (fs::exists(_cache_base_path)) {
|
||||
RETURN_IF_ERROR(load_cache_info_into_memory(cache_lock));
|
||||
// the cache already exists, try to load cache info asyncly
|
||||
_lazy_open_done = false;
|
||||
_cache_background_load_thread = std::thread([this]() {
|
||||
MonotonicStopWatch watch;
|
||||
watch.start();
|
||||
std::lock_guard<std::mutex> cache_lock(_mutex);
|
||||
Status s = load_cache_info_into_memory(cache_lock);
|
||||
if (s.ok()) {
|
||||
_lazy_open_done = true;
|
||||
} else {
|
||||
LOG(WARNING) << fmt::format("Failed to load cache info from {}: {}",
|
||||
_cache_base_path, s.to_string());
|
||||
}
|
||||
int64_t cost = watch.elapsed_time() / 1000 / 1000;
|
||||
LOG(INFO) << fmt::format(
|
||||
"FileCache lazy load done path={}, disposable queue size={} elements={}, "
|
||||
"index queue size={} elements={}, query queue size={} elements={}, init "
|
||||
"cost(ms)={}",
|
||||
_cache_base_path, _disposable_queue.get_total_cache_size(cache_lock),
|
||||
_disposable_queue.get_elements_num(cache_lock),
|
||||
_index_queue.get_total_cache_size(cache_lock),
|
||||
_index_queue.get_elements_num(cache_lock),
|
||||
_normal_queue.get_total_cache_size(cache_lock),
|
||||
_normal_queue.get_elements_num(cache_lock), cost);
|
||||
});
|
||||
} else {
|
||||
std::error_code ec;
|
||||
fs::create_directories(_cache_base_path, ec);
|
||||
@ -136,17 +160,8 @@ Status LRUFileCache::initialize() {
|
||||
_is_initialized = true;
|
||||
_cache_background_thread = std::thread(&LRUFileCache::run_background_operation, this);
|
||||
int64_t cost = watch.elapsed_time() / 1000 / 1000;
|
||||
LOG(INFO) << fmt::format(
|
||||
"After initialize file cache path={}, disposable queue size={} elements={}, index "
|
||||
"queue size={} "
|
||||
"elements={}, query queue "
|
||||
"size={} elements={}, init cost(ms)={}",
|
||||
_cache_base_path, _disposable_queue.get_total_cache_size(cache_lock),
|
||||
_disposable_queue.get_elements_num(cache_lock),
|
||||
_index_queue.get_total_cache_size(cache_lock),
|
||||
_index_queue.get_elements_num(cache_lock),
|
||||
_normal_queue.get_total_cache_size(cache_lock),
|
||||
_normal_queue.get_elements_num(cache_lock), cost);
|
||||
LOG(INFO) << fmt::format("After initialize file cache path={}, init cost(ms)={}",
|
||||
_cache_base_path, cost);
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
@ -376,6 +391,16 @@ void LRUFileCache::fill_holes_with_empty_file_blocks(FileBlocks& file_blocks, co
|
||||
|
||||
FileBlocksHolder LRUFileCache::get_or_set(const Key& key, size_t offset, size_t size,
|
||||
const CacheContext& context) {
|
||||
if (!_lazy_open_done) {
|
||||
// Cache is not ready yet
|
||||
VLOG_NOTICE << fmt::format(
|
||||
"Cache is not ready yet, skip cache for key: {}, offset: {}, size: {}.",
|
||||
key.to_string(), offset, size);
|
||||
FileBlocks file_blocks = {std::make_shared<FileBlock>(
|
||||
offset, size, key, this, FileBlock::State::SKIP_CACHE, context.cache_type)};
|
||||
return FileBlocksHolder(std::move(file_blocks));
|
||||
}
|
||||
|
||||
FileBlock::Range range(offset, offset + size - 1);
|
||||
|
||||
std::lock_guard cache_lock(_mutex);
|
||||
@ -827,6 +852,7 @@ Status LRUFileCache::load_cache_info_into_memory(std::lock_guard<std::mutex>& ca
|
||||
std::vector<std::pair<Key, size_t>> queue_entries;
|
||||
std::vector<std::string> need_to_check_if_empty_dir;
|
||||
Status st = Status::OK();
|
||||
size_t scan_file_num = 0;
|
||||
auto scan_file_cache = [&](fs::directory_iterator& key_it) {
|
||||
for (; key_it != fs::directory_iterator(); ++key_it) {
|
||||
key = Key(
|
||||
@ -888,6 +914,11 @@ Status LRUFileCache::load_cache_info_into_memory(std::lock_guard<std::mutex>& ca
|
||||
}
|
||||
need_to_check_if_empty_dir.push_back(key_it->path());
|
||||
}
|
||||
scan_file_num += 1;
|
||||
if (scan_file_num % config::async_file_cache_init_file_num_interval == 0) {
|
||||
std::this_thread::sleep_for(std::chrono::milliseconds(
|
||||
config::async_file_cache_init_sleep_interval_ms));
|
||||
}
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
5
be/src/io/cache/block/block_lru_file_cache.h
vendored
5
be/src/io/cache/block/block_lru_file_cache.h
vendored
@ -53,6 +53,9 @@ public:
|
||||
LRUFileCache(const std::string& cache_base_path, const FileCacheSettings& cache_settings);
|
||||
~LRUFileCache() override {
|
||||
_close = true;
|
||||
if (_cache_background_load_thread.joinable()) {
|
||||
_cache_background_thread.join();
|
||||
}
|
||||
if (_cache_background_thread.joinable()) {
|
||||
_cache_background_thread.join();
|
||||
}
|
||||
@ -201,6 +204,8 @@ public:
|
||||
private:
|
||||
std::atomic_bool _close {false};
|
||||
std::thread _cache_background_thread;
|
||||
std::atomic_bool _lazy_open_done {true};
|
||||
std::thread _cache_background_load_thread;
|
||||
size_t _num_read_segments = 0;
|
||||
size_t _num_hit_segments = 0;
|
||||
size_t _num_removed_segments = 0;
|
||||
|
||||
Reference in New Issue
Block a user