From 82635d4b59e951975988a28fe83ecc97340fc74c Mon Sep 17 00:00:00 2001 From: Xinyi Zou Date: Fri, 29 Dec 2023 16:15:56 +0800 Subject: [PATCH] [opt](memory) All LRU Cache inherit from LRUCachePolicy (#28940) After all LRU Cache inherits from LRUCachePolicy, this will allow prune stale entry, eviction when memory exceeds limit, and define common properties. LRUCache constructor change to private, only allow LRUCachePolicy to construct it. Impl DummyLRUCache, when LRU Cache capacity is 0, will no longer be meaningless insert and evict. --- be/src/common/config.cpp | 6 +- be/src/common/config.h | 7 +- be/src/olap/lru_cache.cpp | 35 ++++++++++ be/src/olap/lru_cache.h | 50 +++++++++++--- be/src/olap/page_cache.cpp | 6 +- be/src/olap/page_cache.h | 25 ++----- .../segment_v2/inverted_index_cache.cpp | 52 ++++++-------- .../rowset/segment_v2/inverted_index_cache.h | 32 ++++++--- be/src/olap/rowset/segment_v2/page_io.cpp | 5 +- be/src/olap/schema_cache.h | 10 +-- be/src/olap/segment_loader.cpp | 12 ++-- be/src/olap/tablet_meta.cpp | 2 +- be/src/olap/tablet_meta.h | 16 +++-- be/src/olap/txn_manager.cpp | 15 ++-- be/src/olap/txn_manager.h | 11 ++- be/src/runtime/exec_env.h | 6 ++ be/src/runtime/exec_env_init.cpp | 14 ++-- be/src/runtime/load_channel_mgr.cpp | 12 ++-- be/src/runtime/load_channel_mgr.h | 11 ++- be/src/runtime/memory/cache_manager.cpp | 3 + be/src/runtime/memory/cache_policy.cpp | 4 +- be/src/runtime/memory/cache_policy.h | 26 ++++++- be/src/runtime/memory/lru_cache_policy.h | 68 +++++++++++++++---- be/src/service/point_query_executor.cpp | 19 +++--- be/src/service/point_query_executor.h | 22 +++--- be/src/util/obj_lru_cache.cpp | 15 ++-- be/src/util/obj_lru_cache.h | 14 ++-- be/test/olap/lru_cache_test.cpp | 46 ++++++++----- .../inverted_index_searcher_cache_test.cpp | 29 +++----- be/test/testutil/run_all_tests.cpp | 1 + 30 files changed, 367 insertions(+), 207 deletions(-) diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp index 1d297ece45..120ca28be7 100644 --- a/be/src/common/config.cpp +++ b/be/src/common/config.cpp @@ -275,7 +275,8 @@ DEFINE_mInt64(memory_limitation_per_thread_for_storage_migration_bytes, "1000000 DEFINE_mInt32(cache_prune_stale_interval, "10"); // the clean interval of tablet lookup cache -DEFINE_mInt32(tablet_lookup_cache_clean_interval, "30"); +DEFINE_mInt32(tablet_lookup_cache_stale_sweep_time_sec, "30"); +DEFINE_mInt32(point_query_row_cache_stale_sweep_time_sec, "300"); DEFINE_mInt32(disk_stat_monitor_interval, "5"); DEFINE_mInt32(unused_rowset_monitor_interval, "30"); DEFINE_String(storage_root_path, "${DORIS_HOME}/storage"); @@ -809,6 +810,9 @@ DEFINE_mInt32(external_table_connect_timeout_sec, "30"); // Global bitmap cache capacity for aggregation cache, size in bytes DEFINE_Int64(delete_bitmap_agg_cache_capacity, "104857600"); +DEFINE_mInt32(delete_bitmap_agg_cache_stale_sweep_time_sec, "1800"); + +DEFINE_mInt32(common_obj_lru_cache_stale_sweep_time_sec, "900"); // s3 config DEFINE_mInt32(max_remote_storage_count, "10"); diff --git a/be/src/common/config.h b/be/src/common/config.h index 4e09bb2b08..68c02941cd 100644 --- a/be/src/common/config.h +++ b/be/src/common/config.h @@ -318,7 +318,8 @@ DECLARE_mInt64(memory_limitation_per_thread_for_storage_migration_bytes); // the prune stale interval of all cache DECLARE_mInt32(cache_prune_stale_interval); // the clean interval of tablet lookup cache -DECLARE_mInt32(tablet_lookup_cache_clean_interval); +DECLARE_mInt32(tablet_lookup_cache_stale_sweep_time_sec); +DECLARE_mInt32(point_query_row_cache_stale_sweep_time_sec); DECLARE_mInt32(disk_stat_monitor_interval); DECLARE_mInt32(unused_rowset_monitor_interval); DECLARE_String(storage_root_path); @@ -863,6 +864,10 @@ DECLARE_mInt32(external_table_connect_timeout_sec); // Global bitmap cache capacity for aggregation cache, size in bytes DECLARE_Int64(delete_bitmap_agg_cache_capacity); +DECLARE_mInt32(delete_bitmap_agg_cache_stale_sweep_time_sec); + +// A common object cache depends on an Sharded LRU Cache. +DECLARE_mInt32(common_obj_lru_cache_stale_sweep_time_sec); // s3 config DECLARE_mInt32(max_remote_storage_count); diff --git a/be/src/olap/lru_cache.cpp b/be/src/olap/lru_cache.cpp index 0de99e20b2..efc9287047 100644 --- a/be/src/olap/lru_cache.cpp +++ b/be/src/olap/lru_cache.cpp @@ -667,4 +667,39 @@ void ShardedLRUCache::update_cache_metrics() const { total_lookup_count == 0 ? 0 : ((double)total_hit_count / total_lookup_count)); } +Cache::Handle* DummyLRUCache::insert(const CacheKey& key, void* value, size_t charge, + void (*deleter)(const CacheKey& key, void* value), + CachePriority priority, size_t bytes) { + size_t handle_size = sizeof(LRUHandle) - 1 + key.size(); + auto* e = reinterpret_cast(malloc(handle_size)); + e->value = value; + e->deleter = deleter; + e->charge = charge; + e->key_length = 0; + e->total_size = 0; + e->bytes = 0; + e->hash = 0; + e->refs = 1; // only one for the returned handle + e->next = e->prev = nullptr; + e->in_cache = false; + return reinterpret_cast(e); +} + +void DummyLRUCache::release(Cache::Handle* handle) { + if (handle == nullptr) { + return; + } + auto* e = reinterpret_cast(handle); + e->free(); +} + +void* DummyLRUCache::value(Handle* handle) { + return reinterpret_cast(handle)->value; +} + +Slice DummyLRUCache::value_slice(Handle* handle) { + auto* lru_handle = reinterpret_cast(handle); + return Slice((char*)lru_handle->value, lru_handle->charge); +} + } // namespace doris diff --git a/be/src/olap/lru_cache.h b/be/src/olap/lru_cache.h index 8608a85bf2..6ecab80022 100644 --- a/be/src/olap/lru_cache.h +++ b/be/src/olap/lru_cache.h @@ -52,12 +52,17 @@ namespace doris { } while (0) class Cache; +class LRUCachePolicy; enum LRUCacheType { SIZE, // The capacity of cache is based on the size of cache entry. NUMBER // The capacity of cache is based on the number of cache entry. }; +static constexpr LRUCacheType DEFAULT_LRU_CACHE_TYPE = LRUCacheType::SIZE; +static constexpr uint32_t DEFAULT_LRU_CACHE_NUM_SHARDS = 16; +static constexpr size_t DEFAULT_LRU_CACHE_ELEMENT_COUNT_CAPACITY = 0; + class CacheKey { public: CacheKey() : _data(nullptr), _size(0) {} @@ -263,8 +268,10 @@ struct LRUHandle { void free() { (*deleter)(key(), value); - THREAD_MEM_TRACKER_TRANSFER_FROM(bytes, mem_tracker); - DorisMetrics::instance()->lru_cache_memory_bytes->increment(-bytes); + if (bytes != 0) { // DummyLRUCache bytes always equal to 0 + THREAD_MEM_TRACKER_TRANSFER_FROM(bytes, mem_tracker); + DorisMetrics::instance()->lru_cache_memory_bytes->increment(-bytes); + } ::free(this); } }; @@ -330,6 +337,7 @@ public: } // Like Cache methods, but with an extra "hash" parameter. + // Must call release on the returned handle pointer. Cache::Handle* insert(const CacheKey& key, uint32_t hash, void* value, size_t charge, void (*deleter)(const CacheKey& key, void* value), MemTrackerLimiter* tracker, @@ -389,14 +397,6 @@ private: class ShardedLRUCache : public Cache { public: - explicit ShardedLRUCache(const std::string& name, size_t total_capacity, - LRUCacheType type = LRUCacheType::SIZE, uint32_t num_shards = 16, - uint32_t element_count_capacity = 0); - explicit ShardedLRUCache(const std::string& name, size_t total_capacity, LRUCacheType type, - uint32_t num_shards, - CacheValueTimeExtractor cache_value_time_extractor, - bool cache_value_check_timestamp, uint32_t element_count_capacity = 0); - // TODO(fdy): 析构时清除所有cache元素 virtual ~ShardedLRUCache(); virtual Handle* insert(const CacheKey& key, void* value, size_t charge, void (*deleter)(const CacheKey& key, void* value), @@ -415,6 +415,16 @@ public: size_t get_total_capacity() override { return _total_capacity; }; private: + // LRUCache can only be created and managed with LRUCachePolicy. + friend class LRUCachePolicy; + + explicit ShardedLRUCache(const std::string& name, size_t total_capacity, LRUCacheType type, + uint32_t num_shards, uint32_t element_count_capacity); + explicit ShardedLRUCache(const std::string& name, size_t total_capacity, LRUCacheType type, + uint32_t num_shards, + CacheValueTimeExtractor cache_value_time_extractor, + bool cache_value_check_timestamp, uint32_t element_count_capacity); + void update_cache_metrics() const; static std::string lru_cache_type_string(LRUCacheType type) { @@ -456,4 +466,24 @@ private: std::unique_ptr>> _lookup_count_per_second; }; +// Compatible with ShardedLRUCache usage, but will not actually cache. +class DummyLRUCache : public Cache { +public: + // Must call release on the returned handle pointer. + Handle* insert(const CacheKey& key, void* value, size_t charge, + void (*deleter)(const CacheKey& key, void* value), + CachePriority priority = CachePriority::NORMAL, size_t bytes = -1) override; + Handle* lookup(const CacheKey& key) override { return nullptr; }; + void release(Handle* handle) override; + void erase(const CacheKey& key) override {}; + void* value(Handle* handle) override; + Slice value_slice(Handle* handle) override; + uint64_t new_id() override { return 0; }; + int64_t prune() override { return 0; }; + int64_t prune_if(CacheValuePredicate pred, bool lazy_mode = false) override { return 0; }; + int64_t mem_consumption() override { return 0; }; + int64_t get_usage() override { return 0; }; + size_t get_total_capacity() override { return 0; }; +}; + } // namespace doris diff --git a/be/src/olap/page_cache.cpp b/be/src/olap/page_cache.cpp index 1779c293f8..6fbcbbe19f 100644 --- a/be/src/olap/page_cache.cpp +++ b/be/src/olap/page_cache.cpp @@ -48,10 +48,8 @@ StoragePageCache::StoragePageCache(size_t capacity, int32_t index_cache_percenta } else { CHECK(false) << "invalid index page cache percentage"; } - if (pk_index_cache_capacity > 0) { - _pk_index_page_cache = - std::make_unique(pk_index_cache_capacity, num_shards); - } + + _pk_index_page_cache = std::make_unique(pk_index_cache_capacity, num_shards); } bool StoragePageCache::lookup(const CacheKey& key, PageCacheHandle* handle, diff --git a/be/src/olap/page_cache.h b/be/src/olap/page_cache.h index 2d93574455..a604d20550 100644 --- a/be/src/olap/page_cache.h +++ b/be/src/olap/page_cache.h @@ -157,12 +157,6 @@ public: void insert(const CacheKey& key, DataPage* data, PageCacheHandle* handle, segment_v2::PageTypePB page_type, bool in_memory = false); - // Page cache available check. - // When percentage is set to 0 or 100, the index or data cache will not be allocated. - bool is_cache_available(segment_v2::PageTypePB page_type) { - return _get_page_cache(page_type) != nullptr; - } - private: StoragePageCache(); @@ -177,26 +171,19 @@ private: Cache* _get_page_cache(segment_v2::PageTypePB page_type) { switch (page_type) { case segment_v2::DATA_PAGE: { - if (_data_page_cache) { - return _data_page_cache->get(); - } - return nullptr; + return _data_page_cache->cache(); } case segment_v2::INDEX_PAGE: { - if (_index_page_cache) { - return _index_page_cache->get(); - } - return nullptr; + return _index_page_cache->cache(); } case segment_v2::PRIMARY_KEY_INDEX_PAGE: { - if (_pk_index_page_cache) { - return _pk_index_page_cache->get(); - } - return nullptr; + return _pk_index_page_cache->cache(); } default: - return nullptr; + LOG(FATAL) << "get error type page cache"; } + LOG(FATAL) << "__builtin_unreachable"; + __builtin_unreachable(); } }; diff --git a/be/src/olap/rowset/segment_v2/inverted_index_cache.cpp b/be/src/olap/rowset/segment_v2/inverted_index_cache.cpp index e88573c1be..f5259e8139 100644 --- a/be/src/olap/rowset/segment_v2/inverted_index_cache.cpp +++ b/be/src/olap/rowset/segment_v2/inverted_index_cache.cpp @@ -86,11 +86,7 @@ InvertedIndexSearcherCache* InvertedIndexSearcherCache::create_global_instance( return new InvertedIndexSearcherCache(capacity, num_shards); } -InvertedIndexSearcherCache::InvertedIndexSearcherCache(size_t capacity, uint32_t num_shards) - : LRUCachePolicy(CachePolicy::CacheType::INVERTEDINDEX_SEARCHER_CACHE, - config::inverted_index_cache_stale_sweep_time_sec), - _mem_tracker(std::make_unique("InvertedIndexSearcherCache")) { - SCOPED_CONSUME_MEM_TRACKER(_mem_tracker.get()); +InvertedIndexSearcherCache::InvertedIndexSearcherCache(size_t capacity, uint32_t num_shards) { uint64_t fd_number = config::min_file_descriptor_number; struct rlimit l; int ret = getrlimit(RLIMIT_NOFILE, &l); @@ -113,13 +109,11 @@ InvertedIndexSearcherCache::InvertedIndexSearcherCache(size_t capacity, uint32_t auto* cache_value = (InvertedIndexSearcherCache::CacheValue*)value; return cache_value->last_visit_time; }; - _cache = std::unique_ptr( - new ShardedLRUCache("InvertedIndexSearcherCache", capacity, LRUCacheType::SIZE, - num_shards, get_last_visit_time, true, open_searcher_limit)); + _policy = std::make_unique( + capacity, num_shards, open_searcher_limit, get_last_visit_time, true); } else { - _cache = std::unique_ptr(new ShardedLRUCache("InvertedIndexSearcherCache", capacity, - LRUCacheType::SIZE, num_shards, - open_searcher_limit)); + _policy = std::make_unique(capacity, num_shards, + open_searcher_limit); } } @@ -208,8 +202,8 @@ Status InvertedIndexSearcherCache::get_index_searcher( IndexCacheValuePtr cache_value = std::make_unique(); cache_value->index_searcher = std::move(index_searcher); cache_value->size = mem_tracker->consumption(); - *cache_handle = - InvertedIndexCacheHandle(_cache.get(), _insert(cache_key, cache_value.release())); + *cache_handle = InvertedIndexCacheHandle(_policy->cache(), + _insert(cache_key, cache_value.release())); } else { cache_handle->index_searcher = std::move(index_searcher); } @@ -280,30 +274,27 @@ Status InvertedIndexSearcherCache::insert(const io::FileSystemSPtr& fs, cache_value->size = mem_tracker->consumption(); cache_value->last_visit_time = UnixMillis(); auto* lru_handle = _insert(cache_key, cache_value.release()); - _cache->release(lru_handle); + _policy->cache()->release(lru_handle); return Status::OK(); } Status InvertedIndexSearcherCache::erase(const std::string& index_file_path) { InvertedIndexSearcherCache::CacheKey cache_key(index_file_path); - _cache->erase(cache_key.index_file_path); + _policy->cache()->erase(cache_key.index_file_path); return Status::OK(); } int64_t InvertedIndexSearcherCache::mem_consumption() { - if (_cache) { - return _cache->mem_consumption(); - } - return 0L; + return _policy->cache()->mem_consumption(); } bool InvertedIndexSearcherCache::_lookup(const InvertedIndexSearcherCache::CacheKey& key, InvertedIndexCacheHandle* handle) { - auto* lru_handle = _cache->lookup(key.index_file_path); + auto* lru_handle = _policy->cache()->lookup(key.index_file_path); if (lru_handle == nullptr) { return false; } - *handle = InvertedIndexCacheHandle(_cache.get(), lru_handle); + *handle = InvertedIndexCacheHandle(_policy->cache(), lru_handle); return true; } @@ -314,8 +305,8 @@ Cache::Handle* InvertedIndexSearcherCache::_insert(const InvertedIndexSearcherCa delete cache_value; }; - Cache::Handle* lru_handle = - _cache->insert(key.index_file_path, value, value->size, deleter, CachePriority::NORMAL); + Cache::Handle* lru_handle = _policy->cache()->insert(key.index_file_path, value, value->size, + deleter, CachePriority::NORMAL); return lru_handle; } @@ -323,11 +314,11 @@ bool InvertedIndexQueryCache::lookup(const CacheKey& key, InvertedIndexQueryCach if (key.encode().empty()) { return false; } - auto* lru_handle = _cache->lookup(key.encode()); + auto* lru_handle = cache()->lookup(key.encode()); if (lru_handle == nullptr) { return false; } - *handle = InvertedIndexQueryCacheHandle(_cache.get(), lru_handle); + *handle = InvertedIndexQueryCacheHandle(cache(), lru_handle); return true; } @@ -346,16 +337,13 @@ void InvertedIndexQueryCache::insert(const CacheKey& key, std::shared_ptrinsert(key.encode(), (void*)cache_value_ptr.release(), - bitmap->getSizeInBytes(), deleter, CachePriority::NORMAL); - *handle = InvertedIndexQueryCacheHandle(_cache.get(), lru_handle); + auto* lru_handle = cache()->insert(key.encode(), (void*)cache_value_ptr.release(), + bitmap->getSizeInBytes(), deleter, CachePriority::NORMAL); + *handle = InvertedIndexQueryCacheHandle(cache(), lru_handle); } int64_t InvertedIndexQueryCache::mem_consumption() { - if (_cache) { - return _cache->mem_consumption(); - } - return 0L; + return cache()->mem_consumption(); } } // namespace doris::segment_v2 diff --git a/be/src/olap/rowset/segment_v2/inverted_index_cache.h b/be/src/olap/rowset/segment_v2/inverted_index_cache.h index 8d7d90eea2..51791f0e10 100644 --- a/be/src/olap/rowset/segment_v2/inverted_index_cache.h +++ b/be/src/olap/rowset/segment_v2/inverted_index_cache.h @@ -94,7 +94,7 @@ public: OptionalIndexSearcherPtr& output_searcher) override; }; -class InvertedIndexSearcherCache : public LRUCachePolicy { +class InvertedIndexSearcherCache { public: // The cache key of index_searcher lru cache struct CacheKey { @@ -113,12 +113,6 @@ public: static InvertedIndexSearcherCache* create_global_instance(size_t capacity, uint32_t num_shards = 16); - void reset() { - _cache.reset(); - _mem_tracker.reset(); - // Reset or clear the state of the object. - } - // Return global instance. // Client should call create_global_cache before. static InvertedIndexSearcherCache* instance() { @@ -139,11 +133,32 @@ public: // function `erase` called after compaction remove segment Status erase(const std::string& index_file_path); + void release(Cache::Handle* handle) { _policy->cache()->release(handle); } + int64_t mem_consumption(); private: InvertedIndexSearcherCache(); + class InvertedIndexSearcherCachePolicy : public LRUCachePolicy { + public: + InvertedIndexSearcherCachePolicy(size_t capacity, uint32_t num_shards, + uint32_t element_count_capacity) + : LRUCachePolicy(CachePolicy::CacheType::INVERTEDINDEX_SEARCHER_CACHE, capacity, + LRUCacheType::SIZE, + config::inverted_index_cache_stale_sweep_time_sec, num_shards, + element_count_capacity, true) {} + InvertedIndexSearcherCachePolicy(size_t capacity, uint32_t num_shards, + uint32_t element_count_capacity, + CacheValueTimeExtractor cache_value_time_extractor, + bool cache_value_check_timestamp) + : LRUCachePolicy(CachePolicy::CacheType::INVERTEDINDEX_SEARCHER_CACHE, capacity, + LRUCacheType::SIZE, + config::inverted_index_cache_stale_sweep_time_sec, num_shards, + element_count_capacity, cache_value_time_extractor, + cache_value_check_timestamp, true) {} + }; + // Lookup the given index_searcher in the cache. // If the index_searcher is found, the cache entry will be written into handle. // Return true if entry is found, otherwise return false. @@ -154,8 +169,7 @@ private: // This function is thread-safe. Cache::Handle* _insert(const InvertedIndexSearcherCache::CacheKey& key, CacheValue* value); -private: - std::unique_ptr _mem_tracker; + std::unique_ptr _policy; }; using IndexCacheValuePtr = std::unique_ptr; diff --git a/be/src/olap/rowset/segment_v2/page_io.cpp b/be/src/olap/rowset/segment_v2/page_io.cpp index c712478850..dac9012abc 100644 --- a/be/src/olap/rowset/segment_v2/page_io.cpp +++ b/be/src/olap/rowset/segment_v2/page_io.cpp @@ -120,8 +120,7 @@ Status PageIO::read_and_decompress_page(const PageReadOptions& opts, PageHandle* PageCacheHandle cache_handle; StoragePageCache::CacheKey cache_key(opts.file_reader->path().native(), opts.file_reader->size(), opts.page_pointer.offset); - if (opts.use_page_cache && cache->is_cache_available(opts.type) && - cache->lookup(cache_key, &cache_handle, opts.type)) { + if (opts.use_page_cache && cache && cache->lookup(cache_key, &cache_handle, opts.type)) { // we find page in cache, use it *handle = PageHandle(std::move(cache_handle)); opts.stats->cached_pages_num++; @@ -218,7 +217,7 @@ Status PageIO::read_and_decompress_page(const PageReadOptions& opts, PageHandle* *body = Slice(page_slice.data, page_slice.size - 4 - footer_size); page->reset_size(page_slice.size); - if (opts.use_page_cache && cache->is_cache_available(opts.type)) { + if (opts.use_page_cache && cache) { // insert this page into cache and return the cache handle cache->insert(cache_key, page.get(), &cache_handle, opts.type, opts.kept_in_memory); *handle = PageHandle(std::move(cache_handle)); diff --git a/be/src/olap/schema_cache.h b/be/src/olap/schema_cache.h index b7f640551d..58bf05bbfc 100644 --- a/be/src/olap/schema_cache.h +++ b/be/src/olap/schema_cache.h @@ -65,10 +65,10 @@ public: if (!instance() || schema_key.empty()) { return {}; } - auto lru_handle = _cache->lookup(schema_key); + auto lru_handle = cache()->lookup(schema_key); if (lru_handle) { - Defer release([cache = _cache.get(), lru_handle] { cache->release(lru_handle); }); - auto value = (CacheValue*)_cache->value(lru_handle); + Defer release([cache = cache(), lru_handle] { cache->release(lru_handle); }); + auto value = (CacheValue*)cache()->value(lru_handle); value->last_visit_time = UnixMillis(); VLOG_DEBUG << "use cache schema"; if constexpr (std::is_same_v) { @@ -101,8 +101,8 @@ public: delete cache_value; }; auto lru_handle = - _cache->insert(key, value, 1, deleter, CachePriority::NORMAL, schema->mem_size()); - _cache->release(lru_handle); + cache()->insert(key, value, 1, deleter, CachePriority::NORMAL, schema->mem_size()); + cache()->release(lru_handle); } // Try to prune the cache if expired. diff --git a/be/src/olap/segment_loader.cpp b/be/src/olap/segment_loader.cpp index 02539a3f16..0d068bfcbe 100644 --- a/be/src/olap/segment_loader.cpp +++ b/be/src/olap/segment_loader.cpp @@ -29,11 +29,11 @@ SegmentLoader* SegmentLoader::instance() { } bool SegmentCache::lookup(const SegmentCache::CacheKey& key, SegmentCacheHandle* handle) { - auto lru_handle = _cache->lookup(key.encode()); + auto lru_handle = cache()->lookup(key.encode()); if (lru_handle == nullptr) { return false; } - handle->push_segment(_cache.get(), lru_handle); + handle->push_segment(cache(), lru_handle); return true; } @@ -45,13 +45,13 @@ void SegmentCache::insert(const SegmentCache::CacheKey& key, SegmentCache::Cache delete cache_value; }; - auto lru_handle = _cache->insert(key.encode(), &value, 1, deleter, CachePriority::NORMAL, - value.segment->meta_mem_usage()); - handle->push_segment(_cache.get(), lru_handle); + auto lru_handle = cache()->insert(key.encode(), &value, 1, deleter, CachePriority::NORMAL, + value.segment->meta_mem_usage()); + handle->push_segment(cache(), lru_handle); } void SegmentCache::erase(const SegmentCache::CacheKey& key) { - _cache->erase(key.encode()); + cache()->erase(key.encode()); } Status SegmentLoader::load_segments(const BetaRowsetSharedPtr& rowset, diff --git a/be/src/olap/tablet_meta.cpp b/be/src/olap/tablet_meta.cpp index a67103a893..793bde7b08 100644 --- a/be/src/olap/tablet_meta.cpp +++ b/be/src/olap/tablet_meta.cpp @@ -1074,7 +1074,7 @@ std::shared_ptr DeleteBitmap::get_agg(const BitmapKey& bmk) co &val->bitmap, [this, handle](...) { _agg_cache->repr()->release(handle); }); } -std::atomic DeleteBitmap::AggCache::s_repr {nullptr}; +std::atomic DeleteBitmap::AggCache::s_repr {nullptr}; std::string tablet_state_name(TabletState state) { switch (state) { diff --git a/be/src/olap/tablet_meta.h b/be/src/olap/tablet_meta.h index 00886bb848..2d14ef599f 100644 --- a/be/src/olap/tablet_meta.h +++ b/be/src/olap/tablet_meta.h @@ -46,6 +46,7 @@ #include "olap/olap_common.h" #include "olap/rowset/rowset_meta.h" #include "olap/tablet_schema.h" +#include "runtime/memory/lru_cache_policy.h" #include "util/uid_util.h" namespace json2pb { @@ -469,6 +470,14 @@ public: */ std::shared_ptr get_agg(const BitmapKey& bmk) const; + class AggCachePolicy : public LRUCachePolicy { + public: + AggCachePolicy(size_t capacity) + : LRUCachePolicy(CachePolicy::CacheType::DELETE_BITMAP_AGG_CACHE, capacity, + LRUCacheType::SIZE, + config::delete_bitmap_agg_cache_stale_sweep_time_sec, 256) {} + }; + class AggCache { public: struct Value { @@ -478,8 +487,7 @@ public: AggCache(size_t size_in_bytes) { static std::once_flag once; std::call_once(once, [size_in_bytes] { - auto tmp = new ShardedLRUCache("DeleteBitmap AggCache", size_in_bytes, - LRUCacheType::SIZE, 256); + auto* tmp = new AggCachePolicy(size_in_bytes); AggCache::s_repr.store(tmp, std::memory_order_release); }); @@ -487,8 +495,8 @@ public: } } - static ShardedLRUCache* repr() { return s_repr.load(std::memory_order_acquire); } - static std::atomic s_repr; + static Cache* repr() { return s_repr.load(std::memory_order_acquire)->cache(); } + static std::atomic s_repr; }; private: diff --git a/be/src/olap/txn_manager.cpp b/be/src/olap/txn_manager.cpp index 2b1e57925f..90c277d8ad 100644 --- a/be/src/olap/txn_manager.cpp +++ b/be/src/olap/txn_manager.cpp @@ -116,8 +116,7 @@ TxnManager::TxnManager(int32_t txn_map_shard_size, int32_t txn_shard_size) _txn_tablet_delta_writer_map = new txn_tablet_delta_writer_map_t[_txn_map_shard_size]; _txn_tablet_delta_writer_map_locks = new std::shared_mutex[_txn_map_shard_size]; // For debugging - _tablet_version_cache = std::unique_ptr( - new ShardedLRUCache("TabletVersionCache", 100000, LRUCacheType::NUMBER, 32)); + _tablet_version_cache = std::make_unique(100000); } // prepare txn should always be allowed because ingest task will be retried @@ -889,12 +888,12 @@ int64_t TxnManager::get_txn_by_tablet_version(int64_t tablet_id, int64_t version memcpy(key + sizeof(int64_t), &version, sizeof(int64_t)); CacheKey cache_key((const char*)&key, sizeof(key)); - auto handle = _tablet_version_cache->lookup(cache_key); + auto* handle = _tablet_version_cache->cache()->lookup(cache_key); if (handle == nullptr) { return -1; } - int64_t res = *(int64_t*)_tablet_version_cache->value(handle); - _tablet_version_cache->release(handle); + int64_t res = *(int64_t*)_tablet_version_cache->cache()->value(handle); + _tablet_version_cache->cache()->release(handle); return res; } @@ -911,9 +910,9 @@ void TxnManager::update_tablet_version_txn(int64_t tablet_id, int64_t version, i delete cache_value; }; - auto handle = _tablet_version_cache->insert(cache_key, value, 1, deleter, CachePriority::NORMAL, - sizeof(txn_id)); - _tablet_version_cache->release(handle); + auto* handle = _tablet_version_cache->cache()->insert(cache_key, value, 1, deleter, + CachePriority::NORMAL, sizeof(txn_id)); + _tablet_version_cache->cache()->release(handle); } TxnState TxnManager::get_txn_state(TPartitionId partition_id, TTransactionId transaction_id, diff --git a/be/src/olap/txn_manager.h b/be/src/olap/txn_manager.h index 74ad589cba..25b8da90b7 100644 --- a/be/src/olap/txn_manager.h +++ b/be/src/olap/txn_manager.h @@ -44,6 +44,7 @@ #include "olap/rowset/segment_v2/segment_writer.h" #include "olap/tablet.h" #include "olap/tablet_meta.h" +#include "runtime/memory/lru_cache_policy.h" #include "util/time.h" #include "vec/core/block.h" @@ -220,6 +221,14 @@ private: void _insert_txn_partition_map_unlocked(int64_t transaction_id, int64_t partition_id); void _clear_txn_partition_map_unlocked(int64_t transaction_id, int64_t partition_id); + class TabletVersionCache : public LRUCachePolicy { + public: + TabletVersionCache(size_t capacity) + : LRUCachePolicy(CachePolicy::CacheType::TABLET_VERSION_CACHE, capacity, + LRUCacheType::NUMBER, -1, DEFAULT_LRU_CACHE_NUM_SHARDS, + DEFAULT_LRU_CACHE_ELEMENT_COUNT_CAPACITY, false) {} + }; + private: const int32_t _txn_map_shard_size; @@ -238,7 +247,7 @@ private: std::shared_mutex* _txn_mutex = nullptr; txn_tablet_delta_writer_map_t* _txn_tablet_delta_writer_map = nullptr; - std::unique_ptr _tablet_version_cache; + std::unique_ptr _tablet_version_cache; std::shared_mutex* _txn_tablet_delta_writer_map_locks = nullptr; DISALLOW_COPY_AND_ASSIGN(TxnManager); }; // TxnManager diff --git a/be/src/runtime/exec_env.h b/be/src/runtime/exec_env.h index f8a52ad1ec..c97ff55a20 100644 --- a/be/src/runtime/exec_env.h +++ b/be/src/runtime/exec_env.h @@ -102,6 +102,7 @@ class StoragePageCache; class SegmentLoader; class LookupConnectionCache; class RowCache; +class DummyLRUCache; class CacheManager; class WalManager; @@ -232,6 +233,9 @@ public: this->_routine_load_task_executor = r; } void set_wal_mgr(std::shared_ptr wm) { this->_wal_manager = wm; } + void set_dummy_lru_cache(std::shared_ptr dummy_lru_cache) { + this->_dummy_lru_cache = dummy_lru_cache; + } #endif stream_load::LoadStreamStubPool* load_stream_stub_pool() { @@ -261,6 +265,7 @@ public: segment_v2::InvertedIndexQueryCache* get_inverted_index_query_cache() { return _inverted_index_query_cache; } + std::shared_ptr get_dummy_lru_cache() { return _dummy_lru_cache; } std::shared_ptr get_global_block_scheduler() { return _global_block_scheduler; @@ -371,6 +376,7 @@ private: CacheManager* _cache_manager = nullptr; segment_v2::InvertedIndexSearcherCache* _inverted_index_searcher_cache = nullptr; segment_v2::InvertedIndexQueryCache* _inverted_index_query_cache = nullptr; + std::shared_ptr _dummy_lru_cache = nullptr; // used for query with group cpu hard limit std::shared_ptr _global_block_scheduler; diff --git a/be/src/runtime/exec_env_init.cpp b/be/src/runtime/exec_env_init.cpp index c204ead069..e612cb816b 100644 --- a/be/src/runtime/exec_env_init.cpp +++ b/be/src/runtime/exec_env_init.cpp @@ -216,7 +216,6 @@ Status ExecEnv::_init(const std::vector& store_paths, _small_file_mgr = new SmallFileMgr(this, config::small_file_dir); _block_spill_mgr = new BlockSpillManager(store_paths); _group_commit_mgr = new GroupCommitMgr(this); - _file_meta_cache = new FileMetaCache(config::max_external_file_meta_cache_num); _memtable_memory_limiter = std::make_unique(); _load_stream_stub_pool = std::make_unique(); _delta_writer_v2_pool = std::make_unique(); @@ -371,13 +370,13 @@ Status ExecEnv::_init_mem_env() { init_hook(); #endif - // 2. init buffer pool if (!BitUtil::IsPowerOf2(config::min_buffer_size)) { ss << "Config min_buffer_size must be a power-of-two: " << config::min_buffer_size; return Status::InternalError(ss.str()); } - // 3. init storage page cache + _dummy_lru_cache = std::make_shared(); + _cache_manager = CacheManager::create_global_instance(); int64_t storage_cache_limit = @@ -396,6 +395,12 @@ Status ExecEnv::_init_mem_env() { << ". Please modify the 'storage_page_cache_shard_size' parameter in your " "conf file to be a power of two for better performance."; } + if (storage_cache_limit < num_shards * 2) { + LOG(WARNING) << "storage_cache_limit(" << storage_cache_limit << ") less than num_shards(" + << num_shards + << ") * 2, cache capacity will be 0, continuing to use " + "cache will only have negative effects, will be disabled."; + } int64_t pk_storage_page_cache_limit = ParseUtil::parse_mem_spec(config::pk_storage_page_cache_limit, MemInfo::mem_limit(), MemInfo::physical_mem(), &is_percent); @@ -442,6 +447,8 @@ Status ExecEnv::_init_mem_env() { _schema_cache = new SchemaCache(config::schema_cache_capacity); + _file_meta_cache = new FileMetaCache(config::max_external_file_meta_cache_num); + _lookup_connection_cache = LookupConnectionCache::create_global_instance( config::lookup_connection_cache_bytes_limit); @@ -473,7 +480,6 @@ Status ExecEnv::_init_mem_env() { << PrettyPrinter::print(inverted_index_cache_limit, TUnit::BYTES) << ", origin config value: " << config::inverted_index_query_cache_limit; - // 4. init other managers RETURN_IF_ERROR(_block_spill_mgr->init()); return Status::OK(); } diff --git a/be/src/runtime/load_channel_mgr.cpp b/be/src/runtime/load_channel_mgr.cpp index 2eb6812ac5..4d98276f0d 100644 --- a/be/src/runtime/load_channel_mgr.cpp +++ b/be/src/runtime/load_channel_mgr.cpp @@ -82,8 +82,7 @@ void LoadChannelMgr::stop() { } Status LoadChannelMgr::init(int64_t process_mem_limit) { - _last_success_channel = - std::unique_ptr(new ShardedLRUCache("LastSuccessChannelCache", 1024)); + _last_success_channels = std::make_unique(1024); RETURN_IF_ERROR(_start_bg_worker()); return Status::OK(); } @@ -124,10 +123,10 @@ Status LoadChannelMgr::_get_load_channel(std::shared_ptr& channel, std::lock_guard l(_lock); auto it = _load_channels.find(load_id); if (it == _load_channels.end()) { - auto handle = _last_success_channel->lookup(load_id.to_string()); + auto handle = _last_success_channels->cache()->lookup(load_id.to_string()); // success only when eos be true if (handle != nullptr) { - _last_success_channel->release(handle); + _last_success_channels->cache()->release(handle); if (request.has_eos() && request.eos()) { is_eof = true; return Status::OK(); @@ -183,8 +182,9 @@ void LoadChannelMgr::_finish_load_channel(const UniqueId load_id) { if (_load_channels.find(load_id) != _load_channels.end()) { _load_channels.erase(load_id); } - auto handle = _last_success_channel->insert(load_id.to_string(), nullptr, 1, dummy_deleter); - _last_success_channel->release(handle); + auto handle = _last_success_channels->cache()->insert(load_id.to_string(), nullptr, 1, + dummy_deleter); + _last_success_channels->cache()->release(handle); } VLOG_CRITICAL << "removed load channel " << load_id; } diff --git a/be/src/runtime/load_channel_mgr.h b/be/src/runtime/load_channel_mgr.h index 0aeec52f24..94bd210f26 100644 --- a/be/src/runtime/load_channel_mgr.h +++ b/be/src/runtime/load_channel_mgr.h @@ -33,6 +33,7 @@ #include "olap/lru_cache.h" #include "olap/memtable_memory_limiter.h" #include "runtime/load_channel.h" +#include "runtime/memory/lru_cache_policy.h" #include "runtime/memory/mem_tracker_limiter.h" #include "runtime/thread_context.h" #include "util/countdown_latch.h" @@ -71,12 +72,20 @@ private: Status _start_bg_worker(); + class LastSuccessChannelCache : public LRUCachePolicy { + public: + LastSuccessChannelCache(size_t capacity) + : LRUCachePolicy(CachePolicy::CacheType::LAST_SUCCESS_CHANNEL_CACHE, capacity, + LRUCacheType::SIZE, -1, DEFAULT_LRU_CACHE_NUM_SHARDS, + DEFAULT_LRU_CACHE_ELEMENT_COUNT_CAPACITY, false) {} + }; + protected: // lock protect the load channel map std::mutex _lock; // load id -> load channel std::unordered_map> _load_channels; - std::unique_ptr _last_success_channel; + std::unique_ptr _last_success_channels; MemTableMemoryLimiter* _memtable_memory_limiter = nullptr; diff --git a/be/src/runtime/memory/cache_manager.cpp b/be/src/runtime/memory/cache_manager.cpp index eada36836d..0910781708 100644 --- a/be/src/runtime/memory/cache_manager.cpp +++ b/be/src/runtime/memory/cache_manager.cpp @@ -27,6 +27,9 @@ int64_t CacheManager::for_each_cache_prune_stale_wrap( int64_t freed_size = 0; std::lock_guard l(_caches_lock); for (auto cache_policy : _caches) { + if (!cache_policy->enable_prune()) { + continue; + } func(cache_policy); freed_size += cache_policy->profile()->get_counter("FreedMemory")->value(); if (cache_policy->profile()->get_counter("FreedMemory")->value() != 0 && profile) { diff --git a/be/src/runtime/memory/cache_policy.cpp b/be/src/runtime/memory/cache_policy.cpp index e79beaffa8..99af485703 100644 --- a/be/src/runtime/memory/cache_policy.cpp +++ b/be/src/runtime/memory/cache_policy.cpp @@ -21,8 +21,8 @@ namespace doris { -CachePolicy::CachePolicy(CacheType type, uint32_t stale_sweep_time_s) - : _type(type), _stale_sweep_time_s(stale_sweep_time_s) { +CachePolicy::CachePolicy(CacheType type, uint32_t stale_sweep_time_s, bool enable_prune) + : _type(type), _stale_sweep_time_s(stale_sweep_time_s), _enable_prune(enable_prune) { _it = CacheManager::instance()->register_cache(this); init_profile(); } diff --git a/be/src/runtime/memory/cache_policy.h b/be/src/runtime/memory/cache_policy.h index e931b8f131..0a57f838bf 100644 --- a/be/src/runtime/memory/cache_policy.h +++ b/be/src/runtime/memory/cache_policy.h @@ -34,7 +34,13 @@ public: SEGMENT_CACHE = 4, INVERTEDINDEX_SEARCHER_CACHE = 5, INVERTEDINDEX_QUERY_CACHE = 6, - LOOKUP_CONNECTION_CACHE = 7 + LOOKUP_CONNECTION_CACHE = 7, + POINT_QUERY_ROW_CACHE = 8, + DELETE_BITMAP_AGG_CACHE = 9, + TABLET_VERSION_CACHE = 10, + LAST_SUCCESS_CHANNEL_CACHE = 11, + COMMON_OBJ_LRU_CACHE = 12, + FOR_UT = 13 }; static std::string type_string(CacheType type) { @@ -54,7 +60,19 @@ public: case CacheType::INVERTEDINDEX_QUERY_CACHE: return "InvertedIndexQueryCache"; case CacheType::LOOKUP_CONNECTION_CACHE: - return "LookupConnectionCache"; + return "PointQueryLookupConnectionCache"; + case CacheType::POINT_QUERY_ROW_CACHE: + return "PointQueryRowCache"; + case CacheType::DELETE_BITMAP_AGG_CACHE: + return "MowDeleteBitmapAggCache"; + case CacheType::TABLET_VERSION_CACHE: + return "MowTabletVersionCache"; + case CacheType::LAST_SUCCESS_CHANNEL_CACHE: + return "LastSuccessChannelCache"; + case CacheType::COMMON_OBJ_LRU_CACHE: + return "CommonObjLRUCache"; + case CacheType::FOR_UT: + return "ForUT"; default: LOG(FATAL) << "not match type of cache policy :" << static_cast(type); } @@ -62,13 +80,14 @@ public: __builtin_unreachable(); } - CachePolicy(CacheType type, uint32_t stale_sweep_time_s); + CachePolicy(CacheType type, uint32_t stale_sweep_time_s, bool enable_prune); virtual ~CachePolicy(); virtual void prune_stale() = 0; virtual void prune_all(bool clear) = 0; CacheType type() { return _type; } + bool enable_prune() const { return _enable_prune; } RuntimeProfile* profile() { return _profile.get(); } protected: @@ -94,6 +113,7 @@ protected: RuntimeProfile::Counter* _cost_timer = nullptr; uint32_t _stale_sweep_time_s; + bool _enable_prune = true; }; } // namespace doris diff --git a/be/src/runtime/memory/lru_cache_policy.h b/be/src/runtime/memory/lru_cache_policy.h index bd301cd47b..717c90d9ef 100644 --- a/be/src/runtime/memory/lru_cache_policy.h +++ b/be/src/runtime/memory/lru_cache_policy.h @@ -17,6 +17,8 @@ #pragma once +#include + #include "olap/lru_cache.h" #include "runtime/memory/cache_policy.h" #include "util/time.h" @@ -34,29 +36,64 @@ struct LRUCacheValueBase { // Base of lru cache, allow prune stale entry and prune all entry. class LRUCachePolicy : public CachePolicy { public: - LRUCachePolicy(CacheType type, uint32_t stale_sweep_time_s) - : CachePolicy(type, stale_sweep_time_s) {}; LRUCachePolicy(CacheType type, size_t capacity, LRUCacheType lru_cache_type, - uint32_t stale_sweep_time_s, uint32_t num_shards = -1) - : CachePolicy(type, stale_sweep_time_s) { - _cache = num_shards == -1 - ? std::unique_ptr( - new ShardedLRUCache(type_string(type), capacity, lru_cache_type)) - : std::unique_ptr(new ShardedLRUCache(type_string(type), capacity, - lru_cache_type, num_shards)); + uint32_t stale_sweep_time_s, uint32_t num_shards = DEFAULT_LRU_CACHE_NUM_SHARDS, + uint32_t element_count_capacity = DEFAULT_LRU_CACHE_ELEMENT_COUNT_CAPACITY, + bool enable_prune = true) + : CachePolicy(type, stale_sweep_time_s, enable_prune) { + if (check_capacity(capacity, num_shards)) { + _cache = std::shared_ptr( + new ShardedLRUCache(type_string(type), capacity, lru_cache_type, num_shards, + element_count_capacity)); + } else { + CHECK(ExecEnv::GetInstance()->get_dummy_lru_cache()); + _cache = ExecEnv::GetInstance()->get_dummy_lru_cache(); + } + } + + LRUCachePolicy(CacheType type, size_t capacity, LRUCacheType lru_cache_type, + uint32_t stale_sweep_time_s, uint32_t num_shards, + uint32_t element_count_capacity, + CacheValueTimeExtractor cache_value_time_extractor, + bool cache_value_check_timestamp, bool enable_prune = true) + : CachePolicy(type, stale_sweep_time_s, enable_prune) { + if (check_capacity(capacity, num_shards)) { + _cache = std::shared_ptr( + new ShardedLRUCache(type_string(type), capacity, lru_cache_type, num_shards, + cache_value_time_extractor, cache_value_check_timestamp, + element_count_capacity)); + } else { + CHECK(ExecEnv::GetInstance()->get_dummy_lru_cache()); + _cache = ExecEnv::GetInstance()->get_dummy_lru_cache(); + } + } + + bool check_capacity(size_t capacity, uint32_t num_shards) { + if (capacity < num_shards) { + LOG(INFO) << fmt::format( + "{} lru cache capacity({} B) less than num_shards({}), init failed, will be " + "disabled.", + type_string(type()), capacity, num_shards); + _enable_prune = false; + return false; + } + return true; } ~LRUCachePolicy() override = default; // Try to prune the cache if expired. void prune_stale() override { + if (_stale_sweep_time_s <= 0 && _cache == ExecEnv::GetInstance()->get_dummy_lru_cache()) { + return; + } if (_cache->mem_consumption() > CACHE_MIN_FREE_SIZE) { COUNTER_SET(_cost_timer, (int64_t)0); SCOPED_TIMER(_cost_timer); const int64_t curtime = UnixMillis(); int64_t byte_size = 0L; auto pred = [this, curtime, &byte_size](const void* value) -> bool { - LRUCacheValueBase* cache_value = (LRUCacheValueBase*)value; + auto* cache_value = (LRUCacheValueBase*)value; if ((cache_value->last_visit_time + _stale_sweep_time_s * 1000) < curtime) { byte_size += cache_value->size; return true; @@ -76,6 +113,9 @@ public: } void prune_all(bool clear) override { + if (_cache == ExecEnv::GetInstance()->get_dummy_lru_cache()) { + return; + } if ((clear && _cache->mem_consumption() != 0) || _cache->mem_consumption() > CACHE_MIN_FREE_SIZE) { COUNTER_SET(_cost_timer, (int64_t)0); @@ -91,10 +131,12 @@ public: } } - Cache* get() { return _cache.get(); } + // if check_capacity failed, will return dummy lru cache, + // compatible with ShardedLRUCache usage, but will not actually cache. + Cache* cache() const { return _cache.get(); } -protected: - std::unique_ptr _cache; +private: + std::shared_ptr _cache; }; } // namespace doris diff --git a/be/src/service/point_query_executor.cpp b/be/src/service/point_query_executor.cpp index 6a0000a3c4..d122a9d3cb 100644 --- a/be/src/service/point_query_executor.cpp +++ b/be/src/service/point_query_executor.cpp @@ -111,11 +111,10 @@ LookupConnectionCache* LookupConnectionCache::create_global_instance(size_t capa return res; } -RowCache::RowCache(int64_t capacity, int num_shards) { - // Create Row Cache - _cache = std::unique_ptr( - new ShardedLRUCache("RowCache", capacity, LRUCacheType::SIZE, num_shards)); -} +RowCache::RowCache(int64_t capacity, int num_shards) + : LRUCachePolicy(CachePolicy::CacheType::POINT_QUERY_ROW_CACHE, capacity, + LRUCacheType::SIZE, config::point_query_row_cache_stale_sweep_time_sec, + num_shards) {} // Create global instance of this class RowCache* RowCache::create_global_cache(int64_t capacity, uint32_t num_shards) { @@ -130,12 +129,12 @@ RowCache* RowCache::instance() { bool RowCache::lookup(const RowCacheKey& key, CacheHandle* handle) { const std::string& encoded_key = key.encode(); - auto lru_handle = _cache->lookup(encoded_key); + auto lru_handle = cache()->lookup(encoded_key); if (!lru_handle) { // cache miss return false; } - *handle = CacheHandle(_cache.get(), lru_handle); + *handle = CacheHandle(cache(), lru_handle); return true; } @@ -145,14 +144,14 @@ void RowCache::insert(const RowCacheKey& key, const Slice& value) { memcpy(cache_value, value.data, value.size); const std::string& encoded_key = key.encode(); auto handle = - _cache->insert(encoded_key, cache_value, value.size, deleter, CachePriority::NORMAL); + cache()->insert(encoded_key, cache_value, value.size, deleter, CachePriority::NORMAL); // handle will released - auto tmp = CacheHandle {_cache.get(), handle}; + auto tmp = CacheHandle {cache(), handle}; } void RowCache::erase(const RowCacheKey& key) { const std::string& encoded_key = key.encode(); - _cache->erase(encoded_key); + cache()->erase(encoded_key); } Status PointQueryExecutor::init(const PTabletKeyLookupRequest* request, diff --git a/be/src/service/point_query_executor.h b/be/src/service/point_query_executor.h index c951cc6746..77fa8c69cd 100644 --- a/be/src/service/point_query_executor.h +++ b/be/src/service/point_query_executor.h @@ -109,7 +109,7 @@ private: }; // RowCache is a LRU cache for row store -class RowCache { +class RowCache : public LRUCachePolicy { public: // The cache key for row lru cache struct RowCacheKey { @@ -187,7 +187,6 @@ public: private: static constexpr uint32_t kDefaultNumShards = 128; RowCache(int64_t capacity, int num_shards = kDefaultNumShards); - std::unique_ptr _cache; }; // A cache used for prepare stmt. @@ -204,7 +203,8 @@ private: friend class PointQueryExecutor; LookupConnectionCache(size_t capacity) : LRUCachePolicy(CachePolicy::CacheType::LOOKUP_CONNECTION_CACHE, capacity, - LRUCacheType::SIZE, config::tablet_lookup_cache_clean_interval) {} + LRUCacheType::SIZE, config::tablet_lookup_cache_stale_sweep_time_sec) { + } std::string encode_key(__int128_t cache_id) { fmt::memory_buffer buffer; @@ -222,20 +222,20 @@ private: delete cache_value; }; LOG(INFO) << "Add item mem size " << item->mem_size() - << ", cache_capacity: " << _cache->get_total_capacity() - << ", cache_usage: " << _cache->get_usage() - << ", mem_consum: " << _cache->mem_consumption(); + << ", cache_capacity: " << cache()->get_total_capacity() + << ", cache_usage: " << cache()->get_usage() + << ", mem_consum: " << cache()->mem_consumption(); auto lru_handle = - _cache->insert(key, value, item->mem_size(), deleter, CachePriority::NORMAL); - _cache->release(lru_handle); + cache()->insert(key, value, item->mem_size(), deleter, CachePriority::NORMAL); + cache()->release(lru_handle); } std::shared_ptr get(__int128_t cache_id) { std::string key = encode_key(cache_id); - auto lru_handle = _cache->lookup(key); + auto lru_handle = cache()->lookup(key); if (lru_handle) { - Defer release([cache = _cache.get(), lru_handle] { cache->release(lru_handle); }); - auto value = (CacheValue*)_cache->value(lru_handle); + Defer release([cache = cache(), lru_handle] { cache->release(lru_handle); }); + auto value = (CacheValue*)cache()->value(lru_handle); value->last_visit_time = UnixMillis(); return value->item; } diff --git a/be/src/util/obj_lru_cache.cpp b/be/src/util/obj_lru_cache.cpp index b1f2b80258..4b61b245f2 100644 --- a/be/src/util/obj_lru_cache.cpp +++ b/be/src/util/obj_lru_cache.cpp @@ -19,30 +19,29 @@ namespace doris { -ObjLRUCache::ObjLRUCache(int64_t capacity, uint32_t num_shards) { +ObjLRUCache::ObjLRUCache(int64_t capacity, uint32_t num_shards) + : LRUCachePolicy(CachePolicy::CacheType::COMMON_OBJ_LRU_CACHE, capacity, + LRUCacheType::NUMBER, config::common_obj_lru_cache_stale_sweep_time_sec, + num_shards) { _enabled = (capacity > 0); - if (_enabled) { - _cache = std::unique_ptr( - new ShardedLRUCache("ObjLRUCache", capacity, LRUCacheType::NUMBER, num_shards)); - } } bool ObjLRUCache::lookup(const ObjKey& key, CacheHandle* handle) { if (!_enabled) { return false; } - auto lru_handle = _cache->lookup(key.key); + auto lru_handle = cache()->lookup(key.key); if (!lru_handle) { // cache miss return false; } - *handle = CacheHandle(_cache.get(), lru_handle); + *handle = CacheHandle(cache(), lru_handle); return true; } void ObjLRUCache::erase(const ObjKey& key) { if (_enabled) { - _cache->erase(key.key); + cache()->erase(key.key); } } diff --git a/be/src/util/obj_lru_cache.h b/be/src/util/obj_lru_cache.h index d280567650..f43e7a1e8d 100644 --- a/be/src/util/obj_lru_cache.h +++ b/be/src/util/obj_lru_cache.h @@ -18,14 +18,14 @@ #pragma once #include "olap/lru_cache.h" +#include "runtime/memory/lru_cache_policy.h" namespace doris { // A common object cache depends on an Sharded LRU Cache. // It has a certain capacity, which determin how many objects it can cache. // Caller must hold a CacheHandle instance when visiting the cached object. -// TODO shouble add gc prune -class ObjLRUCache { +class ObjLRUCache : public LRUCachePolicy { public: struct ObjKey { ObjKey(const std::string& key_) : key(key_) {} @@ -67,7 +67,7 @@ public: DISALLOW_COPY_AND_ASSIGN(CacheHandle); }; - ObjLRUCache(int64_t capacity, uint32_t num_shards = kDefaultNumShards); + ObjLRUCache(int64_t capacity, uint32_t num_shards = DEFAULT_LRU_CACHE_NUM_SHARDS); bool lookup(const ObjKey& key, CacheHandle* handle); @@ -85,9 +85,9 @@ public: void (*deleter)(const CacheKey& key, void* value)) { if (_enabled) { const std::string& encoded_key = key.key; - auto handle = _cache->insert(encoded_key, (void*)value, 1, deleter, - CachePriority::NORMAL, sizeof(T)); - *cache_handle = CacheHandle {_cache.get(), handle}; + auto handle = cache()->insert(encoded_key, (void*)value, 1, deleter, + CachePriority::NORMAL, sizeof(T)); + *cache_handle = CacheHandle {cache(), handle}; } else { cache_handle = nullptr; } @@ -96,8 +96,6 @@ public: void erase(const ObjKey& key); private: - static constexpr uint32_t kDefaultNumShards = 16; - std::unique_ptr _cache; bool _enabled; }; diff --git a/be/test/olap/lru_cache_test.cpp b/be/test/olap/lru_cache_test.cpp index 4f9c661e43..60dbd0402b 100644 --- a/be/test/olap/lru_cache_test.cpp +++ b/be/test/olap/lru_cache_test.cpp @@ -25,6 +25,7 @@ #include #include "gtest/gtest_pred_impl.h" +#include "runtime/memory/lru_cache_policy.h" #include "runtime/memory/mem_tracker_limiter.h" #include "testutil/test_util.h" @@ -72,25 +73,34 @@ public: _s_current->_deleted_values.push_back(DecodeValue(v)); } + class CacheTestPolicy : public LRUCachePolicy { + public: + CacheTestPolicy(size_t capacity) + : LRUCachePolicy(CachePolicy::CacheType::FOR_UT, capacity, LRUCacheType::SIZE, -1) { + } + }; + // there is 16 shards in ShardedLRUCache // And the LRUHandle size is about 100B. So the cache size should big enough // to run the UT. static const int kCacheSize = 1000 * 16; std::vector _deleted_keys; std::vector _deleted_values; - Cache* _cache; + CacheTestPolicy* _cache; - CacheTest() : _cache(new ShardedLRUCache("test", kCacheSize)) { _s_current = this; } + CacheTest() : _cache(new CacheTestPolicy(kCacheSize)) { _s_current = this; } ~CacheTest() { delete _cache; } + Cache* cache() { return _cache->cache(); } + int Lookup(int key) { std::string result; - Cache::Handle* handle = _cache->lookup(EncodeKey(&result, key)); - const int r = (handle == nullptr) ? -1 : DecodeValue(_cache->value(handle)); + Cache::Handle* handle = cache()->lookup(EncodeKey(&result, key)); + const int r = (handle == nullptr) ? -1 : DecodeValue(cache()->value(handle)); if (handle != nullptr) { - _cache->release(handle); + cache()->release(handle); } return r; @@ -98,19 +108,19 @@ public: void Insert(int key, int value, int charge) { std::string result; - _cache->release(_cache->insert(EncodeKey(&result, key), EncodeValue(value), charge, - &CacheTest::Deleter)); + cache()->release(cache()->insert(EncodeKey(&result, key), EncodeValue(value), charge, + &CacheTest::Deleter)); } void InsertDurable(int key, int value, int charge) { std::string result; - _cache->release(_cache->insert(EncodeKey(&result, key), EncodeValue(value), charge, - &CacheTest::Deleter, CachePriority::DURABLE)); + cache()->release(cache()->insert(EncodeKey(&result, key), EncodeValue(value), charge, + &CacheTest::Deleter, CachePriority::DURABLE)); } void Erase(int key) { std::string result; - _cache->erase(EncodeKey(&result, key)); + cache()->erase(EncodeKey(&result, key)); } void SetUp() {} @@ -164,16 +174,16 @@ TEST_F(CacheTest, Erase) { TEST_F(CacheTest, EntriesArePinned) { Insert(100, 101, 1); std::string result1; - Cache::Handle* h1 = _cache->lookup(EncodeKey(&result1, 100)); - EXPECT_EQ(101, DecodeValue(_cache->value(h1))); + Cache::Handle* h1 = cache()->lookup(EncodeKey(&result1, 100)); + EXPECT_EQ(101, DecodeValue(cache()->value(h1))); Insert(100, 102, 1); std::string result2; - Cache::Handle* h2 = _cache->lookup(EncodeKey(&result2, 100)); - EXPECT_EQ(102, DecodeValue(_cache->value(h2))); + Cache::Handle* h2 = cache()->lookup(EncodeKey(&result2, 100)); + EXPECT_EQ(102, DecodeValue(cache()->value(h2))); EXPECT_EQ(0, _deleted_keys.size()); - _cache->release(h1); + cache()->release(h1); EXPECT_EQ(1, _deleted_keys.size()); EXPECT_EQ(100, _deleted_keys[0]); EXPECT_EQ(101, _deleted_values[0]); @@ -182,7 +192,7 @@ TEST_F(CacheTest, EntriesArePinned) { EXPECT_EQ(-1, Lookup(100)); EXPECT_EQ(1, _deleted_keys.size()); - _cache->release(h2); + cache()->release(h2); EXPECT_EQ(2, _deleted_keys.size()); EXPECT_EQ(100, _deleted_keys[1]); EXPECT_EQ(102, _deleted_values[1]); @@ -400,8 +410,8 @@ TEST_F(CacheTest, HeavyEntries) { } TEST_F(CacheTest, NewId) { - uint64_t a = _cache->new_id(); - uint64_t b = _cache->new_id(); + uint64_t a = cache()->new_id(); + uint64_t b = cache()->new_id(); EXPECT_NE(a, b); } diff --git a/be/test/olap/rowset/segment_v2/inverted_index_searcher_cache_test.cpp b/be/test/olap/rowset/segment_v2/inverted_index_searcher_cache_test.cpp index a9e381b169..4ea9ac62a1 100644 --- a/be/test/olap/rowset/segment_v2/inverted_index_searcher_cache_test.cpp +++ b/be/test/olap/rowset/segment_v2/inverted_index_searcher_cache_test.cpp @@ -161,8 +161,7 @@ TEST_F(InvertedIndexSearcherCacheTest, evict_by_usage) { cache_value_1->size = 200; //cache_value_1->index_searcher; cache_value_1->last_visit_time = 10; - index_searcher_cache->_cache->release( - index_searcher_cache->_insert(key_1, cache_value_1.release())); + index_searcher_cache->release(index_searcher_cache->_insert(key_1, cache_value_1.release())); // should evict {key_1, cache_value_1} std::string file_name_2 = "test_2.idx"; @@ -171,8 +170,7 @@ TEST_F(InvertedIndexSearcherCacheTest, evict_by_usage) { cache_value_2->size = 800; //cache_value_2->index_searcher; cache_value_2->last_visit_time = 20; - index_searcher_cache->_cache->release( - index_searcher_cache->_insert(key_2, cache_value_2.release())); + index_searcher_cache->release(index_searcher_cache->_insert(key_2, cache_value_2.release())); { InvertedIndexCacheHandle cache_handle; // lookup key_1 @@ -188,8 +186,7 @@ TEST_F(InvertedIndexSearcherCacheTest, evict_by_usage) { cache_value_3->size = 400; //cache_value_3->index_searcher; cache_value_3->last_visit_time = 30; - index_searcher_cache->_cache->release( - index_searcher_cache->_insert(key_3, cache_value_3.release())); + index_searcher_cache->release(index_searcher_cache->_insert(key_3, cache_value_3.release())); { InvertedIndexCacheHandle cache_handle; // lookup key_2 @@ -205,8 +202,7 @@ TEST_F(InvertedIndexSearcherCacheTest, evict_by_usage) { cache_value_4->size = 100; //cache_value_4->index_searcher; cache_value_4->last_visit_time = 40; - index_searcher_cache->_cache->release( - index_searcher_cache->_insert(key_4, cache_value_4.release())); + index_searcher_cache->release(index_searcher_cache->_insert(key_4, cache_value_4.release())); { InvertedIndexCacheHandle cache_handle; // lookup key_3 @@ -228,8 +224,7 @@ TEST_F(InvertedIndexSearcherCacheTest, evict_by_element_count_limit) { cache_value_1->size = 20; //cache_value_1->index_searcher; cache_value_1->last_visit_time = 10; - index_searcher_cache->_cache->release( - index_searcher_cache->_insert(key_1, cache_value_1.release())); + index_searcher_cache->release(index_searcher_cache->_insert(key_1, cache_value_1.release())); // no need evict std::string file_name_2 = "test_2.idx"; @@ -238,8 +233,7 @@ TEST_F(InvertedIndexSearcherCacheTest, evict_by_element_count_limit) { cache_value_2->size = 50; //cache_value_2->index_searcher; cache_value_2->last_visit_time = 20; - index_searcher_cache->_cache->release( - index_searcher_cache->_insert(key_2, cache_value_2.release())); + index_searcher_cache->release(index_searcher_cache->_insert(key_2, cache_value_2.release())); { InvertedIndexCacheHandle cache_handle; // lookup key_1 @@ -259,8 +253,7 @@ TEST_F(InvertedIndexSearcherCacheTest, evict_by_element_count_limit) { cache_value_3->size = 80; //cache_value_3->index_searcher; cache_value_3->last_visit_time = 30; - index_searcher_cache->_cache->release( - index_searcher_cache->_insert(key_3, cache_value_3.release())); + index_searcher_cache->release(index_searcher_cache->_insert(key_3, cache_value_3.release())); { InvertedIndexCacheHandle cache_handle; // lookup key_1 @@ -286,8 +279,7 @@ TEST_F(InvertedIndexSearcherCacheTest, evict_by_element_count_limit) { cache_value_4->size = 100; //cache_value_4->index_searcher; cache_value_4->last_visit_time = 40; - index_searcher_cache->_cache->release( - index_searcher_cache->_insert(key_4, cache_value_4.release())); + index_searcher_cache->release(index_searcher_cache->_insert(key_4, cache_value_4.release())); { InvertedIndexCacheHandle cache_handle; // lookup key_1 @@ -321,8 +313,7 @@ TEST_F(InvertedIndexSearcherCacheTest, remove_element_only_in_table) { cache_value_1->size = 200; //cache_value_1->index_searcher; cache_value_1->last_visit_time = 10; - index_searcher_cache->_cache->release( - index_searcher_cache->_insert(key_1, cache_value_1.release())); + index_searcher_cache->release(index_searcher_cache->_insert(key_1, cache_value_1.release())); std::string file_name_2 = "test_2.idx"; InvertedIndexSearcherCache::CacheKey key_2(file_name_2); @@ -338,7 +329,7 @@ TEST_F(InvertedIndexSearcherCacheTest, remove_element_only_in_table) { cache_value_2->size = 800; //cache_value_2->index_searcher; cache_value_2->last_visit_time = 20; - index_searcher_cache->_cache->release( + index_searcher_cache->release( index_searcher_cache->_insert(key_2, cache_value_2.release())); // lookup key_2, key_2 has removed from table due to cache is full diff --git a/be/test/testutil/run_all_tests.cpp b/be/test/testutil/run_all_tests.cpp index c9ea1c552e..fe514d1a38 100644 --- a/be/test/testutil/run_all_tests.cpp +++ b/be/test/testutil/run_all_tests.cpp @@ -46,6 +46,7 @@ int main(int argc, char** argv) { doris::ExecEnv::GetInstance()->set_tablet_schema_cache( doris::TabletSchemaCache::create_global_schema_cache()); doris::ExecEnv::GetInstance()->get_tablet_schema_cache()->start(); + doris::ExecEnv::GetInstance()->set_dummy_lru_cache(std::make_shared()); doris::ExecEnv::GetInstance()->set_storage_page_cache( doris::StoragePageCache::create_global_cache(1 << 30, 10, 0)); doris::ExecEnv::GetInstance()->set_segment_loader(new doris::SegmentLoader(1000));