426 lines
		
	
	
		
			12 KiB
		
	
	
	
		
			C++
		
	
	
	
	
	
			
		
		
	
	
			426 lines
		
	
	
		
			12 KiB
		
	
	
	
		
			C++
		
	
	
	
	
	
/**
 | 
						|
 * Copyright (c) 2021 OceanBase
 | 
						|
 * OceanBase CE is licensed under Mulan PubL v2.
 | 
						|
 * You can use this software according to the terms and conditions of the Mulan PubL v2.
 | 
						|
 * You may obtain a copy of Mulan PubL v2 at:
 | 
						|
 *          http://license.coscl.org.cn/MulanPubL-2.0
 | 
						|
 * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
 | 
						|
 * EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
 | 
						|
 * MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
 | 
						|
 * See the Mulan PubL v2 for more details.
 | 
						|
 */
 | 
						|
 | 
						|
#define USING_LOG_PREFIX OBLOG_FETCHER
 | 
						|
 | 
						|
#include <gtest/gtest.h>
 | 
						|
#include "share/ob_define.h"
 | 
						|
#include "liboblog/src/ob_map_queue.h"
 | 
						|
#include "ob_log_utils.h"
 | 
						|
 | 
						|
using namespace oceanbase;
 | 
						|
using namespace common;
 | 
						|
using namespace liboblog;
 | 
						|
 | 
						|
