278 lines
		
	
	
		
			8.8 KiB
		
	
	
	
		
			C++
		
	
	
	
	
	
			
		
		
	
	
			278 lines
		
	
	
		
			8.8 KiB
		
	
	
	
		
			C++
		
	
	
	
	
	
| /**
 | |
|  * Copyright (c) 2021 OceanBase
 | |
|  * OceanBase CE is licensed under Mulan PubL v2.
 | |
|  * You can use this software according to the terms and conditions of the Mulan PubL v2.
 | |
|  * You may obtain a copy of Mulan PubL v2 at:
 | |
|  *          http://license.coscl.org.cn/MulanPubL-2.0
 | |
|  * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
 | |
|  * EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
 | |
|  * MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
 | |
|  * See the Mulan PubL v2 for more details.
 | |
|  */
 | |
| 
 | |
| #define UNIITTEST_DEBUG
 | |
| 
 | |
| #include "share/ob_occam_thread_pool.h"
 | |
| #include <gtest/gtest.h>
 | |
| #include <thread>
 | |
| #include <iostream>
 | |
| #include <vector>
 | |
| #include <chrono>
 | |
| #include "common/ob_clock_generator.h"
 | |
| 
 | |
| namespace oceanbase {
 | |
| namespace unittest {
 | |
| 
 | |
| using namespace common;
 | |
| using namespace std;
 | |
| 
 | |
| class TestObOccamThreadPool: public ::testing::Test
 | |
| {
 | |
| public:
 | |
|   TestObOccamThreadPool() : thread_pool(nullptr) {};
 | |
|   virtual ~TestObOccamThreadPool() {};
 | |
|   virtual void SetUp() {
 | |
|     thread_pool = new ObOccamThreadPool();
 | |
|     ASSERT_EQ(thread_pool->init_and_start(2, 1), OB_SUCCESS);
 | |
|   };
 | |
|   virtual void TearDown() {
 | |
|     delete thread_pool;
 | |
|     ASSERT_EQ(occam::DefaultAllocator::get_default_allocator().total_alive_num, 0);
 | |
|     ASSERT_EQ(function::DefaultFunctionAllocator::get_default_allocator().total_alive_num, 0);
 | |
|     ASSERT_EQ(future::DefaultFutureAllocator::get_default_allocator().total_alive_num, 0);
 | |
|     ASSERT_EQ(guard::DefaultSharedGuardAllocator::get_default_allocator().total_alive_num, 0);
 | |
|   };
 | |
|   ObOccamThreadPool *thread_pool;
 | |
| private:
 | |
|   // disallow copy
 | |
|   DISALLOW_COPY_AND_ASSIGN(TestObOccamThreadPool);
 | |
| };
 | |
| 
 | |
| TEST_F(TestObOccamThreadPool, ObOccamThread) {
 | |
|   cout << "main thread id:" << this_thread::get_id() << endl;
 | |
|   int ret = OB_SUCCESS;
 | |
|   occam::ObOccamThread th;
 | |
|   if (OB_FAIL(th.init_and_start([](){ cout << "th thread id:" << this_thread::get_id() << endl; }))) {}
 | |
|   ASSERT_EQ(ret, OB_SUCCESS);
 | |
|   th.wait();
 | |
|   th.destroy();
 | |
| }
 | |
| 
 | |
| struct TestInt {
 | |
|   TestInt(int a) : data_(a) {}
 | |
|   int data_;
 | |
| };
 | |
| 
 | |
| struct TestBigInt {
 | |
|   TestBigInt(int a) { data_[0] = a; }
 | |
|   int data_[100];
 | |
| };
 | |
| 
 | |
| TEST_F(TestObOccamThreadPool, thread_pool) {
 | |
|   ObOccamThreadPool thread_pool_not_vaild;
 | |
|   ASSERT_EQ(thread_pool_not_vaild.init_and_start(2, 17), OB_INVALID_ARGUMENT);
 | |
|   int ret = OB_SUCCESS;
 | |
|   {
 | |
|     ObFuture<TestInt> result;
 | |
|     int a = 3;
 | |
|     int b = 4;
 | |
|     cout << "origin b addr:" << &b << endl;
 | |
|     ret = thread_pool->commit_task(result,
 | |
|                                   [](int a, int &b)->TestInt{
 | |
|                                     cout << "b addr:" << &b << endl;
 | |
|                                     return TestInt(++a + ++b);
 | |
|                                   },
 | |
|                                   a,
 | |
|                                   b);
 | |
|     ASSERT_EQ(ret, OB_SUCCESS);
 | |
|     TestInt *resultp;
 | |
|     ASSERT_EQ(result.get(resultp), OB_SUCCESS);
 | |
|     ASSERT_EQ(resultp->data_, 9);
 | |
|     ASSERT_EQ(a, 3);
 | |
|     ASSERT_EQ(b, 5);
 | |
|   }
 | |
|   ASSERT_EQ(ret, OB_SUCCESS);
 | |
| }
 | |
| 
 | |
| TEST_F(TestObOccamThreadPool, thread_pool_big_obj) {
 | |
|   int ret = OB_SUCCESS;
 | |
|   {
 | |
|     ObFuture<TestBigInt> result;
 | |
|     int b = 4;
 | |
|     cout << "origin b addr:" << &b << endl;
 | |
|     ret = thread_pool->commit_task(result,
 | |
|                                   [](const int a, int &b)->TestBigInt{
 | |
|                                     cout << "b addr:" << &b << endl;
 | |
|                                     return TestBigInt(a + ++b);
 | |
|                                   },
 | |
|                                   3,
 | |
|                                   b);
 | |
|     TestBigInt *resultp;
 | |
|     result.get(resultp);
 | |
|     ASSERT_EQ(resultp->data_[0], 8);
 | |
|     ASSERT_EQ(b, 5);
 | |
|   }
 | |
|   ASSERT_EQ(ret, OB_SUCCESS);
 | |
| }
 | |
| 
 | |
| TEST_F(TestObOccamThreadPool, queue_full) {
 | |
|   // 线程池里有两个线程,队列长度是2,只能同事容纳两个等待执行的任务
 | |
|   auto fun = []() { 
 | |
|     this_thread::sleep_for(chrono::milliseconds(100));
 | |
|     OB_LOG(DEBUG, "task done time", K(ObClockGenerator::getRealClock()));
 | |
|   };
 | |
|   int ret = OB_SUCCESS;
 | |
|   OB_LOG(DEBUG, "1", K(ObClockGenerator::getRealClock()));
 | |
|   vector<ObFuture<void>> results;
 | |
|   ObFuture<void> result;
 | |
|   // start commit task
 | |
|   ret = thread_pool->commit_task(result, fun);// 提交第一个任务
 | |
|   ASSERT_EQ(ret, OB_SUCCESS);
 | |
|   results.push_back(result);
 | |
|   ret = thread_pool->commit_task(result, fun); // 提交第二个任务
 | |
|   ASSERT_EQ(ret, OB_SUCCESS);
 | |
|   results.push_back(result);
 | |
|   OB_LOG(DEBUG, "2", K(ObClockGenerator::getRealClock()));
 | |
|   this_thread::sleep_for(chrono::milliseconds(50));// 等待worker线程fetch任务,预期两个线程都把任务去出来了,现在任务队列空了
 | |
|   // now all 2 threads are busy
 | |
|   ret = thread_pool->commit_task(result, fun);// 提交第三个任务
 | |
|   ASSERT_EQ(ret, OB_SUCCESS);
 | |
|   results.push_back(result);
 | |
|   ret = thread_pool->commit_task(result, fun);// 提交第四个任务
 | |
|   ASSERT_EQ(ret, OB_SUCCESS);
 | |
|   results.push_back(result);
 | |
|   OB_LOG(DEBUG, "3", K(ObClockGenerator::getRealClock()));
 | |
|   // now the queue is full
 | |
|   ret = thread_pool->commit_task(result, fun);
 | |
|   ASSERT_EQ(ret, OB_BUF_NOT_ENOUGH);// commit task failed
 | |
|   this_thread::sleep_for(chrono::milliseconds(100));
 | |
|   ret = thread_pool->commit_task(result, fun);
 | |
|   OB_LOG(DEBUG, "4", K(ObClockGenerator::getRealClock()));
 | |
|   ASSERT_EQ(ret, OB_SUCCESS);
 | |
|   results.push_back(result);
 | |
|   for (auto &result : results)
 | |
|     result.wait();
 | |
| }
 | |
| 
 | |
| TEST_F(TestObOccamThreadPool, task_priority) {
 | |
|   ObOccamThreadPool *thread_pool = new ObOccamThreadPool();
 | |
|   cout << "size of:" << sizeof(thread_pool) << endl;
 | |
|   thread_pool->init_and_start(2);
 | |
|   auto fun = []() { this_thread::sleep_for(chrono::milliseconds(100)); };
 | |
|   int ret = OB_SUCCESS;
 | |
|   vector<ObFuture<void>> results;
 | |
|   ObFuture<void> result;
 | |
|   // start commit task
 | |
|   ret = thread_pool->commit_task(result, fun);
 | |
|   ASSERT_EQ(ret, OB_SUCCESS);
 | |
|   results.push_back(result);
 | |
|   ret = thread_pool->commit_task(result, fun);
 | |
|   ASSERT_EQ(ret, OB_SUCCESS);
 | |
|   results.push_back(result);
 | |
|   this_thread::sleep_for(chrono::milliseconds(1));
 | |
|   // now all 2 threads are busy
 | |
|   int a = 0;
 | |
|   ret = thread_pool->commit_task(result, [&a](){ a = 3; });// commit first, but will execute later
 | |
|   ASSERT_EQ(ret, OB_SUCCESS);
 | |
|   results.push_back(result);
 | |
|   ret = thread_pool->commit_task(result, [&a](){ a = 4; });// commit second, but will execute later
 | |
|   ASSERT_EQ(ret, OB_SUCCESS);
 | |
|   results.push_back(result);
 | |
|   ret = thread_pool->commit_task<occam::TASK_PRIORITY::HIGH>(result, [&a](){ a = 2; });// last committed task, but will execute first
 | |
|   ASSERT_EQ(ret, OB_SUCCESS);
 | |
|   results.push_back(result);
 | |
|   // now the queue is full
 | |
|   for (auto &result : results)
 | |
|     result.wait();
 | |
|   ASSERT_NE(a, 2);
 | |
|   delete thread_pool;
 | |
| }
 | |
| 
 | |
| int64_t task(int64_t n)
 | |
| {
 | |
|   while(--n);
 | |
|   return n;
 | |
| }
 | |
| 
 | |
| vector<int64_t> run_all_without_thread_pool(vector<int64_t> &inputs)
 | |
| {
 | |
|   vector<int64_t> outputs;
 | |
|   for (auto &input : inputs) {
 | |
|     outputs.emplace_back(task(input));
 | |
|   }
 | |
|   return outputs;
 | |
| }
 | |
| 
 | |
| vector<int64_t> run_all_with_thread_pool(vector<int64_t> &inputs, ObOccamThreadPool &th_pool)
 | |
| {
 | |
|   vector<int64_t> outputs;
 | |
|   // 拆分提交异步任务
 | |
|   vector<ObFuture<int64_t>> futures;
 | |
|   ObFuture<int64_t> future;
 | |
|   for (auto &input : inputs) {
 | |
|     th_pool.commit_task(future, task, input);
 | |
|     futures.push_back(future);
 | |
|   }
 | |
|   // 同步等待异步任务的结果
 | |
|   int64_t *p_result;
 | |
|   for (auto &future : futures) {
 | |
|     future.get(p_result);
 | |
|     outputs.push_back(*p_result);
 | |
|   }
 | |
|   return outputs;
 | |
| }
 | |
| 
 | |
| bool compare_result_equal(const vector<int64_t> &v1, const vector<int64_t> &v2)
 | |
| {
 | |
|   bool ret = true;
 | |
|   if (v1.size() != v2.size()) {
 | |
|     ret = false;
 | |
|   } else {
 | |
|     for (size_t idx = 0; idx < v1.size() && ret; ++idx) {
 | |
|       ret = (v1[idx] == v2[idx]);
 | |
|     }
 | |
|   }
 | |
|   return ret;
 | |
| }
 | |
| 
 | |
| TEST_F(TestObOccamThreadPool, heavy_task_sperate) {
 | |
|   ObOccamThreadPool *thread_pool = new ObOccamThreadPool();
 | |
|   thread_pool->init_and_start(2, 10);
 | |
|   vector<int64_t> inputs(1000, 1000000);
 | |
|   auto timepoint1 = chrono::steady_clock::now();
 | |
|   vector<int64_t> outputs1 = run_all_without_thread_pool(inputs);
 | |
|   auto timepoint2 = chrono::steady_clock::now();
 | |
|   vector<int64_t> outputs2 = run_all_with_thread_pool(inputs, *thread_pool);
 | |
|   auto timepoint3 = chrono::steady_clock::now();
 | |
| 
 | |
|   ASSERT_EQ(compare_result_equal(outputs1, outputs2), true);
 | |
| 
 | |
|   cout << "cost 1:" << chrono::duration_cast<chrono::milliseconds>(timepoint2 - timepoint1).count() << "ms" << endl;
 | |
|   cout << "cost 2:" << chrono::duration_cast<chrono::milliseconds>(timepoint3 - timepoint2).count() << "ms" << endl;
 | |
|   cout << "inputs[0]:" << inputs[0] << endl;
 | |
|   delete thread_pool;
 | |
| }
 | |
| 
 | |
| int test_f(int a, int b) {
 | |
|   UNUSED(b);
 | |
|   return a;
 | |
| }
 | |
| 
 | |
| TEST_F(TestObOccamThreadPool, test_) {
 | |
|   thread_pool->commit_task_ignore_ret(test_f, 1, 1);
 | |
| }
 | |
| 
 | |
| }
 | |
| }
 | |
| 
 | |
| int main(int argc, char **argv)
 | |
| {
 | |
|   system("rm -rf test_ob_occam_thread_pool.log");
 | |
|   oceanbase::common::ObLogger &logger = oceanbase::common::ObLogger::get_logger();
 | |
|   logger.set_file_name("test_ob_occam_thread_pool.log", false);
 | |
|   logger.set_log_level(OB_LOG_LEVEL_DEBUG);
 | |
|   testing::InitGoogleTest(&argc, argv);
 | |
|   return RUN_ALL_TESTS();
 | |
| }
 | |
| 
 | 
