316 lines
		
	
	
		
			12 KiB
		
	
	
	
		
			C++
		
	
	
	
	
	
			
		
		
	
	
			316 lines
		
	
	
		
			12 KiB
		
	
	
	
		
			C++
		
	
	
	
	
	
| /**
 | |
|  * Copyright (c) 2021 OceanBase
 | |
|  * OceanBase CE is licensed under Mulan PubL v2.
 | |
|  * You can use this software according to the terms and conditions of the Mulan PubL v2.
 | |
|  * You may obtain a copy of Mulan PubL v2 at:
 | |
|  *          http://license.coscl.org.cn/MulanPubL-2.0
 | |
|  * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
 | |
|  * EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
 | |
|  * MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
 | |
|  * See the Mulan PubL v2 for more details.
 | |
|  */
 | |
| 
 | |
| #include "share/ob_common_rpc_proxy.h"
 | |
| #include "lib/lock/ob_mutex.h"
 | |
| #include "storage/ob_server_frozen_status.h"
 | |
| #include <gtest/gtest.h>
 | |
| #include <gmock/gmock.h>
 | |
| 
 | |
| using namespace oceanbase::common;
 | |
| using namespace oceanbase::storage;
 | |
| using namespace oceanbase::obrpc;
 | |
| 
 | |
| // > MAX_STORE_CNT_IN_STORAGE
 | |
| const int32_t INIT_RPC_MAX_VERSION = 74;
 | |
| 
 | |
| class MockRootRpcProxy : public ObCommonRpcProxy
 | |
| {
 | |
| public:
 | |
|   MockRootRpcProxy() : latest_version_lock_(),
 | |
|                        rpc_read_count_(0)
 | |
|   {}
 | |
|   virtual ~MockRootRpcProxy() {}
 | |
| 
 | |
|   int get_frozen_status(const Int64 &arg,
 | |
|                         ObFrozenStatus &frozen_status,
 | |
|                         const ObRpcOpts &opts);
 | |
|   int64_t get_rpc_read_count() const { return rpc_read_count_; }
 | |
| private:
 | |
|   oceanbase::lib::ObMutex latest_version_lock_;
 | |
|   int64_t rpc_read_count_;
 | |
| };
 | |
| 
 | |
| int MockRootRpcProxy::get_frozen_status(
 | |
|     const Int64 &arg,
 | |
|     ObFrozenStatus &frozen_status,
 | |
|     const ObRpcOpts &opts)
 | |
| {
 | |
|   int ret = OB_SUCCESS;
 | |
|   UNUSED(opts);
 | |
|   if (0 == arg) {
 | |
|     oceanbase::lib::ObMutexGuard guard(latest_version_lock_);
 | |
|     frozen_status.frozen_version_.major_ = INIT_RPC_MAX_VERSION;
 | |
|     frozen_status.frozen_timestamp_ = INIT_RPC_MAX_VERSION;
 | |
|     frozen_status.status_ = COMMIT_SUCCEED;
 | |
|     frozen_status.schema_version_ = INIT_RPC_MAX_VERSION;
 | |
|     ++rpc_read_count_;
 | |
|   } else {
 | |
|     frozen_status.frozen_version_.major_ = static_cast<int32_t>(arg);
 | |
|     frozen_status.frozen_timestamp_ = arg;
 | |
|     frozen_status.status_ = COMMIT_SUCCEED;
 | |
|     frozen_status.schema_version_ = arg;
 | |
|     oceanbase::lib::ObMutexGuard guard(latest_version_lock_);
 | |
|     ++rpc_read_count_;
 | |
|   }
 | |
| 
 | |
|   return ret;
 | |
| }
 | |
| 
 | |
| class TestObServerFrozenStatus : public ::testing::Test
 | |
| {
 | |
| public:
 | |
|   virtual void SetUp() {}
 | |
|   virtual void TearDown() {}
 | |
|   static void SetUpTestCase();
 | |
|   static void TearDownTestCase();
 | |
| 
 | |
|   static MockRootRpcProxy rs_rpc_proxy_;
 | |
|   static ObServerFrozenStatus observer_frozen_status_;
 | |
| };
 | |
