[feature](remote)Add cache files cleaner for remote olap files (#11959)

This commit is contained in:
pengxiangyu
2022-08-26 23:59:36 +08:00
committed by GitHub
parent 0b5bb565a7
commit a6e2e2f3bc
15 changed files with 251 additions and 20 deletions

View File

@ -824,10 +824,11 @@ CONF_Int32(quick_compaction_min_rowsets, "10");
// cooldown task configs
CONF_Int32(cooldown_thread_num, "5");
CONF_mInt64(generate_cooldown_task_interval_sec, "20");
CONF_mInt64(generate_cache_cleaner_task_interval_sec, "43200"); // 12 h
CONF_Int32(concurrency_per_dir, "2");
CONF_mInt64(cooldown_lag_time_sec, "10800"); // 3h
CONF_mInt64(max_sub_cache_file_size, "1073741824"); // 1GB
CONF_mInt64(file_cache_alive_time_sec, "604800"); // 1 week
CONF_mInt64(cooldown_lag_time_sec, "10800"); // 3h
CONF_mInt64(max_sub_cache_file_size, "104857600"); // 100MB
CONF_mInt64(file_cache_alive_time_sec, "604800"); // 1 week
// file_cache_type is used to set the type of file cache for remote files.
// "": no cache, "sub_file_cache": split sub files from remote file.
// "whole_file_cache": the whole file.

View File

@ -27,6 +27,8 @@
namespace doris {
namespace io {
const std::string CACHE_DONE_FILE_SUFFIX = "_DONE";
class FileCache : public FileReader {
public:
FileCache() = default;

View File

@ -17,31 +17,132 @@
#include "io/cache/file_cache_manager.h"
#include "gutil/strings/util.h"
#include "io/cache/sub_file_cache.h"
#include "io/cache/whole_file_cache.h"
#include "io/fs/local_file_system.h"
#include "util/file_utils.h"
#include "util/string_util.h"
namespace doris {
namespace io {
void FileCacheManager::add_file_cache(const Path& cache_path, FileCachePtr file_cache) {
void FileCacheManager::add_file_cache(const std::string& cache_path, FileCachePtr file_cache) {
std::lock_guard<std::shared_mutex> wrlock(_cache_map_lock);
_file_cache_map.emplace(cache_path.native(), file_cache);
_file_cache_map.emplace(cache_path, file_cache);
}
void FileCacheManager::remove_file_cache(const Path& cache_path) {
std::lock_guard<std::shared_mutex> wrlock(_cache_map_lock);
_file_cache_map.erase(cache_path.native());
void FileCacheManager::remove_file_cache(const std::string& cache_path) {
bool cache_path_exist = false;
{
std::shared_lock<std::shared_mutex> rdlock(_cache_map_lock);
if (_file_cache_map.find(cache_path) == _file_cache_map.end()) {
bool cache_dir_exist = false;
if (global_local_filesystem()->exists(cache_path, &cache_dir_exist).ok() &&
cache_dir_exist) {
Status st = global_local_filesystem()->delete_directory(cache_path);
if (!st.ok()) {
LOG(WARNING) << st.to_string();
}
}
} else {
cache_path_exist = true;
_file_cache_map.find(cache_path)->second->clean_all_cache();
}
}
if (cache_path_exist) {
std::lock_guard<std::shared_mutex> wrlock(_cache_map_lock);
_file_cache_map.erase(cache_path);
}
}
void FileCacheManager::clean_timeout_caches() {
std::shared_lock<std::shared_mutex> rdlock(_cache_map_lock);
for (std::map<std::string, FileCachePtr>::const_iterator iter = _file_cache_map.cbegin();
iter != _file_cache_map.cend(); ++iter) {
if (iter->second == nullptr) {
continue;
}
iter->second->clean_timeout_cache();
}
}
FileCachePtr FileCacheManager::new_file_cache(const Path& cache_dir, int64_t alive_time_sec,
void FileCacheManager::clean_timeout_file_not_in_mem(const std::string& cache_path) {
time_t now = time(nullptr);
std::shared_lock<std::shared_mutex> rdlock(_cache_map_lock);
// Deal with caches not in _file_cache_map
if (_file_cache_map.find(cache_path) == _file_cache_map.end()) {
std::vector<Path> cache_file_names;
if (io::global_local_filesystem()->list(cache_path, &cache_file_names).ok()) {
std::map<std::string, bool> cache_names;
std::list<std::string> done_names;
for (Path cache_file_name : cache_file_names) {
std::string filename = cache_file_name.native();
if (ends_with(filename, CACHE_DONE_FILE_SUFFIX)) {
cache_names[filename] = true;
continue;
}
done_names.push_back(filename);
std::stringstream done_file_ss;
done_file_ss << cache_path << "/" << filename;
std::string done_file_path = done_file_ss.str();
time_t m_time;
if (!FileUtils::mtime(done_file_path, &m_time).ok()) {
continue;
}
if (now - m_time < config::file_cache_alive_time_sec) {
continue;
}
std::string cache_file_path =
StringReplace(done_file_path, CACHE_DONE_FILE_SUFFIX, "", true);
LOG(INFO) << "Delete timeout done_cache_path: " << done_file_path
<< ", cache_file_path: " << cache_file_path << ", m_time: " << m_time;
if (!io::global_local_filesystem()->delete_file(done_file_path).ok()) {
LOG(ERROR) << "delete_file failed: " << done_file_path;
continue;
}
if (!io::global_local_filesystem()->delete_file(cache_file_path).ok()) {
LOG(ERROR) << "delete_file failed: " << cache_file_path;
continue;
}
}
// find cache file without done file.
for (std::list<std::string>::iterator itr = done_names.begin(); itr != done_names.end();
++itr) {
std::string cache_filename = StringReplace(*itr, CACHE_DONE_FILE_SUFFIX, "", true);
if (cache_names.find(cache_filename) != cache_names.end()) {
cache_names.erase(cache_filename);
}
}
// remove cache file without done file
for (std::map<std::string, bool>::iterator itr = cache_names.begin();
itr != cache_names.end(); ++itr) {
std::stringstream cache_file_ss;
cache_file_ss << cache_path << "/" << itr->first;
std::string cache_file_path = cache_file_ss.str();
time_t m_time;
if (!FileUtils::mtime(cache_file_path, &m_time).ok()) {
continue;
}
if (now - m_time < config::file_cache_alive_time_sec) {
continue;
}
LOG(INFO) << "Delete cache file without done file: " << cache_file_path;
if (!io::global_local_filesystem()->delete_file(cache_file_path).ok()) {
LOG(ERROR) << "delete_file failed: " << cache_file_path;
}
}
if (io::global_local_filesystem()->list(cache_path, &cache_file_names).ok() &&
cache_file_names.size() == 0) {
if (global_local_filesystem()->delete_directory(cache_path).ok()) {
LOG(INFO) << "Delete empty dir: " << cache_path;
}
}
}
}
}
FileCachePtr FileCacheManager::new_file_cache(const std::string& cache_dir, int64_t alive_time_sec,
io::FileReaderSPtr remote_file_reader,
const std::string& file_cache_type) {
if (file_cache_type == "whole_file_cache") {
@ -53,6 +154,11 @@ FileCachePtr FileCacheManager::new_file_cache(const Path& cache_dir, int64_t ali
}
}
bool FileCacheManager::exist(const std::string& cache_path) {
std::shared_lock<std::shared_mutex> rdlock(_cache_map_lock);
return _file_cache_map.find(cache_path) != _file_cache_map.end();
}
FileCacheManager* FileCacheManager::instance() {
static FileCacheManager cache_manager;
return &cache_manager;

View File

@ -33,16 +33,20 @@ public:
static FileCacheManager* instance();
void add_file_cache(const Path& cache_path, FileCachePtr file_cache);
void add_file_cache(const std::string& cache_path, FileCachePtr file_cache);
void remove_file_cache(const Path& cache_path);
void remove_file_cache(const std::string& cache_path);
void clean_timeout_caches();
FileCachePtr new_file_cache(const Path& cache_dir, int64_t alive_time_sec,
void clean_timeout_file_not_in_mem(const std::string& cache_path);
FileCachePtr new_file_cache(const std::string& cache_dir, int64_t alive_time_sec,
io::FileReaderSPtr remote_file_reader,
const std::string& file_cache_type);
bool exist(const std::string& cache_path);
private:
std::shared_mutex _cache_map_lock;
// cache_path -> FileCache

View File

@ -26,7 +26,6 @@ namespace io {
using std::vector;
const static std::string SUB_FILE_CACHE_PREFIX = "SUB_CACHE";
const static std::string SUB_FILE_DONE_PREFIX = "SUB_CACHE_DONE";
SubFileCache::SubFileCache(const Path& cache_dir, int64_t alive_time_sec,
io::FileReaderSPtr remote_file_reader)
@ -116,7 +115,8 @@ Status SubFileCache::read_at(size_t offset, Slice result, size_t* bytes_read) {
Status SubFileCache::_generate_cache_reader(size_t offset, size_t req_size) {
Path cache_file = _cache_dir / fmt::format("{}_{}", SUB_FILE_CACHE_PREFIX, offset);
Path cache_done_file = _cache_dir / fmt::format("{}_{}", SUB_FILE_DONE_PREFIX, offset);
Path cache_done_file = _cache_dir / fmt::format("{}_{}{}", SUB_FILE_CACHE_PREFIX, offset,
CACHE_DONE_FILE_SUFFIX);
bool done_file_exist = false;
RETURN_NOT_OK_STATUS_WITH_WARN(
io::global_local_filesystem()->exists(cache_done_file, &done_file_exist),
@ -259,7 +259,8 @@ Status SubFileCache::_clean_cache_internal(size_t offset) {
}
_cache_file_size = 0;
Path cache_file = _cache_dir / fmt::format("{}_{}", SUB_FILE_CACHE_PREFIX, offset);
Path done_file = _cache_dir / fmt::format("{}_{}", SUB_FILE_DONE_PREFIX, offset);
Path done_file = _cache_dir /
fmt::format("{}_{}{}", SUB_FILE_CACHE_PREFIX, offset, CACHE_DONE_FILE_SUFFIX);
bool done_file_exist = false;
RETURN_NOT_OK_STATUS_WITH_WARN(
io::global_local_filesystem()->exists(done_file, &done_file_exist),

View File

@ -23,7 +23,6 @@ namespace doris {
namespace io {
const static std::string WHOLE_FILE_CACHE_NAME = "WHOLE_FILE_CACHE";
const static std::string WHOLE_FILE_CACHE_DONE_NAME = "WHOLE_FILE_CACHE_DONE";
WholeFileCache::WholeFileCache(const Path& cache_dir, int64_t alive_time_sec,
io::FileReaderSPtr remote_file_reader)
@ -55,7 +54,8 @@ Status WholeFileCache::read_at(size_t offset, Slice result, size_t* bytes_read)
Status WholeFileCache::_generate_cache_reader(size_t offset, size_t req_size) {
std::unique_lock<std::shared_mutex> wrlock(_cache_lock);
Path cache_file = _cache_dir / WHOLE_FILE_CACHE_NAME;
Path cache_done_file = _cache_dir / WHOLE_FILE_CACHE_DONE_NAME;
Path cache_done_file =
_cache_dir / fmt::format("{}{}", WHOLE_FILE_CACHE_NAME, CACHE_DONE_FILE_SUFFIX);
bool done_file_exist = false;
RETURN_NOT_OK_STATUS_WITH_WARN(
io::global_local_filesystem()->exists(cache_done_file, &done_file_exist),
@ -175,7 +175,8 @@ Status WholeFileCache::_clean_cache_internal() {
_cache_file_reader.reset();
_cache_file_size = 0;
Path cache_file = _cache_dir / WHOLE_FILE_CACHE_NAME;
Path done_file = _cache_dir / WHOLE_FILE_CACHE_DONE_NAME;
Path done_file =
_cache_dir / fmt::format("{}{}", WHOLE_FILE_CACHE_NAME, CACHE_DONE_FILE_SUFFIX);
bool done_file_exist = false;
RETURN_NOT_OK_STATUS_WITH_WARN(
io::global_local_filesystem()->exists(done_file, &done_file_exist),

View File

@ -28,16 +28,21 @@
#include "agent/cgroups_mgr.h"
#include "common/status.h"
#include "gutil/strings/substitute.h"
#include "io/cache/file_cache_manager.h"
#include "olap/cumulative_compaction.h"
#include "olap/olap_common.h"
#include "olap/olap_define.h"
#include "olap/storage_engine.h"
#include "util/file_utils.h"
#include "util/time.h"
using std::string;
namespace doris {
using io::FileCacheManager;
using io::Path;
// number of running SCHEMA-CHANGE threads
volatile uint32_t g_schema_change_active_threads = 0;
@ -143,6 +148,12 @@ Status StorageEngine::start_bg_threads() {
&_cooldown_tasks_producer_thread));
LOG(INFO) << "cooldown tasks producer thread started";
RETURN_IF_ERROR(Thread::create(
"StorageEngine", "cache_file_cleaner_tasks_producer_thread",
[this]() { this->_cache_file_cleaner_tasks_producer_callback(); },
&_cache_file_cleaner_tasks_producer_thread));
LOG(INFO) << "cache file cleaner tasks producer thread started";
// add tablet publish version thread pool
ThreadPoolBuilder("TabletPublishTxnThreadPool")
.set_min_threads(config::tablet_publish_txn_max_thread)
@ -743,4 +754,33 @@ void StorageEngine::_cooldown_tasks_producer_callback() {
} while (!_stop_background_threads_latch.wait_for(std::chrono::seconds(interval)));
}
void StorageEngine::_cache_file_cleaner_tasks_producer_callback() {
int64_t interval = config::generate_cache_cleaner_task_interval_sec;
do {
LOG(INFO) << "Begin to Clean cache files";
FileCacheManager::instance()->clean_timeout_caches();
std::vector<TabletSharedPtr> tablets =
StorageEngine::instance()->tablet_manager()->get_all_tablet();
for (const auto& tablet : tablets) {
std::vector<Path> seg_file_paths;
if (io::global_local_filesystem()->list(tablet->tablet_path(), &seg_file_paths).ok()) {
for (Path seg_file : seg_file_paths) {
std::string seg_filename = seg_file.native();
// check if it is a dir name
if (ends_with(seg_filename, ".dat")) {
continue;
}
std::stringstream ss;
ss << tablet->tablet_path() << "/" << seg_filename;
std::string cache_path = ss.str();
if (FileCacheManager::instance()->exist(cache_path)) {
continue;
}
FileCacheManager::instance()->clean_timeout_file_not_in_mem(cache_path);
}
}
}
} while (!_stop_background_threads_latch.wait_for(std::chrono::seconds(interval)));
}
} // namespace doris

View File

@ -25,6 +25,7 @@
#include "common/status.h"
#include "gutil/strings/substitute.h"
#include "io/cache/file_cache_manager.h"
#include "io/fs/s3_file_system.h"
#include "olap/olap_define.h"
#include "olap/rowset/beta_rowset_reader.h"
@ -34,6 +35,8 @@
namespace doris {
using io::FileCacheManager;
std::string BetaRowset::segment_file_path(int segment_id) {
if (is_local()) {
return local_segment_path(_tablet_path, rowset_id(), segment_id);
@ -65,6 +68,12 @@ std::string BetaRowset::remote_segment_path(int64_t tablet_id, const RowsetId& r
segment_id);
}
std::string BetaRowset::local_cache_path(const std::string& tablet_path, const RowsetId& rowset_id,
int segment_id) {
// {root_path}/data/{shard_id}/{tablet_id}/{schema_hash}/{rowset_id}_{seg_num}
return fmt::format("{}/{}_{}", tablet_path, rowset_id.to_string(), segment_id);
}
BetaRowset::BetaRowset(TabletSchemaSPtr schema, const std::string& tablet_path,
RowsetMetaSharedPtr rowset_meta)
: Rowset(schema, tablet_path, std::move(rowset_meta)) {}
@ -151,6 +160,10 @@ Status BetaRowset::remove() {
LOG(WARNING) << st.to_string();
success = false;
}
if (fs->type() != io::FileSystemType::LOCAL) {
auto cache_path = segment_cache_path(i);
FileCacheManager::instance()->remove_file_cache(cache_path);
}
}
if (!success) {
LOG(WARNING) << "failed to remove files in rowset " << unique_id();

View File

@ -55,6 +55,9 @@ public:
static std::string remote_segment_path(int64_t tablet_id, const std::string& rowset_id,
int segment_id);
static std::string local_cache_path(const std::string& tablet_path, const RowsetId& rowset_id,
int segment_id);
Status split_range(const RowCursor& start_key, const RowCursor& end_key,
uint64_t request_block_row_count, size_t key_num,
std::vector<OlapTuple>* ranges) override;

View File

@ -49,10 +49,11 @@ Status Segment::open(io::FileSystem* fs, const std::string& path, const std::str
if (config::file_cache_type.empty()) {
segment->_file_reader = std::move(file_reader);
} else {
io::FileReaderSPtr cache_reader = FileCacheManager::instance()->new_file_cache(
io::FileCachePtr cache_reader = FileCacheManager::instance()->new_file_cache(
cache_path, config::file_cache_alive_time_sec, file_reader,
config::file_cache_type);
segment->_file_reader = std::move(cache_reader);
segment->_file_reader = cache_reader;
FileCacheManager::instance()->add_file_cache(cache_path, cache_reader);
}
RETURN_IF_ERROR(segment->_open());
*output = std::move(segment);

View File

@ -276,6 +276,8 @@ private:
void _cooldown_tasks_producer_callback();
void _cache_file_cleaner_tasks_producer_callback();
private:
struct CompactionCandidate {
CompactionCandidate(uint32_t nicumulative_compaction_, int64_t tablet_id_, uint32_t index_)
@ -399,6 +401,8 @@ private:
scoped_refptr<Thread> _cooldown_tasks_producer_thread;
scoped_refptr<Thread> _cache_file_cleaner_tasks_producer_thread;
std::unique_ptr<ThreadPool> _cooldown_thread_pool;
std::mutex _running_cooldown_mutex;

View File

@ -211,6 +211,21 @@ Status FileUtils::md5sum(const std::string& file, std::string* md5sum) {
return Status::OK();
}
Status FileUtils::mtime(const std::string& file, time_t* m_time) {
int fd = open(file.c_str(), O_RDONLY);
if (fd < 0) {
return Status::InternalError("failed to open file");
}
Defer defer {[&]() { close(fd); }};
struct stat statbuf;
if (fstat(fd, &statbuf) < 0) {
return Status::InternalError("failed to stat file");
}
*m_time = statbuf.st_mtime;
return Status::OK();
}
bool FileUtils::check_exist(const std::string& path) {
return Env::Default()->path_exists(path).ok();
}

View File

@ -96,6 +96,8 @@ public:
// calc md5sum of a local file
static Status md5sum(const std::string& file, std::string* md5sum);
static Status mtime(const std::string& file, time_t* m_time);
// check path(file or directory) exist with default env
static bool check_exist(const std::string& path);

View File

@ -1588,3 +1588,22 @@ Translated with www.DeepL.com/Translator (free version)
* Description: at least the number of versions to be compaction, and the number of rowsets with a small amount of data in the selection. If it is greater than this value, the real compaction will be carried out
* Default: 10
### `generate_cache_cleaner_task_interval_sec`
* Type:int64
* Description:Cleaning interval of cache files, in seconds
* Default:43200(12 hours)
### `file_cache_type`
* Type:string
* Description:Type of cache file. whole_ file_ Cache: download the entire segment file, sub_ file_ Cache: the segment file is divided into multiple files by size.
* Default:""
### `max_sub_cache_file_size`
* Type:int64
* Description:Cache files using sub_ file_ The maximum size of the split file during cache, unit: B
* Default:104857600(100MB)
### `file_cache_alive_time_sec`
* Type:int64
* Description:Save time of cache file, in seconds
* Default:604800(1 week)

View File

@ -1612,3 +1612,22 @@ webserver默认工作线程数
* 描述: 最少进行合并的版本数,当选中的小数据量的rowset个数,大于这个值是才会进行真正的合并
* 默认值: 10
### `generate_cache_cleaner_task_interval_sec`
* 类型:int64
* 描述:缓存文件的清理间隔,单位:秒
* 默认值:43200(12小时)
### `file_cache_type`
* 类型:string
* 描述:缓存文件的类型。whole_file_cache:将segment文件整个下载,sub_file_cache:将segment文件按大小切分成多个文件。
* 默认值:""
### `max_sub_cache_file_size`
* 类型:int64
* 描述:缓存文件使用sub_file_cache时,切分文件的最大大小,单位B
* 默认值:104857600(100MB)
### `file_cache_alive_time_sec`
* 类型:int64
* 描述:缓存文件的保存时间,单位:秒
* 默认值:604800(1个星期)