diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp index 1d297ece45..120ca28be7 100644 --- a/be/src/common/config.cpp +++ b/be/src/common/config.cpp @@ -275,7 +275,8 @@ DEFINE_mInt64(memory_limitation_per_thread_for_storage_migration_bytes, "1000000 DEFINE_mInt32(cache_prune_stale_interval, "10"); // the clean interval of tablet lookup cache -DEFINE_mInt32(tablet_lookup_cache_clean_interval, "30"); +DEFINE_mInt32(tablet_lookup_cache_stale_sweep_time_sec, "30"); +DEFINE_mInt32(point_query_row_cache_stale_sweep_time_sec, "300"); DEFINE_mInt32(disk_stat_monitor_interval, "5"); DEFINE_mInt32(unused_rowset_monitor_interval, "30"); DEFINE_String(storage_root_path, "${DORIS_HOME}/storage"); @@ -809,6 +810,9 @@ DEFINE_mInt32(external_table_connect_timeout_sec, "30"); // Global bitmap cache capacity for aggregation cache, size in bytes DEFINE_Int64(delete_bitmap_agg_cache_capacity, "104857600"); +DEFINE_mInt32(delete_bitmap_agg_cache_stale_sweep_time_sec, "1800"); + +DEFINE_mInt32(common_obj_lru_cache_stale_sweep_time_sec, "900"); // s3 config DEFINE_mInt32(max_remote_storage_count, "10"); diff --git a/be/src/common/config.h b/be/src/common/config.h index 4e09bb2b08..68c02941cd 100644 --- a/be/src/common/config.h +++ b/be/src/common/config.h @@ -318,7 +318,8 @@ DECLARE_mInt64(memory_limitation_per_thread_for_storage_migration_bytes); // the prune stale interval of all cache DECLARE_mInt32(cache_prune_stale_interval); // the clean interval of tablet lookup cache -DECLARE_mInt32(tablet_lookup_cache_clean_interval); +DECLARE_mInt32(tablet_lookup_cache_stale_sweep_time_sec); +DECLARE_mInt32(point_query_row_cache_stale_sweep_time_sec); DECLARE_mInt32(disk_stat_monitor_interval); DECLARE_mInt32(unused_rowset_monitor_interval); DECLARE_String(storage_root_path); @@ -863,6 +864,10 @@ DECLARE_mInt32(external_table_connect_timeout_sec); // Global bitmap cache capacity for aggregation cache, size in bytes DECLARE_Int64(delete_bitmap_agg_cache_capacity); +DECLARE_mInt32(delete_bitmap_agg_cache_stale_sweep_time_sec); + +// A common object cache depends on an Sharded LRU Cache. +DECLARE_mInt32(common_obj_lru_cache_stale_sweep_time_sec); // s3 config DECLARE_mInt32(max_remote_storage_count); diff --git a/be/src/olap/lru_cache.cpp b/be/src/olap/lru_cache.cpp index 0de99e20b2..efc9287047 100644 --- a/be/src/olap/lru_cache.cpp +++ b/be/src/olap/lru_cache.cpp @@ -667,4 +667,39 @@ void ShardedLRUCache::update_cache_metrics() const { total_lookup_count == 0 ? 0 : ((double)total_hit_count / total_lookup_count)); } +Cache::Handle* DummyLRUCache::insert(const CacheKey& key, void* value, size_t charge, + void (*deleter)(const CacheKey& key, void* value), + CachePriority priority, size_t bytes) { + size_t handle_size = sizeof(LRUHandle) - 1 + key.size(); + auto* e = reinterpret_cast(malloc(handle_size)); + e->value = value; + e->deleter = deleter; + e->charge = charge; + e->key_length = 0; + e->total_size = 0; + e->bytes = 0; + e->hash = 0; + e->refs = 1; // only one for the returned handle + e->next = e->prev = nullptr; + e->in_cache = false; + return reinterpret_cast(e); +} + +void DummyLRUCache::release(Cache::Handle* handle) { + if (handle == nullptr) { + return; + } + auto* e = reinterpret_cast(handle); + e->free(); +} + +void* DummyLRUCache::value(Handle* handle) { + return reinterpret_cast(handle)->value; +} + +Slice DummyLRUCache::value_slice(Handle* handle) { + auto* lru_handle = reinterpret_cast(handle); + return Slice((char*)lru_handle->value, lru_handle->charge); +} + } // namespace doris diff --git a/be/src/olap/lru_cache.h b/be/src/olap/lru_cache.h index 8608a85bf2..6ecab80022 100644 --- a/be/src/olap/lru_cache.h +++ b/be/src/olap/lru_cache.h @@ -52,12 +52,17 @@ namespace doris { } while (0) class Cache; +class LRUCachePolicy; enum LRUCacheType { SIZE, // The capacity of cache is based on the size of cache entry. NUMBER // The capacity of cache is based on the number of cache entry. }; +static constexpr LRUCacheType DEFAULT_LRU_CACHE_TYPE = LRUCacheType::SIZE; +static constexpr uint32_t DEFAULT_LRU_CACHE_NUM_SHARDS = 16; +static constexpr size_t DEFAULT_LRU_CACHE_ELEMENT_COUNT_CAPACITY = 0; + class CacheKey { public: CacheKey() : _data(nullptr), _size(0) {} @@ -263,8 +268,10 @@ struct LRUHandle { void free() { (*deleter)(key(), value); - THREAD_MEM_TRACKER_TRANSFER_FROM(bytes, mem_tracker); - DorisMetrics::instance()->lru_cache_memory_bytes->increment(-bytes); + if (bytes != 0) { // DummyLRUCache bytes always equal to 0 + THREAD_MEM_TRACKER_TRANSFER_FROM(bytes, mem_tracker); + DorisMetrics::instance()->lru_cache_memory_bytes->increment(-bytes); + } ::free(this); } }; @@ -330,6 +337,7 @@ public: } // Like Cache methods, but with an extra "hash" parameter. + // Must call release on the returned handle pointer. Cache::Handle* insert(const CacheKey& key, uint32_t hash, void* value, size_t charge, void (*deleter)(const CacheKey& key, void* value), MemTrackerLimiter* tracker, @@ -389,14 +397,6 @@ private: class ShardedLRUCache : public Cache { public: - explicit ShardedLRUCache(const std::string& name, size_t total_capacity, - LRUCacheType type = LRUCacheType::SIZE, uint32_t num_shards = 16, - uint32_t element_count_capacity = 0); - explicit ShardedLRUCache(const std::string& name, size_t total_capacity, LRUCacheType type, - uint32_t num_shards, - CacheValueTimeExtractor cache_value_time_extractor, - bool cache_value_check_timestamp, uint32_t element_count_capacity = 0); - // TODO(fdy): 析构时清除所有cache元素 virtual ~ShardedLRUCache(); virtual Handle* insert(const CacheKey& key, void* value, size_t charge, void (*deleter)(const CacheKey& key, void* value), @@ -415,6 +415,16 @@ public: size_t get_total_capacity() override { return _total_capacity; }; private: + // LRUCache can only be created and managed with LRUCachePolicy. + friend class LRUCachePolicy; + + explicit ShardedLRUCache(const std::string& name, size_t total_capacity, LRUCacheType type, + uint32_t num_shards, uint32_t element_count_capacity); + explicit ShardedLRUCache(const std::string& name, size_t total_capacity, LRUCacheType type, + uint32_t num_shards, + CacheValueTimeExtractor cache_value_time_extractor, + bool cache_value_check_timestamp, uint32_t element_count_capacity); + void update_cache_metrics() const; static std::string lru_cache_type_string(LRUCacheType type) { @@ -456,4 +466,24 @@ private: std::unique_ptr>> _lookup_count_per_second; }; +// Compatible with ShardedLRUCache usage, but will not actually cache. +class DummyLRUCache : public Cache { +public: + // Must call release on the returned handle pointer. + Handle* insert(const CacheKey& key, void* value, size_t charge, + void (*deleter)(const CacheKey& key, void* value), + CachePriority priority = CachePriority::NORMAL, size_t bytes = -1) override; + Handle* lookup(const CacheKey& key) override { return nullptr; }; + void release(Handle* handle) override; + void erase(const CacheKey& key) override {}; + void* value(Handle* handle) override; + Slice value_slice(Handle* handle) override; + uint64_t new_id() override { return 0; }; + int64_t prune() override { return 0; }; + int64_t prune_if(CacheValuePredicate pred, bool lazy_mode = false) override { return 0; }; + int64_t mem_consumption() override { return 0; }; + int64_t get_usage() override { return 0; }; + size_t get_total_capacity() override { return 0; }; +}; + } // namespace doris diff --git a/be/src/olap/page_cache.cpp b/be/src/olap/page_cache.cpp index 1779c293f8..6fbcbbe19f 100644 --- a/be/src/olap/page_cache.cpp +++ b/be/src/olap/page_cache.cpp @@ -48,10 +48,8 @@ StoragePageCache::StoragePageCache(size_t capacity, int32_t index_cache_percenta } else { CHECK(false) << "invalid index page cache percentage"; } - if (pk_index_cache_capacity > 0) { - _pk_index_page_cache = - std::make_unique(pk_index_cache_capacity, num_shards); - } + + _pk_index_page_cache = std::make_unique(pk_index_cache_capacity, num_shards); } bool StoragePageCache::lookup(const CacheKey& key, PageCacheHandle* handle, diff --git a/be/src/olap/page_cache.h b/be/src/olap/page_cache.h index 2d93574455..a604d20550 100644 --- a/be/src/olap/page_cache.h +++ b/be/src/olap/page_cache.h @@ -157,12 +157,6 @@ public: void insert(const CacheKey& key, DataPage* data, PageCacheHandle* handle, segment_v2::PageTypePB page_type, bool in_memory = false); - // Page cache available check. - // When percentage is set to 0 or 100, the index or data cache will not be allocated. - bool is_cache_available(segment_v2::PageTypePB page_type) { - return _get_page_cache(page_type) != nullptr; - } - private: StoragePageCache(); @@ -177,26 +171,19 @@ private: Cache* _get_page_cache(segment_v2::PageTypePB page_type) { switch (page_type) { case segment_v2::DATA_PAGE: { - if (_data_page_cache) { - return _data_page_cache->get(); - } - return nullptr; + return _data_page_cache->cache(); } case segment_v2::INDEX_PAGE: { - if (_index_page_cache) { - return _index_page_cache->get(); - } - return nullptr; + return _index_page_cache->cache(); } case segment_v2::PRIMARY_KEY_INDEX_PAGE: { - if (_pk_index_page_cache) { - return _pk_index_page_cache->get(); - } - return nullptr; + return _pk_index_page_cache->cache(); } default: - return nullptr; + LOG(FATAL) << "get error type page cache"; } + LOG(FATAL) << "__builtin_unreachable"; + __builtin_unreachable(); } }; diff --git a/be/src/olap/rowset/segment_v2/inverted_index_cache.cpp b/be/src/olap/rowset/segment_v2/inverted_index_cache.cpp index e88573c1be..f5259e8139 100644 --- a/be/src/olap/rowset/segment_v2/inverted_index_cache.cpp +++ b/be/src/olap/rowset/segment_v2/inverted_index_cache.cpp @@ -86,11 +86,7 @@ InvertedIndexSearcherCache* InvertedIndexSearcherCache::create_global_instance( return new InvertedIndexSearcherCache(capacity, num_shards); } -InvertedIndexSearcherCache::InvertedIndexSearcherCache(size_t capacity, uint32_t num_shards) - : LRUCachePolicy(CachePolicy::CacheType::INVERTEDINDEX_SEARCHER_CACHE, - config::inverted_index_cache_stale_sweep_time_sec), - _mem_tracker(std::make_unique("InvertedIndexSearcherCache")) { - SCOPED_CONSUME_MEM_TRACKER(_mem_tracker.get()); +InvertedIndexSearcherCache::InvertedIndexSearcherCache(size_t capacity, uint32_t num_shards) { uint64_t fd_number = config::min_file_descriptor_number; struct rlimit l; int ret = getrlimit(RLIMIT_NOFILE, &l); @@ -113,13 +109,11 @@ InvertedIndexSearcherCache::InvertedIndexSearcherCache(size_t capacity, uint32_t auto* cache_value = (InvertedIndexSearcherCache::CacheValue*)value; return cache_value->last_visit_time; }; - _cache = std::unique_ptr( - new ShardedLRUCache("InvertedIndexSearcherCache", capacity, LRUCacheType::SIZE, - num_shards, get_last_visit_time, true, open_searcher_limit)); + _policy = std::make_unique( + capacity, num_shards, open_searcher_limit, get_last_visit_time, true); } else { - _cache = std::unique_ptr(new ShardedLRUCache("InvertedIndexSearcherCache", capacity, - LRUCacheType::SIZE, num_shards, - open_searcher_limit)); + _policy = std::make_unique(capacity, num_shards, + open_searcher_limit); } } @@ -208,8 +202,8 @@ Status InvertedIndexSearcherCache::get_index_searcher( IndexCacheValuePtr cache_value = std::make_unique(); cache_value->index_searcher = std::move(index_searcher); cache_value->size = mem_tracker->consumption(); - *cache_handle = - InvertedIndexCacheHandle(_cache.get(), _insert(cache_key, cache_value.release())); + *cache_handle = InvertedIndexCacheHandle(_policy->cache(), + _insert(cache_key, cache_value.release())); } else { cache_handle->index_searcher = std::move(index_searcher); } @@ -280,30 +274,27 @@ Status InvertedIndexSearcherCache::insert(const io::FileSystemSPtr& fs, cache_value->size = mem_tracker->consumption(); cache_value->last_visit_time = UnixMillis(); auto* lru_handle = _insert(cache_key, cache_value.release()); - _cache->release(lru_handle); + _policy->cache()->release(lru_handle); return Status::OK(); } Status InvertedIndexSearcherCache::erase(const std::string& index_file_path) { InvertedIndexSearcherCache::CacheKey cache_key(index_file_path); - _cache->erase(cache_key.index_file_path); + _policy->cache()->erase(cache_key.index_file_path); return Status::OK(); } int64_t InvertedIndexSearcherCache::mem_consumption() { - if (_cache) { - return _cache->mem_consumption(); - } - return 0L; + return _policy->cache()->mem_consumption(); } bool InvertedIndexSearcherCache::_lookup(const InvertedIndexSearcherCache::CacheKey& key, InvertedIndexCacheHandle* handle) { - auto* lru_handle = _cache->lookup(key.index_file_path); + auto* lru_handle = _policy->cache()->lookup(key.index_file_path); if (lru_handle == nullptr) { return false; } - *handle = InvertedIndexCacheHandle(_cache.get(), lru_handle); + *handle = InvertedIndexCacheHandle(_policy->cache(), lru_handle); return true; } @@ -314,8 +305,8 @@ Cache::Handle* InvertedIndexSearcherCache::_insert(const InvertedIndexSearcherCa delete cache_value; }; - Cache::Handle* lru_handle = - _cache->insert(key.index_file_path, value, value->size, deleter, CachePriority::NORMAL); + Cache::Handle* lru_handle = _policy->cache()->insert(key.index_file_path, value, value->size, + deleter, CachePriority::NORMAL); return lru_handle; } @@ -323,11 +314,11 @@ bool InvertedIndexQueryCache::lookup(const CacheKey& key, InvertedIndexQueryCach if (key.encode().empty()) { return false; } - auto* lru_handle = _cache->lookup(key.encode()); + auto* lru_handle = cache()->lookup(key.encode()); if (lru_handle == nullptr) { return false; } - *handle = InvertedIndexQueryCacheHandle(_cache.get(), lru_handle); + *handle = InvertedIndexQueryCacheHandle(cache(), lru_handle); return true; } @@ -346,16 +337,13 @@ void InvertedIndexQueryCache::insert(const CacheKey& key, std::shared_ptrinsert(key.encode(), (void*)cache_value_ptr.release(), - bitmap->getSizeInBytes(), deleter, CachePriority::NORMAL); - *handle = InvertedIndexQueryCacheHandle(_cache.get(), lru_handle); + auto* lru_handle = cache()->insert(key.encode(), (void*)cache_value_ptr.release(), + bitmap->getSizeInBytes(), deleter, CachePriority::NORMAL); + *handle = InvertedIndexQueryCacheHandle(cache(), lru_handle); } int64_t InvertedIndexQueryCache::mem_consumption() { - if (_cache) { - return _cache->mem_consumption(); - } - return 0L; + return cache()->mem_consumption(); } } // namespace doris::segment_v2 diff --git a/be/src/olap/rowset/segment_v2/inverted_index_cache.h b/be/src/olap/rowset/segment_v2/inverted_index_cache.h index 8d7d90eea2..51791f0e10 100644 --- a/be/src/olap/rowset/segment_v2/inverted_index_cache.h +++ b/be/src/olap/rowset/segment_v2/inverted_index_cache.h @@ -94,7 +94,7 @@ public: OptionalIndexSearcherPtr& output_searcher) override; }; -class InvertedIndexSearcherCache : public LRUCachePolicy { +class InvertedIndexSearcherCache { public: // The cache key of index_searcher lru cache struct CacheKey { @@ -113,12 +113,6 @@ public: static InvertedIndexSearcherCache* create_global_instance(size_t capacity, uint32_t num_shards = 16); - void reset() { - _cache.reset(); - _mem_tracker.reset(); - // Reset or clear the state of the object. - } - // Return global instance. // Client should call create_global_cache before. static InvertedIndexSearcherCache* instance() { @@ -139,11 +133,32 @@ public: // function `erase` called after compaction remove segment Status erase(const std::string& index_file_path); + void release(Cache::Handle* handle) { _policy->cache()->release(handle); } + int64_t mem_consumption(); private: InvertedIndexSearcherCache(); + class InvertedIndexSearcherCachePolicy : public LRUCachePolicy { + public: + InvertedIndexSearcherCachePolicy(size_t capacity, uint32_t num_shards, + uint32_t element_count_capacity) + : LRUCachePolicy(CachePolicy::CacheType::INVERTEDINDEX_SEARCHER_CACHE, capacity, + LRUCacheType::SIZE, + config::inverted_index_cache_stale_sweep_time_sec, num_shards, + element_count_capacity, true) {} + InvertedIndexSearcherCachePolicy(size_t capacity, uint32_t num_shards, + uint32_t element_count_capacity, + CacheValueTimeExtractor cache_value_time_extractor, + bool cache_value_check_timestamp) + : LRUCachePolicy(CachePolicy::CacheType::INVERTEDINDEX_SEARCHER_CACHE, capacity, + LRUCacheType::SIZE, + config::inverted_index_cache_stale_sweep_time_sec, num_shards, + element_count_capacity, cache_value_time_extractor, + cache_value_check_timestamp, true) {} + }; + // Lookup the given index_searcher in the cache. // If the index_searcher is found, the cache entry will be written into handle. // Return true if entry is found, otherwise return false. @@ -154,8 +169,7 @@ private: // This function is thread-safe. Cache::Handle* _insert(const InvertedIndexSearcherCache::CacheKey& key, CacheValue* value); -private: - std::unique_ptr _mem_tracker; + std::unique_ptr _policy; }; using IndexCacheValuePtr = std::unique_ptr; diff --git a/be/src/olap/rowset/segment_v2/page_io.cpp b/be/src/olap/rowset/segment_v2/page_io.cpp index c712478850..dac9012abc 100644 --- a/be/src/olap/rowset/segment_v2/page_io.cpp +++ b/be/src/olap/rowset/segment_v2/page_io.cpp @@ -120,8 +120,7 @@ Status PageIO::read_and_decompress_page(const PageReadOptions& opts, PageHandle* PageCacheHandle cache_handle; StoragePageCache::CacheKey cache_key(opts.file_reader->path().native(), opts.file_reader->size(), opts.page_pointer.offset); - if (opts.use_page_cache && cache->is_cache_available(opts.type) && - cache->lookup(cache_key, &cache_handle, opts.type)) { + if (opts.use_page_cache && cache && cache->lookup(cache_key, &cache_handle, opts.type)) { // we find page in cache, use it *handle = PageHandle(std::move(cache_handle)); opts.stats->cached_pages_num++; @@ -218,7 +217,7 @@ Status PageIO::read_and_decompress_page(const PageReadOptions& opts, PageHandle* *body = Slice(page_slice.data, page_slice.size - 4 - footer_size); page->reset_size(page_slice.size); - if (opts.use_page_cache && cache->is_cache_available(opts.type)) { + if (opts.use_page_cache && cache) { // insert this page into cache and return the cache handle cache->insert(cache_key, page.get(), &cache_handle, opts.type, opts.kept_in_memory); *handle = PageHandle(std::move(cache_handle)); diff --git a/be/src/olap/schema_cache.h b/be/src/olap/schema_cache.h index b7f640551d..58bf05bbfc 100644 --- a/be/src/olap/schema_cache.h +++ b/be/src/olap/schema_cache.h @@ -65,10 +65,10 @@ public: if (!instance() || schema_key.empty()) { return {}; } - auto lru_handle = _cache->lookup(schema_key); + auto lru_handle = cache()->lookup(schema_key); if (lru_handle) { - Defer release([cache = _cache.get(), lru_handle] { cache->release(lru_handle); }); - auto value = (CacheValue*)_cache->value(lru_handle); + Defer release([cache = cache(), lru_handle] { cache->release(lru_handle); }); + auto value = (CacheValue*)cache()->value(lru_handle); value->last_visit_time = UnixMillis(); VLOG_DEBUG << "use cache schema"; if constexpr (std::is_same_v) { @@ -101,8 +101,8 @@ public: delete cache_value; }; auto lru_handle = - _cache->insert(key, value, 1, deleter, CachePriority::NORMAL, schema->mem_size()); - _cache->release(lru_handle); + cache()->insert(key, value, 1, deleter, CachePriority::NORMAL, schema->mem_size()); + cache()->release(lru_handle); } // Try to prune the cache if expired. diff --git a/be/src/olap/segment_loader.cpp b/be/src/olap/segment_loader.cpp index 02539a3f16..0d068bfcbe 100644 --- a/be/src/olap/segment_loader.cpp +++ b/be/src/olap/segment_loader.cpp @@ -29,11 +29,11 @@ SegmentLoader* SegmentLoader::instance() { } bool SegmentCache::lookup(const SegmentCache::CacheKey& key, SegmentCacheHandle* handle) { - auto lru_handle = _cache->lookup(key.encode()); + auto lru_handle = cache()->lookup(key.encode()); if (lru_handle == nullptr) { return false; } - handle->push_segment(_cache.get(), lru_handle); + handle->push_segment(cache(), lru_handle); return true; } @@ -45,13 +45,13 @@ void SegmentCache::insert(const SegmentCache::CacheKey& key, SegmentCache::Cache delete cache_value; }; - auto lru_handle = _cache->insert(key.encode(), &value, 1, deleter, CachePriority::NORMAL, - value.segment->meta_mem_usage()); - handle->push_segment(_cache.get(), lru_handle); + auto lru_handle = cache()->insert(key.encode(), &value, 1, deleter, CachePriority::NORMAL, + value.segment->meta_mem_usage()); + handle->push_segment(cache(), lru_handle); } void SegmentCache::erase(const SegmentCache::CacheKey& key) { - _cache->erase(key.encode()); + cache()->erase(key.encode()); } Status SegmentLoader::load_segments(const BetaRowsetSharedPtr& rowset, diff --git a/be/src/olap/tablet_meta.cpp b/be/src/olap/tablet_meta.cpp index a67103a893..793bde7b08 100644 --- a/be/src/olap/tablet_meta.cpp +++ b/be/src/olap/tablet_meta.cpp @@ -1074,7 +1074,7 @@ std::shared_ptr DeleteBitmap::get_agg(const BitmapKey& bmk) co &val->bitmap, [this, handle](...) { _agg_cache->repr()->release(handle); }); } -std::atomic DeleteBitmap::AggCache::s_repr {nullptr}; +std::atomic DeleteBitmap::AggCache::s_repr {nullptr}; std::string tablet_state_name(TabletState state) { switch (state) { diff --git a/be/src/olap/tablet_meta.h b/be/src/olap/tablet_meta.h index 00886bb848..2d14ef599f 100644 --- a/be/src/olap/tablet_meta.h +++ b/be/src/olap/tablet_meta.h @@ -46,6 +46,7 @@ #include "olap/olap_common.h" #include "olap/rowset/rowset_meta.h" #include "olap/tablet_schema.h" +#include "runtime/memory/lru_cache_policy.h" #include "util/uid_util.h" namespace json2pb { @@ -469,6 +470,14 @@ public: */ std::shared_ptr get_agg(const BitmapKey& bmk) const; + class AggCachePolicy : public LRUCachePolicy { + public: + AggCachePolicy(size_t capacity) + : LRUCachePolicy(CachePolicy::CacheType::DELETE_BITMAP_AGG_CACHE, capacity, + LRUCacheType::SIZE, + config::delete_bitmap_agg_cache_stale_sweep_time_sec, 256) {} + }; + class AggCache { public: struct Value { @@ -478,8 +487,7 @@ public: AggCache(size_t size_in_bytes) { static std::once_flag once; std::call_once(once, [size_in_bytes] { - auto tmp = new ShardedLRUCache("DeleteBitmap AggCache", size_in_bytes, - LRUCacheType::SIZE, 256); + auto* tmp = new AggCachePolicy(size_in_bytes); AggCache::s_repr.store(tmp, std::memory_order_release); }); @@ -487,8 +495,8 @@ public: } } - static ShardedLRUCache* repr() { return s_repr.load(std::memory_order_acquire); } - static std::atomic s_repr; + static Cache* repr() { return s_repr.load(std::memory_order_acquire)->cache(); } + static std::atomic s_repr; }; private: diff --git a/be/src/olap/txn_manager.cpp b/be/src/olap/txn_manager.cpp index 2b1e57925f..90c277d8ad 100644 --- a/be/src/olap/txn_manager.cpp +++ b/be/src/olap/txn_manager.cpp @@ -116,8 +116,7 @@ TxnManager::TxnManager(int32_t txn_map_shard_size, int32_t txn_shard_size) _txn_tablet_delta_writer_map = new txn_tablet_delta_writer_map_t[_txn_map_shard_size]; _txn_tablet_delta_writer_map_locks = new std::shared_mutex[_txn_map_shard_size]; // For debugging - _tablet_version_cache = std::unique_ptr( - new ShardedLRUCache("TabletVersionCache", 100000, LRUCacheType::NUMBER, 32)); + _tablet_version_cache = std::make_unique(100000); } // prepare txn should always be allowed because ingest task will be retried @@ -889,12 +888,12 @@ int64_t TxnManager::get_txn_by_tablet_version(int64_t tablet_id, int64_t version memcpy(key + sizeof(int64_t), &version, sizeof(int64_t)); CacheKey cache_key((const char*)&key, sizeof(key)); - auto handle = _tablet_version_cache->lookup(cache_key); + auto* handle = _tablet_version_cache->cache()->lookup(cache_key); if (handle == nullptr) { return -1; } - int64_t res = *(int64_t*)_tablet_version_cache->value(handle); - _tablet_version_cache->release(handle); + int64_t res = *(int64_t*)_tablet_version_cache->cache()->value(handle); + _tablet_version_cache->cache()->release(handle); return res; } @@ -911,9 +910,9 @@ void TxnManager::update_tablet_version_txn(int64_t tablet_id, int64_t version, i delete cache_value; }; - auto handle = _tablet_version_cache->insert(cache_key, value, 1, deleter, CachePriority::NORMAL, - sizeof(txn_id)); - _tablet_version_cache->release(handle); + auto* handle = _tablet_version_cache->cache()->insert(cache_key, value, 1, deleter, + CachePriority::NORMAL, sizeof(txn_id)); + _tablet_version_cache->cache()->release(handle); } TxnState TxnManager::get_txn_state(TPartitionId partition_id, TTransactionId transaction_id, diff --git a/be/src/olap/txn_manager.h b/be/src/olap/txn_manager.h index 74ad589cba..25b8da90b7 100644 --- a/be/src/olap/txn_manager.h +++ b/be/src/olap/txn_manager.h @@ -44,6 +44,7 @@ #include "olap/rowset/segment_v2/segment_writer.h" #include "olap/tablet.h" #include "olap/tablet_meta.h" +#include "runtime/memory/lru_cache_policy.h" #include "util/time.h" #include "vec/core/block.h" @@ -220,6 +221,14 @@ private: void _insert_txn_partition_map_unlocked(int64_t transaction_id, int64_t partition_id); void _clear_txn_partition_map_unlocked(int64_t transaction_id, int64_t partition_id); + class TabletVersionCache : public LRUCachePolicy { + public: + TabletVersionCache(size_t capacity) + : LRUCachePolicy(CachePolicy::CacheType::TABLET_VERSION_CACHE, capacity, + LRUCacheType::NUMBER, -1, DEFAULT_LRU_CACHE_NUM_SHARDS, + DEFAULT_LRU_CACHE_ELEMENT_COUNT_CAPACITY, false) {} + }; + private: const int32_t _txn_map_shard_size; @@ -238,7 +247,7 @@ private: std::shared_mutex* _txn_mutex = nullptr; txn_tablet_delta_writer_map_t* _txn_tablet_delta_writer_map = nullptr; - std::unique_ptr _tablet_version_cache; + std::unique_ptr _tablet_version_cache; std::shared_mutex* _txn_tablet_delta_writer_map_locks = nullptr; DISALLOW_COPY_AND_ASSIGN(TxnManager); }; // TxnManager diff --git a/be/src/runtime/exec_env.h b/be/src/runtime/exec_env.h index f8a52ad1ec..c97ff55a20 100644 --- a/be/src/runtime/exec_env.h +++ b/be/src/runtime/exec_env.h @@ -102,6 +102,7 @@ class StoragePageCache; class SegmentLoader; class LookupConnectionCache; class RowCache; +class DummyLRUCache; class CacheManager; class WalManager; @@ -232,6 +233,9 @@ public: this->_routine_load_task_executor = r; } void set_wal_mgr(std::shared_ptr wm) { this->_wal_manager = wm; } + void set_dummy_lru_cache(std::shared_ptr dummy_lru_cache) { + this->_dummy_lru_cache = dummy_lru_cache; + } #endif stream_load::LoadStreamStubPool* load_stream_stub_pool() { @@ -261,6 +265,7 @@ public: segment_v2::InvertedIndexQueryCache* get_inverted_index_query_cache() { return _inverted_index_query_cache; } + std::shared_ptr get_dummy_lru_cache() { return _dummy_lru_cache; } std::shared_ptr get_global_block_scheduler() { return _global_block_scheduler; @@ -371,6 +376,7 @@ private: CacheManager* _cache_manager = nullptr; segment_v2::InvertedIndexSearcherCache* _inverted_index_searcher_cache = nullptr; segment_v2::InvertedIndexQueryCache* _inverted_index_query_cache = nullptr; + std::shared_ptr _dummy_lru_cache = nullptr; // used for query with group cpu hard limit std::shared_ptr _global_block_scheduler; diff --git a/be/src/runtime/exec_env_init.cpp b/be/src/runtime/exec_env_init.cpp index c204ead069..e612cb816b 100644 --- a/be/src/runtime/exec_env_init.cpp +++ b/be/src/runtime/exec_env_init.cpp @@ -216,7 +216,6 @@ Status ExecEnv::_init(const std::vector& store_paths, _small_file_mgr = new SmallFileMgr(this, config::small_file_dir); _block_spill_mgr = new BlockSpillManager(store_paths); _group_commit_mgr = new GroupCommitMgr(this); - _file_meta_cache = new FileMetaCache(config::max_external_file_meta_cache_num); _memtable_memory_limiter = std::make_unique(); _load_stream_stub_pool = std::make_unique(); _delta_writer_v2_pool = std::make_unique(); @@ -371,13 +370,13 @@ Status ExecEnv::_init_mem_env() { init_hook(); #endif - // 2. init buffer pool if (!BitUtil::IsPowerOf2(config::min_buffer_size)) { ss << "Config min_buffer_size must be a power-of-two: " << config::min_buffer_size; return Status::InternalError(ss.str()); } - // 3. init storage page cache + _dummy_lru_cache = std::make_shared(); + _cache_manager = CacheManager::create_global_instance(); int64_t storage_cache_limit = @@ -396,6 +395,12 @@ Status ExecEnv::_init_mem_env() { << ". Please modify the 'storage_page_cache_shard_size' parameter in your " "conf file to be a power of two for better performance."; } + if (storage_cache_limit < num_shards * 2) { + LOG(WARNING) << "storage_cache_limit(" << storage_cache_limit << ") less than num_shards(" + << num_shards + << ") * 2, cache capacity will be 0, continuing to use " + "cache will only have negative effects, will be disabled."; + } int64_t pk_storage_page_cache_limit = ParseUtil::parse_mem_spec(config::pk_storage_page_cache_limit, MemInfo::mem_limit(), MemInfo::physical_mem(), &is_percent); @@ -442,6 +447,8 @@ Status ExecEnv::_init_mem_env() { _schema_cache = new SchemaCache(config::schema_cache_capacity); + _file_meta_cache = new FileMetaCache(config::max_external_file_meta_cache_num); + _lookup_connection_cache = LookupConnectionCache::create_global_instance( config::lookup_connection_cache_bytes_limit); @@ -473,7 +480,6 @@ Status ExecEnv::_init_mem_env() { << PrettyPrinter::print(inverted_index_cache_limit, TUnit::BYTES) << ", origin config value: " << config::inverted_index_query_cache_limit; - // 4. init other managers RETURN_IF_ERROR(_block_spill_mgr->init()); return Status::OK(); } diff --git a/be/src/runtime/load_channel_mgr.cpp b/be/src/runtime/load_channel_mgr.cpp index 2eb6812ac5..4d98276f0d 100644 --- a/be/src/runtime/load_channel_mgr.cpp +++ b/be/src/runtime/load_channel_mgr.cpp @@ -82,8 +82,7 @@ void LoadChannelMgr::stop() { } Status LoadChannelMgr::init(int64_t process_mem_limit) { - _last_success_channel = - std::unique_ptr(new ShardedLRUCache("LastSuccessChannelCache", 1024)); + _last_success_channels = std::make_unique(1024); RETURN_IF_ERROR(_start_bg_worker()); return Status::OK(); } @@ -124,10 +123,10 @@ Status LoadChannelMgr::_get_load_channel(std::shared_ptr& channel, std::lock_guard l(_lock); auto it = _load_channels.find(load_id); if (it == _load_channels.end()) { - auto handle = _last_success_channel->lookup(load_id.to_string()); + auto handle = _last_success_channels->cache()->lookup(load_id.to_string()); // success only when eos be true if (handle != nullptr) { - _last_success_channel->release(handle); + _last_success_channels->cache()->release(handle); if (request.has_eos() && request.eos()) { is_eof = true; return Status::OK(); @@ -183,8 +182,9 @@ void LoadChannelMgr::_finish_load_channel(const UniqueId load_id) { if (_load_channels.find(load_id) != _load_channels.end()) { _load_channels.erase(load_id); } - auto handle = _last_success_channel->insert(load_id.to_string(), nullptr, 1, dummy_deleter); - _last_success_channel->release(handle); + auto handle = _last_success_channels->cache()->insert(load_id.to_string(), nullptr, 1, + dummy_deleter); + _last_success_channels->cache()->release(handle); } VLOG_CRITICAL << "removed load channel " << load_id; } diff --git a/be/src/runtime/load_channel_mgr.h b/be/src/runtime/load_channel_mgr.h index 0aeec52f24..94bd210f26 100644 --- a/be/src/runtime/load_channel_mgr.h +++ b/be/src/runtime/load_channel_mgr.h @@ -33,6 +33,7 @@ #include "olap/lru_cache.h" #include "olap/memtable_memory_limiter.h" #include "runtime/load_channel.h" +#include "runtime/memory/lru_cache_policy.h" #include "runtime/memory/mem_tracker_limiter.h" #include "runtime/thread_context.h" #include "util/countdown_latch.h" @@ -71,12 +72,20 @@ private: Status _start_bg_worker(); + class LastSuccessChannelCache : public LRUCachePolicy { + public: + LastSuccessChannelCache(size_t capacity) + : LRUCachePolicy(CachePolicy::CacheType::LAST_SUCCESS_CHANNEL_CACHE, capacity, + LRUCacheType::SIZE, -1, DEFAULT_LRU_CACHE_NUM_SHARDS, + DEFAULT_LRU_CACHE_ELEMENT_COUNT_CAPACITY, false) {} + }; + protected: // lock protect the load channel map std::mutex _lock; // load id -> load channel std::unordered_map> _load_channels; - std::unique_ptr _last_success_channel; + std::unique_ptr _last_success_channels; MemTableMemoryLimiter* _memtable_memory_limiter = nullptr; diff --git a/be/src/runtime/memory/cache_manager.cpp b/be/src/runtime/memory/cache_manager.cpp index eada36836d..0910781708 100644 --- a/be/src/runtime/memory/cache_manager.cpp +++ b/be/src/runtime/memory/cache_manager.cpp @@ -27,6 +27,9 @@ int64_t CacheManager::for_each_cache_prune_stale_wrap( int64_t freed_size = 0; std::lock_guard l(_caches_lock); for (auto cache_policy : _caches) { + if (!cache_policy->enable_prune()) { + continue; + } func(cache_policy); freed_size += cache_policy->profile()->get_counter("FreedMemory")->value(); if (cache_policy->profile()->get_counter("FreedMemory")->value() != 0 && profile) { diff --git a/be/src/runtime/memory/cache_policy.cpp b/be/src/runtime/memory/cache_policy.cpp index e79beaffa8..99af485703 100644 --- a/be/src/runtime/memory/cache_policy.cpp +++ b/be/src/runtime/memory/cache_policy.cpp @@ -21,8 +21,8 @@ namespace doris { -CachePolicy::CachePolicy(CacheType type, uint32_t stale_sweep_time_s) - : _type(type), _stale_sweep_time_s(stale_sweep_time_s) { +CachePolicy::CachePolicy(CacheType type, uint32_t stale_sweep_time_s, bool enable_prune) + : _type(type), _stale_sweep_time_s(stale_sweep_time_s), _enable_prune(enable_prune) { _it = CacheManager::instance()->register_cache(this); init_profile(); } diff --git a/be/src/runtime/memory/cache_policy.h b/be/src/runtime/memory/cache_policy.h index e931b8f131..0a57f838bf 100644 --- a/be/src/runtime/memory/cache_policy.h +++ b/be/src/runtime/memory/cache_policy.h @@ -34,7 +34,13 @@ public: SEGMENT_CACHE = 4, INVERTEDINDEX_SEARCHER_CACHE = 5, INVERTEDINDEX_QUERY_CACHE = 6, - LOOKUP_CONNECTION_CACHE = 7 + LOOKUP_CONNECTION_CACHE = 7, + POINT_QUERY_ROW_CACHE = 8, + DELETE_BITMAP_AGG_CACHE = 9, + TABLET_VERSION_CACHE = 10, + LAST_SUCCESS_CHANNEL_CACHE = 11, + COMMON_OBJ_LRU_CACHE = 12, + FOR_UT = 13 }; static std::string type_string(CacheType type) { @@ -54,7 +60,19 @@ public: case CacheType::INVERTEDINDEX_QUERY_CACHE: return "InvertedIndexQueryCache"; case CacheType::LOOKUP_CONNECTION_CACHE: - return "LookupConnectionCache"; + return "PointQueryLookupConnectionCache"; + case CacheType::POINT_QUERY_ROW_CACHE: + return "PointQueryRowCache"; + case CacheType::DELETE_BITMAP_AGG_CACHE: + return "MowDeleteBitmapAggCache"; + case CacheType::TABLET_VERSION_CACHE: + return "MowTabletVersionCache"; + case CacheType::LAST_SUCCESS_CHANNEL_CACHE: + return "LastSuccessChannelCache"; + case CacheType::COMMON_OBJ_LRU_CACHE: + return "CommonObjLRUCache"; + case CacheType::FOR_UT: + return "ForUT"; default: LOG(FATAL) << "not match type of cache policy :" << static_cast(type); } @@ -62,13 +80,14 @@ public: __builtin_unreachable(); } - CachePolicy(CacheType type, uint32_t stale_sweep_time_s); + CachePolicy(CacheType type, uint32_t stale_sweep_time_s, bool enable_prune); virtual ~CachePolicy(); virtual void prune_stale() = 0; virtual void prune_all(bool clear) = 0; CacheType type() { return _type; } + bool enable_prune() const { return _enable_prune; } RuntimeProfile* profile() { return _profile.get(); } protected: @@ -94,6 +113,7 @@ protected: RuntimeProfile::Counter* _cost_timer = nullptr; uint32_t _stale_sweep_time_s; + bool _enable_prune = true; }; } // namespace doris diff --git a/be/src/runtime/memory/lru_cache_policy.h b/be/src/runtime/memory/lru_cache_policy.h index bd301cd47b..717c90d9ef 100644 --- a/be/src/runtime/memory/lru_cache_policy.h +++ b/be/src/runtime/memory/lru_cache_policy.h @@ -17,6 +17,8 @@ #pragma once +#include + #include "olap/lru_cache.h" #include "runtime/memory/cache_policy.h" #include "util/time.h" @@ -34,29 +36,64 @@ struct LRUCacheValueBase { // Base of lru cache, allow prune stale entry and prune all entry. class LRUCachePolicy : public CachePolicy { public: - LRUCachePolicy(CacheType type, uint32_t stale_sweep_time_s) - : CachePolicy(type, stale_sweep_time_s) {}; LRUCachePolicy(CacheType type, size_t capacity, LRUCacheType lru_cache_type, - uint32_t stale_sweep_time_s, uint32_t num_shards = -1) - : CachePolicy(type, stale_sweep_time_s) { - _cache = num_shards == -1 - ? std::unique_ptr( - new ShardedLRUCache(type_string(type), capacity, lru_cache_type)) - : std::unique_ptr(new ShardedLRUCache(type_string(type), capacity, - lru_cache_type, num_shards)); + uint32_t stale_sweep_time_s, uint32_t num_shards = DEFAULT_LRU_CACHE_NUM_SHARDS, + uint32_t element_count_capacity = DEFAULT_LRU_CACHE_ELEMENT_COUNT_CAPACITY, + bool enable_prune = true) + : CachePolicy(type, stale_sweep_time_s, enable_prune) { + if (check_capacity(capacity, num_shards)) { + _cache = std::shared_ptr( + new ShardedLRUCache(type_string(type), capacity, lru_cache_type, num_shards, + element_count_capacity)); + } else { + CHECK(ExecEnv::GetInstance()->get_dummy_lru_cache()); + _cache = ExecEnv::GetInstance()->get_dummy_lru_cache(); + } + } + + LRUCachePolicy(CacheType type, size_t capacity, LRUCacheType lru_cache_type, + uint32_t stale_sweep_time_s, uint32_t num_shards, + uint32_t element_count_capacity, + CacheValueTimeExtractor cache_value_time_extractor, + bool cache_value_check_timestamp, bool enable_prune = true) + : CachePolicy(type, stale_sweep_time_s, enable_prune) { + if (check_capacity(capacity, num_shards)) { + _cache = std::shared_ptr( + new ShardedLRUCache(type_string(type), capacity, lru_cache_type, num_shards, + cache_value_time_extractor, cache_value_check_timestamp, + element_count_capacity)); + } else { + CHECK(ExecEnv::GetInstance()->get_dummy_lru_cache()); + _cache = ExecEnv::GetInstance()->get_dummy_lru_cache(); + } + } + + bool check_capacity(size_t capacity, uint32_t num_shards) { + if (capacity < num_shards) { + LOG(INFO) << fmt::format( + "{} lru cache capacity({} B) less than num_shards({}), init failed, will be " + "disabled.", + type_string(type()), capacity, num_shards); + _enable_prune = false; + return false; + } + return true; } ~LRUCachePolicy() override = default; // Try to prune the cache if expired. void prune_stale() override { + if (_stale_sweep_time_s <= 0 && _cache == ExecEnv::GetInstance()->get_dummy_lru_cache()) { + return; + } if (_cache->mem_consumption() > CACHE_MIN_FREE_SIZE) { COUNTER_SET(_cost_timer, (int64_t)0); SCOPED_TIMER(_cost_timer); const int64_t curtime = UnixMillis(); int64_t byte_size = 0L; auto pred = [this, curtime, &byte_size](const void* value) -> bool { - LRUCacheValueBase* cache_value = (LRUCacheValueBase*)value; + auto* cache_value = (LRUCacheValueBase*)value; if ((cache_value->last_visit_time + _stale_sweep_time_s * 1000) < curtime) { byte_size += cache_value->size; return true; @@ -76,6 +113,9 @@ public: } void prune_all(bool clear) override { + if (_cache == ExecEnv::GetInstance()->get_dummy_lru_cache()) { + return; + } if ((clear && _cache->mem_consumption() != 0) || _cache->mem_consumption() > CACHE_MIN_FREE_SIZE) { COUNTER_SET(_cost_timer, (int64_t)0); @@ -91,10 +131,12 @@ public: } } - Cache* get() { return _cache.get(); } + // if check_capacity failed, will return dummy lru cache, + // compatible with ShardedLRUCache usage, but will not actually cache. + Cache* cache() const { return _cache.get(); } -protected: - std::unique_ptr _cache; +private: + std::shared_ptr _cache; }; } // namespace doris diff --git a/be/src/service/point_query_executor.cpp b/be/src/service/point_query_executor.cpp index 6a0000a3c4..d122a9d3cb 100644 --- a/be/src/service/point_query_executor.cpp +++ b/be/src/service/point_query_executor.cpp @@ -111,11 +111,10 @@ LookupConnectionCache* LookupConnectionCache::create_global_instance(size_t capa return res; } -RowCache::RowCache(int64_t capacity, int num_shards) { - // Create Row Cache - _cache = std::unique_ptr( - new ShardedLRUCache("RowCache", capacity, LRUCacheType::SIZE, num_shards)); -} +RowCache::RowCache(int64_t capacity, int num_shards) + : LRUCachePolicy(CachePolicy::CacheType::POINT_QUERY_ROW_CACHE, capacity, + LRUCacheType::SIZE, config::point_query_row_cache_stale_sweep_time_sec, + num_shards) {} // Create global instance of this class RowCache* RowCache::create_global_cache(int64_t capacity, uint32_t num_shards) { @@ -130,12 +129,12 @@ RowCache* RowCache::instance() { bool RowCache::lookup(const RowCacheKey& key, CacheHandle* handle) { const std::string& encoded_key = key.encode(); - auto lru_handle = _cache->lookup(encoded_key); + auto lru_handle = cache()->lookup(encoded_key); if (!lru_handle) { // cache miss return false; } - *handle = CacheHandle(_cache.get(), lru_handle); + *handle = CacheHandle(cache(), lru_handle); return true; } @@ -145,14 +144,14 @@ void RowCache::insert(const RowCacheKey& key, const Slice& value) { memcpy(cache_value, value.data, value.size); const std::string& encoded_key = key.encode(); auto handle = - _cache->insert(encoded_key, cache_value, value.size, deleter, CachePriority::NORMAL); + cache()->insert(encoded_key, cache_value, value.size, deleter, CachePriority::NORMAL); // handle will released - auto tmp = CacheHandle {_cache.get(), handle}; + auto tmp = CacheHandle {cache(), handle}; } void RowCache::erase(const RowCacheKey& key) { const std::string& encoded_key = key.encode(); - _cache->erase(encoded_key); + cache()->erase(encoded_key); } Status PointQueryExecutor::init(const PTabletKeyLookupRequest* request, diff --git a/be/src/service/point_query_executor.h b/be/src/service/point_query_executor.h index c951cc6746..77fa8c69cd 100644 --- a/be/src/service/point_query_executor.h +++ b/be/src/service/point_query_executor.h @@ -109,7 +109,7 @@ private: }; // RowCache is a LRU cache for row store -class RowCache { +class RowCache : public LRUCachePolicy { public: // The cache key for row lru cache struct RowCacheKey { @@ -187,7 +187,6 @@ public: private: static constexpr uint32_t kDefaultNumShards = 128; RowCache(int64_t capacity, int num_shards = kDefaultNumShards); - std::unique_ptr _cache; }; // A cache used for prepare stmt. @@ -204,7 +203,8 @@ private: friend class PointQueryExecutor; LookupConnectionCache(size_t capacity) : LRUCachePolicy(CachePolicy::CacheType::LOOKUP_CONNECTION_CACHE, capacity, - LRUCacheType::SIZE, config::tablet_lookup_cache_clean_interval) {} + LRUCacheType::SIZE, config::tablet_lookup_cache_stale_sweep_time_sec) { + } std::string encode_key(__int128_t cache_id) { fmt::memory_buffer buffer; @@ -222,20 +222,20 @@ private: delete cache_value; }; LOG(INFO) << "Add item mem size " << item->mem_size() - << ", cache_capacity: " << _cache->get_total_capacity() - << ", cache_usage: " << _cache->get_usage() - << ", mem_consum: " << _cache->mem_consumption(); + << ", cache_capacity: " << cache()->get_total_capacity() + << ", cache_usage: " << cache()->get_usage() + << ", mem_consum: " << cache()->mem_consumption(); auto lru_handle = - _cache->insert(key, value, item->mem_size(), deleter, CachePriority::NORMAL); - _cache->release(lru_handle); + cache()->insert(key, value, item->mem_size(), deleter, CachePriority::NORMAL); + cache()->release(lru_handle); } std::shared_ptr get(__int128_t cache_id) { std::string key = encode_key(cache_id); - auto lru_handle = _cache->lookup(key); + auto lru_handle = cache()->lookup(key); if (lru_handle) { - Defer release([cache = _cache.get(), lru_handle] { cache->release(lru_handle); }); - auto value = (CacheValue*)_cache->value(lru_handle); + Defer release([cache = cache(), lru_handle] { cache->release(lru_handle); }); + auto value = (CacheValue*)cache()->value(lru_handle); value->last_visit_time = UnixMillis(); return value->item; } diff --git a/be/src/util/obj_lru_cache.cpp b/be/src/util/obj_lru_cache.cpp index b1f2b80258..4b61b245f2 100644 --- a/be/src/util/obj_lru_cache.cpp +++ b/be/src/util/obj_lru_cache.cpp @@ -19,30 +19,29 @@ namespace doris { -ObjLRUCache::ObjLRUCache(int64_t capacity, uint32_t num_shards) { +ObjLRUCache::ObjLRUCache(int64_t capacity, uint32_t num_shards) + : LRUCachePolicy(CachePolicy::CacheType::COMMON_OBJ_LRU_CACHE, capacity, + LRUCacheType::NUMBER, config::common_obj_lru_cache_stale_sweep_time_sec, + num_shards) { _enabled = (capacity > 0); - if (_enabled) { - _cache = std::unique_ptr( - new ShardedLRUCache("ObjLRUCache", capacity, LRUCacheType::NUMBER, num_shards)); - } } bool ObjLRUCache::lookup(const ObjKey& key, CacheHandle* handle) { if (!_enabled) { return false; } - auto lru_handle = _cache->lookup(key.key); + auto lru_handle = cache()->lookup(key.key); if (!lru_handle) { // cache miss return false; } - *handle = CacheHandle(_cache.get(), lru_handle); + *handle = CacheHandle(cache(), lru_handle); return true; } void ObjLRUCache::erase(const ObjKey& key) { if (_enabled) { - _cache->erase(key.key); + cache()->erase(key.key); } } diff --git a/be/src/util/obj_lru_cache.h b/be/src/util/obj_lru_cache.h index d280567650..f43e7a1e8d 100644 --- a/be/src/util/obj_lru_cache.h +++ b/be/src/util/obj_lru_cache.h @@ -18,14 +18,14 @@ #pragma once #include "olap/lru_cache.h" +#include "runtime/memory/lru_cache_policy.h" namespace doris { // A common object cache depends on an Sharded LRU Cache. // It has a certain capacity, which determin how many objects it can cache. // Caller must hold a CacheHandle instance when visiting the cached object. -// TODO shouble add gc prune -class ObjLRUCache { +class ObjLRUCache : public LRUCachePolicy { public: struct ObjKey { ObjKey(const std::string& key_) : key(key_) {} @@ -67,7 +67,7 @@ public: DISALLOW_COPY_AND_ASSIGN(CacheHandle); }; - ObjLRUCache(int64_t capacity, uint32_t num_shards = kDefaultNumShards); + ObjLRUCache(int64_t capacity, uint32_t num_shards = DEFAULT_LRU_CACHE_NUM_SHARDS); bool lookup(const ObjKey& key, CacheHandle* handle); @@ -85,9 +85,9 @@ public: void (*deleter)(const CacheKey& key, void* value)) { if (_enabled) { const std::string& encoded_key = key.key; - auto handle = _cache->insert(encoded_key, (void*)value, 1, deleter, - CachePriority::NORMAL, sizeof(T)); - *cache_handle = CacheHandle {_cache.get(), handle}; + auto handle = cache()->insert(encoded_key, (void*)value, 1, deleter, + CachePriority::NORMAL, sizeof(T)); + *cache_handle = CacheHandle {cache(), handle}; } else { cache_handle = nullptr; } @@ -96,8 +96,6 @@ public: void erase(const ObjKey& key); private: - static constexpr uint32_t kDefaultNumShards = 16; - std::unique_ptr _cache; bool _enabled; }; diff --git a/be/test/olap/lru_cache_test.cpp b/be/test/olap/lru_cache_test.cpp index 4f9c661e43..60dbd0402b 100644 --- a/be/test/olap/lru_cache_test.cpp +++ b/be/test/olap/lru_cache_test.cpp @@ -25,6 +25,7 @@ #include #include "gtest/gtest_pred_impl.h" +#include "runtime/memory/lru_cache_policy.h" #include "runtime/memory/mem_tracker_limiter.h" #include "testutil/test_util.h" @@ -72,25 +73,34 @@ public: _s_current->_deleted_values.push_back(DecodeValue(v)); } + class CacheTestPolicy : public LRUCachePolicy { + public: + CacheTestPolicy(size_t capacity) + : LRUCachePolicy(CachePolicy::CacheType::FOR_UT, capacity, LRUCacheType::SIZE, -1) { + } + }; + // there is 16 shards in ShardedLRUCache // And the LRUHandle size is about 100B. So the cache size should big enough // to run the UT. static const int kCacheSize = 1000 * 16; std::vector _deleted_keys; std::vector _deleted_values; - Cache* _cache; + CacheTestPolicy* _cache; - CacheTest() : _cache(new ShardedLRUCache("test", kCacheSize)) { _s_current = this; } + CacheTest() : _cache(new CacheTestPolicy(kCacheSize)) { _s_current = this; } ~CacheTest() { delete _cache; } + Cache* cache() { return _cache->cache(); } + int Lookup(int key) { std::string result; - Cache::Handle* handle = _cache->lookup(EncodeKey(&result, key)); - const int r = (handle == nullptr) ? -1 : DecodeValue(_cache->value(handle)); + Cache::Handle* handle = cache()->lookup(EncodeKey(&result, key)); + const int r = (handle == nullptr) ? -1 : DecodeValue(cache()->value(handle)); if (handle != nullptr) { - _cache->release(handle); + cache()->release(handle); } return r; @@ -98,19 +108,19 @@ public: void Insert(int key, int value, int charge) { std::string result; - _cache->release(_cache->insert(EncodeKey(&result, key), EncodeValue(value), charge, - &CacheTest::Deleter)); + cache()->release(cache()->insert(EncodeKey(&result, key), EncodeValue(value), charge, + &CacheTest::Deleter)); } void InsertDurable(int key, int value, int charge) { std::string result; - _cache->release(_cache->insert(EncodeKey(&result, key), EncodeValue(value), charge, - &CacheTest::Deleter, CachePriority::DURABLE)); + cache()->release(cache()->insert(EncodeKey(&result, key), EncodeValue(value), charge, + &CacheTest::Deleter, CachePriority::DURABLE)); } void Erase(int key) { std::string result; - _cache->erase(EncodeKey(&result, key)); + cache()->erase(EncodeKey(&result, key)); } void SetUp() {} @@ -164,16 +174,16 @@ TEST_F(CacheTest, Erase) { TEST_F(CacheTest, EntriesArePinned) { Insert(100, 101, 1); std::string result1; - Cache::Handle* h1 = _cache->lookup(EncodeKey(&result1, 100)); - EXPECT_EQ(101, DecodeValue(_cache->value(h1))); + Cache::Handle* h1 = cache()->lookup(EncodeKey(&result1, 100)); + EXPECT_EQ(101, DecodeValue(cache()->value(h1))); Insert(100, 102, 1); std::string result2; - Cache::Handle* h2 = _cache->lookup(EncodeKey(&result2, 100)); - EXPECT_EQ(102, DecodeValue(_cache->value(h2))); + Cache::Handle* h2 = cache()->lookup(EncodeKey(&result2, 100)); + EXPECT_EQ(102, DecodeValue(cache()->value(h2))); EXPECT_EQ(0, _deleted_keys.size()); - _cache->release(h1); + cache()->release(h1); EXPECT_EQ(1, _deleted_keys.size()); EXPECT_EQ(100, _deleted_keys[0]); EXPECT_EQ(101, _deleted_values[0]); @@ -182,7 +192,7 @@ TEST_F(CacheTest, EntriesArePinned) { EXPECT_EQ(-1, Lookup(100)); EXPECT_EQ(1, _deleted_keys.size()); - _cache->release(h2); + cache()->release(h2); EXPECT_EQ(2, _deleted_keys.size()); EXPECT_EQ(100, _deleted_keys[1]); EXPECT_EQ(102, _deleted_values[1]); @@ -400,8 +410,8 @@ TEST_F(CacheTest, HeavyEntries) { } TEST_F(CacheTest, NewId) { - uint64_t a = _cache->new_id(); - uint64_t b = _cache->new_id(); + uint64_t a = cache()->new_id(); + uint64_t b = cache()->new_id(); EXPECT_NE(a, b); } diff --git a/be/test/olap/rowset/segment_v2/inverted_index_searcher_cache_test.cpp b/be/test/olap/rowset/segment_v2/inverted_index_searcher_cache_test.cpp index a9e381b169..4ea9ac62a1 100644 --- a/be/test/olap/rowset/segment_v2/inverted_index_searcher_cache_test.cpp +++ b/be/test/olap/rowset/segment_v2/inverted_index_searcher_cache_test.cpp @@ -161,8 +161,7 @@ TEST_F(InvertedIndexSearcherCacheTest, evict_by_usage) { cache_value_1->size = 200; //cache_value_1->index_searcher; cache_value_1->last_visit_time = 10; - index_searcher_cache->_cache->release( - index_searcher_cache->_insert(key_1, cache_value_1.release())); + index_searcher_cache->release(index_searcher_cache->_insert(key_1, cache_value_1.release())); // should evict {key_1, cache_value_1} std::string file_name_2 = "test_2.idx"; @@ -171,8 +170,7 @@ TEST_F(InvertedIndexSearcherCacheTest, evict_by_usage) { cache_value_2->size = 800; //cache_value_2->index_searcher; cache_value_2->last_visit_time = 20; - index_searcher_cache->_cache->release( - index_searcher_cache->_insert(key_2, cache_value_2.release())); + index_searcher_cache->release(index_searcher_cache->_insert(key_2, cache_value_2.release())); { InvertedIndexCacheHandle cache_handle; // lookup key_1 @@ -188,8 +186,7 @@ TEST_F(InvertedIndexSearcherCacheTest, evict_by_usage) { cache_value_3->size = 400; //cache_value_3->index_searcher; cache_value_3->last_visit_time = 30; - index_searcher_cache->_cache->release( - index_searcher_cache->_insert(key_3, cache_value_3.release())); + index_searcher_cache->release(index_searcher_cache->_insert(key_3, cache_value_3.release())); { InvertedIndexCacheHandle cache_handle; // lookup key_2 @@ -205,8 +202,7 @@ TEST_F(InvertedIndexSearcherCacheTest, evict_by_usage) { cache_value_4->size = 100; //cache_value_4->index_searcher; cache_value_4->last_visit_time = 40; - index_searcher_cache->_cache->release( - index_searcher_cache->_insert(key_4, cache_value_4.release())); + index_searcher_cache->release(index_searcher_cache->_insert(key_4, cache_value_4.release())); { InvertedIndexCacheHandle cache_handle; // lookup key_3 @@ -228,8 +224,7 @@ TEST_F(InvertedIndexSearcherCacheTest, evict_by_element_count_limit) { cache_value_1->size = 20; //cache_value_1->index_searcher; cache_value_1->last_visit_time = 10; - index_searcher_cache->_cache->release( - index_searcher_cache->_insert(key_1, cache_value_1.release())); + index_searcher_cache->release(index_searcher_cache->_insert(key_1, cache_value_1.release())); // no need evict std::string file_name_2 = "test_2.idx"; @@ -238,8 +233,7 @@ TEST_F(InvertedIndexSearcherCacheTest, evict_by_element_count_limit) { cache_value_2->size = 50; //cache_value_2->index_searcher; cache_value_2->last_visit_time = 20; - index_searcher_cache->_cache->release( - index_searcher_cache->_insert(key_2, cache_value_2.release())); + index_searcher_cache->release(index_searcher_cache->_insert(key_2, cache_value_2.release())); { InvertedIndexCacheHandle cache_handle; // lookup key_1 @@ -259,8 +253,7 @@ TEST_F(InvertedIndexSearcherCacheTest, evict_by_element_count_limit) { cache_value_3->size = 80; //cache_value_3->index_searcher; cache_value_3->last_visit_time = 30; - index_searcher_cache->_cache->release( - index_searcher_cache->_insert(key_3, cache_value_3.release())); + index_searcher_cache->release(index_searcher_cache->_insert(key_3, cache_value_3.release())); { InvertedIndexCacheHandle cache_handle; // lookup key_1 @@ -286,8 +279,7 @@ TEST_F(InvertedIndexSearcherCacheTest, evict_by_element_count_limit) { cache_value_4->size = 100; //cache_value_4->index_searcher; cache_value_4->last_visit_time = 40; - index_searcher_cache->_cache->release( - index_searcher_cache->_insert(key_4, cache_value_4.release())); + index_searcher_cache->release(index_searcher_cache->_insert(key_4, cache_value_4.release())); { InvertedIndexCacheHandle cache_handle; // lookup key_1 @@ -321,8 +313,7 @@ TEST_F(InvertedIndexSearcherCacheTest, remove_element_only_in_table) { cache_value_1->size = 200; //cache_value_1->index_searcher; cache_value_1->last_visit_time = 10; - index_searcher_cache->_cache->release( - index_searcher_cache->_insert(key_1, cache_value_1.release())); + index_searcher_cache->release(index_searcher_cache->_insert(key_1, cache_value_1.release())); std::string file_name_2 = "test_2.idx"; InvertedIndexSearcherCache::CacheKey key_2(file_name_2); @@ -338,7 +329,7 @@ TEST_F(InvertedIndexSearcherCacheTest, remove_element_only_in_table) { cache_value_2->size = 800; //cache_value_2->index_searcher; cache_value_2->last_visit_time = 20; - index_searcher_cache->_cache->release( + index_searcher_cache->release( index_searcher_cache->_insert(key_2, cache_value_2.release())); // lookup key_2, key_2 has removed from table due to cache is full diff --git a/be/test/testutil/run_all_tests.cpp b/be/test/testutil/run_all_tests.cpp index c9ea1c552e..fe514d1a38 100644 --- a/be/test/testutil/run_all_tests.cpp +++ b/be/test/testutil/run_all_tests.cpp @@ -46,6 +46,7 @@ int main(int argc, char** argv) { doris::ExecEnv::GetInstance()->set_tablet_schema_cache( doris::TabletSchemaCache::create_global_schema_cache()); doris::ExecEnv::GetInstance()->get_tablet_schema_cache()->start(); + doris::ExecEnv::GetInstance()->set_dummy_lru_cache(std::make_shared()); doris::ExecEnv::GetInstance()->set_storage_page_cache( doris::StoragePageCache::create_global_cache(1 << 30, 10, 0)); doris::ExecEnv::GetInstance()->set_segment_loader(new doris::SegmentLoader(1000));