| MockRootRpcProxy TestObServerFrozenStatus::rs_rpc_proxy_;
 | |
| ObServerFrozenStatus TestObServerFrozenStatus::observer_frozen_status_;
 | |
| 
 | |
| void TestObServerFrozenStatus::SetUpTestCase()
 | |
| {
 | |
|   observer_frozen_status_.set_rs_rpc_proxy(&rs_rpc_proxy_);
 | |
| }
 | |
| 
 | |
| void TestObServerFrozenStatus::TearDownTestCase()
 | |
| {
 | |
| }
 | |
| 
 | |
| void check_frozen_status(const int32_t major_version,
 | |
|                          const int64_t freeze_status,
 | |
|                          const ObFrozenStatus &frozen_status)
 | |
| {
 | |
|   ASSERT_EQ(frozen_status.frozen_version_, ObVersion(major_version, 0));
 | |
|   ASSERT_EQ(frozen_status.status_, freeze_status);
 | |
|   ASSERT_EQ(frozen_status.frozen_timestamp_, major_version);
 | |
|   ASSERT_EQ(frozen_status.schema_version_, major_version);
 | |
| }
 | |
| 
 | |
| TEST_F(TestObServerFrozenStatus, test_get_from_rpc)
 | |
| {
 | |
|   int ret = OB_SUCCESS;
 | |
|   ObFrozenStatus frozen_status;
 | |
| 
 | |
| 
 | |
| 
 | |
|   ret = observer_frozen_status_.get_frozen_status(ObVersion(5, 0),
 | |
|                                                   frozen_status);
 | |
|   ASSERT_EQ(OB_SUCCESS, ret);
 | |
|   check_frozen_status(5, COMMIT_SUCCEED, frozen_status);
 | |
|   ASSERT_EQ(observer_frozen_status_.count(), 1);
 | |
|   ASSERT_EQ(rs_rpc_proxy_.get_rpc_read_count(), 1);
 | |
|   // get from local
 | |
|   ret = observer_frozen_status_.get_frozen_status(ObVersion(5, 0),
 | |
|                                                   frozen_status);
 | |
|   ASSERT_EQ(OB_SUCCESS, ret);
 | |
|   check_frozen_status(5, COMMIT_SUCCEED, frozen_status);
 | |
|   ASSERT_EQ(observer_frozen_status_.count(), 1);
 | |
|   ASSERT_EQ(rs_rpc_proxy_.get_rpc_read_count(), 1);
 | |
| 
 | |
| 
 | |
|   ret = observer_frozen_status_.get_frozen_status(ObVersion(6, 0),
 | |
|                                                   frozen_status);
 | |
|   ASSERT_EQ(OB_SUCCESS, ret);
 | |
|   check_frozen_status(6, COMMIT_SUCCEED, frozen_status);
 | |
|   ASSERT_EQ(observer_frozen_status_.count(), 2);
 | |
|   ASSERT_EQ(rs_rpc_proxy_.get_rpc_read_count(), 2);
 | |
| 
 | |
| 
 | |
|   ret = observer_frozen_status_.get_frozen_status(frozen_status);
 | |
|   ASSERT_EQ(OB_SUCCESS, ret);
 | |
|   check_frozen_status(static_cast<int32_t>(INIT_RPC_MAX_VERSION),
 | |
|                       COMMIT_SUCCEED, frozen_status);
 | |
|   ASSERT_EQ(observer_frozen_status_.count(), 1);
 | |
|   ASSERT_EQ(rs_rpc_proxy_.get_rpc_read_count(), 3);
 | |
| 
 | |
| 
 | |
|   ret = observer_frozen_status_.get_frozen_status(ObVersion(150, 0),
 | |
|                                                   frozen_status);
 | |
|   ASSERT_EQ(OB_ERR_UNEXPECTED, ret);
 | |
|   ASSERT_EQ(observer_frozen_status_.count(), 1);
 | |
|   ASSERT_EQ(rs_rpc_proxy_.get_rpc_read_count(), 4);
 | |
| 
 | |
| 
 | |
|   ret = observer_frozen_status_.get_frozen_status(ObVersion(1, 0),
 | |
|                                                   frozen_status);
 | |
|   ASSERT_EQ(OB_SUCCESS, ret);
 | |
|   check_frozen_status(1, COMMIT_SUCCEED, frozen_status);
 | |
|   ASSERT_EQ(observer_frozen_status_.count(), 1);
 | |
|   ASSERT_EQ(rs_rpc_proxy_.get_rpc_read_count(), 5);
 | |
| 
 | |
|   ret = observer_frozen_status_.get_frozen_status(ObVersion(1, 0),
 | |
|                                                   frozen_status);
 | |
|   ASSERT_EQ(OB_SUCCESS, ret);
 | |
|   check_frozen_status(1, COMMIT_SUCCEED, frozen_status);
 | |
|   ASSERT_EQ(observer_frozen_status_.count(), 1);
 | |
|   ASSERT_EQ(rs_rpc_proxy_.get_rpc_read_count(), 6);
 | |
| 
 | |
|   for (int32_t i = 2; OB_SUCC(ret) && i < 5; ++i) {
 | |
|     ret = observer_frozen_status_.get_frozen_status(ObVersion(i, 0),
 | |
|                                                     frozen_status);
 | |
|     ASSERT_EQ(OB_SUCCESS, ret);
 | |
|     check_frozen_status(i, COMMIT_SUCCEED, frozen_status);
 | |
|   }
 | |
|   ASSERT_EQ(observer_frozen_status_.count(), 1);
 | |
|   ASSERT_EQ(rs_rpc_proxy_.get_rpc_read_count(), 9);
 | |
| 
 | |
| 
 | |
|   for(int32_t i = 7; OB_SUCC(ret) && i < INIT_RPC_MAX_VERSION; ++i) {
 | |
|     ret = observer_frozen_status_.get_frozen_status(ObVersion(i, 0),
 | |
|                                                     frozen_status);
 | |
|     ASSERT_EQ(OB_SUCCESS, ret);
 | |
|     check_frozen_status(i, COMMIT_SUCCEED, frozen_status);
 | |
|   }
 | |
|   ASSERT_EQ(observer_frozen_status_.count(), MAX_STORE_CNT_IN_STORAGE);
 | |
|   ASSERT_EQ(rs_rpc_proxy_.get_rpc_read_count(), 76);
 | |
| 
 | |
| 
 | |
|   int32_t i = static_cast<int32_t>(INIT_RPC_MAX_VERSION) -
 | |
|               MAX_STORE_CNT_IN_STORAGE + 1;
 | |
|   for(; OB_SUCC(ret) && i <= INIT_RPC_MAX_VERSION; ++i) {
 | |
|     ret = observer_frozen_status_.get_frozen_status(ObVersion(i, 0),
 | |
|                                                     frozen_status);
 | |
|     ASSERT_EQ(OB_SUCCESS, ret);
 | |
|     check_frozen_status(i, COMMIT_SUCCEED, frozen_status);
 | |
|   }
 | |
|   ASSERT_EQ(observer_frozen_status_.count(), MAX_STORE_CNT_IN_STORAGE);
 | |
|   ASSERT_EQ(rs_rpc_proxy_.get_rpc_read_count(), 76);
 | |
| }
 | |