namespace oceanbase
 | 
						|
{
 | 
						|
namespace unittest
 | 
						|
{
 | 
						|
class TestObMapQueue : public ::testing::Test
 | 
						|
{
 | 
						|
public :
 | 
						|
  virtual void SetUp() {}
 | 
						|
  virtual void TearDown() {}
 | 
						|
public :
 | 
						|
  // ObMapQueue label
 | 
						|
  static constexpr const char *LABEL = "TestObMapQueue";
 | 
						|
  // push thread
 | 
						|
  static const int64_t ONE_PUSH_THREAD_NUM = 1;
 | 
						|
  static const int64_t MULTI_PUSH_THREAD_NUM = 3;
 | 
						|
  // pop thread
 | 
						|
  static const int64_t ONE_POP_THREAD_NUM = 1;
 | 
						|
  static const int64_t MULTI_POP_THREAD_NUM = 5;
 | 
						|
 | 
						|
  static const int64_t TEST_TIME_LIMIT = 10 * _MIN_;
 | 
						|
};
 | 
						|
 | 
						|
// ObMapQueue type
 | 
						|
typedef int64_t Type;
 | 
						|
// push ObMapQueue value
 | 
						|
static const int64_t START_VALUE = 0;
 | 
						|
static const int64_t END_VALUE = 1 * 1000 * 1000 - 1;
 | 
						|
static const int64_t VALUE_COUNT = END_VALUE - START_VALUE + 1;
 | 
						|
 | 
						|
class TestPushWorker : public liboblog::Runnable
 | 
						|
{
 | 
						|
public:
 | 
						|
  enum State
 | 
						|
  {
 | 
						|
	  IDLE, // 
 | 
						|
		REQ,  // pushing
 | 
						|
		DONE  // push DONE
 | 
						|
	};
 | 
						|
	// Identifies the current thread status
 | 
						|
	State state_;
 | 
						|
 | 
						|
	// thread index
 | 
						|
	int64_t thread_idx_;
 | 
						|
	// thread count
 | 
						|
	int64_t thread_count_;
 | 
						|
	// ObMapQueue
 | 
						|
	ObMapQueue<Type> *map_queue_;
 | 
						|
	// record map_queue push count
 | 
						|
	int64_t push_count_;
 | 
						|
  // value interval
 | 
						|
	int64_t interval_;
 | 
						|
 | 
						|
	virtual int routine()
 | 
						|
	{
 | 
						|
		int64_t start = thread_idx_ * interval_;
 | 
						|
		int64_t end = (thread_count_ - 1 != thread_idx_) ? start + interval_ - 1 : END_VALUE;
 | 
						|
		LOG_INFO("TestPushWorker", K(start), K(end));
 | 
						|
 | 
						|
		int64_t val = start;
 | 
						|
		while (val <= end) {
 | 
						|
			EXPECT_EQ(OB_SUCCESS, map_queue_->push(val++));
 | 
						|
			push_count_++;
 | 
						|
		}
 | 
						|
 | 
						|
		if (end + 1 == val) {
 | 
						|
			state_ = DONE;
 | 
						|
		}
 | 
						|
 | 
						|
    return OB_SUCCESS;
 | 
						|
	}
 | 
						|
};
 | 
						|
 | 
						|
class TestPopWorker: public liboblog::Runnable
 | 
						|
{
 | 
						|
public:
 | 
						|
	// thread index
 | 
						|
	int64_t thread_idx_;
 | 
						|
	// ObMapQueue
 | 
						|
	ObMapQueue<Type> *map_queue_;
 | 
						|
	// record thread map_queue pop count
 | 
						|
	int64_t pop_count_ CACHE_ALIGNED;
 | 
						|
	// record poped count for all threads
 | 
						|
	int64_t *end_pop_count_ CACHE_ALIGNED;
 | 
						|
  // 保存pop出来的数据
 | 
						|
	Type *array_;
 | 
						|
 | 
						|
	virtual int routine()
 | 
						|
	{
 | 
						|
		int ret = OB_SUCCESS;
 | 
						|
 | 
						|
		while (OB_SUCC(ret)) {
 | 
						|
		  Type val;
 | 
						|
			while (OB_SUCC(map_queue_->pop(val))) {
 | 
						|
				if (val >= START_VALUE && val <= END_VALUE) {
 | 
						|
					if (0 == array_[val]) {
 | 
						|
						array_[val] = val;
 | 
						|
						ATOMIC_INC(&pop_count_);
 | 
						|
					}
 | 
						|
				}
 | 
						|
			}
 | 
						|
 | 
						|
		  if (OB_EAGAIN == ret) {
 | 
						|
		  	ret = OB_SUCCESS;
 | 
						|
		  }
 | 
						|
			if (ATOMIC_LOAD(end_pop_count_) == VALUE_COUNT) {
 | 
						|
				break;
 | 
						|
			}
 | 
						|
		}
 | 
						|
 | 
						|
    return ret;
 | 
						|
	}
 | 
						|
};
 | 
						|
 | 
						|
 | 
						|
////////////////////// Basic function tests //////////////////////////////////////////
 | 
						|
// ObMapQueue init
 | 
						|
TEST_F(TestObMapQueue, init)
 | 
						|
{
 | 
						|
	ObMapQueue<Type> map_queue;
 | 
						|
	EXPECT_EQ(OB_SUCCESS, map_queue.init(LABEL));
 | 
						|
	EXPECT_TRUE(map_queue.is_inited());
 | 
						|
 | 
						|
	map_queue.destroy();
 | 
						|
	EXPECT_FALSE(map_queue.is_inited());
 | 
						|
}
 | 
						|
 | 
						|
// Test scenarios.
 | 
						|
// 1. single-threaded push - single-threaded pop
 | 
						|
// 2. single-threaded push - multi-threaded pop
 | 
						|
// 3. multi-threaded push - single-threaded pop
 | 
						|
// 4. multi-threaded push - multi-threaded pop
 | 
						|
TEST_F(TestObMapQueue, push_pop_test)
 | 
						|
{
 | 
						|
	ObMapQueue<Type> map_queue;
 | 
						|
	EXPECT_EQ(OB_SUCCESS, map_queue.init(LABEL));
 | 
						|
	EXPECT_TRUE(map_queue.is_inited());
 | 
						|
 | 
						|
	// malloc array
 | 
						|
	Type *array = (Type *)ob_malloc(sizeof(Type) * VALUE_COUNT);
 | 
						|
	OB_ASSERT(NULL != array);
 | 
						|
 | 
						|
	for (int64_t test_type = 0, test_cnt = 4; test_type < test_cnt; ++test_type) {
 | 
						|
		memset(array, 0, sizeof(Type) * VALUE_COUNT);
 | 
						|
		int64_t PUSH_THREAD_NUM = 0;
 | 
						|
		int64_t POP_THREAD_NUM = 0;
 | 
						|
		int64_t end_push_count = 0;
 | 
						|
		int64_t end_pop_count = 0;
 | 
						|
 | 
						|
		switch (test_type) {
 | 
						|
			// single-threaded push - single-threaded pop
 | 
						|
			case 0:
 | 
						|
				PUSH_THREAD_NUM = ONE_PUSH_THREAD_NUM;
 | 
						|
				POP_THREAD_NUM = ONE_POP_THREAD_NUM;
 | 
						|
				break;
 | 
						|
			// single-threaded push - multi-threaded pop
 | 
						|
			case 1:
 | 
						|
				PUSH_THREAD_NUM = ONE_PUSH_THREAD_NUM;
 | 
						|
				POP_THREAD_NUM = MULTI_POP_THREAD_NUM;
 | 
						|
				break;
 | 
						|
			// multi-threaded push - single-threaded pop
 | 
						|
			case 2:
 | 
						|
				PUSH_THREAD_NUM = MULTI_PUSH_THREAD_NUM;
 | 
						|
				POP_THREAD_NUM = ONE_POP_THREAD_NUM;
 | 
						|
				break;
 | 
						|
			// multi-threaded push - multi-threaded pop
 | 
						|
			case 3:
 | 
						|
				PUSH_THREAD_NUM = MULTI_PUSH_THREAD_NUM;
 | 
						|
				POP_THREAD_NUM = MULTI_POP_THREAD_NUM;
 | 
						|
				break;
 | 
						|
			default:
 | 
						|
				break;
 | 
						|
		}
 | 
						|
		LOG_INFO("push_pop_test", K(test_type), K(PUSH_THREAD_NUM), K(POP_THREAD_NUM));
 | 
						|
 | 
						|
		// push thread
 | 
						|
		TestPushWorker push_workers[PUSH_THREAD_NUM];
 | 
						|
		const int64_t INTERVAL = VALUE_COUNT / PUSH_THREAD_NUM;
 | 
						|
		for (int64_t idx = 0, cnt = PUSH_THREAD_NUM; idx < cnt; ++idx) {
 | 
						|
			TestPushWorker &w = push_workers[idx];
 | 
						|
			// assign value
 | 
						|
			w.state_ = TestPushWorker::REQ;
 | 
						|
			w.thread_idx_ = idx;
 | 
						|
			w.thread_count_ = PUSH_THREAD_NUM;
 | 
						|
			w.map_queue_ = &map_queue;
 | 
						|
			w.push_count_ = 0;
 | 
						|
			w.interval_ = INTERVAL;
 | 
						|
			// create threads
 | 
						|
			w.create();
 | 
						|
			LOG_INFO("push_pop_test", "push thread", "create OB_SUCCESS");
 | 
						|
		}
 | 
						|
 | 
						|
		// pop thread
 | 
						|
		TestPopWorker pop_workers[POP_THREAD_NUM];
 | 
						|
		for (int64_t idx = 0, cnt = POP_THREAD_NUM; idx < cnt; ++idx) {
 | 
						|
			TestPopWorker &w = pop_workers[idx];
 | 
						|
			// addign value
 | 
						|
			w.map_queue_ = &map_queue;
 | 
						|
			w.array_ = array;
 | 
						|
			w.pop_count_ = 0;
 | 
						|
			w.end_pop_count_ = &end_pop_count;
 | 
						|
			// create threads
 | 
						|
			w.create();
 | 
						|
			LOG_INFO("push_pop_test", "pop thread", "create OB_SUCCESS");
 | 
						|
		}
 | 
						|
 | 
						|
		// Verify the correctness of the push: verify the total number of pushes into the ObMapQueue-Type
 | 
						|
		int64_t start_test_tstamp = get_timestamp();
 | 
						|
		while (((get_timestamp() - start_test_tstamp) < TEST_TIME_LIMIT)
 | 
						|
						 && (end_push_count < VALUE_COUNT)) {
 | 
						|
			for (int64_t idx = 0, cnt = PUSH_THREAD_NUM; idx < cnt; ++idx) {
 | 
						|
				TestPushWorker &w = push_workers[idx];
 | 
						|
					if (TestPushWorker::DONE == w.state_) {
 | 
						|
						end_push_count += w.push_count_;
 | 
						|
						w.state_ = TestPushWorker::IDLE;
 | 
						|
					}
 | 
						|
			}
 | 
						|
		}
 | 
						|
		EXPECT_EQ(VALUE_COUNT, end_push_count);
 | 
						|
 | 
						|
		// Verify that the pop is correct:
 | 
						|
		// 1. verify the total number of -Types popped from ObMapQueue
 | 
						|
		// 2. Correctness of the fields
 | 
						|
		start_test_tstamp = get_timestamp();
 | 
						|
		while (((get_timestamp() - start_test_tstamp) < TEST_TIME_LIMIT)
 | 
						|
						 && (end_pop_count < VALUE_COUNT)) {
 | 
						|
			for (int64_t idx = 0, cnt = POP_THREAD_NUM; idx < cnt; ++idx) {
 | 
						|
				TestPopWorker &w = pop_workers[idx];
 | 
						|
 | 
						|
				int64_t pop_cnt = ATOMIC_LOAD(&w.pop_count_);
 | 
						|
				while (!ATOMIC_BCAS(&w.pop_count_, pop_cnt, 0)) {
 | 
						|
					pop_cnt = ATOMIC_LOAD(&w.pop_count_);
 | 
						|
				}
 | 
						|
 | 
						|
				end_pop_count += pop_cnt;
 | 
						|
				//LOG_DEBUG("pop verify", K(idx), K(pop_cnt), K(end_pop_count));
 | 
						|
				LOG_INFO("pop verify", K(idx), K(pop_cnt), K(end_pop_count));
 | 
						|
			}
 | 
						|
		}
 | 
						|
		EXPECT_EQ(VALUE_COUNT, end_pop_count);
 | 
						|
 | 
						|
		int64_t correct_field = 0;
 | 
						|
		for (int64_t idx = 0, cnt = VALUE_COUNT; idx < cnt; ++idx) {
 | 
						|
			if (idx == array[idx]) {
 | 
						|
				correct_field++;
 | 
						|
			}
 | 
						|
		}
 | 
						|
		EXPECT_EQ(VALUE_COUNT, correct_field);
 | 
						|
 | 
						|
		// push thread join
 | 
						|
		for (int64_t idx = 0, cnt = PUSH_THREAD_NUM; idx < cnt; ++idx) {
 | 
						|
			TestPushWorker &w = push_workers[idx];
 | 
						|
			w.join();
 | 
						|
			LOG_INFO("push_pop_test", "push thread", "join OB_SUCCESS");
 | 
						|
		}
 | 
						|
 | 
						|
		// pop thread join
 | 
						|
		for (int64_t idx = 0, cnt = POP_THREAD_NUM; idx < cnt; ++idx) {
 | 
						|
			TestPopWorker &w = pop_workers[idx];
 | 
						|
			w.join();
 | 
						|
			LOG_INFO("push_pop_test", "pop thread", "join OB_SUCCESS");
 | 
						|
		}
 | 
						|
 | 
						|
		EXPECT_EQ(OB_SUCCESS, map_queue.reset());
 | 
						|
	}
 | 
						|
 | 
						|
	// free array
 | 
						|
	ob_free(array);
 | 
						|
	map_queue.destroy();
 | 
						|
	EXPECT_FALSE(map_queue.is_inited());
 | 
						|
}
 | 
						|
 | 
						|
// 1. push performance test: push data with 10 threads
 | 
						|
// 2. pop performance test: pop data with 10 threads
 | 
						|
TEST_F(TestObMapQueue, DISABLED_performance)
 | 
						|
{
 | 
						|
	int64_t start_test_tstamp = 0;
 | 
						|
	int64_t end_test_tstamp = 0;
 | 
						|
 | 
						|
	ObMapQueue<Type> map_queue;
 | 
						|
	EXPECT_EQ(OB_SUCCESS, map_queue.init(LABEL));
 | 
						|
 | 
						|
	// push
 | 
						|
	int64_t PUSH_THREAD_NUM = 10;
 | 
						|
  const int64_t INTERVAL = VALUE_COUNT / PUSH_THREAD_NUM;
 | 
						|
	int64_t end_push_count = 0;
 | 
						|
	TestPushWorker push_workers[PUSH_THREAD_NUM];
 | 
						|
 | 
						|
	start_test_tstamp = get_timestamp();
 | 
						|
	for (int64_t idx = 0, cnt = PUSH_THREAD_NUM; idx < cnt; ++idx) {
 | 
						|
		TestPushWorker &w = push_workers[idx];
 | 
						|
		w.state_ = TestPushWorker::REQ;
 | 
						|
		w.thread_idx_ = idx;
 | 
						|
		w.thread_count_ = PUSH_THREAD_NUM;
 | 
						|
		w.map_queue_ = &map_queue;
 | 
						|
		w.push_count_ = 0;
 | 
						|
		w.interval_ = INTERVAL;
 | 
						|
		w.create();
 | 
						|
		LOG_INFO("push_performance", "push thread", "create OB_SUCCESS");
 | 
						|
	}
 | 
						|
	// Detect the end of push in all threads
 | 
						|
  while (((get_timestamp() - start_test_tstamp) < TEST_TIME_LIMIT)
 | 
						|
           && (end_push_count < VALUE_COUNT)) {
 | 
						|
		for (int64_t idx = 0, cnt = PUSH_THREAD_NUM; idx < cnt; ++idx) {
 | 
						|
		  TestPushWorker &w = push_workers[idx];
 | 
						|
			if (TestPushWorker::DONE == w.state_) {
 | 
						|
					end_push_count += w.push_count_;
 | 
						|
					w.state_ = TestPushWorker::IDLE;
 | 
						|
			}
 | 
						|
		}
 | 
						|
  }
 | 
						|
  EXPECT_EQ(VALUE_COUNT, end_push_count);
 | 
						|
	end_test_tstamp = get_timestamp();
 | 
						|
 | 
						|
	double push_time = static_cast<double>(end_test_tstamp - start_test_tstamp) * 1.0 / 1000000;
 | 
						|
	double push_cnt_per_second = static_cast<double>(VALUE_COUNT) * 1.0 / (push_time);
 | 
						|
	LOG_INFO("push_performance", K(end_push_count), K(push_time), "push count/s", push_cnt_per_second);
 | 
						|
 | 
						|
	// pop
 | 
						|
	int64_t POP_THREAD_NUM = 10;
 | 
						|
	int64_t end_pop_count = 0;
 | 
						|
	TestPopWorker pop_workers[POP_THREAD_NUM];
 | 
						|
 | 
						|
	// malloc array
 | 
						|
	Type *array = (Type *)ob_malloc(sizeof(Type) * VALUE_COUNT);
 | 
						|
	OB_ASSERT(NULL != array);
 | 
						|
	memset(array, 0, sizeof(Type) * VALUE_COUNT);
 | 
						|
 | 
						|
	start_test_tstamp = get_timestamp();
 | 
						|
	for (int64_t idx = 0, cnt = POP_THREAD_NUM; idx < cnt; ++idx) {
 | 
						|
		TestPopWorker &w = pop_workers[idx];
 | 
						|
		w.map_queue_ = &map_queue;
 | 
						|
		w.array_ = array;
 | 
						|
		w.pop_count_ = 0;
 | 
						|
		w.end_pop_count_ = &end_pop_count;
 | 
						|
		w.create();
 | 
						|
		LOG_INFO("pop_performance", "pop thread", "create OB_SUCCESS");
 | 
						|
	}
 | 
						|
 | 
						|
	while (((get_timestamp() - start_test_tstamp) < TEST_TIME_LIMIT)
 | 
						|
					 && (end_pop_count < VALUE_COUNT)) {
 | 
						|
		for (int64_t idx = 0, cnt = POP_THREAD_NUM; idx < cnt; ++idx) {
 | 
						|
			TestPopWorker &w = pop_workers[idx];
 | 
						|
 | 
						|
			int64_t pop_cnt = ATOMIC_LOAD(&w.pop_count_);
 | 
						|
			while (!ATOMIC_BCAS(&w.pop_count_, pop_cnt, 0)) {
 | 
						|
				pop_cnt = ATOMIC_LOAD(&w.pop_count_);
 | 
						|
			}
 | 
						|
 | 
						|
			end_pop_count += pop_cnt;
 | 
						|
			LOG_DEBUG("pop verify", K(idx), K(pop_cnt), K(end_pop_count));
 | 
						|
		}
 | 
						|
	}
 | 
						|
	EXPECT_EQ(VALUE_COUNT, end_pop_count);
 | 
						|
	end_test_tstamp = get_timestamp();
 | 
						|
 | 
						|
	double pop_time = static_cast<double>(end_test_tstamp - start_test_tstamp) * 1.0 / 1000000;
 | 
						|
	double pop_cnt_per_second = static_cast<double>(VALUE_COUNT) * 1.0 / (pop_time);
 | 
						|
	LOG_INFO("pop_performance", K(end_pop_count), K(pop_time), "pop count/s", pop_cnt_per_second);
 | 
						|
 | 
						|
	// push thread join
 | 
						|
	for (int64_t idx = 0, cnt = PUSH_THREAD_NUM; idx < cnt; ++idx) {
 | 
						|
		TestPushWorker &w = push_workers[idx];
 | 
						|
		w.join();
 | 
						|
		LOG_INFO("performance", "push thread", "join OB_SUCCESS");
 | 
						|
	}
 | 
						|
 | 
						|
	// pop thread join
 | 
						|
	for (int64_t idx = 0, cnt = POP_THREAD_NUM; idx < cnt; ++idx) {
 | 
						|
		TestPopWorker &w = pop_workers[idx];
 | 
						|
		w.join();
 | 
						|
		LOG_INFO("performance", "pop thread", "join OB_SUCCESS");
 | 
						|
	}
 | 
						|
 | 
						|
	ob_free(array);
 | 
						|
	map_queue.destroy();
 | 
						|
}
 | 
						|
 | 
						|
//////////////////////// Boundary condition testing //////////////////////////////////////////
 | 
						|
// ObMapQueue init fail
 | 
						|
TEST_F(TestObMapQueue, init_failed)
 | 
						|
{
 | 
						|
	ObMapQueue<Type> map_queue;
 | 
						|
	EXPECT_EQ(OB_SUCCESS, map_queue.init(LABEL));
 | 
						|
	EXPECT_EQ(OB_INIT_TWICE, map_queue.init(LABEL));
 | 
						|
}
 | 
						|
 | 
						|
}//end of unittest
 | 
						|
}//end of oceanbase
 | 
						|
 | 
						|
int main(int argc, char **argv)
 | 
						|
{
 | 
						|
  // ObLogger::get_logger().set_mod_log_levels("ALL.*:DEBUG, TLOG.*:DEBUG");
 | 
						|
  // testing::InitGoogleTest(&argc,argv);
 | 
						|
  // testing::FLAGS_gtest_filter = "DO_NOT_RUN";
 | 
						|
  int ret = 1;
 | 
						|
  ObLogger &logger = ObLogger::get_logger();
 | 
						|
  logger.set_file_name("test_ob_map_queue.log", true);
 | 
						|
  logger.set_log_level(OB_LOG_LEVEL_INFO);
 | 
						|
  testing::InitGoogleTest(&argc, argv);
 | 
						|
  ret = RUN_ALL_TESTS();
 | 
						|
  return ret;
 | 
						|
}
 |