 566e920620
			
		
	
	566e920620
	
	
	
		
			
			Co-authored-by: wangt1xiuyi <13547954130@163.com> Co-authored-by: yangqise7en <877793735@qq.com> Co-authored-by: Zach41 <zach_41@163.com>
		
			
				
	
	
		
			718 lines
		
	
	
		
			19 KiB
		
	
	
	
		
			C++
		
	
	
	
	
	
			
		
		
	
	
			718 lines
		
	
	
		
			19 KiB
		
	
	
	
		
			C++
		
	
	
	
	
	
| /**
 | |
|  * Copyright (c) 2021 OceanBase
 | |
|  * OceanBase CE is licensed under Mulan PubL v2.
 | |
|  * You can use this software according to the terms and conditions of the Mulan PubL v2.
 | |
|  * You may obtain a copy of Mulan PubL v2 at:
 | |
|  *          http://license.coscl.org.cn/MulanPubL-2.0
 | |
|  * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
 | |
|  * EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
 | |
|  * MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
 | |
|  * See the Mulan PubL v2 for more details.
 | |
|  */
 | |
| 
 | |
| #ifndef OCEANBASE_SHARE_OB_CACHE_TEST_UTILS_H_
 | |
| #define OCEANBASE_SHARE_OB_CACHE_TEST_UTILS_H_
 | |
| 
 | |
| #include "share/cache/ob_kv_storecache.h"
 | |
| #include "share/ob_define.h"
 | |
| #include "share/ob_thread_pool.h"
 | |
| #include "lib/atomic/ob_atomic.h"
 | |
| #include "lib/queue/ob_lighty_queue.h"
 | |
| #include "lib/alloc/ob_malloc_allocator.h"
 | |
| #include "share/cache/ob_kvcache_hazard_version.h"
 | |
| namespace oceanbase
 | |
