/** * Copyright (c) 2021 OceanBase * OceanBase CE is licensed under Mulan PubL v2. * You can use this software according to the terms and conditions of the Mulan PubL v2. * You may obtain a copy of Mulan PubL v2 at: * http://license.coscl.org.cn/MulanPubL-2.0 * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, * EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, * MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE. * See the Mulan PubL v2 for more details. */ #ifndef OCEANBASE_SHARE_OB_CACHE_TEST_UTILS_H_ #define OCEANBASE_SHARE_OB_CACHE_TEST_UTILS_H_ #include "share/cache/ob_kv_storecache.h" #include "share/ob_define.h" #include "share/ob_thread_pool.h" #include "lib/atomic/ob_atomic.h" #include "lib/queue/ob_lighty_queue.h" #include "lib/alloc/ob_malloc_allocator.h" #include "share/cache/ob_kvcache_hazard_version.h" namespace oceanbase { using namespace lib; namespace common { template struct TestKVCacheKey: public ObIKVCacheKey { TestKVCacheKey(void) : v_(0), tenant_id_(0) { memset(buf_, 0, sizeof(buf_)); } virtual bool operator ==(const ObIKVCacheKey &other) const; virtual uint64_t get_tenant_id() const { return tenant_id_; } virtual uint64_t hash() const { return v_; } virtual int64_t size() const { return sizeof(*this); } virtual int deep_copy(char *buf, const int64_t buf_len, ObIKVCacheKey *&key) const; uint64_t v_; uint64_t tenant_id_; char buf_[SIZE > sizeof(v_) ? SIZE - sizeof(v_) : 0]; }; template struct TestKVCacheValue: public ObIKVCacheValue { TestKVCacheValue(void) : v_(0) { memset(buf_, 0, sizeof(buf_)); } virtual int64_t size() const { return sizeof(*this); } virtual int deep_copy(char *buf, const int64_t buf_len, ObIKVCacheValue *&value) const; uint64_t v_; char buf_[SIZE > sizeof(v_) ? SIZE - sizeof(v_) : 0]; }; template bool TestKVCacheKey::operator ==(const ObIKVCacheKey &other) const { const TestKVCacheKey &other_key = reinterpret_cast(other); return v_ == other_key.v_ && tenant_id_ == other_key.tenant_id_; } template int TestKVCacheKey::deep_copy(char *buf, const int64_t buf_len, ObIKVCacheKey *&key) const { int ret = OB_SUCCESS; TestKVCacheKey *pkey = NULL; if (NULL == buf || buf_len < size()) { ret = OB_INVALID_ARGUMENT; } else { pkey = new (buf) TestKVCacheKey(); pkey->v_ = v_; pkey->tenant_id_ = tenant_id_; key = pkey; } return ret; } template int TestKVCacheValue::deep_copy( char *buf, const int64_t buf_len, ObIKVCacheValue *&value) const { int ret = OB_SUCCESS; TestKVCacheValue *pvalue = NULL; if (NULL == buf || buf_len < size()) { ret = OB_INVALID_ARGUMENT; } else { pvalue = new (buf) TestKVCacheValue(); pvalue->v_ = v_; value = pvalue; } return ret; } struct AllocBuf { void *ptr_; AllocBuf *next_; }; class ObCacheTestTask; class ObICacheTestStat { public: virtual void add_task(ObCacheTestTask *task) = 0; virtual ObCacheTestTask *pop_oppo_task(ObCacheTestTask *task) = 0; virtual void inc_fail_count() = 0; }; class ObCacheTestTask { public: ObCacheTestTask(const int64_t tenant_id, const bool is_alloc, const int64_t alloc_size, const int64_t alloc_count, ObICacheTestStat *stat) : tenant_id_(tenant_id), is_alloc_(is_alloc), alloc_size_(alloc_size), alloc_count_(alloc_count), next_(NULL), stat_(stat), alloc_list_(NULL) { } virtual ~ObCacheTestTask() {} virtual int process() { int ret = OB_SUCCESS; if (is_alloc_) { ObMemAttr attr; attr.tenant_id_ = tenant_id_; attr.label_ = "CacheTestTask"; //xxx for (int64_t i = 0; i < alloc_count_; ++i) { void *ptr = ob_malloc(alloc_size_, attr); if (NULL == ptr) { ret = OB_ALLOCATE_MEMORY_FAILED; COMMON_LOG(WARN, "ob_malloc failed", K(ret), K_(alloc_size)); ObMallocAllocator::get_instance()->print_tenant_memory_usage(tenant_id_); ObMallocAllocator::get_instance()->print_tenant_ctx_memory_usage(tenant_id_); break; } else { append_alloc_list(ptr); } } if (OB_SUCC(ret)) { stat_->add_task(this); } } else { ObCacheTestTask *oppo_task = stat_->pop_oppo_task(this); if (NULL == oppo_task) { ret = OB_ERR_UNEXPECTED; COMMON_LOG(WARN, "oppo_task not exist", K(ret), "this", *this); } else { int64_t free_count = 0; AllocBuf *buf = oppo_task->alloc_list_; AllocBuf *next = NULL; while (NULL != buf) { next = buf->next_; ob_free(buf->ptr_); buf = next; ++free_count; } if (free_count != alloc_count_) { ret = OB_ERR_UNEXPECTED; COMMON_LOG(WARN, "free_count != alloc_count_", K(ret), K(free_count), K_(alloc_count)); } ob_free(oppo_task); } } if (OB_FAIL(ret)) { COMMON_LOG(WARN, "task process failed", K(ret)); ObMallocAllocator::get_instance()->print_tenant_memory_usage(tenant_id_); ObMallocAllocator::get_instance()->print_tenant_ctx_memory_usage(tenant_id_); stat_->inc_fail_count(); } return ret; } void set_next(ObCacheTestTask *task) { next_ = task; } void append_alloc_list(void *ptr) { AllocBuf *buf = new (ptr) AllocBuf(); buf->ptr_ = ptr; buf->next_ = alloc_list_; alloc_list_ = buf; } TO_STRING_KV(K_(tenant_id), K_(is_alloc), K_(alloc_size), K_(alloc_count)); public: uint64_t tenant_id_; bool is_alloc_; // alloc or free int64_t alloc_size_; int64_t alloc_count_; ObCacheTestTask *next_; ObICacheTestStat *stat_; AllocBuf *alloc_list_; }; class CacheTestStat : public ObICacheTestStat { public: CacheTestStat() : fail_count_(0), task_list_(NULL) {} virtual ~CacheTestStat() {} virtual void inc_fail_count() { ATOMIC_AAF(&fail_count_, 1); } virtual int64_t get_fail_count() const { return ATOMIC_LOAD(&fail_count_); } virtual void add_task(ObCacheTestTask *task) { if (NULL != task) { task->next_ = task_list_; task_list_ = task; } } virtual ObCacheTestTask *pop_oppo_task(ObCacheTestTask *task) { ObCacheTestTask *oppo_task = NULL; ObCacheTestTask *prev_task = NULL; ObCacheTestTask *cur_task = NULL; if (NULL != task && !task->is_alloc_) { cur_task = task_list_; while (NULL != cur_task) { if (cur_task->is_alloc_ && cur_task->tenant_id_ == task->tenant_id_ && cur_task->alloc_size_ == task->alloc_size_ && cur_task->alloc_count_ == task->alloc_count_) { oppo_task = cur_task; // delete it from list if (NULL != prev_task) { prev_task->next_ = cur_task->next_; cur_task->next_ = NULL; } else { task_list_ = cur_task->next_; cur_task->next_ = NULL; } break; } cur_task = cur_task->next_; prev_task = cur_task; } } return oppo_task; } private: int64_t fail_count_; ObCacheTestTask *task_list_; }; class ObAllocatorStress : public share::ObThreadPool { public: ObAllocatorStress() : inited_(false), stat_(), queue_() {} virtual ~ObAllocatorStress() {} int init() { int ret = OB_SUCCESS; if (inited_) { ret = OB_INIT_TWICE; COMMON_LOG(WARN, "init twice", K(ret)); } else if (OB_FAIL(queue_.init(1024))) { COMMON_LOG(WARN, "queue init failed", K(ret)); } else { inited_ = true; } return ret; } virtual void run1() { // UNUSED(arg); int ret = OB_SUCCESS; COMMON_LOG(INFO, "allocator stress thread start"); if (!inited_) { ret = OB_NOT_INIT; COMMON_LOG(WARN, "not init", K(ret)); } else { // will process all task before exit while (!has_set_stop() || OB_ENTRY_NOT_EXIST != ret) { ObCacheTestTask *task = NULL; if (OB_FAIL(pop(task))) { if (OB_ENTRY_NOT_EXIST != ret) { COMMON_LOG(WARN, "pop task failed", K(ret)); } } else { if (OB_FAIL(task->process())) { COMMON_LOG(WARN, "task process failed", K(ret)); } else { COMMON_LOG(INFO, "task process succeed", "task", *task); } ObMallocAllocator::get_instance()->print_tenant_memory_usage(500); ObMallocAllocator::get_instance()->print_tenant_ctx_memory_usage(500); ObMallocAllocator::get_instance()->print_tenant_memory_usage(task->tenant_id_); ObMallocAllocator::get_instance()->print_tenant_ctx_memory_usage(task->tenant_id_); } } } COMMON_LOG(INFO, "allocator stress thread end"); } int add_task(const ObCacheTestTask &task) { int ret = OB_SUCCESS; const int64_t buf_size = sizeof(task); void *ptr = NULL; if (NULL == (ptr = ob_malloc(buf_size, ObNewModIds::TEST))) { ret = OB_ALLOCATE_MEMORY_FAILED; COMMON_LOG(WARN, "ob_malloc failed", K(ret), K(buf_size)); } else { ObCacheTestTask *copy_task = new (ptr) ObCacheTestTask( task.tenant_id_, task.is_alloc_, task.alloc_size_, task.alloc_count_, task.stat_); if (OB_FAIL(queue_.push(copy_task))) { COMMON_LOG(WARN, "push task failed", K(ret)); } } return ret; } int pop(ObCacheTestTask *&task) { int ret = OB_SUCCESS; void *vp = NULL; const int64_t timeout = 1000 * 1000; if (!inited_) { ret = OB_NOT_INIT; COMMON_LOG(WARN,"not init", K(ret)); } else { ret = queue_.pop(vp, timeout); if (OB_FAIL(ret)) { if (OB_ENTRY_NOT_EXIST != ret) { COMMON_LOG(WARN, "queue pop failed", K(ret)); } } else { task = static_cast(vp); } } return ret; } int64_t get_fail_count() { return stat_.get_fail_count(); } CacheTestStat *get_stat() { return &stat_; } private: bool inited_; CacheTestStat stat_; common::ObLightyQueue queue_; }; template class ObCacheStress : public share::ObThreadPool { public: typedef TestKVCacheKey TestKey; typedef TestKVCacheValue TestValue; ObCacheStress() : inited_(false), tenant_id_(OB_INVALID_ID), put_count_(0), fail_count_(0), cache_() {} virtual ~ObCacheStress() {} int init(const uint64_t tenant_id, int64_t index) { int ret = OB_SUCCESS; char cache_name[1024]; snprintf(cache_name, 1024, "%s_%ld", "test", index); if (inited_) { ret = OB_INIT_TWICE; COMMON_LOG(WARN, "init twice", K(ret)); } else if (OB_FAIL(cache_.init(cache_name))) { COMMON_LOG(WARN, "cache init failed", K(ret)); } else { tenant_id_ = tenant_id; inited_ = true; } return ret; } virtual void run1() { // UNUSED(arg); int ret = OB_SUCCESS; COMMON_LOG(INFO, "cache stress thread start"); TestKey key; TestValue value; if (!inited_) { ret = OB_NOT_INIT; COMMON_LOG(WARN, "not init", K(ret)); } else { while (!has_set_stop()) { key.tenant_id_ = tenant_id_; key.v_ = put_count_; if (OB_FAIL(cache_.put(key, value))) { COMMON_LOG(WARN, "cache put failed", K(ret)); } else { const TestValue *get_value = NULL; ObKVCacheHandle handle; if (OB_FAIL(cache_.get(key, get_value, handle))) { COMMON_LOG(WARN, "cache get failed", K(ret)); } } ++put_count_; if (OB_FAIL(ret)) { ++fail_count_; } } } COMMON_LOG(INFO, "cache stress thread exit"); } uint64_t get_tenant_id() const { return tenant_id_; } int64_t get_put_count() const { return put_count_; } int64_t get_fail_count() const { return fail_count_; } private: bool inited_; uint64_t tenant_id_; int64_t put_count_; int64_t fail_count_; ObKVCache cache_; }; template class ObCacheGetStress : public share::ObThreadPool { public: typedef TestKVCacheKey TestKey; typedef TestKVCacheValue TestValue; ObCacheGetStress() : inited_(false), tenant_id_(OB_INVALID_ID), kv_cnt_(0), hit_cnt_(0), total_cnt_(0), fail_cnt_(0), cache_() { } int init(const uint64_t tenant_id, const int64_t kv_cnt) { int ret = OB_SUCCESS; if (inited_) { ret = OB_INIT_TWICE; COMMON_LOG(WARN, "init twice", K(ret)); } else if (OB_INVALID_ID == tenant_id || kv_cnt <= 0) { ret = OB_INVALID_ARGUMENT; COMMON_LOG(WARN, "invalid arguments", K(ret), K(tenant_id), K(kv_cnt)); } else if (OB_FAIL(cache_.init("test_cache"))) { COMMON_LOG(WARN, "cache init failed", K(ret)); } else { // put kv pairs to cache TestKey key; TestValue value; for (int64_t i = 0; OB_SUCC(ret) && i < kv_cnt; ++i) { key.tenant_id_ = tenant_id;; key.v_ = i; if (OB_FAIL(cache_.put(key, value))) { COMMON_LOG(WARN, "put failed", K(ret)); } } tenant_id_ = tenant_id; kv_cnt_ = kv_cnt; hit_cnt_ = 0; total_cnt_ = 0; fail_cnt_ = 0; inited_ = true; } return ret; } // will create monitor thread print hit ratio per second virtual void set_thread_count(const int64_t thread_count) { // extra thread for monitor thread share::ObThreadPool::set_thread_count(static_cast(thread_count + 1)); } virtual void run1() { // int64_t thread_id = (int64_t)(arg); int64_t thread_id = (int64_t)(this->get_thread_idx()); if (0 == thread_id) { do_monitor(); } else { do_work(); } } ObKVCache &get_cache() { return cache_; } double get_hit_ratio() { int ret = OB_SUCCESS; int64_t hit_cnt = 0; TestKey key; const TestValue *pvalue = NULL; ObKVCacheHandle handle; for (int64_t i = 0; i < kv_cnt_; ++i) { key.tenant_id_ = tenant_id_; key.v_ = i; if (OB_FAIL(cache_.get(key, pvalue, handle))) { if (OB_ENTRY_NOT_EXIST != ret) { COMMON_LOG(ERROR, "get failed", K(ret)); } } else { ++hit_cnt; } } return (double)hit_cnt / (double)kv_cnt_; } private: void do_monitor() { while (!has_set_stop()) { double hit_ratio = 0; const int64_t hit_cnt = ATOMIC_LOAD(&hit_cnt_); const int64_t total_cnt = ATOMIC_LOAD(&total_cnt_); ATOMIC_STORE(&hit_cnt_, 0); ATOMIC_STORE(&total_cnt_, 0); if (total_cnt > 0) { hit_ratio = (double)hit_cnt / (double)total_cnt; } COMMON_LOG(INFO, "get stress stat", K(hit_ratio)); sleep(1); } } void do_work() { int ret = OB_SUCCESS; TestKey key; const TestValue *pvalue = NULL; ObKVCacheHandle handle; while (!has_set_stop()) { for (int64_t i = 0; i < kv_cnt_ && !has_set_stop(); ++i) { const int64_t get_cnt = i < (int64_t)((double)kv_cnt_ * 0.8) ? 1 : 4; key.tenant_id_ = tenant_id_; key.v_ = i; for (int64_t i = 0; i < get_cnt; ++i) { ATOMIC_INC(&total_cnt_); if (OB_FAIL(cache_.get(key, pvalue, handle))) { if (OB_ENTRY_NOT_EXIST != ret) { COMMON_LOG(ERROR, "get failed", K(ret)); } else { ATOMIC_INC(&fail_cnt_); } } else { ATOMIC_INC(&hit_cnt_); } } } } } bool inited_; uint64_t tenant_id_; int64_t kv_cnt_; int64_t hit_cnt_; int64_t total_cnt_; int64_t fail_cnt_; ObKVCache cache_; }; template class ObWorkingSetStress : public share::ObThreadPool { public: typedef TestKVCacheKey TestKey; typedef TestKVCacheValue TestValue; ObWorkingSetStress() : inited_(false), tenant_id_(OB_INVALID_ID), put_count_(0), fail_count_(0) {} virtual ~ObWorkingSetStress() {} int init(const uint64_t tenant_id, const bool only_put) { int ret = OB_SUCCESS; if (inited_) { ret = OB_INIT_TWICE; COMMON_LOG(WARN, "init twice", K(ret)); } else if (OB_FAIL(cache_.init("test_cache"))) { COMMON_LOG(WARN, "cache init failed", K(ret)); } else if (OB_FAIL(ws_.init(tenant_id, cache_))) { COMMON_LOG(WARN, "init ws failed", K(ret), K(tenant_id)); } else { tenant_id_ = tenant_id; put_count_ = 0; fail_count_ = 0; only_put_ = only_put; pcache_ = &ws_; inited_ = true; } return ret; } int init(const uint64_t tenant_id, ObKVCache &cache, const bool use_ws, const int64_t start_key = 0) { int ret = OB_SUCCESS; if (inited_) { ret = OB_INIT_TWICE; COMMON_LOG(WARN, "init twice", K(ret)); } else { tenant_id_ = tenant_id; put_count_ = 0; fail_count_ = 0; only_put_ = false; pcache_ = &cache; start_key_ = start_key; if (use_ws) { if (OB_FAIL(ws_.init(tenant_id, cache))) { COMMON_LOG(WARN, "init ws failed", K(ret), K(tenant_id)); } else { pcache_ = &ws_; } } if (OB_SUCC(ret)) { inited_ = true; } } return ret; } virtual void run1() { // const int64_t thread_id = (int64_t)(arg); const int64_t thread_id = this->get_thread_idx(); const int64_t count = this->get_thread_count(); int64_t put_count = 0; int64_t fail_count = 0; int ret = OB_SUCCESS; COMMON_LOG(INFO, "working set stress thread start"); TestKey key; TestValue value; if (!inited_) { ret = OB_NOT_INIT; COMMON_LOG(WARN, "not init", K(ret)); } else { while (!has_set_stop()) { key.tenant_id_ = tenant_id_; key.v_ = start_key_ + thread_id + put_count * count; if (OB_FAIL(pcache_->put(key, value))) { COMMON_LOG(WARN, "cache put failed", K(ret)); } else if (!only_put_) { const TestValue *get_value = NULL; ObKVCacheHandle handle; if (OB_FAIL(pcache_->get(key, get_value, handle))) { if (OB_ENTRY_NOT_EXIST == ret) { ret = OB_SUCCESS; } else { COMMON_LOG(WARN, "cache get failed", K(ret)); } } } ++put_count; if (OB_FAIL(ret)) { ++fail_count; ObMallocAllocator::get_instance()->print_tenant_memory_usage(tenant_id_); ObMallocAllocator::get_instance()->print_tenant_ctx_memory_usage(tenant_id_); } } } ATOMIC_AAF(&put_count_, put_count); ATOMIC_AAF(&fail_count_, fail_count); COMMON_LOG(INFO, "working set stress thread exit"); } uint64_t get_tenant_id() const { return tenant_id_; } int64_t get_put_count() const { return put_count_; } int64_t get_fail_count() const { return fail_count_; } int64_t get_used() const { return ws_.working_set_->get_used(); } int64_t get_limit() const { return ws_.working_set_->get_limit(); } private: bool inited_; uint64_t tenant_id_; int64_t put_count_; int64_t fail_count_; bool only_put_; ObIKVCache *pcache_; ObKVCache cache_; ObCacheWorkingSet ws_; int64_t start_key_; }; class TestNode : public ObKVCacheHazardNode{ public: TestNode() : id_(0) {} virtual void retire() {COMMON_LOG(INFO, "TestNode(HazardNode) retire", K(this->get_version()), K(id_));} private: int64_t id_; }; }//end namespace common }//end namespace oceanbase #endif //OCEANBASE_SHARE_OB_CACHE_TEST_UTILS_H_