| 
 | |
| TEST_F(TestObServerFrozenStatus, set_frozen_status)
 | |
| {
 | |
|   int ret = OB_SUCCESS;
 | |
|   int32_t next = INIT_RPC_MAX_VERSION + 1;
 | |
|   ObVersion version(next, 0);
 | |
| 
 | |
|   // must be 75-prepare
 | |
|   ObFrozenStatus frozen_status(version, next, COMMIT_SUCCEED, next);
 | |
|   ret = observer_frozen_status_.set_frozen_status(frozen_status);
 | |
|   ASSERT_EQ(OB_INVALID_ARGUMENT, ret);
 | |
|   frozen_status.status_ = INIT_STATUS;
 | |
|   ret = observer_frozen_status_.set_frozen_status(frozen_status);
 | |
|   ASSERT_EQ(OB_INVALID_ARGUMENT, ret);
 | |
|   frozen_status.status_ = PREPARED_SUCCEED;
 | |
|   frozen_status.frozen_version_ = ObVersion(INIT_RPC_MAX_VERSION + 2, 0);
 | |
|   ret = observer_frozen_status_.set_frozen_status(frozen_status);
 | |
|   ASSERT_EQ(OB_INVALID_ARGUMENT, ret);
 | |
|   frozen_status.frozen_version_ = next;
 | |
|   ret = observer_frozen_status_.set_frozen_status(frozen_status);
 | |
|   ASSERT_EQ(OB_SUCCESS, ret);
 | |
|   ObFrozenStatus read_frozen_status;
 | |
|   ret = observer_frozen_status_.get_frozen_status(read_frozen_status);
 | |
|   ASSERT_EQ(OB_SUCCESS, ret);
 | |
|   check_frozen_status(INIT_RPC_MAX_VERSION + 1,
 | |
|                       PREPARED_SUCCEED,
 | |
|                       read_frozen_status);
 | |
|   ASSERT_EQ(rs_rpc_proxy_.get_rpc_read_count(), 76);
 | |
| 
 | |
|   // must be 75-commit or 75-abort
 | |
|   ret = observer_frozen_status_.set_frozen_status(frozen_status);
 | |
|   ASSERT_EQ(OB_INVALID_ARGUMENT, ret);
 | |
|   frozen_status.status_ = COMMIT_SUCCEED;
 | |
|   ret = observer_frozen_status_.set_frozen_status(frozen_status);
 | |
|   ASSERT_EQ(OB_SUCCESS, ret);
 | |
|   ret = observer_frozen_status_.get_frozen_status(read_frozen_status);
 | |
|   ASSERT_EQ(OB_SUCCESS, ret);
 | |
|   check_frozen_status(INIT_RPC_MAX_VERSION + 1,
 | |
|                       COMMIT_SUCCEED,
 | |
|                       read_frozen_status);
 | |
|   ASSERT_EQ(rs_rpc_proxy_.get_rpc_read_count(), 76);
 | |
| 
 | |
|   // must be 76-prepare
 | |
|   frozen_status.status_ = PREPARED_SUCCEED;
 | |
|   ret = observer_frozen_status_.set_frozen_status(frozen_status);
 | |
|   ASSERT_EQ(OB_INVALID_ARGUMENT, ret);
 | |
|   frozen_status.status_ = COMMIT_SUCCEED;
 | |
|   ret = observer_frozen_status_.set_frozen_status(frozen_status);
 | |
|   ASSERT_EQ(OB_INVALID_ARGUMENT, ret);
 | |
|   frozen_status.status_ = INIT_STATUS;
 | |
|   ret = observer_frozen_status_.set_frozen_status(frozen_status);
 | |
|   ASSERT_EQ(OB_INVALID_ARGUMENT, ret);
 | |
|   frozen_status.status_ = PREPARED_SUCCEED;
 | |
|   frozen_status.frozen_version_ = ObVersion(INIT_RPC_MAX_VERSION + 2, 0);
 | |
|   frozen_status.frozen_timestamp_ = INIT_RPC_MAX_VERSION + 2;
 | |
|   frozen_status.schema_version_ = INIT_RPC_MAX_VERSION + 2;
 | |
|   ret = observer_frozen_status_.set_frozen_status(frozen_status);
 | |
|   ASSERT_EQ(OB_SUCCESS, ret);
 | |
|   ret = observer_frozen_status_.get_frozen_status(read_frozen_status);
 | |
|   ASSERT_EQ(OB_SUCCESS, ret);
 | |
|   check_frozen_status(INIT_RPC_MAX_VERSION + 2,
 | |
|                       PREPARED_SUCCEED,
 | |
|                       read_frozen_status);
 | |
| 
 | |
|   // can abort unless committed
 | |
|   frozen_status.status_ = INIT_STATUS;
 | |
|   ret = observer_frozen_status_.set_frozen_status(frozen_status);
 | |
|   ASSERT_EQ(OB_SUCCESS, ret);
 | |
|   ret = observer_frozen_status_.get_frozen_status(read_frozen_status);
 | |
|   ASSERT_EQ(OB_SUCCESS, ret);
 | |
|   check_frozen_status(INIT_RPC_MAX_VERSION + 2,
 | |
|                       INIT_STATUS,
 | |
|                       read_frozen_status);
 | |
|   // abort again
 | |
|   ret = observer_frozen_status_.set_frozen_status(frozen_status);
 | |
|   ASSERT_EQ(OB_SUCCESS, ret);
 | |
|   ret = observer_frozen_status_.get_frozen_status(read_frozen_status);
 | |
|   ASSERT_EQ(OB_SUCCESS, ret);
 | |
|   check_frozen_status(INIT_RPC_MAX_VERSION + 2,
 | |
|                       INIT_STATUS,
 | |
|                       read_frozen_status);
 | |
|   frozen_status.frozen_version_ = ObVersion(INIT_RPC_MAX_VERSION + 1, 0);
 | |
|   frozen_status.frozen_timestamp_ = INIT_RPC_MAX_VERSION + 1;
 | |
|   frozen_status.schema_version_ = INIT_RPC_MAX_VERSION + 1;
 | |
|   ret = observer_frozen_status_.set_frozen_status(frozen_status);
 | |
|   ASSERT_EQ(OB_INVALID_ARGUMENT, ret);
 | |
| 
 | |
| }
 | |
