
Co-authored-by: wangt1xiuyi <13547954130@163.com> Co-authored-by: yangqise7en <877793735@qq.com> Co-authored-by: Zach41 <zach_41@163.com>
718 lines
19 KiB
C++
718 lines
19 KiB
C++
/**
|
|
* Copyright (c) 2021 OceanBase
|
|
* OceanBase CE is licensed under Mulan PubL v2.
|
|
* You can use this software according to the terms and conditions of the Mulan PubL v2.
|
|
* You may obtain a copy of Mulan PubL v2 at:
|
|
* http://license.coscl.org.cn/MulanPubL-2.0
|
|
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
|
|
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
|
|
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
|
|
* See the Mulan PubL v2 for more details.
|
|
*/
|
|
|
|
#ifndef OCEANBASE_SHARE_OB_CACHE_TEST_UTILS_H_
|
|
#define OCEANBASE_SHARE_OB_CACHE_TEST_UTILS_H_
|
|
|
|
#include "share/cache/ob_kv_storecache.h"
|
|
#include "share/ob_define.h"
|
|
#include "share/ob_thread_pool.h"
|
|
#include "lib/atomic/ob_atomic.h"
|
|
#include "lib/queue/ob_lighty_queue.h"
|
|
#include "lib/alloc/ob_malloc_allocator.h"
|
|
#include "share/cache/ob_kvcache_hazard_version.h"
|
|
namespace oceanbase
|
|
{
|
|
using namespace lib;
|
|
namespace common
|
|
{
|
|
|
|
template<int64_t SIZE>
|
|
struct TestKVCacheKey: public ObIKVCacheKey
|
|
{
|
|
TestKVCacheKey(void)
|
|
: v_(0), tenant_id_(0)
|
|
{
|
|
memset(buf_, 0, sizeof(buf_));
|
|
}
|
|
virtual bool operator ==(const ObIKVCacheKey &other) const;
|
|
virtual uint64_t get_tenant_id() const
|
|
{
|
|
return tenant_id_;
|
|
}
|
|
virtual uint64_t hash() const
|
|
{
|
|
return v_;
|
|
}
|
|
virtual int64_t size() const
|
|
{
|
|
return sizeof(*this);
|
|
}
|
|
virtual int deep_copy(char *buf, const int64_t buf_len, ObIKVCacheKey *&key) const;
|
|
uint64_t v_;
|
|
uint64_t tenant_id_;
|
|
char buf_[SIZE > sizeof(v_) ? SIZE - sizeof(v_) : 0];
|
|
};
|
|
|
|
template<int64_t SIZE>
|
|
struct TestKVCacheValue: public ObIKVCacheValue
|
|
{
|
|
TestKVCacheValue(void)
|
|
: v_(0)
|
|
{
|
|
memset(buf_, 0, sizeof(buf_));
|
|
}
|
|
virtual int64_t size() const
|
|
{
|
|
return sizeof(*this);
|
|
}
|
|
virtual int deep_copy(char *buf, const int64_t buf_len, ObIKVCacheValue *&value) const;
|
|
uint64_t v_;
|
|
char buf_[SIZE > sizeof(v_) ? SIZE - sizeof(v_) : 0];
|
|
};
|
|
|
|
|
|
template<int64_t SIZE>
|
|
bool TestKVCacheKey<SIZE>::operator ==(const ObIKVCacheKey &other) const
|
|
{
|
|
const TestKVCacheKey &other_key = reinterpret_cast<const TestKVCacheKey &>(other);
|
|
return v_ == other_key.v_ && tenant_id_ == other_key.tenant_id_;
|
|
}
|
|
|
|
template<int64_t SIZE>
|
|
int TestKVCacheKey<SIZE>::deep_copy(char *buf, const int64_t buf_len, ObIKVCacheKey *&key) const
|
|
{
|
|
int ret = OB_SUCCESS;
|
|
TestKVCacheKey<SIZE> *pkey = NULL;
|
|
if (NULL == buf || buf_len < size()) {
|
|
ret = OB_INVALID_ARGUMENT;
|
|
} else {
|
|
pkey = new (buf) TestKVCacheKey<SIZE>();
|
|
pkey->v_ = v_;
|
|
pkey->tenant_id_ = tenant_id_;
|
|
key = pkey;
|
|
}
|
|
return ret;
|
|
}
|
|
|
|
template<int64_t SIZE>
|
|
int TestKVCacheValue<SIZE>::deep_copy(
|
|
char *buf,
|
|
const int64_t buf_len,
|
|
ObIKVCacheValue *&value) const
|
|
{
|
|
int ret = OB_SUCCESS;
|
|
TestKVCacheValue<SIZE> *pvalue = NULL;
|
|
if (NULL == buf || buf_len < size()) {
|
|
ret = OB_INVALID_ARGUMENT;
|
|
} else {
|
|
pvalue = new (buf) TestKVCacheValue<SIZE>();
|
|
pvalue->v_ = v_;
|
|
value = pvalue;
|
|
}
|
|
return ret;
|
|
}
|
|
|
|
|
|
|
|
struct AllocBuf
|
|
{
|
|
void *ptr_;
|
|
AllocBuf *next_;
|
|
};
|
|
|
|
class ObCacheTestTask;
|
|
class ObICacheTestStat
|
|
{
|
|
public:
|
|
virtual void add_task(ObCacheTestTask *task) = 0;
|
|
virtual ObCacheTestTask *pop_oppo_task(ObCacheTestTask *task) = 0;
|
|
virtual void inc_fail_count() = 0;
|
|
};
|
|
|
|
class ObCacheTestTask
|
|
{
|
|
public:
|
|
ObCacheTestTask(const int64_t tenant_id, const bool is_alloc,
|
|
const int64_t alloc_size, const int64_t alloc_count,
|
|
ObICacheTestStat *stat)
|
|
: tenant_id_(tenant_id), is_alloc_(is_alloc),
|
|
alloc_size_(alloc_size), alloc_count_(alloc_count),
|
|
next_(NULL), stat_(stat), alloc_list_(NULL)
|
|
{
|
|
}
|
|
|
|
virtual ~ObCacheTestTask() {}
|
|
|
|
virtual int process()
|
|
{
|
|
int ret = OB_SUCCESS;
|
|
if (is_alloc_) {
|
|
ObMemAttr attr;
|
|
attr.tenant_id_ = tenant_id_;
|
|
attr.label_ = "CacheTestTask"; //xxx
|
|
for (int64_t i = 0; i < alloc_count_; ++i) {
|
|
void *ptr = ob_malloc(alloc_size_, attr);
|
|
if (NULL == ptr) {
|
|
ret = OB_ALLOCATE_MEMORY_FAILED;
|
|
COMMON_LOG(WARN, "ob_malloc failed", K(ret), K_(alloc_size));
|
|
ObMallocAllocator::get_instance()->print_tenant_memory_usage(tenant_id_);
|
|
ObMallocAllocator::get_instance()->print_tenant_ctx_memory_usage(tenant_id_);
|
|
break;
|
|
} else {
|
|
append_alloc_list(ptr);
|
|
}
|
|
}
|
|
|
|
if (OB_SUCC(ret)) {
|
|
stat_->add_task(this);
|
|
}
|
|
} else {
|
|
ObCacheTestTask *oppo_task = stat_->pop_oppo_task(this);
|
|
if (NULL == oppo_task) {
|
|
ret = OB_ERR_UNEXPECTED;
|
|
COMMON_LOG(WARN, "oppo_task not exist", K(ret), "this", *this);
|
|
} else {
|
|
int64_t free_count = 0;
|
|
AllocBuf *buf = oppo_task->alloc_list_;
|
|
AllocBuf *next = NULL;
|
|
while (NULL != buf) {
|
|
next = buf->next_;
|
|
ob_free(buf->ptr_);
|
|
buf = next;
|
|
++free_count;
|
|
}
|
|
|
|
if (free_count != alloc_count_) {
|
|
ret = OB_ERR_UNEXPECTED;
|
|
COMMON_LOG(WARN, "free_count != alloc_count_", K(ret), K(free_count), K_(alloc_count));
|
|
}
|
|
|
|
ob_free(oppo_task);
|
|
}
|
|
}
|
|
|
|
if (OB_FAIL(ret)) {
|
|
COMMON_LOG(WARN, "task process failed", K(ret));
|
|
ObMallocAllocator::get_instance()->print_tenant_memory_usage(tenant_id_);
|
|
ObMallocAllocator::get_instance()->print_tenant_ctx_memory_usage(tenant_id_);
|
|
stat_->inc_fail_count();
|
|
}
|
|
|
|
return ret;
|
|
}
|
|
void set_next(ObCacheTestTask *task) { next_ = task; }
|
|
void append_alloc_list(void *ptr)
|
|
{
|
|
AllocBuf *buf = new (ptr) AllocBuf();
|
|
buf->ptr_ = ptr;
|
|
buf->next_ = alloc_list_;
|
|
alloc_list_ = buf;
|
|
}
|
|
TO_STRING_KV(K_(tenant_id), K_(is_alloc), K_(alloc_size), K_(alloc_count));
|
|
public:
|
|
uint64_t tenant_id_;
|
|
bool is_alloc_; // alloc or free
|
|
int64_t alloc_size_;
|
|
int64_t alloc_count_;
|
|
ObCacheTestTask *next_;
|
|
ObICacheTestStat *stat_;
|
|
AllocBuf *alloc_list_;
|
|
};
|
|
|
|
class CacheTestStat : public ObICacheTestStat
|
|
{
|
|
public:
|
|
CacheTestStat() : fail_count_(0), task_list_(NULL) {}
|
|
virtual ~CacheTestStat() {}
|
|
|
|
virtual void inc_fail_count() { ATOMIC_AAF(&fail_count_, 1); }
|
|
virtual int64_t get_fail_count() const { return ATOMIC_LOAD(&fail_count_); }
|
|
virtual void add_task(ObCacheTestTask *task)
|
|
{
|
|
if (NULL != task) {
|
|
task->next_ = task_list_;
|
|
task_list_ = task;
|
|
}
|
|
}
|
|
virtual ObCacheTestTask *pop_oppo_task(ObCacheTestTask *task)
|
|
{
|
|
ObCacheTestTask *oppo_task = NULL;
|
|
ObCacheTestTask *prev_task = NULL;
|
|
ObCacheTestTask *cur_task = NULL;
|
|
if (NULL != task && !task->is_alloc_) {
|
|
cur_task = task_list_;
|
|
while (NULL != cur_task) {
|
|
if (cur_task->is_alloc_ && cur_task->tenant_id_ == task->tenant_id_
|
|
&& cur_task->alloc_size_ == task->alloc_size_ && cur_task->alloc_count_ == task->alloc_count_) {
|
|
oppo_task = cur_task;
|
|
// delete it from list
|
|
if (NULL != prev_task) {
|
|
prev_task->next_ = cur_task->next_;
|
|
cur_task->next_ = NULL;
|
|
} else {
|
|
task_list_ = cur_task->next_;
|
|
cur_task->next_ = NULL;
|
|
}
|
|
break;
|
|
}
|
|
cur_task = cur_task->next_;
|
|
prev_task = cur_task;
|
|
}
|
|
}
|
|
return oppo_task;
|
|
}
|
|
private:
|
|
int64_t fail_count_;
|
|
ObCacheTestTask *task_list_;
|
|
};
|
|
|
|
class ObAllocatorStress : public share::ObThreadPool
|
|
{
|
|
public:
|
|
ObAllocatorStress() : inited_(false), stat_(), queue_() {}
|
|
virtual ~ObAllocatorStress() {}
|
|
|
|
int init()
|
|
{
|
|
int ret = OB_SUCCESS;
|
|
if (inited_) {
|
|
ret = OB_INIT_TWICE;
|
|
COMMON_LOG(WARN, "init twice", K(ret));
|
|
} else if (OB_FAIL(queue_.init(1024))) {
|
|
COMMON_LOG(WARN, "queue init failed", K(ret));
|
|
} else {
|
|
inited_ = true;
|
|
}
|
|
return ret;
|
|
}
|
|
|
|
virtual void run1()
|
|
{
|
|
|
|
// UNUSED(arg);
|
|
int ret = OB_SUCCESS;
|
|
COMMON_LOG(INFO, "allocator stress thread start");
|
|
if (!inited_) {
|
|
ret = OB_NOT_INIT;
|
|
COMMON_LOG(WARN, "not init", K(ret));
|
|
} else {
|
|
// will process all task before exit
|
|
while (!has_set_stop() || OB_ENTRY_NOT_EXIST != ret) {
|
|
ObCacheTestTask *task = NULL;
|
|
if (OB_FAIL(pop(task))) {
|
|
if (OB_ENTRY_NOT_EXIST != ret) {
|
|
COMMON_LOG(WARN, "pop task failed", K(ret));
|
|
}
|
|
} else {
|
|
if (OB_FAIL(task->process())) {
|
|
COMMON_LOG(WARN, "task process failed", K(ret));
|
|
} else {
|
|
COMMON_LOG(INFO, "task process succeed", "task", *task);
|
|
}
|
|
ObMallocAllocator::get_instance()->print_tenant_memory_usage(500);
|
|
ObMallocAllocator::get_instance()->print_tenant_ctx_memory_usage(500);
|
|
|
|
ObMallocAllocator::get_instance()->print_tenant_memory_usage(task->tenant_id_);
|
|
ObMallocAllocator::get_instance()->print_tenant_ctx_memory_usage(task->tenant_id_);
|
|
}
|
|
}
|
|
}
|
|
COMMON_LOG(INFO, "allocator stress thread end");
|
|
}
|
|
|
|
int add_task(const ObCacheTestTask &task)
|
|
{
|
|
int ret = OB_SUCCESS;
|
|
const int64_t buf_size = sizeof(task);
|
|
void *ptr = NULL;
|
|
if (NULL == (ptr = ob_malloc(buf_size, ObNewModIds::TEST))) {
|
|
ret = OB_ALLOCATE_MEMORY_FAILED;
|
|
COMMON_LOG(WARN, "ob_malloc failed", K(ret), K(buf_size));
|
|
} else {
|
|
ObCacheTestTask *copy_task = new (ptr) ObCacheTestTask(
|
|
task.tenant_id_, task.is_alloc_, task.alloc_size_, task.alloc_count_, task.stat_);
|
|
if (OB_FAIL(queue_.push(copy_task))) {
|
|
COMMON_LOG(WARN, "push task failed", K(ret));
|
|
}
|
|
}
|
|
return ret;
|
|
}
|
|
|
|
int pop(ObCacheTestTask *&task)
|
|
{
|
|
int ret = OB_SUCCESS;
|
|
void *vp = NULL;
|
|
const int64_t timeout = 1000 * 1000;
|
|
if (!inited_) {
|
|
ret = OB_NOT_INIT;
|
|
COMMON_LOG(WARN,"not init", K(ret));
|
|
} else {
|
|
ret = queue_.pop(vp, timeout);
|
|
if (OB_FAIL(ret)) {
|
|
if (OB_ENTRY_NOT_EXIST != ret) {
|
|
COMMON_LOG(WARN, "queue pop failed", K(ret));
|
|
}
|
|
} else {
|
|
task = static_cast<ObCacheTestTask *>(vp);
|
|
}
|
|
}
|
|
return ret;
|
|
}
|
|
|
|
int64_t get_fail_count() { return stat_.get_fail_count(); }
|
|
CacheTestStat *get_stat() { return &stat_; }
|
|
private:
|
|
bool inited_;
|
|
CacheTestStat stat_;
|
|
common::ObLightyQueue queue_;
|
|
};
|
|
|
|
template<int64_t K_SIZE, int64_t V_SIZE>
|
|
class ObCacheStress : public share::ObThreadPool
|
|
{
|
|
public:
|
|
typedef TestKVCacheKey<K_SIZE> TestKey;
|
|
typedef TestKVCacheValue<V_SIZE> TestValue;
|
|
|
|
ObCacheStress() : inited_(false), tenant_id_(OB_INVALID_ID),
|
|
put_count_(0), fail_count_(0), cache_() {}
|
|
virtual ~ObCacheStress() {}
|
|
|
|
int init(const uint64_t tenant_id, int64_t index)
|
|
{
|
|
int ret = OB_SUCCESS;
|
|
char cache_name[1024];
|
|
snprintf(cache_name, 1024, "%s_%ld", "test", index);
|
|
if (inited_) {
|
|
ret = OB_INIT_TWICE;
|
|
COMMON_LOG(WARN, "init twice", K(ret));
|
|
} else if (OB_FAIL(cache_.init(cache_name))) {
|
|
COMMON_LOG(WARN, "cache init failed", K(ret));
|
|
} else {
|
|
tenant_id_ = tenant_id;
|
|
inited_ = true;
|
|
}
|
|
return ret;
|
|
}
|
|
virtual void run1()
|
|
{
|
|
|
|
// UNUSED(arg);
|
|
int ret = OB_SUCCESS;
|
|
COMMON_LOG(INFO, "cache stress thread start");
|
|
TestKey key;
|
|
TestValue value;
|
|
if (!inited_) {
|
|
ret = OB_NOT_INIT;
|
|
COMMON_LOG(WARN, "not init", K(ret));
|
|
} else {
|
|
while (!has_set_stop()) {
|
|
key.tenant_id_ = tenant_id_;
|
|
key.v_ = put_count_;
|
|
if (OB_FAIL(cache_.put(key, value))) {
|
|
COMMON_LOG(WARN, "cache put failed", K(ret));
|
|
} else {
|
|
const TestValue *get_value = NULL;
|
|
ObKVCacheHandle handle;
|
|
if (OB_FAIL(cache_.get(key, get_value, handle))) {
|
|
COMMON_LOG(WARN, "cache get failed", K(ret));
|
|
}
|
|
}
|
|
++put_count_;
|
|
if (OB_FAIL(ret)) {
|
|
++fail_count_;
|
|
}
|
|
}
|
|
}
|
|
COMMON_LOG(INFO, "cache stress thread exit");
|
|
}
|
|
|
|
uint64_t get_tenant_id() const { return tenant_id_; }
|
|
int64_t get_put_count() const { return put_count_; }
|
|
int64_t get_fail_count() const { return fail_count_; }
|
|
private:
|
|
bool inited_;
|
|
uint64_t tenant_id_;
|
|
int64_t put_count_;
|
|
int64_t fail_count_;
|
|
ObKVCache<TestKey, TestValue> cache_;
|
|
};
|
|
|
|
template<int64_t K_SIZE, int64_t V_SIZE>
|
|
class ObCacheGetStress : public share::ObThreadPool
|
|
{
|
|
public:
|
|
typedef TestKVCacheKey<K_SIZE> TestKey;
|
|
typedef TestKVCacheValue<V_SIZE> TestValue;
|
|
|
|
ObCacheGetStress()
|
|
: inited_(false), tenant_id_(OB_INVALID_ID),
|
|
kv_cnt_(0), hit_cnt_(0), total_cnt_(0), fail_cnt_(0), cache_()
|
|
{
|
|
}
|
|
|
|
int init(const uint64_t tenant_id, const int64_t kv_cnt)
|
|
{
|
|
int ret = OB_SUCCESS;
|
|
if (inited_) {
|
|
ret = OB_INIT_TWICE;
|
|
COMMON_LOG(WARN, "init twice", K(ret));
|
|
} else if (OB_INVALID_ID == tenant_id || kv_cnt <= 0) {
|
|
ret = OB_INVALID_ARGUMENT;
|
|
COMMON_LOG(WARN, "invalid arguments", K(ret), K(tenant_id), K(kv_cnt));
|
|
} else if (OB_FAIL(cache_.init("test_cache"))) {
|
|
COMMON_LOG(WARN, "cache init failed", K(ret));
|
|
} else {
|
|
// put kv pairs to cache
|
|
TestKey key;
|
|
TestValue value;
|
|
for (int64_t i = 0; OB_SUCC(ret) && i < kv_cnt; ++i) {
|
|
key.tenant_id_ = tenant_id;;
|
|
key.v_ = i;
|
|
if (OB_FAIL(cache_.put(key, value))) {
|
|
COMMON_LOG(WARN, "put failed", K(ret));
|
|
}
|
|
}
|
|
tenant_id_ = tenant_id;
|
|
kv_cnt_ = kv_cnt;
|
|
hit_cnt_ = 0;
|
|
total_cnt_ = 0;
|
|
fail_cnt_ = 0;
|
|
inited_ = true;
|
|
}
|
|
return ret;
|
|
}
|
|
|
|
// will create monitor thread print hit ratio per second
|
|
virtual void set_thread_count(const int64_t thread_count)
|
|
{
|
|
// extra thread for monitor thread
|
|
share::ObThreadPool::set_thread_count(static_cast<int32_t>(thread_count + 1));
|
|
}
|
|
|
|
virtual void run1()
|
|
{
|
|
|
|
// int64_t thread_id = (int64_t)(arg);
|
|
int64_t thread_id = (int64_t)(this->get_thread_idx());
|
|
if (0 == thread_id) {
|
|
do_monitor();
|
|
} else {
|
|
do_work();
|
|
}
|
|
}
|
|
|
|
ObKVCache<TestKey, TestValue> &get_cache()
|
|
{
|
|
return cache_;
|
|
}
|
|
double get_hit_ratio()
|
|
{
|
|
int ret = OB_SUCCESS;
|
|
int64_t hit_cnt = 0;
|
|
TestKey key;
|
|
const TestValue *pvalue = NULL;
|
|
ObKVCacheHandle handle;
|
|
for (int64_t i = 0; i < kv_cnt_; ++i) {
|
|
key.tenant_id_ = tenant_id_;
|
|
key.v_ = i;
|
|
if (OB_FAIL(cache_.get(key, pvalue, handle))) {
|
|
if (OB_ENTRY_NOT_EXIST != ret) {
|
|
COMMON_LOG(ERROR, "get failed", K(ret));
|
|
}
|
|
} else {
|
|
++hit_cnt;
|
|
}
|
|
}
|
|
return (double)hit_cnt / (double)kv_cnt_;
|
|
}
|
|
private:
|
|
void do_monitor()
|
|
{
|
|
while (!has_set_stop()) {
|
|
double hit_ratio = 0;
|
|
const int64_t hit_cnt = ATOMIC_LOAD(&hit_cnt_);
|
|
const int64_t total_cnt = ATOMIC_LOAD(&total_cnt_);
|
|
ATOMIC_STORE(&hit_cnt_, 0);
|
|
ATOMIC_STORE(&total_cnt_, 0);
|
|
if (total_cnt > 0) {
|
|
hit_ratio = (double)hit_cnt / (double)total_cnt;
|
|
}
|
|
COMMON_LOG(INFO, "get stress stat", K(hit_ratio));
|
|
sleep(1);
|
|
}
|
|
}
|
|
|
|
void do_work()
|
|
{
|
|
int ret = OB_SUCCESS;
|
|
TestKey key;
|
|
const TestValue *pvalue = NULL;
|
|
ObKVCacheHandle handle;
|
|
while (!has_set_stop()) {
|
|
for (int64_t i = 0; i < kv_cnt_ && !has_set_stop(); ++i) {
|
|
const int64_t get_cnt = i < (int64_t)((double)kv_cnt_ * 0.8) ? 1 : 4;
|
|
key.tenant_id_ = tenant_id_;
|
|
key.v_ = i;
|
|
for (int64_t i = 0; i < get_cnt; ++i) {
|
|
ATOMIC_INC(&total_cnt_);
|
|
if (OB_FAIL(cache_.get(key, pvalue, handle))) {
|
|
if (OB_ENTRY_NOT_EXIST != ret) {
|
|
COMMON_LOG(ERROR, "get failed", K(ret));
|
|
} else {
|
|
ATOMIC_INC(&fail_cnt_);
|
|
}
|
|
} else {
|
|
ATOMIC_INC(&hit_cnt_);
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
bool inited_;
|
|
uint64_t tenant_id_;
|
|
int64_t kv_cnt_;
|
|
int64_t hit_cnt_;
|
|
int64_t total_cnt_;
|
|
int64_t fail_cnt_;
|
|
ObKVCache<TestKey, TestValue> cache_;
|
|
};
|
|
|
|
template<int64_t K_SIZE, int64_t V_SIZE>
|
|
class ObWorkingSetStress : public share::ObThreadPool
|
|
{
|
|
public:
|
|
typedef TestKVCacheKey<K_SIZE> TestKey;
|
|
typedef TestKVCacheValue<V_SIZE> TestValue;
|
|
|
|
ObWorkingSetStress() : inited_(false), tenant_id_(OB_INVALID_ID),
|
|
put_count_(0), fail_count_(0) {}
|
|
virtual ~ObWorkingSetStress() {}
|
|
|
|
int init(const uint64_t tenant_id, const bool only_put)
|
|
{
|
|
int ret = OB_SUCCESS;
|
|
if (inited_) {
|
|
ret = OB_INIT_TWICE;
|
|
COMMON_LOG(WARN, "init twice", K(ret));
|
|
} else if (OB_FAIL(cache_.init("test_cache"))) {
|
|
COMMON_LOG(WARN, "cache init failed", K(ret));
|
|
} else if (OB_FAIL(ws_.init(tenant_id, cache_))) {
|
|
COMMON_LOG(WARN, "init ws failed", K(ret), K(tenant_id));
|
|
} else {
|
|
tenant_id_ = tenant_id;
|
|
put_count_ = 0;
|
|
fail_count_ = 0;
|
|
only_put_ = only_put;
|
|
pcache_ = &ws_;
|
|
inited_ = true;
|
|
}
|
|
return ret;
|
|
}
|
|
|
|
int init(const uint64_t tenant_id, ObKVCache<TestKey, TestValue> &cache, const bool use_ws, const int64_t start_key = 0)
|
|
{
|
|
int ret = OB_SUCCESS;
|
|
if (inited_) {
|
|
ret = OB_INIT_TWICE;
|
|
COMMON_LOG(WARN, "init twice", K(ret));
|
|
} else {
|
|
tenant_id_ = tenant_id;
|
|
put_count_ = 0;
|
|
fail_count_ = 0;
|
|
only_put_ = false;
|
|
pcache_ = &cache;
|
|
start_key_ = start_key;
|
|
if (use_ws) {
|
|
if (OB_FAIL(ws_.init(tenant_id, cache))) {
|
|
COMMON_LOG(WARN, "init ws failed", K(ret), K(tenant_id));
|
|
} else {
|
|
pcache_ = &ws_;
|
|
}
|
|
}
|
|
if (OB_SUCC(ret)) {
|
|
inited_ = true;
|
|
}
|
|
}
|
|
return ret;
|
|
}
|
|
|
|
virtual void run1()
|
|
{
|
|
|
|
// const int64_t thread_id = (int64_t)(arg);
|
|
const int64_t thread_id = this->get_thread_idx();
|
|
const int64_t count = this->get_thread_count();
|
|
int64_t put_count = 0;
|
|
int64_t fail_count = 0;
|
|
int ret = OB_SUCCESS;
|
|
COMMON_LOG(INFO, "working set stress thread start");
|
|
TestKey key;
|
|
TestValue value;
|
|
if (!inited_) {
|
|
ret = OB_NOT_INIT;
|
|
COMMON_LOG(WARN, "not init", K(ret));
|
|
} else {
|
|
while (!has_set_stop()) {
|
|
key.tenant_id_ = tenant_id_;
|
|
key.v_ = start_key_ + thread_id + put_count * count;
|
|
if (OB_FAIL(pcache_->put(key, value))) {
|
|
COMMON_LOG(WARN, "cache put failed", K(ret));
|
|
} else if (!only_put_) {
|
|
const TestValue *get_value = NULL;
|
|
ObKVCacheHandle handle;
|
|
if (OB_FAIL(pcache_->get(key, get_value, handle))) {
|
|
if (OB_ENTRY_NOT_EXIST == ret) {
|
|
ret = OB_SUCCESS;
|
|
} else {
|
|
COMMON_LOG(WARN, "cache get failed", K(ret));
|
|
}
|
|
}
|
|
}
|
|
++put_count;
|
|
if (OB_FAIL(ret)) {
|
|
++fail_count;
|
|
ObMallocAllocator::get_instance()->print_tenant_memory_usage(tenant_id_);
|
|
ObMallocAllocator::get_instance()->print_tenant_ctx_memory_usage(tenant_id_);
|
|
|
|
}
|
|
}
|
|
}
|
|
ATOMIC_AAF(&put_count_, put_count);
|
|
ATOMIC_AAF(&fail_count_, fail_count);
|
|
COMMON_LOG(INFO, "working set stress thread exit");
|
|
}
|
|
|
|
uint64_t get_tenant_id() const { return tenant_id_; }
|
|
int64_t get_put_count() const { return put_count_; }
|
|
int64_t get_fail_count() const { return fail_count_; }
|
|
int64_t get_used() const { return ws_.working_set_->get_used(); }
|
|
int64_t get_limit() const { return ws_.working_set_->get_limit(); }
|
|
private:
|
|
bool inited_;
|
|
uint64_t tenant_id_;
|
|
int64_t put_count_;
|
|
int64_t fail_count_;
|
|
bool only_put_;
|
|
ObIKVCache<TestKey, TestValue> *pcache_;
|
|
ObKVCache<TestKey, TestValue> cache_;
|
|
ObCacheWorkingSet<TestKey, TestValue> ws_;
|
|
int64_t start_key_;
|
|
};
|
|
|
|
class TestNode : public ObKVCacheHazardNode{
|
|
public:
|
|
TestNode()
|
|
: id_(0)
|
|
{}
|
|
virtual void retire() {COMMON_LOG(INFO, "TestNode(HazardNode) retire", K(this->get_version()), K(id_));}
|
|
private:
|
|
int64_t id_;
|
|
};
|
|
|
|
}//end namespace common
|
|
}//end namespace oceanbase
|
|
|
|
#endif //OCEANBASE_SHARE_OB_CACHE_TEST_UTILS_H_
|