327 lines
		
	
	
		
			12 KiB
		
	
	
	
		
			C++
		
	
	
	
	
	
			
		
		
	
	
			327 lines
		
	
	
		
			12 KiB
		
	
	
	
		
			C++
		
	
	
	
	
	
| // Copyright 2014 Alibaba Inc. All Rights Reserved.
 | |
| // Author:
 | |
| //     zhenzhong.jzz@alibaba-inc.com
 | |
| // Owner:
 | |
| //     zhenzhong.jzz@alibaba-inc.com
 | |
| //
 | |
| //This file is for the unit test of log_callback_engine and the related
 | |
| //threadpool and handler.
 | |
| 
 | |
| #include <gtest/gtest.h>
 | |
| 
 | |
| #include "clog/ob_log_callback_engine.h"
 | |
| #include "clog/ob_log_callback_task.h"
 | |
| #include "clog/ob_log_callback_thread_pool.h"
 | |
| #include "clog/ob_log_callback_handler.h"
 | |
| #include "clog/ob_log_define.h"
 | |
| //#include "storage/ob_replay_status.h"
 | |
| #include "storage/ob_partition_component_factory.h"
 | |
| #include "storage/ob_i_partition_group.h"
 | |
| #include "common/storage/ob_freeze_define.h"
 | |
| #include "../storage/mockcontainer/mock_ob_partition.h"
 | |
| #include "../storage/mockcontainer/mock_ob_partition_service.h"
 | |
| #include "common/ob_queue_thread.h"
 | |
| #include "common/ob_partition_key.h"
 | |
| #include "share/ob_errno.h"
 | |
| #include "lib/net/ob_addr.h"
 | |
| #include "lib/allocator/ob_concurrent_fifo_allocator.h"
 | |
| #include "lib/utility/ob_tracepoint.h"
 | |
| #include "gtest/gtest.h"
 | |
| #include "share/ob_i_ps_cb.h"
 | |
| #include "clog_mock_container/mock_ps_cb.h"
 | |
| namespace oceanbase
 | |
