[opt](filecache) use weak_ptr to cache the file handle of file segment (#21975)

Use weak_ptr to cache the file handle of file segment. The max cached number of file handles can be configured by `file_cache_max_file_reader_cache_size`, default `1000000`.
Users can inspect the number of cached file handles by request BE metrics: `http://be_host:be_webserver_port/metrics`:
```
# TYPE doris_be_file_cache_segment_reader_cache_size gauge
doris_be_file_cache_segment_reader_cache_size{path="/mnt/datadisk1/gaoxin/file_cache"} 2500
```
This commit is contained in:
Ashin Gau
2023-07-24 19:09:27 +08:00
committed by GitHub
parent 3ba3690f93
commit 30c21789c8
9 changed files with 238 additions and 13 deletions

View File

@ -1009,6 +1009,7 @@ DEFINE_mInt32(s3_write_buffer_size, "5242880");
// can at most buffer 50MB data. And the num of multi part upload task is
// s3_write_buffer_whole_size / s3_write_buffer_size
DEFINE_mInt32(s3_write_buffer_whole_size, "524288000");
DEFINE_mInt64(file_cache_max_file_reader_cache_size, "1000000");
//disable shrink memory by default
DEFINE_Bool(enable_shrink_memory, "false");

View File

@ -1029,6 +1029,8 @@ DECLARE_mInt32(s3_write_buffer_size);
// can at most buffer 50MB data. And the num of multi part upload task is
// s3_write_buffer_whole_size / s3_write_buffer_size
DECLARE_mInt32(s3_write_buffer_whole_size);
// the max number of cached file handle for block segemnt
DECLARE_mInt64(file_cache_max_file_reader_cache_size);
//enable shrink memory
DECLARE_Bool(enable_shrink_memory);
// enable cache for high concurrent point query work load

View File

@ -162,5 +162,48 @@ void IFileCache::QueryFileCacheContext::reserve(const Key& key, size_t offset, s
}
}
void IFileCache::set_read_only(bool read_only) {
s_read_only = read_only;
if (read_only) {
std::lock_guard lock(s_file_reader_cache_mtx);
s_file_reader_cache.clear();
s_file_name_to_reader.clear();
}
}
std::weak_ptr<FileReader> IFileCache::cache_file_reader(const AccessKeyAndOffset& key,
std::shared_ptr<FileReader> file_reader) {
std::weak_ptr<FileReader> wp;
if (!s_read_only) [[likely]] {
std::lock_guard lock(s_file_reader_cache_mtx);
if (config::file_cache_max_file_reader_cache_size == s_file_reader_cache.size()) {
s_file_name_to_reader.erase(s_file_reader_cache.back().first);
s_file_reader_cache.pop_back();
}
wp = file_reader;
s_file_reader_cache.emplace_front(key, std::move(file_reader));
s_file_name_to_reader.insert(std::make_pair(key, s_file_reader_cache.begin()));
}
return wp;
}
void IFileCache::remove_file_reader(const AccessKeyAndOffset& key) {
std::lock_guard lock(s_file_reader_cache_mtx);
if (auto iter = s_file_name_to_reader.find(key); iter != s_file_name_to_reader.end()) {
s_file_reader_cache.erase(iter->second);
s_file_name_to_reader.erase(key);
}
}
bool IFileCache::contains_file_reader(const AccessKeyAndOffset& key) {
std::lock_guard lock(s_file_reader_cache_mtx);
return s_file_name_to_reader.find(key) != s_file_name_to_reader.end();
}
size_t IFileCache::file_reader_cache_size() {
std::lock_guard lock(s_file_reader_cache_mtx);
return s_file_name_to_reader.size();
}
} // namespace io
} // namespace doris

View File

