Files
oceanbase/deps/oblib/unittest/lib/queue/test_ext_ms_queue.cpp
gm 4a92b6d7df reformat source code
according to code styles, 'AccessModifierOffset' should be -2.
2021-06-17 10:40:36 +08:00

320 lines
8.4 KiB
C++

/**
* Copyright (c) 2021 OceanBase
* OceanBase CE is licensed under Mulan PubL v2.
* You can use this software according to the terms and conditions of the Mulan PubL v2.
* You may obtain a copy of Mulan PubL v2 at:
* http://license.coscl.org.cn/MulanPubL-2.0
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
* See the Mulan PubL v2 for more details.
*/
#include <gtest/gtest.h>
#include "common/ob_partition_key.h" // ObPartitionKey
#include "lib/queue/ob_ext_ms_queue.h" // ObExtMsQueue
#define DEFAULT_LOG_LEVEL "INFO"
#define DEFAULT_LOG_FILE "test_ext_ms_queue.log"
using namespace oceanbase::common;
class ExtMsQueueTest : public ::testing::Test {
public:
static const int64_t KEY_COUNT = 10;
static const int64_t MAX_CACHED_MS_QUEUE_ITEM_COUNT = KEY_COUNT;
static const int64_t MSQ_QUEUE_COUNT = 4;
static const int64_t MSQ_QUEUE_LEN = 1024;
static const int64_t PRODUCER_NUM = 4;
static const int64_t CONSUMER_NUM = 4;
static const int64_t RUN_TIME_SEC = 5;
static const int64_t OP_TIMEOUT = 100 * 1000;
static const int64_t WAIT_TIME = 100;
public:
typedef ObPartitionKey KeyType;
typedef ObLink TaskType;
struct KeyValue {
KeyType key_;
int64_t seq_;
KeyValue() : key_(), seq_(0)
{}
~KeyValue()
{}
void reset()
{
key_.reset();
seq_ = 0;
}
};
static void* produce_func(void* args);
static void* consume_func(void* args);
int start_consumers_();
int start_producers_();
void wait_producers_();
void wait_consumers_();
int terminate_all_ms_queue_();
public:
ExtMsQueueTest();
virtual ~ExtMsQueueTest();
virtual void SetUp();
virtual void TearDown();
public:
int64_t end_time_;
KeyValue kv_[KEY_COUNT];
TaskType task_[MSQ_QUEUE_COUNT]; // every Queue in MsQueue corresponds to a Task
pthread_t producers_[PRODUCER_NUM];
pthread_t consumers_[CONSUMER_NUM];
int64_t consumer_thread_index_counter_;
int64_t producer_thread_index_counter_;
ObExtMsQueue<KeyType> ext_ms_queue_;
private:
DISALLOW_COPY_AND_ASSIGN(ExtMsQueueTest);
};
ExtMsQueueTest::ExtMsQueueTest()
{}
ExtMsQueueTest::~ExtMsQueueTest()
{}
void ExtMsQueueTest::SetUp()
{
char* run_time_sec_str = getenv("run_time_sec");
int64_t run_time_sec = NULL == run_time_sec_str ? 0 : strtoll(run_time_sec_str, NULL, 10);
run_time_sec = run_time_sec <= 0 ? RUN_TIME_SEC : run_time_sec;
_LIB_LOG(INFO, "run_time_sec = %ld sec", run_time_sec);
(void)memset(producers_, 0, sizeof(producers_));
(void)memset(consumers_, 0, sizeof(consumers_));
srandom((unsigned int)ObTimeUtility::current_time());
end_time_ = ObTimeUtility::current_time() + run_time_sec * 1000 * 1000;
consumer_thread_index_counter_ = 0;
producer_thread_index_counter_ = 0;
}
void ExtMsQueueTest::TearDown()
{
(void)memset(producers_, 0, sizeof(producers_));
(void)memset(consumers_, 0, sizeof(consumers_));
}
void* ExtMsQueueTest::produce_func(void* args)
{
int ret = OB_SUCCESS;
ExtMsQueueTest* test = (ExtMsQueueTest*)args;
if (NULL != test) {
int64_t end_time = test->end_time_;
int64_t thread_index = ATOMIC_FAA(&test->producer_thread_index_counter_, 1);
ObExtMsQueue<KeyType>& queue = test->ext_ms_queue_;
LIB_LOG(INFO, "producer thread started", K(thread_index));
while (OB_SUCC(ret)) {
int key_index = (int)(random() % KEY_COUNT);
// every transaction push atmost one task to every Queue in MsQueue because of share Task global
int64_t stmt_count = (random() % MSQ_QUEUE_COUNT) + 1;
int64_t seq = ATOMIC_FAA(&(test->kv_[key_index].seq_), 1);
KeyType& key = test->kv_[key_index].key_;
for (int64_t index = 0; OB_SUCC(ret) && index < stmt_count; index++) {
TaskType* task = test->task_ + index; // push Task
while (true) {
ret = queue.push(key, task, seq, index, OP_TIMEOUT);
if (OB_TIMEOUT != ret) {
if (OB_FAIL(ret)) {
LIB_LOG(ERROR, "producer push queue fail", K(ret), K(key), K(seq), K(index));
}
break;
}
}
EXPECT_EQ(OB_SUCCESS, ret);
}
if (OB_SUCC(ret)) {
if (OB_FAIL(queue.end_batch(key, seq, stmt_count))) {
LIB_LOG(ERROR, "producer end_batch fail", K(ret), K(key), K(seq));
}
EXPECT_EQ(OB_SUCCESS, ret);
}
this_routine::usleep(WAIT_TIME);
if (ObTimeUtility::current_time() >= end_time) {
break;
}
}
LIB_LOG(INFO, "producer thread stoped", K(thread_index));
}
return NULL;
}
void* ExtMsQueueTest::consume_func(void* args)
{
int ret = OB_SUCCESS;
ExtMsQueueTest* test = (ExtMsQueueTest*)args;
if (NULL != test) {
int64_t thread_index = ATOMIC_FAA(&test->consumer_thread_index_counter_, 1);
ObExtMsQueue<KeyType>& queue = test->ext_ms_queue_;
LIB_LOG(INFO, "consumer thread started", K(thread_index), "ms_queue_count", queue.get_ms_queue_count());
TaskType* task = NULL;
void* ctx = NULL;
while (OB_SUCCESS == ret && queue.get_ms_queue_count() > 0) {
while (true) {
ret = queue.get(task, ctx, OP_TIMEOUT);
if (OB_TIMEOUT != ret) {
if (OB_FAIL(ret)) {
LIB_LOG(ERROR, "get task from queue fail", K(ret));
}
break;
}
if (queue.get_ms_queue_count() <= 0) {
ret = OB_SUCCESS;
break;
}
}
EXPECT_EQ(OB_SUCCESS, ret);
}
LIB_LOG(INFO, "consumer thread stoped", K(thread_index), "ms_queue_count", queue.get_ms_queue_count());
}
return NULL;
}
void ExtMsQueueTest::wait_producers_()
{
LIB_LOG(INFO, "wait producers");
for (int64_t index = 0; index < PRODUCER_NUM; index++) {
if (0 != producers_[index]) {
pthread_join(producers_[index], NULL);
}
producers_[index] = 0;
}
}
void ExtMsQueueTest::wait_consumers_()
{
LIB_LOG(INFO, "wait consumers");
for (int64_t index = 0; index < CONSUMER_NUM; index++) {
if (0 != consumers_[index]) {
pthread_join(consumers_[index], NULL);
}
consumers_[index] = 0;
}
}
int ExtMsQueueTest::start_consumers_()
{
int ret = OB_SUCCESS;
for (int64_t index = 0; index < CONSUMER_NUM; index++) {
pthread_create(consumers_ + index, NULL, consume_func, this);
}
return ret;
}
int ExtMsQueueTest::start_producers_()
{
int ret = OB_SUCCESS;
for (int64_t index = 0; index < PRODUCER_NUM; index++) {
pthread_create(producers_ + index, NULL, produce_func, this);
}
return ret;
}
int ExtMsQueueTest::terminate_all_ms_queue_()
{
int ret = OB_SUCCESS;
LIB_LOG(INFO, "terminate_all_ms_queue");
for (int64_t index = 0; OB_SUCC(ret) && index < KEY_COUNT; index++) {
KeyType& key = kv_[index].key_;
int64_t end_seq = kv_[index].seq_;
LIB_LOG(DEBUG, "terminate_ms_queue", K(key), K(end_seq));
while (true) {
ret = ext_ms_queue_.terminate_ms_queue(key, end_seq, OP_TIMEOUT);
if (OB_TIMEOUT != ret) {
if (OB_FAIL(ret)) {
LIB_LOG(ERROR, "terminate_ms_queue fail", K(ret), K(key), K(end_seq));
}
break;
}
}
}
return ret;
}
TEST_F(ExtMsQueueTest, basic_test)
{
// init ExtMsQueue
EXPECT_EQ(OB_SUCCESS, ext_ms_queue_.init(MAX_CACHED_MS_QUEUE_ITEM_COUNT, MSQ_QUEUE_COUNT, MSQ_QUEUE_LEN));
// add MSQueue
for (int64_t index = 0; index < KEY_COUNT; index++) {
kv_[index].reset();
ASSERT_EQ(OB_SUCCESS, kv_[index].key_.init(index, 0, 1));
EXPECT_EQ(OB_SUCCESS, ext_ms_queue_.add_ms_queue(kv_[index].key_));
}
// add duplicated key
EXPECT_EQ(OB_ENTRY_EXIST, ext_ms_queue_.add_ms_queue(kv_[0].key_));
// start consumers
EXPECT_EQ(OB_SUCCESS, start_consumers_());
// start produces
EXPECT_EQ(OB_SUCCESS, start_producers_());
// wait all producers exit
wait_producers_();
// stop all MsQueue
EXPECT_EQ(OB_SUCCESS, terminate_all_ms_queue_());
// wait all consumers exit
wait_consumers_();
ext_ms_queue_.destroy();
}
int main(int argc, char** argv)
{
const char* log_level = getenv("log_level");
const char* log_file = getenv("log_file");
log_level = (NULL == log_level ? DEFAULT_LOG_LEVEL : log_level);
log_file = (NULL == log_file) ? DEFAULT_LOG_FILE : log_file;
OB_LOGGER.set_log_level(log_level);
OB_LOGGER.set_file_name(log_file, true);
::testing::InitGoogleTest(&argc, argv);
return RUN_ALL_TESTS();
}