/** * Copyright (c) 2021 OceanBase * OceanBase CE is licensed under Mulan PubL v2. * You can use this software according to the terms and conditions of the Mulan PubL v2. * You may obtain a copy of Mulan PubL v2 at: * http://license.coscl.org.cn/MulanPubL-2.0 * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, * EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, * MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE. * See the Mulan PubL v2 for more details. */ #define USING_LOG_PREFIX OBLOG_FETCHER #include #include #include "share/ob_define.h" #include "logservice/libobcdc/src/ob_map_queue_thread.h" #include "ob_log_utils.h" using namespace oceanbase; using namespace common; using namespace libobcdc; using namespace std; namespace oceanbase { namespace unittest { // ObMapQueue label static const char *label = "test"; // Thread num of ObMapQueueThread static const int THREAD_NUM = 6; // ObMapQueue type struct MapQueueType { int64_t value_; uint64_t hash_val_; void reset(int64_t value, uint64_t hash_val) { value_ = value; hash_val_ = hash_val; } TO_STRING_KV(K(value_), K(hash_val_)); }; typedef MapQueueType Type; // ObMapQueue value range static const int64_t START_VALUE = 0; static const int64_t END_VALUE = 1 * 100 * 1000 - 1; static const int64_t VALUE_COUNT = END_VALUE - START_VALUE + 1; class TestObMapQueueThread : public ::testing::Test { public : TestObMapQueueThread() {} virtual ~TestObMapQueueThread() {} virtual void SetUp() {} virtual void TearDown() {} public : // push thread static const int64_t ONE_PUSH_THREAD_NUM = 1; static const int64_t MULTI_PUSH_THREAD_NUM = 3; // time limit static const int64_t TEST_TIME_LIMIT = 1 * _MIN_; public: // generate data void generate_data(const int64_t count, Type *&datas); }; void TestObMapQueueThread::generate_data(const int64_t count, Type *&datas) { datas = (Type *)ob_malloc(sizeof(Type) * count, ObNewModIds::TEST); OB_ASSERT(NULL != datas); for (int64_t idx = 0; idx < count; idx++) { datas[idx].reset(idx, idx % THREAD_NUM); } for (int64_t idx = 0; idx < count; idx++) { LOG_DEBUG("data", K(datas[idx])); } } static const int MAX_THREAD_NUM = 16; typedef common::ObMapQueueThread QueueThread; // DerivedQueueThread1 // Overload handle, test the correctness of ObMapQueueThread execution class DerivedQueueThread1 : public QueueThread { public: DerivedQueueThread1(vector > &handle_result) : end_handle_count_(0), handle_result_(handle_result), inited_(false) {} virtual ~DerivedQueueThread1() { destroy(); } public: int init(); void destroy(); int start(); public: // Record the number of data that has been processed by the thread int64_t end_handle_count_ CACHE_ALIGNED; public: // Implement ObMapQueueThread dummy function-handle to overload the thread handling function virtual int handle(void *data, const int64_t thread_index, volatile bool &stop_flag); private: vector > &handle_result_; bool inited_; }; // DerivedQueueThread2 // Overload run, test the correctness of ObMapQueueThread execution class DerivedQueueThread2 : public QueueThread { public: DerivedQueueThread2(vector > &handle_result) : end_handle_count_(0), handle_result_(handle_result), inited_(false) {} virtual ~DerivedQueueThread2() { destroy(); } public: int init(); void destroy(); int start(); public: // Record the number of data that has been processed by the thread int64_t end_handle_count_ CACHE_ALIGNED; public: // 实Overload run, test the correctness of ObMapQueueThread execution virtual void run(const int64_t thread_index); private: static const int64_t IDLE_WAIT_TIME = 10 * 1000; private: vector > &handle_result_; bool inited_; }; int DerivedQueueThread1::init() { int ret = OB_SUCCESS; if (OB_UNLIKELY(inited_)) { LOG_ERROR("DerivedQueueThread1 init twice"); ret = OB_INIT_TWICE; } else if (OB_FAIL(QueueThread::init(THREAD_NUM, label))) { LOG_ERROR("init QueueThread fail", K(ret), K(THREAD_NUM), K(label)); } else { EXPECT_EQ(THREAD_NUM, QueueThread::get_thread_num()); EXPECT_TRUE(QueueThread::is_stoped()); end_handle_count_ = 0; inited_ = true; LOG_INFO("DerivedQueueThread1 init ok", K(ret)); } return ret; } void DerivedQueueThread1::destroy() { if (inited_) { QueueThread::destroy(); EXPECT_TRUE(QueueThread::is_stoped()); inited_ = false; LOG_INFO("DerivedQueueThread1 destory"); } } int DerivedQueueThread1::start() { int ret = OB_SUCCESS; if (OB_UNLIKELY(!inited_)) { LOG_ERROR("DerivedQueueThread1 not init"); ret = OB_NOT_INIT; } else if (OB_FAIL(QueueThread::start())) { LOG_ERROR("DerivedQueueThread1 start error", K(ret)); } else { LOG_INFO("DerivedQueueThread1 start ok"); } EXPECT_FALSE(QueueThread::is_stoped()); return ret; } int DerivedQueueThread1::handle(void *data, const int64_t thread_index, volatile bool &stop_flag) { int ret = OB_SUCCESS; stop_flag = stop_flag; Type *task = NULL; if (OB_UNLIKELY(!inited_)) { LOG_ERROR("DerivedQueueThread1 not init"); ret = OB_NOT_INIT; } else if (OB_ISNULL(data) || OB_UNLIKELY(thread_index < 0) || OB_UNLIKELY(thread_index >= get_thread_num())) { LOG_ERROR("invalid argument", K(thread_index), K(get_thread_num())); ret = OB_ERR_UNEXPECTED; } else if (OB_ISNULL(task = static_cast(data))) { ret = OB_INVALID_ARGUMENT; LOG_ERROR("invalid argument", K(ret), KP(task), K(thread_index)); } else { LOG_DEBUG("DerivedQueueThread1 handle", K(ret), K(*task), K(thread_index)); handle_result_[thread_index].push_back(*task); ATOMIC_INC(&end_handle_count_); } return ret; } int DerivedQueueThread2::init() { int ret = OB_SUCCESS; if (OB_UNLIKELY(inited_)) { LOG_ERROR("DerivedQueueThread2 init twice"); ret = OB_INIT_TWICE; } else if (OB_FAIL(QueueThread::init(THREAD_NUM, label))) { LOG_ERROR("init QueueThread fail", K(ret), K(THREAD_NUM), K(label)); } else { EXPECT_EQ(THREAD_NUM, QueueThread::get_thread_num()); EXPECT_TRUE(QueueThread::is_stoped()); end_handle_count_ = 0; inited_ = true; LOG_INFO("DerivedQueueThread2 init ok", K(ret)); } return ret; } void DerivedQueueThread2::destroy() { if (inited_) { QueueThread::destroy(); EXPECT_TRUE(QueueThread::is_stoped()); inited_ = false; LOG_INFO("DerivedQueueThread2 destory"); } } int DerivedQueueThread2::start() { int ret = OB_SUCCESS; if (OB_UNLIKELY(!inited_)) { LOG_ERROR("DerivedQueueThread2 not init"); ret = OB_NOT_INIT; } else if (OB_FAIL(QueueThread::start())) { LOG_ERROR("DerivedQueueThread2 start error", K(ret)); } else { LOG_INFO("DerivedQueueThread2 start ok"); } EXPECT_FALSE(QueueThread::is_stoped()); return ret; } void DerivedQueueThread2::run(const int64_t thread_index) { int ret = OB_SUCCESS; if (OB_UNLIKELY(!inited_)) { LOG_ERROR("DerivedQueueThread2 not init"); ret = OB_NOT_INIT; } else if (OB_UNLIKELY(thread_index < 0) || OB_UNLIKELY(thread_index >= get_thread_num())) { LOG_ERROR("invalid argument", K(thread_index), K(get_thread_num())); ret = OB_ERR_UNEXPECTED; } else { LOG_INFO("DerivedQueueThread2 run start", K(thread_index)); while (!stop_flag_ && OB_SUCCESS == ret) { void *data = NULL; Type *task = NULL; if (OB_FAIL(pop(thread_index, data))) { if (OB_EAGAIN == ret) { // empty ret = OB_SUCCESS; cond_timedwait(thread_index, IDLE_WAIT_TIME); LOG_DEBUG("DerivedQueueThread2 pop empty"); } else { LOG_ERROR("DerivedQueueThread2 pop data error", K(ret)); } } else if (OB_ISNULL(data) || OB_ISNULL(task = static_cast(data))) { LOG_ERROR("invalid argument", KPC(task), K(thread_index)); ret = OB_ERR_UNEXPECTED; } else { LOG_DEBUG("DerivedQueueThread2 handle", K(ret), K(*task), K(thread_index)); handle_result_[thread_index].push_back(*task); ATOMIC_INC(&end_handle_count_); } } } } class TestPushWorker : public libobcdc::Runnable { public: enum State { IDLE, // empty REQ, // pushing DONE // push done }; // Identifies the current thread status State state_; // thread index int64_t thread_idx_; // thread count int64_t thread_count_; // push data Type *datas_; // value interval int64_t interval_; // ObMapQueueThread QueueThread *host_; // record thread map_queue push count int64_t push_count_; void reset(const int64_t thread_idx, const int64_t push_thread_num, Type *datas, QueueThread *host) { state_ = TestPushWorker::REQ; thread_idx_ = thread_idx; thread_count_ = push_thread_num; datas_ = datas; interval_ = VALUE_COUNT / push_thread_num; host_ = host; push_count_ = 0; } void start() { // create threads create(); LOG_INFO("TestPushWorker start", "push worker thread", "create OB_SUCCESS"); } void stop() { join(); LOG_INFO("TestPushWorker join", "push worker thread", "join OB_SUCCESS"); } virtual int routine() { int64_t start = thread_idx_ * interval_; int64_t end = (thread_count_ - 1 != thread_idx_) ? start + interval_ - 1 : END_VALUE; LOG_INFO("TestPushWorker", K(start), K(end)); int64_t idx = 0; for (idx = start; idx <= end; idx++) { Type *type = datas_ + idx; EXPECT_EQ(OB_SUCCESS, host_->push(type, type->hash_val_)); push_count_++; LOG_DEBUG("TestPushWorker", K(idx), KPC(type)); } if (end + 1 == idx) { state_ = DONE; } return OB_SUCCESS; } }; const int64_t MAX_PUSH_WORKER_NUM = 32; TestPushWorker push_workers[MAX_PUSH_WORKER_NUM]; // start push worker static void start_push_worker(const int64_t push_thread_num, Type *datas, QueueThread *host) { //push thread for (int64_t idx = 0, cnt = push_thread_num; idx < cnt; ++idx) { TestPushWorker &w = push_workers[idx]; w.reset(idx, push_thread_num, datas, host); w.start(); } } // stop push worker static void stop_push_worker(const int64_t push_thread_num) { // push thread join for (int64_t idx = 0, cnt = push_thread_num; idx < cnt; ++idx) { TestPushWorker &w = push_workers[idx]; w.stop(); } } ////////////////////// Basic function tests ////////////////////////////////////////// // ObMapQueueThread init, destory, start, stop, is_stoped, get_thread_num // overload ObMapQueueThread-handle TEST_F(TestObMapQueueThread, DerivedQueueThread1) { // genereate data Type *datas = NULL; generate_data(VALUE_COUNT, datas); OB_ASSERT(NULL != datas); // savd result of DerivedQueueThread1 handle vector > handle_result; for (int64_t idx = 0; idx < THREAD_NUM; idx++) { vector res; res.clear(); handle_result.push_back(res); } // init and start worker thread DerivedQueueThread1 derived1(handle_result); EXPECT_EQ(OB_SUCCESS, derived1.init()); EXPECT_EQ(OB_SUCCESS, derived1.start()); // start push thread int64_t PUSH_THREAD_NUM = ONE_PUSH_THREAD_NUM; start_push_worker(PUSH_THREAD_NUM, datas, &derived1); // Check handle completion and verify that the result totals and fields are correct int64_t end_handle_count = 0; int64_t start_test_tstamp = get_timestamp(); while (((get_timestamp() - start_test_tstamp) < TEST_TIME_LIMIT) && (end_handle_count < VALUE_COUNT)) { end_handle_count = ATOMIC_LOAD(&derived1.end_handle_count_); usleep(static_cast<__useconds_t>(1000)); LOG_DEBUG("handle verify", K(end_handle_count)); } EXPECT_EQ(VALUE_COUNT, end_handle_count); int64_t handle_result_count = 0; for (int64_t idx = 0; idx < THREAD_NUM; idx++) { int64_t cnt = handle_result[idx].size(); LOG_INFO("DerivedQueueThread1 vector count", K(idx), K(cnt)); handle_result_count += cnt; for (int64_t i = 0; i < cnt; i++) { Type t = handle_result[idx][i]; LOG_DEBUG("type", K(t)); EXPECT_TRUE(idx == (t.value_ % THREAD_NUM)); EXPECT_TRUE(idx == t.hash_val_); } } EXPECT_EQ(VALUE_COUNT, handle_result_count); stop_push_worker(PUSH_THREAD_NUM); derived1.destroy(); ob_free(datas); } // ObMapQueueThread run, pop, cond_timewait // overload ObMapQueueThread-run TEST_F(TestObMapQueueThread, DerivedQueueThread2) { // genereate data Type *datas = NULL; generate_data(VALUE_COUNT, datas); OB_ASSERT(NULL != datas); // save result of DerivedQueueThread2 handle vector > handle_result; for (int64_t idx = 0; idx < THREAD_NUM; idx++) { vector res; res.clear(); handle_result.push_back(res); } // init and start worker thread DerivedQueueThread2 derived2(handle_result); EXPECT_EQ(OB_SUCCESS, derived2.init()); EXPECT_EQ(OB_SUCCESS, derived2.start()); // start push thread int64_t PUSH_THREAD_NUM = ONE_PUSH_THREAD_NUM; start_push_worker(PUSH_THREAD_NUM, datas, &derived2); // Check handle completion and verify that the result totals and fields are correct int64_t end_handle_count = 0; int64_t start_test_tstamp = get_timestamp(); while (((get_timestamp() - start_test_tstamp) < TEST_TIME_LIMIT) && (end_handle_count < VALUE_COUNT)) { end_handle_count = ATOMIC_LOAD(&derived2.end_handle_count_); usleep(static_cast<__useconds_t>(1000)); LOG_DEBUG("handle verify", K(end_handle_count)); } EXPECT_EQ(VALUE_COUNT, end_handle_count); int64_t handle_result_count = 0; for (int64_t idx = 0; idx < THREAD_NUM; idx++) { int64_t cnt = handle_result[idx].size(); LOG_INFO("DerivedQueueThread2 vector count", K(idx), K(cnt)); handle_result_count += cnt; for (int64_t i = 0; i < cnt; i++) { Type t = handle_result[idx][i]; LOG_DEBUG("type", K(t)); EXPECT_TRUE(idx == (t.value_ % THREAD_NUM)); EXPECT_TRUE(idx == t.hash_val_); } } EXPECT_EQ(VALUE_COUNT, handle_result_count); stop_push_worker(PUSH_THREAD_NUM); derived2.destroy(); ob_free(datas); } ////////////////////////Boundary condition testing////////////////////////////////////////// // ObMapQueue init fail TEST_F(TestObMapQueueThread, init_failed) { QueueThread queue_thread; EXPECT_EQ(OB_SUCCESS, queue_thread.init(THREAD_NUM, label)); EXPECT_EQ(OB_INIT_TWICE, queue_thread.init(THREAD_NUM, label)); queue_thread.destroy(); // MAX_THREAD_NUM = 16 const int64_t INVALID_THREAD_NUM1 = 0; const int64_t INVALID_THREAD_NUM2 = 17; EXPECT_EQ(OB_INVALID_ARGUMENT, queue_thread.init(INVALID_THREAD_NUM1, label)); EXPECT_EQ(OB_INVALID_ARGUMENT, queue_thread.init(INVALID_THREAD_NUM2, label)); EXPECT_EQ(OB_SUCCESS, queue_thread.init(THREAD_NUM, label)); queue_thread.destroy(); } }//end of unittest }//end of oceanbase int main(int argc, char **argv) { // ObLogger::get_logger().set_mod_log_levels("ALL.*:DEBUG, TLOG.*:DEBUG"); // testing::InitGoogleTest(&argc,argv); // testing::FLAGS_gtest_filter = "DO_NOT_RUN"; int ret = 1; ObLogger &logger = ObLogger::get_logger(); logger.set_file_name("test_ob_map_queue_thread.log", true); logger.set_log_level(OB_LOG_LEVEL_INFO); testing::InitGoogleTest(&argc, argv); ret = RUN_ALL_TESTS(); return ret; }