@ -37,6 +37,7 @@
#include "common/status.h"
#include "io/cache/block/block_file_cache_fwd.h"
#include "io/cache/block/block_file_cache_settings.h"
#include "io/fs/file_reader.h"
#include "io/io_common.h"
#include "util/hash_util.hpp"
#include "vec/common/uint128.h"
@ -54,6 +55,7 @@ enum CacheType {
NORMAL,
DISPOSABLE,
};
struct CacheContext {
CacheContext(const IOContext* io_ctx) {
if (io_ctx->read_segment_index) {
@ -292,6 +294,29 @@ public:
};
using QueryFileCacheContextHolderPtr = std::unique_ptr<QueryFileCacheContextHolder>;
QueryFileCacheContextHolderPtr get_query_context_holder(const TUniqueId& query_id);
private:
static inline std::list<std::pair<AccessKeyAndOffset, std::shared_ptr<FileReader>>>
s_file_reader_cache;
static inline std::unordered_map<AccessKeyAndOffset, decltype(s_file_reader_cache.begin()),
KeyAndOffsetHash>
s_file_name_to_reader;
static inline std::mutex s_file_reader_cache_mtx;
static inline std::atomic_bool s_read_only {false};
public:
static void set_read_only(bool read_only);
static bool read_only() { return s_read_only; }
static std::weak_ptr<FileReader> cache_file_reader(const AccessKeyAndOffset& key,
std::shared_ptr<FileReader> file_reader);
static void remove_file_reader(const AccessKeyAndOffset& key);
// use for test
static bool contains_file_reader(const AccessKeyAndOffset& key);
static size_t file_reader_cache_size();
};
using CloudFileCachePtr = IFileCache*;

View File

@ -68,6 +68,13 @@ FileBlock::FileBlock(size_t offset_, size_t size_, const Key& key_, IFileCache*
}
}
FileBlock::~FileBlock() {
std::shared_ptr<FileReader> reader;
if ((reader = _cache_reader.lock())) {
IFileCache::remove_file_reader(std::make_pair(_file_key, offset()));
}
}
FileBlock::State FileBlock::state() const {
std::lock_guard segment_lock(_mutex);
return _download_state;
@ -171,21 +178,20 @@ std::string FileBlock::get_path_in_local_cache() const {
return _cache->get_path_in_local_cache(key(), offset(), _cache_type);
}
Status FileBlock::read_at(Slice buffer, size_t offset) {
Status FileBlock::read_at(Slice buffer, size_t read_offset) {
Status st = Status::OK();
if (!_cache_reader) {
std::lock_guard segment_lock(_mutex);
if (!_cache_reader) {
std::shared_ptr<FileReader> reader;
if (!(reader = _cache_reader.lock())) {
std::lock_guard<std::mutex> lock(_mutex);
if (!(reader = _cache_reader.lock())) {
auto download_path = get_path_in_local_cache();
st = global_local_filesystem()->open_file(download_path, &_cache_reader);
if (!st) {
_cache_reader.reset();
return st;
}
RETURN_IF_ERROR(global_local_filesystem()->open_file(download_path, &reader));
_cache_reader =
IFileCache::cache_file_reader(std::make_pair(_file_key, offset()), reader);
}
}
size_t bytes_reads = buffer.size;
RETURN_IF_ERROR(_cache_reader->read_at(offset, buffer, &bytes_reads));
RETURN_IF_ERROR(reader->read_at(read_offset, buffer, &bytes_reads));
DCHECK(bytes_reads == buffer.size);
return st;
}

View File

@ -52,7 +52,7 @@ class FileBlock {
public:
using Key = IFileCache::Key;
using LocalWriterPtr = std::unique_ptr<FileWriter>;
using LocalReaderPtr = std::shared_ptr<FileReader>;
using LocalReaderPtr = std::weak_ptr<FileReader>;
enum class State {
DOWNLOADED,
@ -74,7 +74,7 @@ public:
FileBlock(size_t offset, size_t size, const Key& key, IFileCache* cache, State download_state,
CacheType cache_type);
~FileBlock() = default;
~FileBlock();
State state() const;
@ -110,7 +110,7 @@ public:
Status append(Slice data);
// read data from cache file
Status read_at(Slice buffer, size_t offset_);
Status read_at(Slice buffer, size_t read_offset);
// finish write, release the file writer
Status finalize_write();

View File

@ -72,6 +72,7 @@ DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(file_cache_disposable_queue_max_size, MetricU
DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(file_cache_disposable_queue_curr_size, MetricUnit::BYTES);
DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(file_cache_disposable_queue_max_elements, MetricUnit::NOUNIT);
DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(file_cache_disposable_queue_curr_elements, MetricUnit::NOUNIT);
DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(file_cache_segment_reader_cache_size, MetricUnit::NOUNIT);
LRUFileCache::LRUFileCache(const std::string& cache_base_path,
const FileCacheSettings& cache_settings)
@ -104,6 +105,7 @@ LRUFileCache::LRUFileCache(const std::string& cache_base_path,
INT_UGAUGE_METRIC_REGISTER(_entity, file_cache_disposable_queue_curr_size);
INT_UGAUGE_METRIC_REGISTER(_entity, file_cache_disposable_queue_max_elements);
INT_UGAUGE_METRIC_REGISTER(_entity, file_cache_disposable_queue_curr_elements);
INT_UGAUGE_METRIC_REGISTER(_entity, file_cache_segment_reader_cache_size);
LOG(INFO) << fmt::format(
"file cache path={}, disposable queue size={} elements={}, index queue size={} "
@ -1116,6 +1118,7 @@ void LRUFileCache::update_cache_metrics() const {
file_cache_disposable_queue_curr_size->set_value(_disposable_queue.get_total_cache_size(l));
file_cache_disposable_queue_max_elements->set_value(_disposable_queue.get_max_element_size());
file_cache_disposable_queue_curr_elements->set_value(_disposable_queue.get_elements_num(l));
file_cache_segment_reader_cache_size->set_value(IFileCache::file_reader_cache_size());
}
} // namespace io

View File

@ -221,6 +221,7 @@ private:
UIntGauge* file_cache_disposable_queue_curr_size = nullptr;
UIntGauge* file_cache_disposable_queue_max_elements = nullptr;
UIntGauge* file_cache_disposable_queue_curr_elements = nullptr;
UIntGauge* file_cache_segment_reader_cache_size = nullptr;
};
} // namespace io

View File

@ -816,4 +816,148 @@ TEST(LRUFileCache, query_limit_dcheck) {
}
}
TEST(LRUFileCache, fd_cache_remove) {
if (fs::exists(cache_base_path)) {
fs::remove_all(cache_base_path);
}
doris::config::enable_file_cache_query_limit = true;
fs::create_directories(cache_base_path);
io::FileCacheSettings settings;
settings.index_queue_elements = 0;
settings.index_queue_size = 0;
settings.disposable_queue_size = 0;
settings.disposable_queue_elements = 0;
settings.query_queue_size = 15;
settings.query_queue_elements = 5;
settings.max_file_segment_size = 10;
settings.max_query_cache_size = 15;
settings.total_size = 15;
io::LRUFileCache cache(cache_base_path, settings);
ASSERT_TRUE(cache.initialize());
io::CacheContext context;
context.cache_type = io::CacheType::NORMAL;
auto key = io::LRUFileCache::hash("key1");
{
auto holder = cache.get_or_set(key, 0, 9, context); /// Add range [0, 8]
auto segments = fromHolder(holder);
ASSERT_GE(segments.size(), 1);
assert_range(1, segments[0], io::FileBlock::Range(0, 8), io::FileBlock::State::EMPTY);
ASSERT_TRUE(segments[0]->get_or_set_downloader() == io::FileBlock::get_caller_id());
assert_range(2, segments[0], io::FileBlock::Range(0, 8), io::FileBlock::State::DOWNLOADING);
download(segments[0]);
std::unique_ptr<char[]> buffer = std::make_unique<char[]>(9);
segments[0]->read_at(Slice(buffer.get(), 9), 0);
EXPECT_TRUE(io::IFileCache::contains_file_reader(std::make_pair(key, 0)));
}
{
auto holder = cache.get_or_set(key, 9, 1, context); /// Add range [9, 9]
auto segments = fromHolder(holder);
ASSERT_GE(segments.size(), 1);
assert_range(1, segments[0], io::FileBlock::Range(9, 9), io::FileBlock::State::EMPTY);
ASSERT_TRUE(segments[0]->get_or_set_downloader() == io::FileBlock::get_caller_id());
assert_range(2, segments[0], io::FileBlock::Range(9, 9), io::FileBlock::State::DOWNLOADING);
download(segments[0]);
std::unique_ptr<char[]> buffer = std::make_unique<char[]>(1);
segments[0]->read_at(Slice(buffer.get(), 1), 0);
EXPECT_TRUE(io::IFileCache::contains_file_reader(std::make_pair(key, 9)));
}
{
auto holder = cache.get_or_set(key, 10, 5, context); /// Add range [10, 14]
auto segments = fromHolder(holder);
ASSERT_GE(segments.size(), 1);
assert_range(3, segments[0], io::FileBlock::Range(10, 14), io::FileBlock::State::EMPTY);
ASSERT_TRUE(segments[0]->get_or_set_downloader() == io::FileBlock::get_caller_id());
assert_range(4, segments[0], io::FileBlock::Range(10, 14),
io::FileBlock::State::DOWNLOADING);
download(segments[0]);
std::unique_ptr<char[]> buffer = std::make_unique<char[]>(5);
segments[0]->read_at(Slice(buffer.get(), 5), 0);
EXPECT_TRUE(io::IFileCache::contains_file_reader(std::make_pair(key, 10)));
}
{
auto holder = cache.get_or_set(key, 15, 10, context); /// Add range [15, 24]
auto segments = fromHolder(holder);
ASSERT_EQ(segments.size(), 1);
assert_range(3, segments[0], io::FileBlock::Range(15, 24), io::FileBlock::State::EMPTY);
ASSERT_TRUE(segments[0]->get_or_set_downloader() == io::FileBlock::get_caller_id());
assert_range(4, segments[0], io::FileBlock::Range(15, 24),
io::FileBlock::State::DOWNLOADING);
download(segments[0]);
std::unique_ptr<char[]> buffer = std::make_unique<char[]>(10);
segments[0]->read_at(Slice(buffer.get(), 10), 0);
EXPECT_TRUE(io::IFileCache::contains_file_reader(std::make_pair(key, 15)));
}
EXPECT_FALSE(io::IFileCache::contains_file_reader(std::make_pair(key, 0)));
EXPECT_EQ(io::IFileCache::file_reader_cache_size(), 2);
if (fs::exists(cache_base_path)) {
fs::remove_all(cache_base_path);
}
}
TEST(LRUFileCache, fd_cache_evict) {
if (fs::exists(cache_base_path)) {
fs::remove_all(cache_base_path);
}
doris::config::enable_file_cache_query_limit = true;
fs::create_directories(cache_base_path);
io::FileCacheSettings settings;
settings.index_queue_elements = 0;
settings.index_queue_size = 0;
settings.disposable_queue_size = 0;
settings.disposable_queue_elements = 0;
settings.query_queue_size = 15;
settings.query_queue_elements = 5;
settings.max_file_segment_size = 10;
settings.max_query_cache_size = 15;
settings.total_size = 15;
io::LRUFileCache cache(cache_base_path, settings);
ASSERT_TRUE(cache.initialize());
io::CacheContext context;
context.cache_type = io::CacheType::NORMAL;
auto key = io::LRUFileCache::hash("key1");
config::file_cache_max_file_reader_cache_size = 2;
{
auto holder = cache.get_or_set(key, 0, 9, context); /// Add range [0, 8]
auto segments = fromHolder(holder);
ASSERT_GE(segments.size(), 1);
assert_range(1, segments[0], io::FileBlock::Range(0, 8), io::FileBlock::State::EMPTY);
ASSERT_TRUE(segments[0]->get_or_set_downloader() == io::FileBlock::get_caller_id());
assert_range(2, segments[0], io::FileBlock::Range(0, 8), io::FileBlock::State::DOWNLOADING);
download(segments[0]);
std::unique_ptr<char[]> buffer = std::make_unique<char[]>(9);
segments[0]->read_at(Slice(buffer.get(), 9), 0);
EXPECT_TRUE(io::IFileCache::contains_file_reader(std::make_pair(key, 0)));
}
{
auto holder = cache.get_or_set(key, 9, 1, context); /// Add range [9, 9]
auto segments = fromHolder(holder);
ASSERT_GE(segments.size(), 1);
assert_range(1, segments[0], io::FileBlock::Range(9, 9), io::FileBlock::State::EMPTY);
ASSERT_TRUE(segments[0]->get_or_set_downloader() == io::FileBlock::get_caller_id());
assert_range(2, segments[0], io::FileBlock::Range(9, 9), io::FileBlock::State::DOWNLOADING);
download(segments[0]);
std::unique_ptr<char[]> buffer = std::make_unique<char[]>(1);
segments[0]->read_at(Slice(buffer.get(), 1), 0);
EXPECT_TRUE(io::IFileCache::contains_file_reader(std::make_pair(key, 9)));
}
{
auto holder = cache.get_or_set(key, 10, 5, context); /// Add range [10, 14]
auto segments = fromHolder(holder);
ASSERT_GE(segments.size(), 1);
assert_range(3, segments[0], io::FileBlock::Range(10, 14), io::FileBlock::State::EMPTY);
ASSERT_TRUE(segments[0]->get_or_set_downloader() == io::FileBlock::get_caller_id());
assert_range(4, segments[0], io::FileBlock::Range(10, 14),
io::FileBlock::State::DOWNLOADING);
download(segments[0]);
std::unique_ptr<char[]> buffer = std::make_unique<char[]>(5);
segments[0]->read_at(Slice(buffer.get(), 5), 0);
EXPECT_TRUE(io::IFileCache::contains_file_reader(std::make_pair(key, 10)));
}
EXPECT_FALSE(io::IFileCache::contains_file_reader(std::make_pair(key, 0)));
EXPECT_EQ(io::IFileCache::file_reader_cache_size(), 2);
if (fs::exists(cache_base_path)) {
fs::remove_all(cache_base_path);
}
}
} // namespace doris::io