| {
 | |
| 
 | |
| using namespace common;
 | |
| using namespace clog;
 | |
| using namespace storage;
 | |
| namespace storage
 | |
| {
 | |
| class ObPartitionComponentFactory;
 | |
| }
 | |
| 
 | |
| namespace unittest
 | |
| {
 | |
| class MockPartition: public MockObIPartition
 | |
| {
 | |
| public:
 | |
|   MockPartition() {partition_key_.init(1, 1, 1);}
 | |
|   ~MockPartition() {}
 | |
|   const common::ObPartitionKey &get_partition_key() const
 | |
|   {
 | |
|     return partition_key_;
 | |
|   }
 | |
|   virtual int get_safe_publish_version(int64_t& publish_version)
 | |
|   {
 | |
|     UNUSED(publish_version);
 | |
|     return 0;
 | |
|   }
 | |
| private:
 | |
|   ObPartitionKey partition_key_;
 | |
|   ObReplayStatus relay_ststus_;
 | |
|   //Uncommenting this line will cause coverity to report an error, if necessary,
 | |
|   //please perform related initialization operations.
 | |
|   //  ObPartitionMCState partition_mc_state_;
 | |
| };
 | |
| 
 | |
| class MockPartitionMgr : public MockObIPartitionService
 | |
| {
 | |
| public:
 | |
|   MockPartitionMgr(): is_inited_(true) {}
 | |
|   int get_partition(const ObPartitionKey &partition_key,
 | |
|                     ObIPartitionGroup *&partition)
 | |
|   {
 | |
|     int ret = OB_SUCCESS;
 | |
|     if (partition_key == mock_partition_.get_partition_key()) {
 | |
|       partition = &mock_partition_;
 | |
|     } else {
 | |
|       ret = OB_ENTRY_NOT_EXIST;
 | |
|     }
 | |
|     return ret;
 | |
|   }
 | |
| private:
 | |
|   bool is_inited_;
 | |
|   MockPartition mock_partition_;
 | |
| };
 | |
| 
 | |
| //class MockObPSCb : public share::ObIPSCb
 | |
| //{
 | |
| //public:
 | |
| //  int on_leader_revoke(const ObPartitionKey &partition_key)
 | |
| //  {
 | |
| //    UNUSED(partition_key);
 | |
| //    return OB_SUCCESS;
 | |
| //  }
 | |
| //  int on_leader_takeover(const ObPartitionKey &partition_key)
 | |
| //  {
 | |
| //    UNUSED(partition_key);
 | |
| //    return OB_SUCCESS;
 | |
| //
 | |
| //  }
 | |
| //  int on_leader_active(const ObPartitionKey &partition_key)
 | |
| //  {
 | |
| //    UNUSED(partition_key);
 | |
| //    return OB_SUCCESS;
 | |
| //
 | |
| //  }
 | |
| //  int on_member_change_success(
 | |
| //      const ObPartitionKey &partition_key,
 | |
| //      const int64_t mc_timestamp,
 | |
| //      const ObMemberList &prev_member_list,
 | |
| //      const ObMemberList &curr_member_list)
 | |
| //  {
 | |
| //    UNUSED(partition_key);
 | |
| //    UNUSED(mc_timestamp);
 | |
| //    UNUSED(prev_member_list);
 | |
| //    UNUSED(curr_member_list);
 | |
| //    return OB_SUCCESS;
 | |
| //  }
 | |
| //  int64_t get_min_using_file_id() const { return 0; }
 | |
| //};
 | |
| class ObTestLogCallback : public ::testing::Test
 | |
| {
 | |
| public :
 | |
|   void SetUp()
 | |
|   {
 | |
|     int ret = OB_SUCCESS;
 | |
| 
 | |
|     const uint64_t TABLE_ID = 198;
 | |
|     const int32_t PARTITION_IDX = 125;
 | |
|     const int32_t PARTITION_CNT = 168;
 | |
|     const int64_t TOTAL_LIMIT = 1 << 30;
 | |
|     const int64_t HOLD_LIMIT = 1 << 29;
 | |
|     const int64_t PAGE_SIZE = 1 << 16;
 | |
|     const char *ip = "127.0.0.1";
 | |
|     const int32_t PORT = 80;
 | |
| 
 | |
|     if (OB_SUCCESS != (ret = partition_key_.init(TABLE_ID, PARTITION_IDX, PARTITION_CNT))) {
 | |
|       CLOG_LOG(ERROR, "partition_key_.init failed");
 | |
|     }
 | |
|     if (OB_SUCCESS != (ret = allocator_.init(TOTAL_LIMIT, HOLD_LIMIT, PAGE_SIZE))) {
 | |
|       CLOG_LOG(ERROR, "allocator_.init failed");
 | |
|     }
 | |
|     self_addr_.set_ip_addr(ip, PORT);
 | |
|   }
 | |
|   void TearDown()
 | |
|   {
 | |
|     allocator_.destroy();
 | |
|   }
 | |
| 
 | |
|   MockPartitionMgr partition_service_;
 | |
|   common::ObAddr self_addr_ ;
 | |
|   common::ObConcurrentFIFOAllocator allocator_;
 | |
|   common::ObPartitionKey partition_key_;
 | |
|   MockObPSCb partition_service_cb_;
 | |
|   clog::ObLogCallbackThreadPool log_callback_thread_pool_;
 | |
|   clog::ObLogCallbackThreadPool worker_thread_pool_;
 | |
|   clog::ObLogCallbackThreadPool sp_thread_pool_;
 | |
| };// class ObTestLogCallback
 | |
| 
 | |
| TEST_F(ObTestLogCallback, init_test)
 | |
| {
 | |
|   const int64_t THREAD_NUM = 15;
 | |
|   const int64_t TASK_LIMIT_NUM = 12;
 | |
|   clog::ObLogCallbackEngine log_callback_engine;
 | |
|   clog::ObLogCallbackHandler log_callback_handler;
 | |
| 
 | |
|   //test the init method of ObLogCallbackHandler
 | |
|   EXPECT_EQ(OB_INVALID_ARGUMENT, log_callback_handler.init(NULL, NULL));
 | |
|   EXPECT_EQ(OB_SUCCESS, log_callback_handler.init(&partition_service_, &partition_service_cb_));
 | |
|   EXPECT_EQ(OB_INIT_TWICE, log_callback_handler.init(&partition_service_, &partition_service_cb_));
 | |
| 
 | |
|   //test the init method of ObLogCallbackThreadPool
 | |
|   EXPECT_EQ(OB_INVALID_ARGUMENT, log_callback_thread_pool_.init(NULL, THREAD_NUM, TASK_LIMIT_NUM,
 | |
|                                                                 self_addr_));
 | |
|   TP_SET_ERROR("ob_log_callback_thread_pool_.cpp", "init", "test_a", OB_ERROR);
 | |
|   //  EXPECT_EQ(OB_ERROR, log_callback_thread_pool_.init(&log_callback_handler, THREAD_NUM,
 | |
|   //                                                     TASK_LIMIT_NUM,
 | |
|   //                                                     self_addr_));
 | |
|   TP_SET("ob_log_callback_thread_pool_.cpp", "init", "test_a", NULL);
 | |
|   EXPECT_EQ(OB_SUCCESS, log_callback_thread_pool_.init(&log_callback_handler, THREAD_NUM,
 | |
|                                                        TASK_LIMIT_NUM, self_addr_));
 | |
|   EXPECT_EQ(OB_INIT_TWICE, log_callback_thread_pool_.init(&log_callback_handler, THREAD_NUM,
 | |
|                                                           TASK_LIMIT_NUM, self_addr_));
 | |
| 
 | |
|   //test the init method of ObLogCallbackEngine
 | |
|   worker_thread_pool_.init(&log_callback_handler, THREAD_NUM, TASK_LIMIT_NUM, self_addr_);
 | |
|   sp_thread_pool_.init(&log_callback_handler, THREAD_NUM, TASK_LIMIT_NUM, self_addr_);
 | |
|   EXPECT_EQ(OB_INVALID_ARGUMENT, log_callback_engine.init(NULL, NULL));
 | |
|   EXPECT_EQ(OB_SUCCESS, log_callback_engine.init(&worker_thread_pool_, &sp_thread_pool_));
 | |
|   EXPECT_EQ(OB_INIT_TWICE, log_callback_engine.init(&worker_thread_pool_, &sp_thread_pool_));
 | |
|   sleep(10);
 | |
| 
 | |
|   log_callback_engine.destroy();
 | |
|   log_callback_thread_pool_.destroy();
 | |
|   worker_thread_pool_.destroy();
 | |
|   sp_thread_pool_.destroy();
 | |
| }
 | |
| 
 | |
| //TEST_F(ObTestLogCallback, submit_flush_cb_task_test)
 | |
| //{
 | |
| //  const uint64_t LOG_ID = 190;
 | |
| //  const int64_t MC_TIMESTAMP = 865;
 | |
| //  const common::ObProposalID PROPOSAL_ID;
 | |
| //  const int64_t THREAD_NUM = 15;
 | |
| //  const int64_t TASK_LIMIT_NUM = 12;
 | |
| //  bool NEED_ACK = false;
 | |
| //
 | |
| //  clog::ObLogCallbackEngine log_callback_engine;
 | |
| //  clog::ObLogCallbackThreadPool log_callback_thread_pool_;
 | |
| //  clog::ObLogCallbackHandler log_callback_handler;
 | |
| //  clog::ObLogCallbackThreadPool worker_thread_pool_;
 | |
| //  clog::ObLogCallbackThreadPool sp_thread_pool_;
 | |
| //
 | |
| //  ObLogType type = OB_LOG_MEMBER_CHANGE;
 | |
| //  clog::ObLogCursor log_cursor;
 | |
| //  log_cursor.file_id_ = 0;
 | |
| //  log_cursor.offset_ = 0;
 | |
| //  log_cursor.size_ = 0;
 | |
| //  clog::ObLogFlushCbArg flush_cb_arg(type, LOG_ID, MC_TIMESTAMP, PROPOSAL_ID, NEED_ACK, self_addr_,
 | |
| //                                     log_cursor, 0);
 | |
| //
 | |
| //  EXPECT_EQ(OB_NOT_INIT, log_callback_engine.submit_flush_cb_task(partition_key_, flush_cb_arg));
 | |
| //  ASSERT_EQ(OB_SUCCESS, log_callback_handler.init(&partition_service_, &partition_service_cb_));
 | |
| //  ASSERT_EQ(OB_SUCCESS, log_callback_thread_pool_.init(&log_callback_handler, THREAD_NUM,
 | |
| //                                                      TASK_LIMIT_NUM, self_addr_));
 | |
| //  worker_thread_pool_.init(&log_callback_handler, THREAD_NUM, TASK_LIMIT_NUM, self_addr_);
 | |
| //  sp_thread_pool_.init(&log_callback_handler, THREAD_NUM, TASK_LIMIT_NUM, self_addr_);
 | |
| //  ASSERT_EQ(OB_SUCCESS, log_callback_engine.init(&worker_thread_pool_, &sp_thread_pool_));
 | |
| //  EXPECT_EQ(OB_SUCCESS, log_callback_engine.submit_flush_cb_task(partition_key_, flush_cb_arg));
 | |
| //
 | |
| //  sleep(10);
 | |
| //  log_callback_engine.destroy();
 | |
| //  log_callback_thread_pool_.destroy();
 | |
| //  worker_thread_pool_.destroy();
 | |
| //  sp_thread_pool_.destroy();
 | |
| //}
 | |
| TEST_F(ObTestLogCallback, submit_member_change_success_cb_task_test)
 | |
| {
 | |
|   const int64_t THREAD_NUM = 15;
 | |
|   const int64_t TASK_LIMIT_NUM = 12;
 | |
|   clog::ObLogCallbackEngine log_callback_engine;
 | |
|   clog::ObLogCallbackHandler log_callback_handler;
 | |
|   const int64_t MC_TIMESTAMP = 64;
 | |
|   const common::ObMemberList PREV_MEMBER_LIST;
 | |
|   const common::ObMemberList CURR_MEMBER_LIST;
 | |
| 
 | |
|   EXPECT_EQ(OB_NOT_INIT, log_callback_engine.submit_member_change_success_cb_task(partition_key_,
 | |
|                                                                                   MC_TIMESTAMP, PREV_MEMBER_LIST, CURR_MEMBER_LIST));
 | |
|   ASSERT_EQ(OB_SUCCESS, log_callback_handler.init(&partition_service_, &partition_service_cb_));
 | |
|   ASSERT_EQ(OB_SUCCESS, log_callback_thread_pool_.init(&log_callback_handler, THREAD_NUM,
 | |
|                                                        TASK_LIMIT_NUM, self_addr_));
 | |
|   worker_thread_pool_.init(&log_callback_handler, THREAD_NUM, TASK_LIMIT_NUM, self_addr_);
 | |
|   sp_thread_pool_.init(&log_callback_handler, THREAD_NUM, TASK_LIMIT_NUM, self_addr_);
 | |
|   ASSERT_EQ(OB_SUCCESS, log_callback_engine.init(&worker_thread_pool_, &sp_thread_pool_));
 | |
|   EXPECT_EQ(OB_SUCCESS, log_callback_engine.submit_member_change_success_cb_task(partition_key_,
 | |
|                                                                                  MC_TIMESTAMP, PREV_MEMBER_LIST, CURR_MEMBER_LIST));
 | |
| 
 | |
|   sleep(10);
 | |
|   log_callback_engine.destroy();
 | |
|   log_callback_thread_pool_.destroy();
 | |
|   worker_thread_pool_.destroy();
 | |
|   sp_thread_pool_.destroy();
 | |
| }
 | |
| 
 | |
| //TEST_F(ObTestLogCallback, submit_leader_takeover_cb_task_test)
 | |
| //{
 | |
| //  const int64_t THREAD_NUM = 15;
 | |
| //  const int64_t TASK_LIMIT_NUM = 12;
 | |
| //  clog::ObLogCallbackEngine log_callback_engine;
 | |
| //  clog::ObLogCallbackThreadPool log_callback_thread_pool_;
 | |
| //  clog::ObLogCallbackHandler log_callback_handler;
 | |
| //  clog::ObLogCallbackThreadPool worker_thread_pool_;
 | |
| //  clog::ObLogCallbackThreadPool sp_thread_pool_;
 | |
| //
 | |
| //  EXPECT_EQ(OB_NOT_INIT, log_callback_engine.submit_leader_takeover_cb_task(partition_key_));
 | |
| //  ASSERT_EQ(OB_SUCCESS, log_callback_handler.init(&partition_service_, &partition_service_cb_));
 | |
| //  ASSERT_EQ(OB_SUCCESS, log_callback_thread_pool_.init(&log_callback_handler, THREAD_NUM,
 | |
| //                                                      TASK_LIMIT_NUM, self_addr_));
 | |
| //  worker_thread_pool_.init(&log_callback_handler, THREAD_NUM, TASK_LIMIT_NUM, self_addr_);
 | |
| //  sp_thread_pool_.init(&log_callback_handler, THREAD_NUM, TASK_LIMIT_NUM, self_addr_);
 | |
| //  ASSERT_EQ(OB_SUCCESS, log_callback_engine.init(&worker_thread_pool_, &sp_thread_pool_));
 | |
| //  EXPECT_EQ(OB_SUCCESS, log_callback_engine.submit_leader_takeover_cb_task(partition_key_));
 | |
| //
 | |
| //  sleep(10);
 | |
| //  log_callback_engine.destroy();
 | |
| //  log_callback_thread_pool_.destroy();
 | |
| //  worker_thread_pool_.destroy();
 | |
| //  sp_thread_pool_.destroy();
 | |
| //}
 | |
| //TEST_F(ObTestLogCallback, submit_leader_revoke_cb_task_test)
 | |
| //{
 | |
| //  const int64_t THREAD_NUM = 15;
 | |
| //  const int64_t TASK_LIMIT_NUM = 12;
 | |
| //  clog::ObLogCallbackEngine log_callback_engine;
 | |
| //  clog::ObLogCallbackThreadPool log_callback_thread_pool_;
 | |
| //  clog::ObLogCallbackHandler log_callback_handler;
 | |
| //  clog::ObLogCallbackThreadPool worker_thread_pool_;
 | |
| //  clog::ObLogCallbackThreadPool sp_thread_pool_;
 | |
| //
 | |
| //  EXPECT_EQ(OB_NOT_INIT, log_callback_engine.submit_leader_revoke_cb_task(partition_key_));
 | |
| //  ASSERT_EQ(OB_SUCCESS, log_callback_handler.init(&partition_service_, &partition_service_cb_));
 | |
| //  ASSERT_EQ(OB_SUCCESS, log_callback_thread_pool_.init(&log_callback_handler, THREAD_NUM,
 | |
| //                                                      TASK_LIMIT_NUM, self_addr_));
 | |
| //  worker_thread_pool_.init(&log_callback_handler, THREAD_NUM, TASK_LIMIT_NUM, self_addr_);
 | |
| //  sp_thread_pool_.init(&log_callback_handler, THREAD_NUM, TASK_LIMIT_NUM, self_addr_);
 | |
| //  ASSERT_EQ(OB_SUCCESS, log_callback_engine.init(&worker_thread_pool_, &sp_thread_pool_));
 | |
| //  EXPECT_EQ(OB_SUCCESS, log_callback_engine.submit_leader_revoke_cb_task(partition_key_));
 | |
| //
 | |
| //  sleep(10);
 | |
| //  log_callback_engine.destroy();
 | |
| //  log_callback_thread_pool_.destroy();
 | |
| //  worker_thread_pool_.destroy();
 | |
| //  sp_thread_pool_.destroy();
 | |
| //}
 | |
| 
 | |
| }//namespace unittest
 | |
| }//namespace oceanbase
 | |
| int main(int argc, char **argv)
 | |
| {
 | |
|   OB_LOGGER.set_file_name("test_log_callback_engine.log", true);
 | |
|   OB_LOGGER.set_log_level("INFO");
 | |
|   CLOG_LOG(INFO, "begin unittest: test_ob_log_callback_engine");
 | |
| 
 | |
|   ::testing::InitGoogleTest(&argc, argv);
 | |
|   return RUN_ALL_TESTS();
 | |
| }
 | 