| 
 | |
| TEST_F(TestObServerFrozenStatus, set_frozen_status_4096)
 | |
| {
 | |
|   int ret = OB_SUCCESS;
 | |
|   ObFrozenStatus old;
 | |
|   ObFrozenStatus tgt;
 | |
|   ObFrozenStatus frozen_status;
 | |
|   ret = observer_frozen_status_.get_frozen_status(old);
 | |
|   ASSERT_EQ(OB_SUCCESS, ret);
 | |
|   for (int32_t i = 0; OB_SUCC(ret) && i <= 4096; ++i) {
 | |
|     tgt.frozen_version_.major_ = old.frozen_version_.major_ + i;
 | |
|     tgt.frozen_version_.minor_ = 0;
 | |
|     tgt.status_ = PREPARED_SUCCEED;
 | |
|     tgt.frozen_timestamp_ = ObTimeUtility::current_time();
 | |
|     tgt.schema_version_ = old.schema_version_;
 | |
|     ret = observer_frozen_status_.set_frozen_status(tgt);
 | |
|     ASSERT_EQ(OB_SUCCESS, ret);
 | |
|     ret = observer_frozen_status_.get_frozen_status(tgt.frozen_version_, frozen_status);
 | |
|     ASSERT_EQ(OB_SUCCESS, ret);
 | |
|     tgt.status_ = COMMIT_SUCCEED;
 | |
|     ret = observer_frozen_status_.set_frozen_status(tgt);
 | |
|     ASSERT_EQ(OB_SUCCESS, ret);
 | |
|     ret = observer_frozen_status_.get_frozen_status(tgt.frozen_version_, frozen_status);
 | |
|     ASSERT_EQ(OB_SUCCESS, ret);
 | |
|   }
 | |
| }
 | |
| 
 | |
| int main(int argc, char **argv)
 | |
| {
 | |
|   OB_LOGGER.set_file_name("test_server_frozen_status.log");
 | |
|   OB_LOGGER.set_log_level(OB_LOG_LEVEL_INFO);
 | |
|   CLOG_LOG(INFO, "begin unittest: test_server_frozen_status");
 | |
|   ::testing::InitGoogleTest(&argc, argv);
 | |
|   return RUN_ALL_TESTS();
 | |
| }
 | |
| 
 | 