| {
 | |
| using namespace lib;
 | |
| namespace common
 | |
| {
 | |
| 
 | |
| template<int64_t SIZE>
 | |
| struct TestKVCacheKey: public ObIKVCacheKey
 | |
| {
 | |
|   TestKVCacheKey(void)
 | |
|       : v_(0), tenant_id_(0)
 | |
|   {
 | |
|     memset(buf_, 0, sizeof(buf_));
 | |
|   }
 | |
|   virtual bool operator ==(const ObIKVCacheKey &other) const;
 | |
|   virtual uint64_t get_tenant_id() const
 | |
|   {
 | |
|     return tenant_id_;
 | |
|   }
 | |
|   virtual uint64_t hash() const
 | |
|   {
 | |
|     return v_;
 | |
|   }
 | |
|   virtual int64_t size() const
 | |
|   {
 | |
|     return sizeof(*this);
 | |
|   }
 | |
|   virtual int deep_copy(char *buf, const int64_t buf_len, ObIKVCacheKey *&key) const;
 | |
|   uint64_t v_;
 | |
|   uint64_t tenant_id_;
 | |
|   char buf_[SIZE > sizeof(v_) ? SIZE - sizeof(v_) : 0];
 | |
| };
 | |
| 
 | |
| template<int64_t SIZE>
 | |
| struct TestKVCacheValue: public ObIKVCacheValue
 | |
| {
 | |
|   TestKVCacheValue(void)
 | |
|       : v_(0)
 | |
|   {
 | |
|     memset(buf_, 0, sizeof(buf_));
 | |
|   }
 | |
|   virtual int64_t size() const
 | |
|   {
 | |
|     return sizeof(*this);
 | |
|   }
 | |
|   virtual int deep_copy(char *buf, const int64_t buf_len, ObIKVCacheValue *&value) const;
 | |
|   uint64_t v_;
 | |
|   char buf_[SIZE > sizeof(v_) ? SIZE - sizeof(v_) : 0];
 | |
| };
 | |
| 
 | |
| 
 | |
| template<int64_t SIZE>
 | |
| bool TestKVCacheKey<SIZE>::operator ==(const ObIKVCacheKey &other) const
 | |
| {
 | |
|   const TestKVCacheKey &other_key = reinterpret_cast<const TestKVCacheKey &>(other);
 | |
|   return v_ == other_key.v_ && tenant_id_ ==  other_key.tenant_id_;
 | |
| }
 | |
| 
 | |
| template<int64_t SIZE>
 | |
| int TestKVCacheKey<SIZE>::deep_copy(char *buf, const int64_t buf_len, ObIKVCacheKey *&key) const
 | |
| {
 | |
|   int ret = OB_SUCCESS;
 | |
|   TestKVCacheKey<SIZE> *pkey = NULL;
 | |
|   if (NULL == buf || buf_len < size()) {
 | |
|     ret = OB_INVALID_ARGUMENT;
 | |
|   } else {
 | |
|     pkey = new (buf) TestKVCacheKey<SIZE>();
 | |
|     pkey->v_ = v_;
 | |
|     pkey->tenant_id_ = tenant_id_;
 | |
|     key = pkey;
 | |
|   }
 | |
|   return ret;
 | |
| }
 | |
| 
 | |
| template<int64_t SIZE>
 | |
| int TestKVCacheValue<SIZE>::deep_copy(
 | |
|   char *buf,
 | |
|   const int64_t buf_len,
 | |
|   ObIKVCacheValue *&value) const
 | |
| {
 | |
|   int ret = OB_SUCCESS;
 | |
|   TestKVCacheValue<SIZE> *pvalue = NULL;
 | |
|   if (NULL == buf || buf_len < size()) {
 | |
|     ret = OB_INVALID_ARGUMENT;
 | |
|   } else {
 | |
|     pvalue = new (buf) TestKVCacheValue<SIZE>();
 | |
|     pvalue->v_ = v_;
 | |
|     value = pvalue;
 | |
|   }
 | |
|   return ret;
 | |
| }
 | |
| 
 | |
| 
 | |
| 
 | |
| struct AllocBuf
 | |
| {
 | |
|   void *ptr_;
 | |
|   AllocBuf *next_;
 | |
| };
 | |
| 
 | |
| class ObCacheTestTask;
 | |
| class ObICacheTestStat
 | |
| {
 | |
| public:
 | |
|   virtual void add_task(ObCacheTestTask *task) = 0;
 | |
|   virtual ObCacheTestTask *pop_oppo_task(ObCacheTestTask *task) = 0;
 | |
|   virtual void inc_fail_count() = 0;
 | |
| };
 | |
| 
 | |
| class ObCacheTestTask
 | |
| {
 | |
| public:
 | |
|   ObCacheTestTask(const int64_t tenant_id, const bool is_alloc,
 | |
|       const int64_t alloc_size, const int64_t alloc_count,
 | |
|       ObICacheTestStat *stat)
 | |
|     : tenant_id_(tenant_id), is_alloc_(is_alloc),
 | |
|       alloc_size_(alloc_size), alloc_count_(alloc_count),
 | |
|       next_(NULL), stat_(stat), alloc_list_(NULL)
 | |
|   {
 | |
|   }
 | |
| 
 | |
|   virtual ~ObCacheTestTask() {}
 | |
| 
 | |
|   virtual int process()
 | |
|   {
 | |
|     int ret = OB_SUCCESS;
 | |
|     if (is_alloc_) {
 | |
|       ObMemAttr attr;
 | |
|       attr.tenant_id_ = tenant_id_;
 | |
|       attr.label_ = "CacheTestTask"; //xxx
 | |
|       for (int64_t i = 0; i < alloc_count_; ++i) {
 | |
|         void *ptr = ob_malloc(alloc_size_, attr);
 | |
|         if (NULL == ptr) {
 | |
|           ret = OB_ALLOCATE_MEMORY_FAILED;
 | |
|           COMMON_LOG(WARN, "ob_malloc failed", K(ret), K_(alloc_size));
 | |
|           ObMallocAllocator::get_instance()->print_tenant_memory_usage(tenant_id_);
 | |
|           ObMallocAllocator::get_instance()->print_tenant_ctx_memory_usage(tenant_id_);
 | |
|           break;
 | |
|         } else {
 | |
|           append_alloc_list(ptr);
 | |
|         }
 | |
|       }
 | |
| 
 | |
|       if (OB_SUCC(ret)) {
 | |
|         stat_->add_task(this);
 | |
|       }
 | |
|     } else {
 | |
|       ObCacheTestTask *oppo_task = stat_->pop_oppo_task(this);
 | |
|       if (NULL == oppo_task) {
 | |
|         ret = OB_ERR_UNEXPECTED;
 | |
|         COMMON_LOG(WARN, "oppo_task not exist", K(ret), "this", *this);
 | |
|       } else {
 | |
|         int64_t free_count = 0;
 | |
|         AllocBuf *buf = oppo_task->alloc_list_;
 | |
|         AllocBuf *next = NULL;
 | |
|         while (NULL != buf) {
 | |
|           next = buf->next_;
 | |
|           ob_free(buf->ptr_);
 | |
|           buf = next;
 | |
|           ++free_count;
 | |
|         }
 | |
| 
 | |
|         if (free_count != alloc_count_) {
 | |
|           ret = OB_ERR_UNEXPECTED;
 | |
|           COMMON_LOG(WARN, "free_count != alloc_count_", K(ret), K(free_count), K_(alloc_count));
 | |
|         }
 | |
| 
 | |
|         ob_free(oppo_task);
 | |
|       }
 | |
|     }
 | |
| 
 | |
|     if (OB_FAIL(ret)) {
 | |
|       COMMON_LOG(WARN, "task process failed", K(ret));
 | |
|       ObMallocAllocator::get_instance()->print_tenant_memory_usage(tenant_id_);
 | |
|       ObMallocAllocator::get_instance()->print_tenant_ctx_memory_usage(tenant_id_);
 | |
|       stat_->inc_fail_count();
 | |
|     }
 | |
| 
 | |
|     return ret;
 | |
|   }
 | |
|   void set_next(ObCacheTestTask *task) { next_ = task; }
 | |
|   void append_alloc_list(void *ptr)
 | |
|   {
 | |
|     AllocBuf *buf = new (ptr) AllocBuf();
 | |
|     buf->ptr_ = ptr;
 | |
|     buf->next_ = alloc_list_;
 | |
|     alloc_list_ = buf;
 | |
|   }
 | |
|   TO_STRING_KV(K_(tenant_id), K_(is_alloc), K_(alloc_size), K_(alloc_count));
 | |
| public:
 | |
|   uint64_t tenant_id_;
 | |
|   bool is_alloc_;  // alloc or free
 | |
|   int64_t alloc_size_;
 | |
|   int64_t alloc_count_;
 | |
|   ObCacheTestTask *next_;
 | |
|   ObICacheTestStat *stat_;
 | |
|   AllocBuf *alloc_list_;
 | |
| };
 | |
| 
 | |
| class CacheTestStat : public ObICacheTestStat
 | |
| {
 | |
| public:
 | |
|   CacheTestStat() : fail_count_(0), task_list_(NULL) {}
 | |
|   virtual ~CacheTestStat() {}
 | |
| 
 | |
|   virtual void inc_fail_count() { ATOMIC_AAF(&fail_count_, 1); }
 | |
|   virtual int64_t get_fail_count() const { return ATOMIC_LOAD(&fail_count_); }
 | |
|   virtual void add_task(ObCacheTestTask *task)
 | |
|   {
 | |
|     if (NULL != task) {
 | |
|       task->next_ = task_list_;
 | |
|       task_list_ = task;
 | |
|     }
 | |
|   }
 | |
|   virtual ObCacheTestTask *pop_oppo_task(ObCacheTestTask *task)
 | |
|   {
 | |
|     ObCacheTestTask *oppo_task = NULL;
 | |
|     ObCacheTestTask *prev_task = NULL;
 | |
|     ObCacheTestTask *cur_task = NULL;
 | |
|     if (NULL != task && !task->is_alloc_) {
 | |
|       cur_task = task_list_;
 | |
|       while (NULL != cur_task) {
 | |
|         if (cur_task->is_alloc_ && cur_task->tenant_id_ == task->tenant_id_
 | |
|             && cur_task->alloc_size_ == task->alloc_size_ && cur_task->alloc_count_ == task->alloc_count_) {
 | |
|           oppo_task = cur_task;
 | |
|           // delete it from list
 | |
|           if (NULL != prev_task) {
 | |
|             prev_task->next_ = cur_task->next_;
 | |
|             cur_task->next_ = NULL;
 | |
|           } else {
 | |
|             task_list_ = cur_task->next_;
 | |
|             cur_task->next_ = NULL;
 | |
|           }
 | |
|           break;
 | |
|         }
 | |
|         cur_task = cur_task->next_;
 | |
|         prev_task = cur_task;
 | |
|       }
 | |
|     }
 | |
|     return oppo_task;
 | |
|   }
 | |
| private:
 | |
|   int64_t fail_count_;
 | |
|   ObCacheTestTask *task_list_;
 | |
| };
 | |
| 
 | |
| class ObAllocatorStress : public share::ObThreadPool
 | |
| {
 | |
| public:
 | |
|   ObAllocatorStress() : inited_(false), stat_(), queue_() {}
 | |
|   virtual ~ObAllocatorStress() {}
 | |
| 
 | |
|   int init()
 | |
|   {
 | |
|     int ret = OB_SUCCESS;
 | |
|     if (inited_) {
 | |
|       ret = OB_INIT_TWICE;
 | |
|       COMMON_LOG(WARN, "init twice", K(ret));
 | |
|     } else if (OB_FAIL(queue_.init(1024))) {
 | |
|       COMMON_LOG(WARN, "queue init failed", K(ret));
 | |
|     } else {
 | |
|       inited_ = true;
 | |
|     }
 | |
|     return ret;
 | |
|   }
 | |
| 
 | |
|   virtual void run1()
 | |
|   {
 | |
| 
 | |
|     // UNUSED(arg);
 | |
|     int ret = OB_SUCCESS;
 | |
|     COMMON_LOG(INFO, "allocator stress thread start");
 | |
|     if (!inited_) {
 | |
|       ret = OB_NOT_INIT;
 | |
|       COMMON_LOG(WARN, "not init", K(ret));
 | |
|     } else {
 | |
|       // will process all task before exit
 | |
|       while (!has_set_stop() || OB_ENTRY_NOT_EXIST != ret) {
 | |
|         ObCacheTestTask *task = NULL;
 | |
|         if (OB_FAIL(pop(task))) {
 | |
|           if (OB_ENTRY_NOT_EXIST != ret) {
 | |
|             COMMON_LOG(WARN, "pop task failed", K(ret));
 | |
|           }
 | |
|         } else {
 | |
|           if (OB_FAIL(task->process())) {
 | |
|             COMMON_LOG(WARN, "task process failed", K(ret));
 | |
|           } else {
 | |
|             COMMON_LOG(INFO, "task process succeed", "task", *task);
 | |
|           }
 | |
|           ObMallocAllocator::get_instance()->print_tenant_memory_usage(500);
 | |
|           ObMallocAllocator::get_instance()->print_tenant_ctx_memory_usage(500);
 | |
| 
 | |
|           ObMallocAllocator::get_instance()->print_tenant_memory_usage(task->tenant_id_);
 | |
|           ObMallocAllocator::get_instance()->print_tenant_ctx_memory_usage(task->tenant_id_);
 | |
|         }
 | |
|       }
 | |
|     }
 | |
|     COMMON_LOG(INFO, "allocator stress thread end");
 | |
|   }
 | |
| 
 | |
|   int add_task(const ObCacheTestTask &task)
 | |
|   {
 | |
|     int ret = OB_SUCCESS;
 | |
|     const int64_t buf_size = sizeof(task);
 | |
|     void *ptr = NULL;
 | |
|     if (NULL == (ptr = ob_malloc(buf_size, ObNewModIds::TEST))) {
 | |
|       ret = OB_ALLOCATE_MEMORY_FAILED;
 | |
|       COMMON_LOG(WARN, "ob_malloc failed", K(ret), K(buf_size));
 | |
|     } else {
 | |
|       ObCacheTestTask *copy_task = new (ptr) ObCacheTestTask(
 | |
|           task.tenant_id_, task.is_alloc_, task.alloc_size_, task.alloc_count_, task.stat_);
 | |
|       if (OB_FAIL(queue_.push(copy_task))) {
 | |
|         COMMON_LOG(WARN, "push task failed", K(ret));
 | |
|       }
 | |
|     }
 | |
|     return ret;
 | |
|   }
 | |
| 
 | |
|   int pop(ObCacheTestTask *&task)
 | |
|   {
 | |
|     int ret = OB_SUCCESS;
 | |
|     void *vp = NULL;
 | |
|     const int64_t timeout = 1000 * 1000;
 | |
|     if (!inited_) {
 | |
|       ret = OB_NOT_INIT;
 | |
|       COMMON_LOG(WARN,"not init", K(ret));
 | |
|     } else {
 | |
|       ret = queue_.pop(vp, timeout);
 | |
|       if (OB_FAIL(ret)) {
 | |
|         if (OB_ENTRY_NOT_EXIST != ret) {
 | |
|           COMMON_LOG(WARN, "queue pop failed", K(ret));
 | |
|         }
 | |
|       } else {
 | |
|         task = static_cast<ObCacheTestTask *>(vp);
 | |
|       }
 | |
|     }
 | |
|     return ret;
 | |
|   }
 | |
| 
 | |
|   int64_t get_fail_count() { return stat_.get_fail_count(); }
 | |
|   CacheTestStat *get_stat() { return &stat_; }
 | |
| private:
 | |
|   bool inited_;
 | |
|   CacheTestStat stat_;
 | |
|   common::ObLightyQueue queue_;
 | |
| };
 | |
| 
 | |
| template<int64_t K_SIZE, int64_t V_SIZE>
 | |
| class ObCacheStress : public share::ObThreadPool
 | |
| {
 | |
| public:
 | |
|   typedef TestKVCacheKey<K_SIZE> TestKey;
 | |
|   typedef TestKVCacheValue<V_SIZE> TestValue;
 | |
| 
 | |
|   ObCacheStress() : inited_(false), tenant_id_(OB_INVALID_ID),
 | |
|       put_count_(0), fail_count_(0), cache_()  {}
 | |
|   virtual ~ObCacheStress() {}
 | |
| 
 | |
|   int init(const uint64_t tenant_id, int64_t index)
 | |
|   {
 | |
|     int ret = OB_SUCCESS;
 | |
|     char cache_name[1024];
 | |
|     snprintf(cache_name, 1024, "%s_%ld", "test", index);
 | |
|     if (inited_) {
 | |
|       ret = OB_INIT_TWICE;
 | |
|       COMMON_LOG(WARN, "init twice", K(ret));
 | |
|     } else if (OB_FAIL(cache_.init(cache_name))) {
 | |
|       COMMON_LOG(WARN, "cache init failed", K(ret));
 | |
|     } else {
 | |
|       tenant_id_ = tenant_id;
 | |
|       inited_ = true;
 | |
|     }
 | |
|     return ret;
 | |
|   }
 | |
|   virtual void run1()
 | |
|   {
 | |
| 
 | |
|     // UNUSED(arg);
 | |
|     int ret = OB_SUCCESS;
 | |
|     COMMON_LOG(INFO, "cache stress thread start");
 | |
|     TestKey key;
 | |
|     TestValue value;
 | |
|     if (!inited_) {
 | |
|       ret = OB_NOT_INIT;
 | |
|       COMMON_LOG(WARN, "not init", K(ret));
 | |
|     } else {
 | |
|       while (!has_set_stop()) {
 | |
|         key.tenant_id_ = tenant_id_;
 | |
|         key.v_ = put_count_;
 | |
|         if (OB_FAIL(cache_.put(key, value))) {
 | |
|           COMMON_LOG(WARN, "cache put failed", K(ret));
 | |
|         } else {
 | |
|           const TestValue *get_value = NULL;
 | |
|           ObKVCacheHandle handle;
 | |
|           if (OB_FAIL(cache_.get(key, get_value, handle))) {
 | |
|             COMMON_LOG(WARN, "cache get failed", K(ret));
 | |
|           }
 | |
|         }
 | |
|         ++put_count_;
 | |
|         if (OB_FAIL(ret)) {
 | |
|           ++fail_count_;
 | |
|         }
 | |
|       }
 | |
|     }
 | |
|     COMMON_LOG(INFO, "cache stress thread exit");
 | |
|   }
 | |
| 
 | |
|   uint64_t get_tenant_id() const { return tenant_id_; }
 | |
|   int64_t get_put_count() const { return put_count_; }
 | |
|   int64_t get_fail_count() const { return fail_count_; }
 | |
| private:
 | |
|   bool inited_;
 | |
|   uint64_t tenant_id_;
 | |
|   int64_t put_count_;
 | |
|   int64_t fail_count_;
 | |
|   ObKVCache<TestKey, TestValue> cache_;
 | |
| };
 | |
| 
 | |
| template<int64_t K_SIZE, int64_t V_SIZE>
 | |
| class ObCacheGetStress : public share::ObThreadPool
 | |
| {
 | |
| public:
 | |
|   typedef TestKVCacheKey<K_SIZE> TestKey;
 | |
|   typedef TestKVCacheValue<V_SIZE> TestValue;
 | |
| 
 | |
|   ObCacheGetStress()
 | |
|     : inited_(false), tenant_id_(OB_INVALID_ID),
 | |
|       kv_cnt_(0), hit_cnt_(0), total_cnt_(0), fail_cnt_(0), cache_()
 | |
|   {
 | |
|   }
 | |
| 
 | |
|   int init(const uint64_t tenant_id, const int64_t kv_cnt)
 | |
|   {
 | |
|     int ret = OB_SUCCESS;
 | |
|     if (inited_) {
 | |
|       ret = OB_INIT_TWICE;
 | |
|       COMMON_LOG(WARN, "init twice", K(ret));
 | |
|     } else if (OB_INVALID_ID == tenant_id || kv_cnt <= 0) {
 | |
|       ret = OB_INVALID_ARGUMENT;
 | |
|       COMMON_LOG(WARN, "invalid arguments", K(ret), K(tenant_id), K(kv_cnt));
 | |
|     } else if (OB_FAIL(cache_.init("test_cache"))) {
 | |
|       COMMON_LOG(WARN, "cache init failed", K(ret));
 | |
|     } else {
 | |
|       // put kv pairs to cache
 | |
|       TestKey key;
 | |
|       TestValue value;
 | |
|       for (int64_t i = 0; OB_SUCC(ret) && i < kv_cnt; ++i) {
 | |
|         key.tenant_id_ = tenant_id;;
 | |
|         key.v_ = i;
 | |
|         if (OB_FAIL(cache_.put(key, value))) {
 | |
|           COMMON_LOG(WARN, "put failed", K(ret));
 | |
|         }
 | |
|       }
 | |
|       tenant_id_ = tenant_id;
 | |
|       kv_cnt_ = kv_cnt;
 | |
|       hit_cnt_ = 0;
 | |
|       total_cnt_ = 0;
 | |
|       fail_cnt_ = 0;
 | |
|       inited_ = true;
 | |
|     }
 | |
|     return ret;
 | |
|   }
 | |
| 
 | |
|   // will create monitor thread print hit ratio per second
 | |
|   virtual void set_thread_count(const int64_t thread_count)
 | |
|   {
 | |
|     // extra thread for monitor thread
 | |
|     share::ObThreadPool::set_thread_count(static_cast<int32_t>(thread_count + 1));
 | |
|   }
 | |
| 
 | |
|   virtual void run1()
 | |
|   {
 | |
| 
 | |
|     // int64_t thread_id = (int64_t)(arg);
 | |
|     int64_t thread_id = (int64_t)(this->get_thread_idx());
 | |
|     if (0 == thread_id) {
 | |
|       do_monitor();
 | |
|     } else {
 | |
|       do_work();
 | |
|     }
 | |
|   }
 | |
| 
 | |
|   ObKVCache<TestKey, TestValue> &get_cache()
 | |
|   {
 | |
|     return cache_;
 | |
|   }
 | |
|   double get_hit_ratio()
 | |
|   {
 | |
|     int ret = OB_SUCCESS;
 | |
|     int64_t hit_cnt = 0;
 | |
|     TestKey key;
 | |
|     const TestValue *pvalue = NULL;
 | |
|     ObKVCacheHandle handle;
 | |
|     for (int64_t i = 0; i < kv_cnt_; ++i) {
 | |
|       key.tenant_id_ = tenant_id_;
 | |
|       key.v_ = i;
 | |
|       if (OB_FAIL(cache_.get(key, pvalue, handle))) {
 | |
|         if (OB_ENTRY_NOT_EXIST != ret) {
 | |
|           COMMON_LOG(ERROR, "get failed", K(ret));
 | |
|         }
 | |
|       } else {
 | |
|         ++hit_cnt;
 | |
|       }
 | |
|     }
 | |
|     return (double)hit_cnt / (double)kv_cnt_;
 | |
|   }
 | |
| private:
 | |
|   void do_monitor()
 | |
|   {
 | |
|     while (!has_set_stop()) {
 | |
|       double hit_ratio = 0;
 | |
|       const int64_t hit_cnt = ATOMIC_LOAD(&hit_cnt_);
 | |
|       const int64_t total_cnt = ATOMIC_LOAD(&total_cnt_);
 | |
|       ATOMIC_STORE(&hit_cnt_, 0);
 | |
|       ATOMIC_STORE(&total_cnt_, 0);
 | |
|       if (total_cnt > 0) {
 | |
|         hit_ratio = (double)hit_cnt / (double)total_cnt;
 | |
|       }
 | |
|       COMMON_LOG(INFO, "get stress stat", K(hit_ratio));
 | |
|       sleep(1);
 | |
|     }
 | |
|   }
 | |
| 
 | |
|   void do_work()
 | |
|   {
 | |
|     int ret = OB_SUCCESS;
 | |
|     TestKey key;
 | |
|     const TestValue *pvalue = NULL;
 | |
|     ObKVCacheHandle handle;
 | |
|     while (!has_set_stop()) {
 | |
|       for (int64_t i = 0; i < kv_cnt_ && !has_set_stop(); ++i) {
 | |
|         const int64_t get_cnt = i < (int64_t)((double)kv_cnt_ * 0.8) ? 1 : 4;
 | |
|         key.tenant_id_ = tenant_id_;
 | |
|         key.v_ = i;
 | |
|         for (int64_t i = 0; i < get_cnt; ++i) {
 | |
|           ATOMIC_INC(&total_cnt_);
 | |
|           if (OB_FAIL(cache_.get(key, pvalue, handle))) {
 | |
|             if (OB_ENTRY_NOT_EXIST != ret) {
 | |
|               COMMON_LOG(ERROR, "get failed", K(ret));
 | |
|             } else {
 | |
|               ATOMIC_INC(&fail_cnt_);
 | |
|             }
 | |
|           } else {
 | |
|             ATOMIC_INC(&hit_cnt_);
 | |
|           }
 | |
|         }
 | |
|       }
 | |
|     }
 | |
|   }
 | |
| 
 | |
|   bool inited_;
 | |
|   uint64_t tenant_id_;
 | |
|   int64_t kv_cnt_;
 | |
|   int64_t hit_cnt_;
 | |
|   int64_t total_cnt_;
 | |
|   int64_t fail_cnt_;
 | |
|   ObKVCache<TestKey, TestValue> cache_;
 | |
| };
 | |
| 
 | |
| template<int64_t K_SIZE, int64_t V_SIZE>
 | |
| class ObWorkingSetStress : public share::ObThreadPool
 | |
| {
 | |
| public:
 | |
|   typedef TestKVCacheKey<K_SIZE> TestKey;
 | |
|   typedef TestKVCacheValue<V_SIZE> TestValue;
 | |
| 
 | |
|   ObWorkingSetStress() : inited_(false), tenant_id_(OB_INVALID_ID),
 | |
|       put_count_(0), fail_count_(0)  {}
 | |
|   virtual ~ObWorkingSetStress() {}
 | |
| 
 | |
|   int init(const uint64_t tenant_id, const bool only_put)
 | |
|   {
 | |
|     int ret = OB_SUCCESS;
 | |
|     if (inited_) {
 | |
|       ret = OB_INIT_TWICE;
 | |
|       COMMON_LOG(WARN, "init twice", K(ret));
 | |
|     } else if (OB_FAIL(cache_.init("test_cache"))) {
 | |
|       COMMON_LOG(WARN, "cache init failed", K(ret));
 | |
|     } else if (OB_FAIL(ws_.init(tenant_id, cache_))) {
 | |
|       COMMON_LOG(WARN, "init ws failed", K(ret), K(tenant_id));
 | |
|     } else {
 | |
|       tenant_id_ = tenant_id;
 | |
|       put_count_ = 0;
 | |
|       fail_count_ = 0;
 | |
|       only_put_ = only_put;
 | |
|       pcache_ = &ws_;
 | |
|       inited_ = true;
 | |
|     }
 | |
|     return ret;
 | |
|   }
 | |
| 
 | |
|   int init(const uint64_t tenant_id, ObKVCache<TestKey, TestValue> &cache, const bool use_ws, const int64_t start_key = 0)
 | |
|   {
 | |
|     int ret = OB_SUCCESS;
 | |
|     if (inited_) {
 | |
|       ret = OB_INIT_TWICE;
 | |
|       COMMON_LOG(WARN, "init twice", K(ret));
 | |
|     } else {
 | |
|       tenant_id_ = tenant_id;
 | |
|       put_count_ = 0;
 | |
|       fail_count_ = 0;
 | |
|       only_put_ = false;
 | |
|       pcache_ = &cache;
 | |
|       start_key_ = start_key;
 | |
|       if (use_ws) {
 | |
|         if (OB_FAIL(ws_.init(tenant_id, cache))) {
 | |
|           COMMON_LOG(WARN, "init ws failed", K(ret), K(tenant_id));
 | |
|         } else {
 | |
|           pcache_ = &ws_;
 | |
|         }
 | |
|       }
 | |
|       if (OB_SUCC(ret)) {
 | |
|         inited_ = true;
 | |
|       }
 | |
|     }
 | |
|     return ret;
 | |
|   }
 | |
| 
 | |
|   virtual void run1()
 | |
|   {
 | |
| 
 | |
|     // const int64_t thread_id = (int64_t)(arg);
 | |
|     const int64_t thread_id = this->get_thread_idx();
 | |
|     const int64_t count = this->get_thread_count();
 | |
|     int64_t put_count = 0;
 | |
|     int64_t fail_count = 0;
 | |
|     int ret = OB_SUCCESS;
 | |
|     COMMON_LOG(INFO, "working set stress thread start");
 | |
|     TestKey key;
 | |
|     TestValue value;
 | |
|     if (!inited_) {
 | |
|       ret = OB_NOT_INIT;
 | |
|       COMMON_LOG(WARN, "not init", K(ret));
 | |
|     } else {
 | |
|       while (!has_set_stop()) {
 | |
|         key.tenant_id_ = tenant_id_;
 | |
|         key.v_ = start_key_ + thread_id + put_count * count;
 | |
|         if (OB_FAIL(pcache_->put(key, value))) {
 | |
|           COMMON_LOG(WARN, "cache put failed", K(ret));
 | |
|         } else if (!only_put_) {
 | |
|           const TestValue *get_value = NULL;
 | |
|           ObKVCacheHandle handle;
 | |
|           if (OB_FAIL(pcache_->get(key, get_value, handle))) {
 | |
|             if (OB_ENTRY_NOT_EXIST == ret) {
 | |
|               ret = OB_SUCCESS;
 | |
|             } else {
 | |
|               COMMON_LOG(WARN, "cache get failed", K(ret));
 | |
|             }
 | |
|           }
 | |
|         }
 | |
|         ++put_count;
 | |
|         if (OB_FAIL(ret)) {
 | |
|           ++fail_count;
 | |
|           ObMallocAllocator::get_instance()->print_tenant_memory_usage(tenant_id_);
 | |
|           ObMallocAllocator::get_instance()->print_tenant_ctx_memory_usage(tenant_id_);
 | |
| 
 | |
|         }
 | |
|       }
 | |
|     }
 | |
|     ATOMIC_AAF(&put_count_, put_count);
 | |
|     ATOMIC_AAF(&fail_count_, fail_count);
 | |
|     COMMON_LOG(INFO, "working set stress thread exit");
 | |
|   }
 | |
| 
 | |
|   uint64_t get_tenant_id() const { return tenant_id_; }
 | |
|   int64_t get_put_count() const { return put_count_; }
 | |
|   int64_t get_fail_count() const { return fail_count_; }
 | |
|   int64_t get_used() const { return ws_.working_set_->get_used(); }
 | |
|   int64_t get_limit() const { return ws_.working_set_->get_limit(); }
 | |
| private:
 | |
|   bool inited_;
 | |
|   uint64_t tenant_id_;
 | |
|   int64_t put_count_;
 | |
|   int64_t fail_count_;
 | |
|   bool only_put_;
 | |
|   ObIKVCache<TestKey, TestValue> *pcache_;
 | |
|   ObKVCache<TestKey, TestValue> cache_;
 | |
|   ObCacheWorkingSet<TestKey, TestValue> ws_;
 | |
|   int64_t start_key_;
 | |
| };
 | |
| 
 | |
| class TestNode : public ObKVCacheHazardNode{
 | |
| public:
 | |
|   TestNode()
 | |
|     : id_(0) 
 | |
|   {}
 | |
|   virtual void retire() {COMMON_LOG(INFO, "TestNode(HazardNode) retire", K(this->get_version()), K(id_));}
 | |
| private:
 | |
|   int64_t id_;
 | |
| };
 | |
| 
 | |
| }//end namespace common
 | |
| }//end namespace oceanbase
 | |
| 
 | |
| #endif //OCEANBASE_SHARE_OB_CACHE_TEST_UTILS_H_
 |