// Copyright (c) 2011 The LevelDB Authors. All rights reserved. // Use of this source code is governed by a BSD-style license that can be // found in the LICENSE file. See the AUTHORS file for names of contributors. #include "olap/lru_cache.h" #include #include #include #include #include #include "gutil/bits.h" #include "olap/olap_common.h" #include "olap/olap_define.h" #include "olap/row_block.h" #include "olap/utils.h" #include "runtime/thread_context.h" #include "util/doris_metrics.h" using std::string; using std::stringstream; namespace doris { DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(capacity, MetricUnit::BYTES); DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(usage, MetricUnit::BYTES); DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(usage_ratio, MetricUnit::NOUNIT); DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(lookup_count, MetricUnit::OPERATIONS); DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(hit_count, MetricUnit::OPERATIONS); DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(hit_ratio, MetricUnit::NOUNIT); uint32_t CacheKey::hash(const char* data, size_t n, uint32_t seed) const { // Similar to murmur hash const uint32_t m = 0xc6a4a793; const uint32_t r = 24; const char* limit = data + n; uint32_t h = seed ^ (n * m); // Pick up four bytes at a time while (data + 4 <= limit) { uint32_t w = _decode_fixed32(data); data += 4; h += w; h *= m; h ^= (h >> 16); } // Pick up remaining bytes switch (limit - data) { case 3: h += static_cast(data[2]) << 16; // fall through case 2: h += static_cast(data[1]) << 8; // fall through case 1: h += static_cast(data[0]); h *= m; h ^= (h >> r); break; default: break; } return h; } Cache::~Cache() {} HandleTable::~HandleTable() { delete[] _list; } // LRU cache implementation LRUHandle* HandleTable::lookup(const CacheKey& key, uint32_t hash) { return *_find_pointer(key, hash); } LRUHandle* HandleTable::insert(LRUHandle* h) { LRUHandle** ptr = _find_pointer(h->key(), h->hash); LRUHandle* old = *ptr; h->next_hash = old ? old->next_hash : nullptr; *ptr = h; if (old == nullptr) { ++_elems; if (_elems > _length) { // Since each cache entry is fairly large, we aim for a small // average linked list length (<= 1). _resize(); } } return old; } LRUHandle* HandleTable::remove(const CacheKey& key, uint32_t hash) { LRUHandle** ptr = _find_pointer(key, hash); LRUHandle* result = *ptr; if (result != nullptr) { *ptr = result->next_hash; _elems--; } return result; } bool HandleTable::remove(const LRUHandle* h) { LRUHandle** ptr = &(_list[h->hash & (_length - 1)]); while (*ptr != nullptr && *ptr != h) { ptr = &(*ptr)->next_hash; } LRUHandle* result = *ptr; if (result != nullptr) { *ptr = result->next_hash; _elems--; return true; } return false; } LRUHandle** HandleTable::_find_pointer(const CacheKey& key, uint32_t hash) { LRUHandle** ptr = &(_list[hash & (_length - 1)]); while (*ptr != nullptr && ((*ptr)->hash != hash || key != (*ptr)->key())) { ptr = &(*ptr)->next_hash; } return ptr; } void HandleTable::_resize() { uint32_t new_length = 16; while (new_length < _elems * 1.5) { new_length *= 2; } LRUHandle** new_list = new (std::nothrow) LRUHandle*[new_length]; memset(new_list, 0, sizeof(new_list[0]) * new_length); uint32_t count = 0; for (uint32_t i = 0; i < _length; i++) { LRUHandle* h = _list[i]; while (h != nullptr) { LRUHandle* next = h->next_hash; uint32_t hash = h->hash; LRUHandle** ptr = &new_list[hash & (new_length - 1)]; h->next_hash = *ptr; *ptr = h; h = next; count++; } } DCHECK_EQ(_elems, count); delete[] _list; _list = new_list; _length = new_length; } LRUCache::LRUCache(LRUCacheType type) : _type(type) { // Make empty circular linked list _lru_normal.next = &_lru_normal; _lru_normal.prev = &_lru_normal; _lru_durable.next = &_lru_durable; _lru_durable.prev = &_lru_durable; } LRUCache::~LRUCache() { prune(); } bool LRUCache::_unref(LRUHandle* e) { DCHECK(e->refs > 0); e->refs--; return e->refs == 0; } void LRUCache::_lru_remove(LRUHandle* e) { e->next->prev = e->prev; e->prev->next = e->next; e->prev = e->next = nullptr; } void LRUCache::_lru_append(LRUHandle* list, LRUHandle* e) { // Make "e" newest entry by inserting just before *list e->next = list; e->prev = list->prev; e->prev->next = e; e->next->prev = e; } Cache::Handle* LRUCache::lookup(const CacheKey& key, uint32_t hash) { std::lock_guard l(_mutex); ++_lookup_count; LRUHandle* e = _table.lookup(key, hash); if (e != nullptr) { // we get it from _table, so in_cache must be true DCHECK(e->in_cache); if (e->refs == 1) { // only in LRU free list, remove it from list _lru_remove(e); } e->refs++; ++_hit_count; } return reinterpret_cast(e); } void LRUCache::release(Cache::Handle* handle) { if (handle == nullptr) { return; } LRUHandle* e = reinterpret_cast(handle); bool last_ref = false; { std::lock_guard l(_mutex); last_ref = _unref(e); if (last_ref) { _usage -= e->total_size; } else if (e->in_cache && e->refs == 1) { // only exists in cache if (_usage > _capacity) { // take this opportunity and remove the item bool removed = _table.remove(e); DCHECK(removed); e->in_cache = false; _unref(e); _usage -= e->total_size; last_ref = true; } else { // put it to LRU free list if (e->priority == CachePriority::NORMAL) { _lru_append(&_lru_normal, e); } else if (e->priority == CachePriority::DURABLE) { _lru_append(&_lru_durable, e); } } } } // free handle out of mutex if (last_ref) { e->free(); } } void LRUCache::_evict_from_lru(size_t total_size, LRUHandle** to_remove_head) { // 1. evict normal cache entries while (_usage + total_size > _capacity && _lru_normal.next != &_lru_normal) { LRUHandle* old = _lru_normal.next; DCHECK(old->priority == CachePriority::NORMAL); _evict_one_entry(old); old->next = *to_remove_head; *to_remove_head = old; } // 2. evict durable cache entries if need while (_usage + total_size > _capacity && _lru_durable.next != &_lru_durable) { LRUHandle* old = _lru_durable.next; DCHECK(old->priority == CachePriority::DURABLE); _evict_one_entry(old); old->next = *to_remove_head; *to_remove_head = old; } } void LRUCache::_evict_one_entry(LRUHandle* e) { DCHECK(e->in_cache); DCHECK(e->refs == 1); // LRU list contains elements which may be evicted _lru_remove(e); bool removed = _table.remove(e); DCHECK(removed); e->in_cache = false; _unref(e); _usage -= e->total_size; } Cache::Handle* LRUCache::insert(const CacheKey& key, uint32_t hash, void* value, size_t charge, void (*deleter)(const CacheKey& key, void* value), CachePriority priority, MemTracker* tracker) { size_t handle_size = sizeof(LRUHandle) - 1 + key.size(); LRUHandle* e = reinterpret_cast(malloc(handle_size)); e->value = value; e->deleter = deleter; e->charge = charge; e->key_length = key.size(); e->total_size = (_type == LRUCacheType::SIZE ? handle_size + charge : 1); e->hash = hash; e->refs = 2; // one for the returned handle, one for LRUCache. e->next = e->prev = nullptr; e->in_cache = true; e->priority = priority; e->mem_tracker = tracker; memcpy(e->key_data, key.data(), key.size()); // The memory of the parameter value should be recorded in the tls mem tracker, // transfer the memory ownership of the value to ShardedLRUCache::_mem_tracker. if (tracker) tls_ctx()->_thread_mem_tracker_mgr->mem_tracker()->transfer_to(tracker, e->total_size); LRUHandle* to_remove_head = nullptr; { std::lock_guard l(_mutex); // Free the space following strict LRU policy until enough space // is freed or the lru list is empty _evict_from_lru(e->total_size, &to_remove_head); // insert into the cache // note that the cache might get larger than its capacity if not enough // space was freed auto old = _table.insert(e); _usage += e->total_size; if (old != nullptr) { old->in_cache = false; if (_unref(old)) { _usage -= old->total_size; // old is on LRU because it's in cache and its reference count // was just 1 (Unref returned 0) _lru_remove(old); old->next = to_remove_head; to_remove_head = old; } } } // we free the entries here outside of mutex for // performance reasons while (to_remove_head != nullptr) { LRUHandle* next = to_remove_head->next; to_remove_head->free(); to_remove_head = next; } return reinterpret_cast(e); } void LRUCache::erase(const CacheKey& key, uint32_t hash) { LRUHandle* e = nullptr; bool last_ref = false; { std::lock_guard l(_mutex); e = _table.remove(key, hash); if (e != nullptr) { last_ref = _unref(e); if (last_ref) { _usage -= e->total_size; if (e->in_cache) { // locate in free list _lru_remove(e); } } e->in_cache = false; } } // free handle out of mutex, when last_ref is true, e must not be nullptr if (last_ref) { e->free(); } } int64_t LRUCache::prune() { LRUHandle* to_remove_head = nullptr; { std::lock_guard l(_mutex); while (_lru_normal.next != &_lru_normal) { LRUHandle* old = _lru_normal.next; _evict_one_entry(old); old->next = to_remove_head; to_remove_head = old; } while (_lru_durable.next != &_lru_durable) { LRUHandle* old = _lru_durable.next; _evict_one_entry(old); old->next = to_remove_head; to_remove_head = old; } } int64_t pruned_count = 0; while (to_remove_head != nullptr) { ++pruned_count; LRUHandle* next = to_remove_head->next; to_remove_head->free(); to_remove_head = next; } return pruned_count; } int64_t LRUCache::prune_if(CacheValuePredicate pred) { LRUHandle* to_remove_head = nullptr; { std::lock_guard l(_mutex); LRUHandle* p = _lru_normal.next; while (p != &_lru_normal) { LRUHandle* next = p->next; if (pred(p->value)) { _evict_one_entry(p); p->next = to_remove_head; to_remove_head = p; } p = next; } p = _lru_durable.next; while (p != &_lru_durable) { LRUHandle* next = p->next; if (pred(p->value)) { _evict_one_entry(p); p->next = to_remove_head; to_remove_head = p; } p = next; } } int64_t pruned_count = 0; while (to_remove_head != nullptr) { ++pruned_count; LRUHandle* next = to_remove_head->next; to_remove_head->free(); to_remove_head = next; } return pruned_count; } inline uint32_t ShardedLRUCache::_hash_slice(const CacheKey& s) { return s.hash(s.data(), s.size(), 0); } ShardedLRUCache::ShardedLRUCache(const std::string& name, size_t total_capacity, LRUCacheType type, uint32_t num_shards) : _name(name), _num_shard_bits(Bits::FindLSBSetNonZero(num_shards)), _num_shards(num_shards), _shards(nullptr), _last_id(1), _mem_tracker(MemTracker::create_tracker(-1, name, nullptr, MemTrackerLevel::OVERVIEW)) { CHECK(num_shards > 0) << "num_shards cannot be 0"; CHECK_EQ((num_shards & (num_shards - 1)), 0) << "num_shards should be power of two, but got " << num_shards; const size_t per_shard = (total_capacity + (_num_shards - 1)) / _num_shards; LRUCache** shards = new (std::nothrow) LRUCache*[_num_shards]; for (int s = 0; s < _num_shards; s++) { shards[s] = new LRUCache(type); shards[s]->set_capacity(per_shard); } _shards = shards; _entity = DorisMetrics::instance()->metric_registry()->register_entity( std::string("lru_cache:") + name, {{"name", name}}); _entity->register_hook(name, std::bind(&ShardedLRUCache::update_cache_metrics, this)); INT_GAUGE_METRIC_REGISTER(_entity, capacity); INT_GAUGE_METRIC_REGISTER(_entity, usage); INT_DOUBLE_METRIC_REGISTER(_entity, usage_ratio); INT_ATOMIC_COUNTER_METRIC_REGISTER(_entity, lookup_count); INT_ATOMIC_COUNTER_METRIC_REGISTER(_entity, hit_count); INT_DOUBLE_METRIC_REGISTER(_entity, hit_ratio); } ShardedLRUCache::~ShardedLRUCache() { if (_shards) { for (int s = 0; s < _num_shards; s++) { delete _shards[s]; } delete[] _shards; } _entity->deregister_hook(_name); DorisMetrics::instance()->metric_registry()->deregister_entity(_entity); } Cache::Handle* ShardedLRUCache::insert(const CacheKey& key, void* value, size_t charge, void (*deleter)(const CacheKey& key, void* value), CachePriority priority) { const uint32_t hash = _hash_slice(key); return _shards[_shard(hash)]->insert(key, hash, value, charge, deleter, priority, _mem_tracker.get()); } Cache::Handle* ShardedLRUCache::lookup(const CacheKey& key) { const uint32_t hash = _hash_slice(key); return _shards[_shard(hash)]->lookup(key, hash); } void ShardedLRUCache::release(Handle* handle) { LRUHandle* h = reinterpret_cast(handle); _shards[_shard(h->hash)]->release(handle); } void ShardedLRUCache::erase(const CacheKey& key) { const uint32_t hash = _hash_slice(key); _shards[_shard(hash)]->erase(key, hash); } void* ShardedLRUCache::value(Handle* handle) { return reinterpret_cast(handle)->value; } Slice ShardedLRUCache::value_slice(Handle* handle) { auto lru_handle = reinterpret_cast(handle); return Slice((char*)lru_handle->value, lru_handle->charge); } uint64_t ShardedLRUCache::new_id() { return _last_id.fetch_add(1, std::memory_order_relaxed); } int64_t ShardedLRUCache::prune() { int64_t num_prune = 0; for (int s = 0; s < _num_shards; s++) { num_prune += _shards[s]->prune(); } return num_prune; } int64_t ShardedLRUCache::prune_if(CacheValuePredicate pred) { int64_t num_prune = 0; for (int s = 0; s < _num_shards; s++) { num_prune += _shards[s]->prune_if(pred); } return num_prune; } void ShardedLRUCache::update_cache_metrics() const { size_t total_capacity = 0; size_t total_usage = 0; size_t total_lookup_count = 0; size_t total_hit_count = 0; for (int i = 0; i < _num_shards; i++) { total_capacity += _shards[i]->get_capacity(); total_usage += _shards[i]->get_usage(); total_lookup_count += _shards[i]->get_lookup_count(); total_hit_count += _shards[i]->get_hit_count(); } capacity->set_value(total_capacity); usage->set_value(total_usage); lookup_count->set_value(total_lookup_count); hit_count->set_value(total_hit_count); usage_ratio->set_value(total_capacity == 0 ? 0 : ((double)total_usage / total_capacity)); hit_ratio->set_value(total_lookup_count == 0 ? 0 : ((double)total_hit_count / total_lookup_count)); } Cache* new_lru_cache(const std::string& name, size_t capacity, LRUCacheType type, uint32_t num_shards) { return new ShardedLRUCache(name, capacity, type, num_shards); } } // namespace doris