diff --git a/be/src/common/config.h b/be/src/common/config.h index db103062fc..a293dc64a4 100644 --- a/be/src/common/config.h +++ b/be/src/common/config.h @@ -824,10 +824,11 @@ CONF_Int32(quick_compaction_min_rowsets, "10"); // cooldown task configs CONF_Int32(cooldown_thread_num, "5"); CONF_mInt64(generate_cooldown_task_interval_sec, "20"); +CONF_mInt64(generate_cache_cleaner_task_interval_sec, "43200"); // 12 h CONF_Int32(concurrency_per_dir, "2"); -CONF_mInt64(cooldown_lag_time_sec, "10800"); // 3h -CONF_mInt64(max_sub_cache_file_size, "1073741824"); // 1GB -CONF_mInt64(file_cache_alive_time_sec, "604800"); // 1 week +CONF_mInt64(cooldown_lag_time_sec, "10800"); // 3h +CONF_mInt64(max_sub_cache_file_size, "104857600"); // 100MB +CONF_mInt64(file_cache_alive_time_sec, "604800"); // 1 week // file_cache_type is used to set the type of file cache for remote files. // "": no cache, "sub_file_cache": split sub files from remote file. // "whole_file_cache": the whole file. diff --git a/be/src/io/cache/file_cache.h b/be/src/io/cache/file_cache.h index fee37505bd..d68c840cc7 100644 --- a/be/src/io/cache/file_cache.h +++ b/be/src/io/cache/file_cache.h @@ -27,6 +27,8 @@ namespace doris { namespace io { +const std::string CACHE_DONE_FILE_SUFFIX = "_DONE"; + class FileCache : public FileReader { public: FileCache() = default; diff --git a/be/src/io/cache/file_cache_manager.cpp b/be/src/io/cache/file_cache_manager.cpp index 103a8ee0c0..d65e9d75d6 100644 --- a/be/src/io/cache/file_cache_manager.cpp +++ b/be/src/io/cache/file_cache_manager.cpp @@ -17,31 +17,132 @@ #include "io/cache/file_cache_manager.h" +#include "gutil/strings/util.h" #include "io/cache/sub_file_cache.h" #include "io/cache/whole_file_cache.h" +#include "io/fs/local_file_system.h" +#include "util/file_utils.h" +#include "util/string_util.h" namespace doris { namespace io { -void FileCacheManager::add_file_cache(const Path& cache_path, FileCachePtr file_cache) { +void FileCacheManager::add_file_cache(const std::string& cache_path, FileCachePtr file_cache) { std::lock_guard wrlock(_cache_map_lock); - _file_cache_map.emplace(cache_path.native(), file_cache); + _file_cache_map.emplace(cache_path, file_cache); } -void FileCacheManager::remove_file_cache(const Path& cache_path) { - std::lock_guard wrlock(_cache_map_lock); - _file_cache_map.erase(cache_path.native()); +void FileCacheManager::remove_file_cache(const std::string& cache_path) { + bool cache_path_exist = false; + { + std::shared_lock rdlock(_cache_map_lock); + if (_file_cache_map.find(cache_path) == _file_cache_map.end()) { + bool cache_dir_exist = false; + if (global_local_filesystem()->exists(cache_path, &cache_dir_exist).ok() && + cache_dir_exist) { + Status st = global_local_filesystem()->delete_directory(cache_path); + if (!st.ok()) { + LOG(WARNING) << st.to_string(); + } + } + } else { + cache_path_exist = true; + _file_cache_map.find(cache_path)->second->clean_all_cache(); + } + } + if (cache_path_exist) { + std::lock_guard wrlock(_cache_map_lock); + _file_cache_map.erase(cache_path); + } } void FileCacheManager::clean_timeout_caches() { std::shared_lock rdlock(_cache_map_lock); for (std::map::const_iterator iter = _file_cache_map.cbegin(); iter != _file_cache_map.cend(); ++iter) { + if (iter->second == nullptr) { + continue; + } iter->second->clean_timeout_cache(); } } -FileCachePtr FileCacheManager::new_file_cache(const Path& cache_dir, int64_t alive_time_sec, +void FileCacheManager::clean_timeout_file_not_in_mem(const std::string& cache_path) { + time_t now = time(nullptr); + std::shared_lock rdlock(_cache_map_lock); + // Deal with caches not in _file_cache_map + if (_file_cache_map.find(cache_path) == _file_cache_map.end()) { + std::vector cache_file_names; + if (io::global_local_filesystem()->list(cache_path, &cache_file_names).ok()) { + std::map cache_names; + std::list done_names; + for (Path cache_file_name : cache_file_names) { + std::string filename = cache_file_name.native(); + if (ends_with(filename, CACHE_DONE_FILE_SUFFIX)) { + cache_names[filename] = true; + continue; + } + done_names.push_back(filename); + std::stringstream done_file_ss; + done_file_ss << cache_path << "/" << filename; + std::string done_file_path = done_file_ss.str(); + time_t m_time; + if (!FileUtils::mtime(done_file_path, &m_time).ok()) { + continue; + } + if (now - m_time < config::file_cache_alive_time_sec) { + continue; + } + std::string cache_file_path = + StringReplace(done_file_path, CACHE_DONE_FILE_SUFFIX, "", true); + LOG(INFO) << "Delete timeout done_cache_path: " << done_file_path + << ", cache_file_path: " << cache_file_path << ", m_time: " << m_time; + if (!io::global_local_filesystem()->delete_file(done_file_path).ok()) { + LOG(ERROR) << "delete_file failed: " << done_file_path; + continue; + } + if (!io::global_local_filesystem()->delete_file(cache_file_path).ok()) { + LOG(ERROR) << "delete_file failed: " << cache_file_path; + continue; + } + } + // find cache file without done file. + for (std::list::iterator itr = done_names.begin(); itr != done_names.end(); + ++itr) { + std::string cache_filename = StringReplace(*itr, CACHE_DONE_FILE_SUFFIX, "", true); + if (cache_names.find(cache_filename) != cache_names.end()) { + cache_names.erase(cache_filename); + } + } + // remove cache file without done file + for (std::map::iterator itr = cache_names.begin(); + itr != cache_names.end(); ++itr) { + std::stringstream cache_file_ss; + cache_file_ss << cache_path << "/" << itr->first; + std::string cache_file_path = cache_file_ss.str(); + time_t m_time; + if (!FileUtils::mtime(cache_file_path, &m_time).ok()) { + continue; + } + if (now - m_time < config::file_cache_alive_time_sec) { + continue; + } + LOG(INFO) << "Delete cache file without done file: " << cache_file_path; + if (!io::global_local_filesystem()->delete_file(cache_file_path).ok()) { + LOG(ERROR) << "delete_file failed: " << cache_file_path; + } + } + if (io::global_local_filesystem()->list(cache_path, &cache_file_names).ok() && + cache_file_names.size() == 0) { + if (global_local_filesystem()->delete_directory(cache_path).ok()) { + LOG(INFO) << "Delete empty dir: " << cache_path; + } + } + } + } +} + +FileCachePtr FileCacheManager::new_file_cache(const std::string& cache_dir, int64_t alive_time_sec, io::FileReaderSPtr remote_file_reader, const std::string& file_cache_type) { if (file_cache_type == "whole_file_cache") { @@ -53,6 +154,11 @@ FileCachePtr FileCacheManager::new_file_cache(const Path& cache_dir, int64_t ali } } +bool FileCacheManager::exist(const std::string& cache_path) { + std::shared_lock rdlock(_cache_map_lock); + return _file_cache_map.find(cache_path) != _file_cache_map.end(); +} + FileCacheManager* FileCacheManager::instance() { static FileCacheManager cache_manager; return &cache_manager; diff --git a/be/src/io/cache/file_cache_manager.h b/be/src/io/cache/file_cache_manager.h index 4a3e31ba92..c4195ab59b 100644 --- a/be/src/io/cache/file_cache_manager.h +++ b/be/src/io/cache/file_cache_manager.h @@ -33,16 +33,20 @@ public: static FileCacheManager* instance(); - void add_file_cache(const Path& cache_path, FileCachePtr file_cache); + void add_file_cache(const std::string& cache_path, FileCachePtr file_cache); - void remove_file_cache(const Path& cache_path); + void remove_file_cache(const std::string& cache_path); void clean_timeout_caches(); - FileCachePtr new_file_cache(const Path& cache_dir, int64_t alive_time_sec, + void clean_timeout_file_not_in_mem(const std::string& cache_path); + + FileCachePtr new_file_cache(const std::string& cache_dir, int64_t alive_time_sec, io::FileReaderSPtr remote_file_reader, const std::string& file_cache_type); + bool exist(const std::string& cache_path); + private: std::shared_mutex _cache_map_lock; // cache_path -> FileCache diff --git a/be/src/io/cache/sub_file_cache.cpp b/be/src/io/cache/sub_file_cache.cpp index 8bf75f9f38..c647eeb60a 100644 --- a/be/src/io/cache/sub_file_cache.cpp +++ b/be/src/io/cache/sub_file_cache.cpp @@ -26,7 +26,6 @@ namespace io { using std::vector; const static std::string SUB_FILE_CACHE_PREFIX = "SUB_CACHE"; -const static std::string SUB_FILE_DONE_PREFIX = "SUB_CACHE_DONE"; SubFileCache::SubFileCache(const Path& cache_dir, int64_t alive_time_sec, io::FileReaderSPtr remote_file_reader) @@ -116,7 +115,8 @@ Status SubFileCache::read_at(size_t offset, Slice result, size_t* bytes_read) { Status SubFileCache::_generate_cache_reader(size_t offset, size_t req_size) { Path cache_file = _cache_dir / fmt::format("{}_{}", SUB_FILE_CACHE_PREFIX, offset); - Path cache_done_file = _cache_dir / fmt::format("{}_{}", SUB_FILE_DONE_PREFIX, offset); + Path cache_done_file = _cache_dir / fmt::format("{}_{}{}", SUB_FILE_CACHE_PREFIX, offset, + CACHE_DONE_FILE_SUFFIX); bool done_file_exist = false; RETURN_NOT_OK_STATUS_WITH_WARN( io::global_local_filesystem()->exists(cache_done_file, &done_file_exist), @@ -259,7 +259,8 @@ Status SubFileCache::_clean_cache_internal(size_t offset) { } _cache_file_size = 0; Path cache_file = _cache_dir / fmt::format("{}_{}", SUB_FILE_CACHE_PREFIX, offset); - Path done_file = _cache_dir / fmt::format("{}_{}", SUB_FILE_DONE_PREFIX, offset); + Path done_file = _cache_dir / + fmt::format("{}_{}{}", SUB_FILE_CACHE_PREFIX, offset, CACHE_DONE_FILE_SUFFIX); bool done_file_exist = false; RETURN_NOT_OK_STATUS_WITH_WARN( io::global_local_filesystem()->exists(done_file, &done_file_exist), diff --git a/be/src/io/cache/whole_file_cache.cpp b/be/src/io/cache/whole_file_cache.cpp index b835b51114..2c450d26e3 100644 --- a/be/src/io/cache/whole_file_cache.cpp +++ b/be/src/io/cache/whole_file_cache.cpp @@ -23,7 +23,6 @@ namespace doris { namespace io { const static std::string WHOLE_FILE_CACHE_NAME = "WHOLE_FILE_CACHE"; -const static std::string WHOLE_FILE_CACHE_DONE_NAME = "WHOLE_FILE_CACHE_DONE"; WholeFileCache::WholeFileCache(const Path& cache_dir, int64_t alive_time_sec, io::FileReaderSPtr remote_file_reader) @@ -55,7 +54,8 @@ Status WholeFileCache::read_at(size_t offset, Slice result, size_t* bytes_read) Status WholeFileCache::_generate_cache_reader(size_t offset, size_t req_size) { std::unique_lock wrlock(_cache_lock); Path cache_file = _cache_dir / WHOLE_FILE_CACHE_NAME; - Path cache_done_file = _cache_dir / WHOLE_FILE_CACHE_DONE_NAME; + Path cache_done_file = + _cache_dir / fmt::format("{}{}", WHOLE_FILE_CACHE_NAME, CACHE_DONE_FILE_SUFFIX); bool done_file_exist = false; RETURN_NOT_OK_STATUS_WITH_WARN( io::global_local_filesystem()->exists(cache_done_file, &done_file_exist), @@ -175,7 +175,8 @@ Status WholeFileCache::_clean_cache_internal() { _cache_file_reader.reset(); _cache_file_size = 0; Path cache_file = _cache_dir / WHOLE_FILE_CACHE_NAME; - Path done_file = _cache_dir / WHOLE_FILE_CACHE_DONE_NAME; + Path done_file = + _cache_dir / fmt::format("{}{}", WHOLE_FILE_CACHE_NAME, CACHE_DONE_FILE_SUFFIX); bool done_file_exist = false; RETURN_NOT_OK_STATUS_WITH_WARN( io::global_local_filesystem()->exists(done_file, &done_file_exist), diff --git a/be/src/olap/olap_server.cpp b/be/src/olap/olap_server.cpp index 5fa1ec6837..d35ec4d712 100644 --- a/be/src/olap/olap_server.cpp +++ b/be/src/olap/olap_server.cpp @@ -28,16 +28,21 @@ #include "agent/cgroups_mgr.h" #include "common/status.h" #include "gutil/strings/substitute.h" +#include "io/cache/file_cache_manager.h" #include "olap/cumulative_compaction.h" #include "olap/olap_common.h" #include "olap/olap_define.h" #include "olap/storage_engine.h" +#include "util/file_utils.h" #include "util/time.h" using std::string; namespace doris { +using io::FileCacheManager; +using io::Path; + // number of running SCHEMA-CHANGE threads volatile uint32_t g_schema_change_active_threads = 0; @@ -143,6 +148,12 @@ Status StorageEngine::start_bg_threads() { &_cooldown_tasks_producer_thread)); LOG(INFO) << "cooldown tasks producer thread started"; + RETURN_IF_ERROR(Thread::create( + "StorageEngine", "cache_file_cleaner_tasks_producer_thread", + [this]() { this->_cache_file_cleaner_tasks_producer_callback(); }, + &_cache_file_cleaner_tasks_producer_thread)); + LOG(INFO) << "cache file cleaner tasks producer thread started"; + // add tablet publish version thread pool ThreadPoolBuilder("TabletPublishTxnThreadPool") .set_min_threads(config::tablet_publish_txn_max_thread) @@ -743,4 +754,33 @@ void StorageEngine::_cooldown_tasks_producer_callback() { } while (!_stop_background_threads_latch.wait_for(std::chrono::seconds(interval))); } +void StorageEngine::_cache_file_cleaner_tasks_producer_callback() { + int64_t interval = config::generate_cache_cleaner_task_interval_sec; + do { + LOG(INFO) << "Begin to Clean cache files"; + FileCacheManager::instance()->clean_timeout_caches(); + std::vector tablets = + StorageEngine::instance()->tablet_manager()->get_all_tablet(); + for (const auto& tablet : tablets) { + std::vector seg_file_paths; + if (io::global_local_filesystem()->list(tablet->tablet_path(), &seg_file_paths).ok()) { + for (Path seg_file : seg_file_paths) { + std::string seg_filename = seg_file.native(); + // check if it is a dir name + if (ends_with(seg_filename, ".dat")) { + continue; + } + std::stringstream ss; + ss << tablet->tablet_path() << "/" << seg_filename; + std::string cache_path = ss.str(); + if (FileCacheManager::instance()->exist(cache_path)) { + continue; + } + FileCacheManager::instance()->clean_timeout_file_not_in_mem(cache_path); + } + } + } + } while (!_stop_background_threads_latch.wait_for(std::chrono::seconds(interval))); +} + } // namespace doris diff --git a/be/src/olap/rowset/beta_rowset.cpp b/be/src/olap/rowset/beta_rowset.cpp index ebe9ce4e27..b6d02e2293 100644 --- a/be/src/olap/rowset/beta_rowset.cpp +++ b/be/src/olap/rowset/beta_rowset.cpp @@ -25,6 +25,7 @@ #include "common/status.h" #include "gutil/strings/substitute.h" +#include "io/cache/file_cache_manager.h" #include "io/fs/s3_file_system.h" #include "olap/olap_define.h" #include "olap/rowset/beta_rowset_reader.h" @@ -34,6 +35,8 @@ namespace doris { +using io::FileCacheManager; + std::string BetaRowset::segment_file_path(int segment_id) { if (is_local()) { return local_segment_path(_tablet_path, rowset_id(), segment_id); @@ -65,6 +68,12 @@ std::string BetaRowset::remote_segment_path(int64_t tablet_id, const RowsetId& r segment_id); } +std::string BetaRowset::local_cache_path(const std::string& tablet_path, const RowsetId& rowset_id, + int segment_id) { + // {root_path}/data/{shard_id}/{tablet_id}/{schema_hash}/{rowset_id}_{seg_num} + return fmt::format("{}/{}_{}", tablet_path, rowset_id.to_string(), segment_id); +} + BetaRowset::BetaRowset(TabletSchemaSPtr schema, const std::string& tablet_path, RowsetMetaSharedPtr rowset_meta) : Rowset(schema, tablet_path, std::move(rowset_meta)) {} @@ -151,6 +160,10 @@ Status BetaRowset::remove() { LOG(WARNING) << st.to_string(); success = false; } + if (fs->type() != io::FileSystemType::LOCAL) { + auto cache_path = segment_cache_path(i); + FileCacheManager::instance()->remove_file_cache(cache_path); + } } if (!success) { LOG(WARNING) << "failed to remove files in rowset " << unique_id(); diff --git a/be/src/olap/rowset/beta_rowset.h b/be/src/olap/rowset/beta_rowset.h index c5b23e4fe8..dde891d6cb 100644 --- a/be/src/olap/rowset/beta_rowset.h +++ b/be/src/olap/rowset/beta_rowset.h @@ -55,6 +55,9 @@ public: static std::string remote_segment_path(int64_t tablet_id, const std::string& rowset_id, int segment_id); + static std::string local_cache_path(const std::string& tablet_path, const RowsetId& rowset_id, + int segment_id); + Status split_range(const RowCursor& start_key, const RowCursor& end_key, uint64_t request_block_row_count, size_t key_num, std::vector* ranges) override; diff --git a/be/src/olap/rowset/segment_v2/segment.cpp b/be/src/olap/rowset/segment_v2/segment.cpp index b637487f42..6e44914911 100644 --- a/be/src/olap/rowset/segment_v2/segment.cpp +++ b/be/src/olap/rowset/segment_v2/segment.cpp @@ -49,10 +49,11 @@ Status Segment::open(io::FileSystem* fs, const std::string& path, const std::str if (config::file_cache_type.empty()) { segment->_file_reader = std::move(file_reader); } else { - io::FileReaderSPtr cache_reader = FileCacheManager::instance()->new_file_cache( + io::FileCachePtr cache_reader = FileCacheManager::instance()->new_file_cache( cache_path, config::file_cache_alive_time_sec, file_reader, config::file_cache_type); - segment->_file_reader = std::move(cache_reader); + segment->_file_reader = cache_reader; + FileCacheManager::instance()->add_file_cache(cache_path, cache_reader); } RETURN_IF_ERROR(segment->_open()); *output = std::move(segment); diff --git a/be/src/olap/storage_engine.h b/be/src/olap/storage_engine.h index 9ad9d4a8d7..198aba8aaf 100644 --- a/be/src/olap/storage_engine.h +++ b/be/src/olap/storage_engine.h @@ -276,6 +276,8 @@ private: void _cooldown_tasks_producer_callback(); + void _cache_file_cleaner_tasks_producer_callback(); + private: struct CompactionCandidate { CompactionCandidate(uint32_t nicumulative_compaction_, int64_t tablet_id_, uint32_t index_) @@ -399,6 +401,8 @@ private: scoped_refptr _cooldown_tasks_producer_thread; + scoped_refptr _cache_file_cleaner_tasks_producer_thread; + std::unique_ptr _cooldown_thread_pool; std::mutex _running_cooldown_mutex; diff --git a/be/src/util/file_utils.cpp b/be/src/util/file_utils.cpp index 014d344f0c..ae3b2837d0 100644 --- a/be/src/util/file_utils.cpp +++ b/be/src/util/file_utils.cpp @@ -211,6 +211,21 @@ Status FileUtils::md5sum(const std::string& file, std::string* md5sum) { return Status::OK(); } +Status FileUtils::mtime(const std::string& file, time_t* m_time) { + int fd = open(file.c_str(), O_RDONLY); + if (fd < 0) { + return Status::InternalError("failed to open file"); + } + + Defer defer {[&]() { close(fd); }}; + struct stat statbuf; + if (fstat(fd, &statbuf) < 0) { + return Status::InternalError("failed to stat file"); + } + *m_time = statbuf.st_mtime; + return Status::OK(); +} + bool FileUtils::check_exist(const std::string& path) { return Env::Default()->path_exists(path).ok(); } diff --git a/be/src/util/file_utils.h b/be/src/util/file_utils.h index ae64c93dab..a6419ff39b 100644 --- a/be/src/util/file_utils.h +++ b/be/src/util/file_utils.h @@ -96,6 +96,8 @@ public: // calc md5sum of a local file static Status md5sum(const std::string& file, std::string* md5sum); + static Status mtime(const std::string& file, time_t* m_time); + // check path(file or directory) exist with default env static bool check_exist(const std::string& path); diff --git a/docs/en/docs/admin-manual/config/be-config.md b/docs/en/docs/admin-manual/config/be-config.md index 157bef854c..9b9f24b39c 100644 --- a/docs/en/docs/admin-manual/config/be-config.md +++ b/docs/en/docs/admin-manual/config/be-config.md @@ -1588,3 +1588,22 @@ Translated with www.DeepL.com/Translator (free version) * Description: at least the number of versions to be compaction, and the number of rowsets with a small amount of data in the selection. If it is greater than this value, the real compaction will be carried out * Default: 10 +### `generate_cache_cleaner_task_interval_sec` +* Type:int64 +* Description:Cleaning interval of cache files, in seconds +* Default:43200(12 hours) + +### `file_cache_type` +* Type:string +* Description:Type of cache file. whole_ file_ Cache: download the entire segment file, sub_ file_ Cache: the segment file is divided into multiple files by size. +* Default:"" + +### `max_sub_cache_file_size` +* Type:int64 +* Description:Cache files using sub_ file_ The maximum size of the split file during cache, unit: B +* Default:104857600(100MB) + +### `file_cache_alive_time_sec` +* Type:int64 +* Description:Save time of cache file, in seconds +* Default:604800(1 week) diff --git a/docs/zh-CN/docs/admin-manual/config/be-config.md b/docs/zh-CN/docs/admin-manual/config/be-config.md index 1ac0aa720d..c94cda7b76 100644 --- a/docs/zh-CN/docs/admin-manual/config/be-config.md +++ b/docs/zh-CN/docs/admin-manual/config/be-config.md @@ -1612,3 +1612,22 @@ webserver默认工作线程数 * 描述: 最少进行合并的版本数,当选中的小数据量的rowset个数,大于这个值是才会进行真正的合并 * 默认值: 10 +### `generate_cache_cleaner_task_interval_sec` +* 类型:int64 +* 描述:缓存文件的清理间隔,单位:秒 +* 默认值:43200(12小时) + +### `file_cache_type` +* 类型:string +* 描述:缓存文件的类型。whole_file_cache:将segment文件整个下载,sub_file_cache:将segment文件按大小切分成多个文件。 +* 默认值:"" + +### `max_sub_cache_file_size` +* 类型:int64 +* 描述:缓存文件使用sub_file_cache时,切分文件的最大大小,单位B +* 默认值:104857600(100MB) + +### `file_cache_alive_time_sec` +* 类型:int64 +* 描述:缓存文件的保存时间,单位:秒 +* 默认值:604800(1个星期)