[feature](remote) support local cache GC by disk usage (#12897)

* support local cache gc by disk usage

* support gc per disk

* refractor file cache size logic

* also consider unused file caches while GC by disk size

* change config file_cache_max_size_per_disk from GB to B

* bugfix

* update

* use two stage locks to avoid lock while disk io

* rdlock one by one for dummy file cache gc

Co-authored-by: cambyzju <zhuxiaoli01@baidu.com>
This commit is contained in:
camby
2022-10-14 15:11:29 +08:00
committed by GitHub
parent 50ae9e6b19
commit 005c2cd43b
12 changed files with 391 additions and 131 deletions

View File

@ -18,15 +18,41 @@
#include "io/cache/file_cache_manager.h"
#include "gutil/strings/util.h"
#include "io/cache/dummy_file_cache.h"
#include "io/cache/sub_file_cache.h"
#include "io/cache/whole_file_cache.h"
#include "io/fs/local_file_system.h"
#include "olap/storage_engine.h"
#include "util/file_utils.h"
#include "util/string_util.h"
namespace doris {
namespace io {
void GCContextPerDisk::init(const std::string& path, int64_t max_size) {
_disk_path = path;
_conf_max_size = max_size;
_used_size = 0;
}
bool GCContextPerDisk::try_add_file_cache(FileCachePtr cache, int64_t file_size) {
if (cache->cache_dir().string().substr(0, _disk_path.size()) == _disk_path) {
_lru_queue.push(cache);
_used_size += file_size;
return true;
}
return false;
}
void GCContextPerDisk::get_gc_file_caches(std::list<FileCachePtr>& result) {
while (!_lru_queue.empty() && _used_size > _conf_max_size) {
auto file_cache = _lru_queue.top();
_used_size -= file_cache->cache_file_size();
result.push_back(file_cache);
_lru_queue.pop();
}
}
void FileCacheManager::add_file_cache(const std::string& cache_path, FileCachePtr file_cache) {
std::lock_guard<std::shared_mutex> wrlock(_cache_map_lock);
_file_cache_map.emplace(cache_path, file_cache);
@ -56,87 +82,109 @@ void FileCacheManager::remove_file_cache(const std::string& cache_path) {
}
}
void FileCacheManager::clean_timeout_caches() {
std::shared_lock<std::shared_mutex> rdlock(_cache_map_lock);
for (std::map<std::string, FileCachePtr>::const_iterator iter = _file_cache_map.cbegin();
iter != _file_cache_map.cend(); ++iter) {
if (iter->second == nullptr) {
continue;
void FileCacheManager::_add_file_cache_for_gc_by_disk(std::vector<GCContextPerDisk>& contexts,
FileCachePtr file_cache) {
// sort file cache by last match time
if (config::file_cache_max_size_per_disk > 0) {
auto file_size = file_cache->cache_file_size();
if (file_size <= 0) {
return;
}
for (size_t i = 0; i < contexts.size(); ++i) {
if (contexts[i].try_add_file_cache(file_cache, file_size)) {
break;
}
}
iter->second->clean_timeout_cache();
}
}
void FileCacheManager::clean_timeout_file_not_in_mem(const std::string& cache_path) {
time_t now = time(nullptr);
std::shared_lock<std::shared_mutex> rdlock(_cache_map_lock);
// Deal with caches not in _file_cache_map
if (_file_cache_map.find(cache_path) == _file_cache_map.end()) {
std::vector<Path> cache_file_names;
if (io::global_local_filesystem()->list(cache_path, &cache_file_names).ok()) {
std::map<std::string, bool> cache_names;
std::list<std::string> done_names;
for (Path cache_file_name : cache_file_names) {
std::string filename = cache_file_name.native();
if (!ends_with(filename, CACHE_DONE_FILE_SUFFIX)) {
cache_names[filename] = true;
void FileCacheManager::_gc_unused_file_caches(std::list<FileCachePtr>& result) {
std::vector<TabletSharedPtr> tablets =
StorageEngine::instance()->tablet_manager()->get_all_tablet();
for (const auto& tablet : tablets) {
std::vector<Path> seg_file_paths;
if (io::global_local_filesystem()->list(tablet->tablet_path(), &seg_file_paths).ok()) {
for (Path seg_file : seg_file_paths) {
std::string seg_filename = seg_file.native();
// check if it is a dir name
if (ends_with(seg_filename, ".dat")) {
continue;
}
done_names.push_back(filename);
std::stringstream done_file_ss;
done_file_ss << cache_path << "/" << filename;
std::string done_file_path = done_file_ss.str();
time_t m_time;
if (!FileUtils::mtime(done_file_path, &m_time).ok()) {
continue;
}
if (now - m_time < config::file_cache_alive_time_sec) {
continue;
}
std::string cache_file_path =
StringReplace(done_file_path, CACHE_DONE_FILE_SUFFIX, "", true);
LOG(INFO) << "Delete timeout done_cache_path: " << done_file_path
<< ", cache_file_path: " << cache_file_path << ", m_time: " << m_time;
if (!io::global_local_filesystem()->delete_file(done_file_path).ok()) {
LOG(ERROR) << "delete_file failed: " << done_file_path;
continue;
}
if (!io::global_local_filesystem()->delete_file(cache_file_path).ok()) {
LOG(ERROR) << "delete_file failed: " << cache_file_path;
// skip file cache already in memory
std::stringstream ss;
ss << tablet->tablet_path() << "/" << seg_filename;
std::string cache_path = ss.str();
std::shared_lock<std::shared_mutex> rdlock(_cache_map_lock);
if (_file_cache_map.find(cache_path) != _file_cache_map.end()) {
continue;
}
auto file_cache = std::make_shared<DummyFileCache>(
cache_path, config::file_cache_alive_time_sec);
// load cache meta from disk and clean unfinished cache files
file_cache->load_and_clean();
// policy1: GC file cache by timeout
file_cache->clean_timeout_cache();
result.push_back(file_cache);
}
// find cache file without done file.
for (std::list<std::string>::iterator itr = done_names.begin(); itr != done_names.end();
++itr) {
std::string cache_filename = StringReplace(*itr, CACHE_DONE_FILE_SUFFIX, "", true);
if (cache_names.find(cache_filename) != cache_names.end()) {
cache_names.erase(cache_filename);
}
}
}
}
void FileCacheManager::gc_file_caches() {
int64_t gc_conf_size = config::file_cache_max_size_per_disk;
std::vector<GCContextPerDisk> contexts;
// init for GC by disk size
if (gc_conf_size > 0) {
std::vector<DataDir*> data_dirs = doris::StorageEngine::instance()->get_stores();
contexts.resize(data_dirs.size());
for (size_t i = 0; i < contexts.size(); ++i) {
contexts[i].init(data_dirs[i]->path(), gc_conf_size);
}
}
// process unused file caches
std::list<FileCachePtr> dummy_file_list;
_gc_unused_file_caches(dummy_file_list);
{
std::shared_lock<std::shared_mutex> rdlock(_cache_map_lock);
for (auto item : dummy_file_list) {
// check again after _cache_map_lock hold
if (_file_cache_map.find(item->cache_dir().native()) != _file_cache_map.end()) {
continue;
}
// remove cache file without done file
for (std::map<std::string, bool>::iterator itr = cache_names.begin();
itr != cache_names.end(); ++itr) {
std::stringstream cache_file_ss;
cache_file_ss << cache_path << "/" << itr->first;
std::string cache_file_path = cache_file_ss.str();
time_t m_time;
if (!FileUtils::mtime(cache_file_path, &m_time).ok()) {
// sort file cache by last match time
_add_file_cache_for_gc_by_disk(contexts, item);
}
// process file caches in memory
for (std::map<std::string, FileCachePtr>::const_iterator iter = _file_cache_map.cbegin();
iter != _file_cache_map.cend(); ++iter) {
if (iter->second == nullptr) {
continue;
}
// policy1: GC file cache by timeout
iter->second->clean_timeout_cache();
// sort file cache by last match time
_add_file_cache_for_gc_by_disk(contexts, iter->second);
}
}
// policy2: GC file cache by disk size
if (gc_conf_size > 0) {
for (size_t i = 0; i < contexts.size(); ++i) {
std::list<FileCachePtr> gc_file_list;
contexts[i].get_gc_file_caches(gc_file_list);
for (auto item : gc_file_list) {
std::shared_lock<std::shared_mutex> rdlock(_cache_map_lock);
// for dummy file cache, check already used or not again
if (item->is_dummy_file_cache() &&
_file_cache_map.find(item->cache_dir().native()) != _file_cache_map.end()) {
continue;
}
if (now - m_time < config::file_cache_alive_time_sec) {
continue;
}
LOG(INFO) << "Delete cache file without done file: " << cache_file_path;
if (!io::global_local_filesystem()->delete_file(cache_file_path).ok()) {
LOG(ERROR) << "delete_file failed: " << cache_file_path;
}
}
if (io::global_local_filesystem()->list(cache_path, &cache_file_names).ok() &&
cache_file_names.size() == 0) {
if (global_local_filesystem()->delete_directory(cache_path).ok()) {
LOG(INFO) << "Delete empty dir: " << cache_path;
}
item->clean_all_cache();
}
}
}