236 lines
		
	
	
		
			8.8 KiB
		
	
	
	
		
			C++
		
	
	
	
	
	
			
		
		
	
	
			236 lines
		
	
	
		
			8.8 KiB
		
	
	
	
		
			C++
		
	
	
	
	
	
/**
 | 
						|
 * Copyright (c) 2021 OceanBase
 | 
						|
 * OceanBase CE is licensed under Mulan PubL v2.
 | 
						|
 * You can use this software according to the terms and conditions of the Mulan PubL v2.
 | 
						|
 * You may obtain a copy of Mulan PubL v2 at:
 | 
						|
 *          http://license.coscl.org.cn/MulanPubL-2.0
 | 
						|
 * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
 | 
						|
 * EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
 | 
						|
 * MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
 | 
						|
 * See the Mulan PubL v2 for more details.
 | 
						|
 */
 | 
						|
 | 
						|
#include <gtest/gtest.h>
 | 
						|
#define USING_LOG_PREFIX SERVER
 | 
						|
#define protected public
 | 
						|
#define private public
 | 
						|
 | 
						|
#include "env/ob_fast_bootstrap.h"
 | 
						|
#include "env/ob_multi_replica_util.h"
 | 
						|
#include "lib/mysqlclient/ob_mysql_result.h"
 | 
						|
#include "storage/tx/ob_trans_part_ctx.h"
 | 
						|
 | 
						|
using namespace oceanbase::transaction;
 | 
						|
using namespace oceanbase::storage;
 | 
						|
 | 
						|
#define CUR_TEST_CASE_NAME ObMdsReplayFromCtxTable
 | 
						|
 | 
						|
DEFINE_MULTI_ZONE_TEST_CASE_CLASS
 | 
						|
 | 
						|
APPEND_RESTART_TEST_CASE_CLASS(2, 1);
 | 
						|
 | 
						|
MULTI_REPLICA_TEST_MAIN_FUNCTION(test_mds_replay_from_ctx_table_);
 | 
						|
 | 
						|
