oceanbase/unittest/libobcdc/test_ob_map_queue.cpp
2023-04-13 06:46:35 +00:00

426 lines
12 KiB
C++

/**
* Copyright (c) 2021 OceanBase
* OceanBase CE is licensed under Mulan PubL v2.
* You can use this software according to the terms and conditions of the Mulan PubL v2.
* You may obtain a copy of Mulan PubL v2 at:
* http://license.coscl.org.cn/MulanPubL-2.0
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
* See the Mulan PubL v2 for more details.
*/
#define USING_LOG_PREFIX OBLOG_FETCHER
#include <gtest/gtest.h>
#include "share/ob_define.h"
#include "logservice/libobcdc/src/ob_map_queue.h"
#include "ob_log_utils.h"
using namespace oceanbase;
using namespace common;
using namespace libobcdc;
namespace oceanbase
{
namespace unittest
{
class TestObMapQueue : public ::testing::Test
{
public :
virtual void SetUp() {}
virtual void TearDown() {}
public :
// ObMapQueue label
static constexpr const char *LABEL = "TestObMapQueue";
// push thread
static const int64_t ONE_PUSH_THREAD_NUM = 1;
static const int64_t MULTI_PUSH_THREAD_NUM = 3;
// pop thread
static const int64_t ONE_POP_THREAD_NUM = 1;
static const int64_t MULTI_POP_THREAD_NUM = 5;
static const int64_t TEST_TIME_LIMIT = 10 * _MIN_;
};
// ObMapQueue type
typedef int64_t Type;
// push ObMapQueue value
static const int64_t START_VALUE = 0;
static const int64_t END_VALUE = 1 * 1000 * 1000 - 1;
static const int64_t VALUE_COUNT = END_VALUE - START_VALUE + 1;
class TestPushWorker : public libobcdc::Runnable
{
public:
enum State
{
IDLE, //
REQ, // pushing
DONE // push DONE
};
// Identifies the current thread status
State state_;
// thread index
int64_t thread_idx_;
// thread count
int64_t thread_count_;
// ObMapQueue
ObMapQueue<Type> *map_queue_;
// record map_queue push count
int64_t push_count_;
// value interval
int64_t interval_;
virtual int routine()
{
int64_t start = thread_idx_ * interval_;
int64_t end = (thread_count_ - 1 != thread_idx_) ? start + interval_ - 1 : END_VALUE;
LOG_INFO("TestPushWorker", K(start), K(end));
int64_t val = start;
while (val <= end) {
EXPECT_EQ(OB_SUCCESS, map_queue_->push(val++));
push_count_++;
}
if (end + 1 == val) {
state_ = DONE;
}
return OB_SUCCESS;
}
};
class TestPopWorker: public libobcdc::Runnable
{
public:
// thread index
int64_t thread_idx_;
// ObMapQueue
ObMapQueue<Type> *map_queue_;
// record thread map_queue pop count
int64_t pop_count_ CACHE_ALIGNED;
// record poped count for all threads
int64_t *end_pop_count_ CACHE_ALIGNED;
// 保存pop出来的数据
Type *array_;
virtual int routine()
{
int ret = OB_SUCCESS;
while (OB_SUCC(ret)) {
Type val;
while (OB_SUCC(map_queue_->pop(val))) {
if (val >= START_VALUE && val <= END_VALUE) {
if (0 == array_[val]) {
array_[val] = val;
ATOMIC_INC(&pop_count_);
}
}
}
if (OB_EAGAIN == ret) {
ret = OB_SUCCESS;
}
if (ATOMIC_LOAD(end_pop_count_) == VALUE_COUNT) {
break;
}
}
return ret;
}
};
////////////////////// Basic function tests //////////////////////////////////////////
// ObMapQueue init
TEST_F(TestObMapQueue, init)
{
ObMapQueue<Type> map_queue;
EXPECT_EQ(OB_SUCCESS, map_queue.init(LABEL));
EXPECT_TRUE(map_queue.is_inited());
map_queue.destroy();
EXPECT_FALSE(map_queue.is_inited());
}
// Test scenarios.
// 1. single-threaded push - single-threaded pop
// 2. single-threaded push - multi-threaded pop
// 3. multi-threaded push - single-threaded pop
// 4. multi-threaded push - multi-threaded pop
TEST_F(TestObMapQueue, push_pop_test)
{
ObMapQueue<Type> map_queue;
EXPECT_EQ(OB_SUCCESS, map_queue.init(LABEL));
EXPECT_TRUE(map_queue.is_inited());
// malloc array
Type *array = (Type *)ob_malloc(sizeof(Type) * VALUE_COUNT, ObNewModIds::TEST);
OB_ASSERT(NULL != array);
for (int64_t test_type = 0, test_cnt = 4; test_type < test_cnt; ++test_type) {
memset(array, 0, sizeof(Type) * VALUE_COUNT);
int64_t PUSH_THREAD_NUM = 0;
int64_t POP_THREAD_NUM = 0;
int64_t end_push_count = 0;
int64_t end_pop_count = 0;
switch (test_type) {
// single-threaded push - single-threaded pop
case 0:
PUSH_THREAD_NUM = ONE_PUSH_THREAD_NUM;
POP_THREAD_NUM = ONE_POP_THREAD_NUM;
break;
// single-threaded push - multi-threaded pop
case 1:
PUSH_THREAD_NUM = ONE_PUSH_THREAD_NUM;
POP_THREAD_NUM = MULTI_POP_THREAD_NUM;
break;
// multi-threaded push - single-threaded pop
case 2:
PUSH_THREAD_NUM = MULTI_PUSH_THREAD_NUM;
POP_THREAD_NUM = ONE_POP_THREAD_NUM;
break;
// multi-threaded push - multi-threaded pop
case 3:
PUSH_THREAD_NUM = MULTI_PUSH_THREAD_NUM;
POP_THREAD_NUM = MULTI_POP_THREAD_NUM;
break;
default:
break;
}
LOG_INFO("push_pop_test", K(test_type), K(PUSH_THREAD_NUM), K(POP_THREAD_NUM));
// push thread
TestPushWorker push_workers[PUSH_THREAD_NUM];
const int64_t INTERVAL = VALUE_COUNT / PUSH_THREAD_NUM;
for (int64_t idx = 0, cnt = PUSH_THREAD_NUM; idx < cnt; ++idx) {
TestPushWorker &w = push_workers[idx];
// assign value
w.state_ = TestPushWorker::REQ;
w.thread_idx_ = idx;
w.thread_count_ = PUSH_THREAD_NUM;
w.map_queue_ = &map_queue;
w.push_count_ = 0;
w.interval_ = INTERVAL;
// create threads
w.create();
LOG_INFO("push_pop_test", "push thread", "create OB_SUCCESS");
}
// pop thread
TestPopWorker pop_workers[POP_THREAD_NUM];
for (int64_t idx = 0, cnt = POP_THREAD_NUM; idx < cnt; ++idx) {
TestPopWorker &w = pop_workers[idx];
// addign value
w.map_queue_ = &map_queue;
w.array_ = array;
w.pop_count_ = 0;
w.end_pop_count_ = &end_pop_count;
// create threads
w.create();
LOG_INFO("push_pop_test", "pop thread", "create OB_SUCCESS");
}
// Verify the correctness of the push: verify the total number of pushes into the ObMapQueue-Type
int64_t start_test_tstamp = get_timestamp();
while (((get_timestamp() - start_test_tstamp) < TEST_TIME_LIMIT)
&& (end_push_count < VALUE_COUNT)) {
for (int64_t idx = 0, cnt = PUSH_THREAD_NUM; idx < cnt; ++idx) {
TestPushWorker &w = push_workers[idx];
if (TestPushWorker::DONE == w.state_) {
end_push_count += w.push_count_;
w.state_ = TestPushWorker::IDLE;
}
}
}
EXPECT_EQ(VALUE_COUNT, end_push_count);
// Verify that the pop is correct:
// 1. verify the total number of -Types popped from ObMapQueue
// 2. Correctness of the fields
start_test_tstamp = get_timestamp();
while (((get_timestamp() - start_test_tstamp) < TEST_TIME_LIMIT)
&& (end_pop_count < VALUE_COUNT)) {
for (int64_t idx = 0, cnt = POP_THREAD_NUM; idx < cnt; ++idx) {
TestPopWorker &w = pop_workers[idx];
int64_t pop_cnt = ATOMIC_LOAD(&w.pop_count_);
while (!ATOMIC_BCAS(&w.pop_count_, pop_cnt, 0)) {
pop_cnt = ATOMIC_LOAD(&w.pop_count_);
}
end_pop_count += pop_cnt;
//LOG_DEBUG("pop verify", K(idx), K(pop_cnt), K(end_pop_count));
LOG_INFO("pop verify", K(idx), K(pop_cnt), K(end_pop_count));
}
}
EXPECT_EQ(VALUE_COUNT, end_pop_count);
int64_t correct_field = 0;
for (int64_t idx = 0, cnt = VALUE_COUNT; idx < cnt; ++idx) {
if (idx == array[idx]) {
correct_field++;
}
}
EXPECT_EQ(VALUE_COUNT, correct_field);
// push thread join
for (int64_t idx = 0, cnt = PUSH_THREAD_NUM; idx < cnt; ++idx) {
TestPushWorker &w = push_workers[idx];
w.join();
LOG_INFO("push_pop_test", "push thread", "join OB_SUCCESS");
}
// pop thread join
for (int64_t idx = 0, cnt = POP_THREAD_NUM; idx < cnt; ++idx) {
TestPopWorker &w = pop_workers[idx];
w.join();
LOG_INFO("push_pop_test", "pop thread", "join OB_SUCCESS");
}
EXPECT_EQ(OB_SUCCESS, map_queue.reset());
}
// free array
ob_free(array);
map_queue.destroy();
EXPECT_FALSE(map_queue.is_inited());
}
// 1. push performance test: push data with 10 threads
// 2. pop performance test: pop data with 10 threads
TEST_F(TestObMapQueue, DISABLED_performance)
{
int64_t start_test_tstamp = 0;
int64_t end_test_tstamp = 0;
ObMapQueue<Type> map_queue;
EXPECT_EQ(OB_SUCCESS, map_queue.init(LABEL));
// push
int64_t PUSH_THREAD_NUM = 10;
const int64_t INTERVAL = VALUE_COUNT / PUSH_THREAD_NUM;
int64_t end_push_count = 0;
TestPushWorker push_workers[PUSH_THREAD_NUM];
start_test_tstamp = get_timestamp();
for (int64_t idx = 0, cnt = PUSH_THREAD_NUM; idx < cnt; ++idx) {
TestPushWorker &w = push_workers[idx];
w.state_ = TestPushWorker::REQ;
w.thread_idx_ = idx;
w.thread_count_ = PUSH_THREAD_NUM;
w.map_queue_ = &map_queue;
w.push_count_ = 0;
w.interval_ = INTERVAL;
w.create();
LOG_INFO("push_performance", "push thread", "create OB_SUCCESS");
}
// Detect the end of push in all threads
while (((get_timestamp() - start_test_tstamp) < TEST_TIME_LIMIT)
&& (end_push_count < VALUE_COUNT)) {
for (int64_t idx = 0, cnt = PUSH_THREAD_NUM; idx < cnt; ++idx) {
TestPushWorker &w = push_workers[idx];
if (TestPushWorker::DONE == w.state_) {
end_push_count += w.push_count_;
w.state_ = TestPushWorker::IDLE;
}
}
}
EXPECT_EQ(VALUE_COUNT, end_push_count);
end_test_tstamp = get_timestamp();
double push_time = static_cast<double>(end_test_tstamp - start_test_tstamp) * 1.0 / 1000000;
double push_cnt_per_second = static_cast<double>(VALUE_COUNT) * 1.0 / (push_time);
LOG_INFO("push_performance", K(end_push_count), K(push_time), "push count/s", push_cnt_per_second);
// pop
int64_t POP_THREAD_NUM = 10;
int64_t end_pop_count = 0;
TestPopWorker pop_workers[POP_THREAD_NUM];
// malloc array
Type *array = (Type *)ob_malloc(sizeof(Type) * VALUE_COUNT, ObNewModIds::TEST);
OB_ASSERT(NULL != array);
memset(array, 0, sizeof(Type) * VALUE_COUNT);
start_test_tstamp = get_timestamp();
for (int64_t idx = 0, cnt = POP_THREAD_NUM; idx < cnt; ++idx) {
TestPopWorker &w = pop_workers[idx];
w.map_queue_ = &map_queue;
w.array_ = array;
w.pop_count_ = 0;
w.end_pop_count_ = &end_pop_count;
w.create();
LOG_INFO("pop_performance", "pop thread", "create OB_SUCCESS");
}
while (((get_timestamp() - start_test_tstamp) < TEST_TIME_LIMIT)
&& (end_pop_count < VALUE_COUNT)) {
for (int64_t idx = 0, cnt = POP_THREAD_NUM; idx < cnt; ++idx) {
TestPopWorker &w = pop_workers[idx];
int64_t pop_cnt = ATOMIC_LOAD(&w.pop_count_);
while (!ATOMIC_BCAS(&w.pop_count_, pop_cnt, 0)) {
pop_cnt = ATOMIC_LOAD(&w.pop_count_);
}
end_pop_count += pop_cnt;
LOG_DEBUG("pop verify", K(idx), K(pop_cnt), K(end_pop_count));
}
}
EXPECT_EQ(VALUE_COUNT, end_pop_count);
end_test_tstamp = get_timestamp();
double pop_time = static_cast<double>(end_test_tstamp - start_test_tstamp) * 1.0 / 1000000;
double pop_cnt_per_second = static_cast<double>(VALUE_COUNT) * 1.0 / (pop_time);
LOG_INFO("pop_performance", K(end_pop_count), K(pop_time), "pop count/s", pop_cnt_per_second);
// push thread join
for (int64_t idx = 0, cnt = PUSH_THREAD_NUM; idx < cnt; ++idx) {
TestPushWorker &w = push_workers[idx];
w.join();
LOG_INFO("performance", "push thread", "join OB_SUCCESS");
}
// pop thread join
for (int64_t idx = 0, cnt = POP_THREAD_NUM; idx < cnt; ++idx) {
TestPopWorker &w = pop_workers[idx];
w.join();
LOG_INFO("performance", "pop thread", "join OB_SUCCESS");
}
ob_free(array);
map_queue.destroy();
}
//////////////////////// Boundary condition testing //////////////////////////////////////////
// ObMapQueue init fail
TEST_F(TestObMapQueue, init_failed)
{
ObMapQueue<Type> map_queue;
EXPECT_EQ(OB_SUCCESS, map_queue.init(LABEL));
EXPECT_EQ(OB_INIT_TWICE, map_queue.init(LABEL));
}
}//end of unittest
}//end of oceanbase
int main(int argc, char **argv)
{
// ObLogger::get_logger().set_mod_log_levels("ALL.*:DEBUG, TLOG.*:DEBUG");
// testing::InitGoogleTest(&argc,argv);
// testing::FLAGS_gtest_filter = "DO_NOT_RUN";
int ret = 1;
ObLogger &logger = ObLogger::get_logger();
logger.set_file_name("test_ob_map_queue.log", true);
logger.set_log_level(OB_LOG_LEVEL_INFO);
testing::InitGoogleTest(&argc, argv);
ret = RUN_ALL_TESTS();
return ret;
}