KVCache support memory limit

This commit is contained in:
z404289981
2024-03-12 13:45:05 +00:00
committed by ob-robot
parent efa23e5fee
commit bde77696c9
6 changed files with 60 additions and 7 deletions

View File

@ -602,6 +602,7 @@ int ObKVGlobalCache::erase_cache(const char *cache_name)
int ObKVGlobalCache::register_cache(
const char *cache_name,
const int64_t priority,
const int64_t mem_limit_pct,
int64_t &cache_id)
{
int ret = OB_SUCCESS;
@ -632,6 +633,7 @@ int ObKVGlobalCache::register_cache(
STRNCPY(configs_[cache_id].cache_name_, cache_name, MAX_CACHE_NAME_LENGTH - 1);
configs_[cache_id].cache_name_[MAX_CACHE_NAME_LENGTH - 1] = '\0';
configs_[cache_id].priority_ = priority;
configs_[cache_id].mem_limit_pct_ = mem_limit_pct;
configs_[cache_id].is_valid_ = true;
}
}
@ -717,6 +719,24 @@ int ObKVGlobalCache::set_priority(const int64_t cache_id, const int64_t priority
return ret;
}
int ObKVGlobalCache::set_mem_limit_pct(const int64_t cache_id, const int64_t mem_limit_pct)
{
int ret = OB_SUCCESS;
if (OB_UNLIKELY(!inited_)) {
ret = OB_NOT_INIT;
COMMON_LOG(WARN, "The ObKVGlobalCache has not been inited, ", K(ret));
} else if (OB_UNLIKELY(cache_id < 0) || OB_UNLIKELY(cache_id >= MAX_CACHE_NUM)
|| OB_UNLIKELY(mem_limit_pct <= 0) || OB_UNLIKELY(mem_limit_pct > 100)) {
ret = OB_INVALID_ARGUMENT;
COMMON_LOG(WARN, "Invalid argument, ", K(cache_id), K(mem_limit_pct), K(ret));
} else {
ATOMIC_STORE(&configs_[cache_id].mem_limit_pct_, mem_limit_pct);
}
return ret;
}
void ObKVGlobalCache::wash()
{
if (OB_LIKELY(inited_ && !stopped_)) {

View File

@ -58,9 +58,10 @@ class ObKVCache : public ObIKVCache<Key, Value>
public:
ObKVCache();
virtual ~ObKVCache();
int init(const char *cache_name, const int64_t priority = 1);
int init(const char *cache_name, const int64_t priority = 1, const int64_t mem_limit_pct = 100);
void destroy();
int set_priority(const int64_t priority);
int set_mem_limit_pct(const int64_t mem_limit_pct);
virtual int put(const Key &key, const Value &value, bool overwrite = true);
virtual int put_and_fetch(
const Key &key,
@ -168,11 +169,12 @@ private:
friend class ObKVCacheHandle;
ObKVGlobalCache();
virtual ~ObKVGlobalCache();
int register_cache(const char *cache_name, const int64_t priority, int64_t &cache_id);
int register_cache(const char *cache_name, const int64_t priority, const int64_t mem_limit_pct, int64_t &cache_id);
void deregister_cache(const int64_t cache_id);
int create_working_set(const ObKVCacheInstKey &inst_key, ObWorkingSet *&working_set);
int delete_working_set(ObWorkingSet *working_set);
int set_priority(const int64_t cache_id, const int64_t priority);
int set_mem_limit_pct(const int64_t cache_id, const int64_t mem_limit_pct);
int put(
const int64_t cache_id,
const ObIKVCacheKey &key,
@ -411,17 +413,17 @@ ObKVCache<Key, Value>::~ObKVCache()
}
template <class Key, class Value>
int ObKVCache<Key, Value>::init(const char *cache_name, const int64_t priority)
int ObKVCache<Key, Value>::init(const char *cache_name, const int64_t priority, const int64_t mem_limit_pct)
{
int ret = OB_SUCCESS;
if (OB_UNLIKELY(inited_)) {
ret = OB_INIT_TWICE;
COMMON_LOG(WARN, "The ObKVCache has been inited, ", K(ret));
} else if (OB_UNLIKELY(NULL == cache_name)
|| OB_UNLIKELY(priority <= 0)) {
|| OB_UNLIKELY(priority <= 0 || mem_limit_pct <= 0 || mem_limit_pct > 100)) {
ret = OB_INVALID_ARGUMENT;
COMMON_LOG(WARN, "Invalid argument, ", KP(cache_name), K(priority), K(ret));
} else if (OB_FAIL(ObKVGlobalCache::get_instance().register_cache(cache_name, priority, cache_id_))) {
} else if (OB_FAIL(ObKVGlobalCache::get_instance().register_cache(cache_name, priority, mem_limit_pct, cache_id_))) {
COMMON_LOG(WARN, "Fail to register cache, ", K(ret));
} else {
COMMON_LOG(INFO, "Succ to register cache", K(cache_name), K(priority), K_(cache_id));
@ -455,6 +457,24 @@ int ObKVCache<Key, Value>::set_priority(const int64_t priority)
return ret;
}
template <class Key, class Value>
int ObKVCache<Key, Value>::set_mem_limit_pct(const int64_t mem_limit_pct)
{
int ret = OB_SUCCESS;
if (OB_UNLIKELY(!inited_)) {
ret = OB_NOT_INIT;
COMMON_LOG(WARN, "The ObKVCache has not been inited, ", K(ret));
} else if (OB_UNLIKELY(mem_limit_pct <= 0 || mem_limit_pct > 100)) {
ret = OB_INVALID_ARGUMENT;
COMMON_LOG(WARN, "Invalid argument, ", K(mem_limit_pct), K(ret));
} else if (OB_FAIL(ObKVGlobalCache::get_instance().set_mem_limit_pct(cache_id_, mem_limit_pct))) {
COMMON_LOG(WARN, "Fail to set mem_limit_pct, ", K(ret));
}
return ret;
}
template <class Key, class Value>
int64_t ObKVCache<Key, Value>::size(const uint64_t tenant_id) const
{

View File

@ -109,7 +109,7 @@ struct ObKVCacheInst
// hold size related
inline bool need_hold_cache() { return ATOMIC_LOAD(&status_.hold_size_) > 0; }
inline int64_t get_memory_limit_pct() { return status_.get_memory_limit_pct(); }
common::ObDLink *get_mb_list() { return mb_list_handle_.get_head(); }
TO_STRING_KV(K_(cache_id), K_(tenant_id), K_(is_delete), K_(status), K_(ref_cnt));

View File

@ -882,10 +882,16 @@ int ObKVCacheStore::alloc_mbhandle(
ObKVStoreMemBlock *mem_block = NULL;
char *buf = NULL;
const uint64_t tenant_id = inst.tenant_id_;
const int64_t memory_limit_pct = inst.get_memory_limit_pct();
const int64_t cache_store_size = ATOMIC_AAF(&inst.status_.store_size_, block_size);
if (!inst.mb_list_handle_.is_valid()) {
ret = OB_ERR_UNEXPECTED;
COMMON_LOG(ERROR, "mb_list_handle is invalid", K(ret));
} else if (memory_limit_pct < 100 && cache_store_size >
(inst.mb_list_handle_.get_resource_handle()->get_memory_mgr()->get_limit() * memory_limit_pct / 100)) {
ret = OB_SIZE_OVERFLOW;
COMMON_LOG(INFO, "Fail to allocate memory, ", K(ret), K(block_size), K(cache_store_size), K(memory_limit_pct));
} else if (NULL == (buf = static_cast<char*>(alloc_mb(
*inst.mb_list_handle_.get_resource_handle(), tenant_id, block_size)))) {
ret = OB_ALLOCATE_MEMORY_FAILED;
@ -908,7 +914,6 @@ int ObKVCacheStore::alloc_mbhandle(
ret = OB_ERR_UNEXPECTED;
COMMON_LOG(ERROR, "Fail to set mb_handle status from FREE to USING, ", K(ret));
} else {
(void) ATOMIC_AAF(&inst.status_.store_size_, block_size);
if (LRU == policy) {
(void) ATOMIC_AAF(&inst.status_.lru_mb_cnt_, 1);
} else {
@ -929,6 +934,8 @@ int ObKVCacheStore::alloc_mbhandle(
} else if (OB_FAIL(insert_mb_handle(head, mb_handle))) {
COMMON_LOG(WARN, "insert_mb_handle failed", K(ret));
}
} else {
ATOMIC_SAF(&inst.status_.store_size_, block_size);
}
return ret;

View File

@ -30,6 +30,7 @@ void ObKVCacheConfig::reset()
{
is_valid_ = false;
priority_ = 0;
mem_limit_pct_ = 100;
MEMSET(cache_name_, 0, MAX_CACHE_NAME_LENGTH);
}

View File

@ -176,6 +176,7 @@ public:
void reset();
bool is_valid_;
int64_t priority_;
int64_t mem_limit_pct_;
char cache_name_[MAX_CACHE_NAME_LENGTH];
};
@ -187,6 +188,10 @@ public:
double get_hit_ratio() const;
inline void set_hold_size(const int64_t hold_size) { ATOMIC_STORE(&hold_size_, hold_size); }
inline int64_t get_hold_size() const { return ATOMIC_LOAD(&hold_size_); }
inline int64_t get_memory_limit_pct()
{
return ATOMIC_LOAD(&config_->mem_limit_pct_);
}
void reset();
TO_STRING_KV(KP_(config), K_(kv_cnt), K_(store_size), K_(map_size), K_(lru_mb_cnt),
K_(lfu_mb_cnt), K_(base_mb_score), K_(hold_size));