namespace oceanbase
 | 
						|
{
 | 
						|
namespace unittest
 | 
						|
{
 | 
						|
 | 
						|
// using namespace storage;
 | 
						|
using namespace oceanbase::storage::checkpoint;
 | 
						|
 | 
						|
struct RegisterSuccTxArg
 | 
						|
{
 | 
						|
  transaction::ObTransID tx_id_;
 | 
						|
  share::SCN register_scn1_;
 | 
						|
  int64_t register_no1_;
 | 
						|
  share::SCN register_scn2_;
 | 
						|
  int64_t register_no2_;
 | 
						|
 | 
						|
  TO_STRING_KV(K(tx_id_), K(register_scn1_), K(register_no1_), K(register_scn2_), K(register_no2_));
 | 
						|
 | 
						|
  OB_UNIS_VERSION(1);
 | 
						|
};
 | 
						|
 | 
						|
OB_SERIALIZE_MEMBER(RegisterSuccTxArg,
 | 
						|
                    tx_id_,
 | 
						|
                    register_scn1_,
 | 
						|
                    register_no1_,
 | 
						|
                    register_scn2_,
 | 
						|
                    register_no2_);
 | 
						|
 | 
						|
RegisterSuccTxArg register_succ_arg;
 | 
						|
 | 
						|
common::ObMySQLTransaction mysql_trans;
 | 
						|
oceanbase::observer::ObInnerSQLConnection *register_mds_conn = nullptr;
 | 
						|
 | 
						|
void minor_freeze_tx_ctx_memtable(ObLS *ls)
 | 
						|
{
 | 
						|
  TRANS_LOG(INFO, "minor_freeze_tx_ctx_memtable begin");
 | 
						|
  int ret = OB_SUCCESS;
 | 
						|
 | 
						|
  ObCheckpointExecutor *checkpoint_executor = ls->get_checkpoint_executor();
 | 
						|
  ASSERT_NE(nullptr, checkpoint_executor);
 | 
						|
  ObTxCtxMemtable *tx_ctx_memtable = dynamic_cast<ObTxCtxMemtable *>(
 | 
						|
      dynamic_cast<ObLSTxService *>(
 | 
						|
          checkpoint_executor->handlers_[logservice::TRANS_SERVICE_LOG_BASE_TYPE])
 | 
						|
          ->common_checkpoints_[ObCommonCheckpointType::TX_CTX_MEMTABLE_TYPE]);
 | 
						|
  ASSERT_EQ(true, tx_ctx_memtable->is_active_memtable());
 | 
						|
  ASSERT_EQ(OB_SUCCESS, tx_ctx_memtable->flush(share::SCN::max_scn(), -1));
 | 
						|
 | 
						|
  // // TODO(handora.qc): use more graceful wait
 | 
						|
  // usleep(10 * 1000 * 1000);
 | 
						|
  // usleep(100 * 1000);
 | 
						|
 | 
						|
  RETRY_UNTIL_TIMEOUT(tx_ctx_memtable->is_active_memtable(), 10 * 1000 * 1000, 100 * 1000);
 | 
						|
  ASSERT_EQ(OB_SUCCESS, ret);
 | 
						|
 | 
						|
  TRANS_LOG(INFO, "minor_freeze_tx_ctx_memtable end");
 | 
						|
}
 | 
						|
 | 
						|
TEST_F(GET_ZONE_TEST_CLASS_NAME(1), register_mds_without_commit)
 | 
						|
{
 | 
						|
  int ret = OB_SUCCESS;
 | 
						|
  common::ObMySQLProxy &sys_tenant_proxy = get_curr_simple_server().get_sql_proxy();
 | 
						|
 | 
						|
  ASSERT_EQ(OB_SUCCESS, mysql_trans.start(GCTX.sql_proxy_, 1));
 | 
						|
 | 
						|
  // ACQUIRE_CONN_FROM_SQL_PROXY(sys_conn, sys_tenant_proxy);
 | 
						|
 | 
						|
  // ASSERT_EQ(OB_SUCCESS, mysql_trans.start_transaction(1, false));
 | 
						|
  register_mds_conn = static_cast<observer::ObInnerSQLConnection *>(mysql_trans.get_connection());
 | 
						|
  transaction::ObTxDesc *desc = register_mds_conn->get_session().get_tx_desc();
 | 
						|
  ASSERT_NE(nullptr, desc);
 | 
						|
  ASSERT_EQ(true, desc->is_valid());
 | 
						|
 | 
						|
  std::string test_register_str1 = "TEST REGISTER MDS 1";
 | 
						|
  transaction::ObRegisterMdsFlag register_mds_flag1;
 | 
						|
  register_mds_flag1.need_flush_redo_instantly_ = true;
 | 
						|
  ASSERT_EQ(OB_SUCCESS, register_mds_flag1.mds_base_scn_.convert_for_tx(1000));
 | 
						|
  ASSERT_EQ(OB_SUCCESS,
 | 
						|
            register_mds_conn->register_multi_data_source(
 | 
						|
                1, ObLSID(1), transaction::ObTxDataSourceType::DDL_TRANS,
 | 
						|
                test_register_str1.c_str(), test_register_str1.size(), register_mds_flag1));
 | 
						|
 | 
						|
  ObPartTransCtx *tx_ctx = nullptr;
 | 
						|
  GET_LS(1, 1, ls_handle);
 | 
						|
  ASSERT_EQ(ls_handle.is_valid(), true);
 | 
						|
  ASSERT_EQ(OB_SUCCESS, ls_handle.get_ls()->get_tx_ctx(desc->get_tx_id(), false, tx_ctx));
 | 
						|
 | 
						|
  RETRY_UNTIL_TIMEOUT(tx_ctx->busy_cbs_.is_empty(), 5 * 1000 * 1000, 5000);
 | 
						|
  ASSERT_EQ(ret, OB_SUCCESS);
 | 
						|
  share::SCN register_1_scn = tx_ctx->exec_info_.max_applying_log_ts_;
 | 
						|
 | 
						|
  std::string test_register_str2 = "TEST REGISTER MDS 2";
 | 
						|
  transaction::ObRegisterMdsFlag register_mds_flag2;
 | 
						|
  register_mds_flag2.need_flush_redo_instantly_ = true;
 | 
						|
  ASSERT_EQ(OB_SUCCESS, register_mds_flag2.mds_base_scn_.convert_for_tx(2000));
 | 
						|
  ASSERT_EQ(OB_SUCCESS,
 | 
						|
            register_mds_conn->register_multi_data_source(
 | 
						|
                1, ObLSID(1), transaction::ObTxDataSourceType::DDL_TRANS,
 | 
						|
                test_register_str2.c_str(), test_register_str2.size(), register_mds_flag2));
 | 
						|
 | 
						|
  RETRY_UNTIL_TIMEOUT(tx_ctx->busy_cbs_.is_empty(), 5 * 1000 * 1000, 5000);
 | 
						|
  ASSERT_EQ(ret, OB_SUCCESS);
 | 
						|
  share::SCN register_2_scn = tx_ctx->exec_info_.max_applying_log_ts_;
 | 
						|
 | 
						|
  ASSERT_EQ(true, register_2_scn > register_1_scn);
 | 
						|
  ASSERT_EQ(tx_ctx->exec_info_.multi_data_source_.count(), 2);
 | 
						|
 | 
						|
  register_succ_arg.tx_id_ = desc->get_tx_id();
 | 
						|
  register_succ_arg.register_scn1_ = register_1_scn;
 | 
						|
  register_succ_arg.register_no1_ = tx_ctx->exec_info_.multi_data_source_[0].get_register_no();
 | 
						|
  register_succ_arg.register_scn2_ = register_2_scn;
 | 
						|
  register_succ_arg.register_no2_ = tx_ctx->exec_info_.multi_data_source_[1].get_register_no();
 | 
						|
 | 
						|
  TRANS_LOG(INFO, "[ObMultiReplicaTestBase] register mds into tx and submit mds redo success",
 | 
						|
            K(ret), K(register_succ_arg), K(tx_ctx->exec_info_.multi_data_source_));
 | 
						|
 | 
						|
  ASSERT_EQ(OB_SUCCESS, ls_handle.get_ls()->revert_tx_ctx(tx_ctx));
 | 
						|
 | 
						|
  std::string tmp_str;
 | 
						|
  ASSERT_EQ(OB_SUCCESS,
 | 
						|
            EventArgSerTool<RegisterSuccTxArg>::serialize_arg(register_succ_arg, tmp_str));
 | 
						|
  ASSERT_EQ(OB_SUCCESS, finish_event("REGISTER_MDS_SUCC", tmp_str));
 | 
						|
}
 | 
						|
 | 
						|
TEST_F(GET_ZONE_TEST_CLASS_NAME(2), replay_mds_log_normal)
 | 
						|
{
 | 
						|
  int ret = OB_SUCCESS;
 | 
						|
  std::string tmp_event_val;
 | 
						|
  ASSERT_EQ(OB_SUCCESS, wait_event_finish("REGISTER_MDS_SUCC", tmp_event_val, 30 * 60 * 1000));
 | 
						|
  ASSERT_EQ(OB_SUCCESS,
 | 
						|
            EventArgSerTool<RegisterSuccTxArg>::deserialize_arg(register_succ_arg, tmp_event_val));
 | 
						|
 | 
						|
  ObPartTransCtx *tx_ctx = nullptr;
 | 
						|
  GET_LS(1, 1, ls_handle);
 | 
						|
  ASSERT_EQ(ls_handle.is_valid(), true);
 | 
						|
  ASSERT_EQ(OB_SUCCESS, ls_handle.get_ls()->get_tx_ctx(register_succ_arg.tx_id_, true, tx_ctx));
 | 
						|
 | 
						|
  RETRY_UNTIL_TIMEOUT(tx_ctx->exec_info_.max_applied_log_ts_ == register_succ_arg.register_scn2_,
 | 
						|
                      5 * 1000 * 1000, 5000);
 | 
						|
  ASSERT_EQ(ret, OB_SUCCESS);
 | 
						|
 | 
						|
  ASSERT_EQ(tx_ctx->exec_info_.multi_data_source_.count(), 2);
 | 
						|
  // ASSERT_EQ(tx_ctx->exec_info_.multi_data_source_[0].get_register_no(),
 | 
						|
  //           register_succ_arg.register_no1_);
 | 
						|
  // ASSERT_EQ(tx_ctx->exec_info_.multi_data_source_[1].get_register_no(),
 | 
						|
  //           register_succ_arg.register_no2_);
 | 
						|
 | 
						|
  ASSERT_EQ(OB_SUCCESS, ls_handle.get_ls()->revert_tx_ctx(tx_ctx));
 | 
						|
}
 | 
						|
 | 
						|
TEST_F(GET_ZONE_TEST_CLASS_NAME(2), dump_tx_ctx_table)
 | 
						|
{
 | 
						|
  int ret = OB_SUCCESS;
 | 
						|
  GET_LS(1, 1, ls_handle);
 | 
						|
  ASSERT_EQ(ls_handle.is_valid(), true);
 | 
						|
 | 
						|
  {
 | 
						|
    share::ObTenantSwitchGuard tenant_guard;
 | 
						|
    ASSERT_EQ(OB_SUCCESS, tenant_guard.switch_to(1));
 | 
						|
    minor_freeze_tx_ctx_memtable(ls_handle.get_ls());
 | 
						|
  }
 | 
						|
 | 
						|
  ASSERT_EQ(restart_zone(2, 1), OB_SUCCESS);
 | 
						|
}
 | 
						|
 | 
						|
TEST_F(GET_RESTART_ZONE_TEST_CLASS_NAME(2, 1), restart_zone2_from_tx_ctx_table)
 | 
						|
{
 | 
						|
  int ret = OB_SUCCESS;
 | 
						|
 | 
						|
  std::string tmp_event_val;
 | 
						|
  ASSERT_EQ(OB_SUCCESS, wait_event_finish("REGISTER_MDS_SUCC", tmp_event_val, 30 * 60 * 1000));
 | 
						|
  ASSERT_EQ(OB_SUCCESS,
 | 
						|
            EventArgSerTool<RegisterSuccTxArg>::deserialize_arg(register_succ_arg, tmp_event_val));
 | 
						|
 | 
						|
  ObPartTransCtx *tx_ctx = nullptr;
 | 
						|
  GET_LS(1, 1, ls_handle);
 | 
						|
  ASSERT_EQ(ls_handle.is_valid(), true);
 | 
						|
  ASSERT_EQ(OB_SUCCESS, ls_handle.get_ls()->get_tx_ctx(register_succ_arg.tx_id_, true, tx_ctx));
 | 
						|
 | 
						|
  RETRY_UNTIL_TIMEOUT(tx_ctx->ctx_source_ == PartCtxSource::RECOVER
 | 
						|
                   && tx_ctx->create_ctx_scn_ == register_succ_arg.register_scn2_,
 | 
						|
                      5 * 1000 * 1000, 5000);
 | 
						|
  // tx_ctx->print_trace_log();
 | 
						|
  // TRANS_LOG(INFO, "after restart, print tx ctx",K(*tx_ctx));
 | 
						|
  ASSERT_EQ(ret, OB_SUCCESS);
 | 
						|
  RETRY_UNTIL_TIMEOUT(tx_ctx->exec_info_.max_applying_log_ts_ == register_succ_arg.register_scn2_,
 | 
						|
                      5 * 1000 * 1000, 5000);
 | 
						|
  ASSERT_EQ(ret, OB_SUCCESS);
 | 
						|
 | 
						|
  TRANS_LOG(INFO, "[ObMultiReplicaTestBase] Print mds in tx ctx after retarted", K(ret),
 | 
						|
            K(tx_ctx->trans_id_), K(tx_ctx->exec_info_.multi_data_source_));
 | 
						|
  ASSERT_EQ(tx_ctx->exec_info_.multi_data_source_.count(), 2);
 | 
						|
  ASSERT_EQ(tx_ctx->exec_info_.multi_data_source_[0].get_register_no(),
 | 
						|
            register_succ_arg.register_no1_);
 | 
						|
  ASSERT_EQ(tx_ctx->exec_info_.multi_data_source_[1].get_register_no(),
 | 
						|
            register_succ_arg.register_no2_);
 | 
						|
 | 
						|
  ASSERT_EQ(OB_SUCCESS, ls_handle.get_ls()->revert_tx_ctx(tx_ctx));
 | 
						|
}
 | 
						|
 | 
						|
} // namespace unittest
 | 
						|
 | 
						|
} // namespace oceanbase
 |