// owner: weixiaoxian.wxx // owner group: transaction /** * Copyright (c) 2021 OceanBase * OceanBase CE is licensed under Mulan PubL v2. * You can use this software according to the terms and conditions of the Mulan PubL v2. * You may obtain a copy of Mulan PubL v2 at: * http://license.coscl.org.cn/MulanPubL-2.0 * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, * EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, * MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE. * See the Mulan PubL v2 for more details. */ #define USING_LOG_PREFIX SERVER #define protected public #define private public #include "env/ob_fast_bootstrap.h" #include "env/ob_multi_replica_util.h" using namespace oceanbase::transaction; using namespace oceanbase::storage; #define CUR_TEST_CASE_NAME ObMdsReplayFromCtxTable DEFINE_MULTI_ZONE_TEST_CASE_CLASS APPEND_RESTART_TEST_CASE_CLASS(2, 1); MULTI_REPLICA_TEST_MAIN_FUNCTION(test_mds_replay_from_ctx_table_); namespace oceanbase { namespace unittest { // using namespace storage; using namespace oceanbase::storage::checkpoint; struct RegisterSuccTxArg { transaction::ObTransID tx_id_; share::SCN register_scn1_; int64_t register_no1_; share::SCN register_scn2_; int64_t register_no2_; TO_STRING_KV(K(tx_id_), K(register_scn1_), K(register_no1_), K(register_scn2_), K(register_no2_)); OB_UNIS_VERSION(1); }; OB_SERIALIZE_MEMBER(RegisterSuccTxArg, tx_id_, register_scn1_, register_no1_, register_scn2_, register_no2_); RegisterSuccTxArg register_succ_arg; common::ObMySQLTransaction mysql_trans; oceanbase::observer::ObInnerSQLConnection *register_mds_conn = nullptr; void minor_freeze_tx_ctx_memtable(ObLS *ls) { TRANS_LOG(INFO, "minor_freeze_tx_ctx_memtable begin"); int ret = OB_SUCCESS; ObCheckpointExecutor *checkpoint_executor = ls->get_checkpoint_executor(); ASSERT_NE(nullptr, checkpoint_executor); ObTxCtxMemtable *tx_ctx_memtable = dynamic_cast( dynamic_cast( checkpoint_executor->handlers_[logservice::TRANS_SERVICE_LOG_BASE_TYPE]) ->common_checkpoints_[ObCommonCheckpointType::TX_CTX_MEMTABLE_TYPE]); ASSERT_EQ(true, tx_ctx_memtable->is_active_memtable()); ASSERT_EQ(OB_SUCCESS, tx_ctx_memtable->flush(share::SCN::max_scn(), -1)); // // TODO(handora.qc): use more graceful wait // usleep(10 * 1000 * 1000); // usleep(100 * 1000); RETRY_UNTIL_TIMEOUT(tx_ctx_memtable->is_active_memtable(), 10 * 1000 * 1000, 100 * 1000); ASSERT_EQ(OB_SUCCESS, ret); TRANS_LOG(INFO, "minor_freeze_tx_ctx_memtable end"); } TEST_F(GET_ZONE_TEST_CLASS_NAME(1), register_mds_without_commit) { int ret = OB_SUCCESS; common::ObMySQLProxy &sys_tenant_proxy = get_curr_simple_server().get_sql_proxy(); ASSERT_EQ(OB_SUCCESS, mysql_trans.start(GCTX.sql_proxy_, 1)); // ACQUIRE_CONN_FROM_SQL_PROXY(sys_conn, sys_tenant_proxy); // ASSERT_EQ(OB_SUCCESS, mysql_trans.start_transaction(1, false)); register_mds_conn = static_cast(mysql_trans.get_connection()); transaction::ObTxDesc *desc = register_mds_conn->get_session().get_tx_desc(); ASSERT_NE(nullptr, desc); ASSERT_EQ(true, desc->is_valid()); std::string test_register_str1 = "TEST REGISTER MDS 1"; transaction::ObRegisterMdsFlag register_mds_flag1; register_mds_flag1.need_flush_redo_instantly_ = true; ASSERT_EQ(OB_SUCCESS, register_mds_flag1.mds_base_scn_.convert_for_tx(1000)); ASSERT_EQ(OB_SUCCESS, register_mds_conn->register_multi_data_source( 1, ObLSID(1), transaction::ObTxDataSourceType::DDL_TRANS, test_register_str1.c_str(), test_register_str1.size(), register_mds_flag1)); ObPartTransCtx *tx_ctx = nullptr; GET_LS(1, 1, ls_handle); ASSERT_EQ(ls_handle.is_valid(), true); ASSERT_EQ(OB_SUCCESS, ls_handle.get_ls()->get_tx_ctx(desc->get_tx_id(), false, tx_ctx)); RETRY_UNTIL_TIMEOUT(tx_ctx->busy_cbs_.is_empty(), 5 * 1000 * 1000, 5000); ASSERT_EQ(ret, OB_SUCCESS); share::SCN register_1_scn = tx_ctx->exec_info_.max_applying_log_ts_; std::string test_register_str2 = "TEST REGISTER MDS 2"; transaction::ObRegisterMdsFlag register_mds_flag2; register_mds_flag2.need_flush_redo_instantly_ = true; ASSERT_EQ(OB_SUCCESS, register_mds_flag2.mds_base_scn_.convert_for_tx(2000)); ASSERT_EQ(OB_SUCCESS, register_mds_conn->register_multi_data_source( 1, ObLSID(1), transaction::ObTxDataSourceType::DDL_TRANS, test_register_str2.c_str(), test_register_str2.size(), register_mds_flag2)); RETRY_UNTIL_TIMEOUT(tx_ctx->busy_cbs_.is_empty(), 5 * 1000 * 1000, 5000); ASSERT_EQ(ret, OB_SUCCESS); share::SCN register_2_scn = tx_ctx->exec_info_.max_applying_log_ts_; ASSERT_EQ(true, register_2_scn > register_1_scn); ASSERT_EQ(tx_ctx->exec_info_.multi_data_source_.count(), 2); register_succ_arg.tx_id_ = desc->get_tx_id(); register_succ_arg.register_scn1_ = register_1_scn; register_succ_arg.register_no1_ = tx_ctx->exec_info_.multi_data_source_[0].get_register_no(); register_succ_arg.register_scn2_ = register_2_scn; register_succ_arg.register_no2_ = tx_ctx->exec_info_.multi_data_source_[1].get_register_no(); TRANS_LOG(INFO, "[ObMultiReplicaTestBase] register mds into tx and submit mds redo success", K(ret), K(register_succ_arg), K(tx_ctx->exec_info_.multi_data_source_)); ASSERT_EQ(OB_SUCCESS, ls_handle.get_ls()->revert_tx_ctx(tx_ctx)); std::string tmp_str; ASSERT_EQ(OB_SUCCESS, EventArgSerTool::serialize_arg(register_succ_arg, tmp_str)); ASSERT_EQ(OB_SUCCESS, finish_event("REGISTER_MDS_SUCC", tmp_str)); } TEST_F(GET_ZONE_TEST_CLASS_NAME(2), replay_mds_log_normal) { int ret = OB_SUCCESS; std::string tmp_event_val; ASSERT_EQ(OB_SUCCESS, wait_event_finish("REGISTER_MDS_SUCC", tmp_event_val, 30 * 60 * 1000)); ASSERT_EQ(OB_SUCCESS, EventArgSerTool::deserialize_arg(register_succ_arg, tmp_event_val)); ObPartTransCtx *tx_ctx = nullptr; GET_LS(1, 1, ls_handle); ASSERT_EQ(ls_handle.is_valid(), true); ASSERT_EQ(OB_SUCCESS, ls_handle.get_ls()->get_tx_ctx(register_succ_arg.tx_id_, true, tx_ctx)); RETRY_UNTIL_TIMEOUT(tx_ctx->exec_info_.max_applied_log_ts_ == register_succ_arg.register_scn2_, 5 * 1000 * 1000, 5000); ASSERT_EQ(ret, OB_SUCCESS); ASSERT_EQ(tx_ctx->exec_info_.multi_data_source_.count(), 2); // ASSERT_EQ(tx_ctx->exec_info_.multi_data_source_[0].get_register_no(), // register_succ_arg.register_no1_); // ASSERT_EQ(tx_ctx->exec_info_.multi_data_source_[1].get_register_no(), // register_succ_arg.register_no2_); ASSERT_EQ(OB_SUCCESS, ls_handle.get_ls()->revert_tx_ctx(tx_ctx)); } TEST_F(GET_ZONE_TEST_CLASS_NAME(2), dump_tx_ctx_table) { int ret = OB_SUCCESS; GET_LS(1, 1, ls_handle); ASSERT_EQ(ls_handle.is_valid(), true); { share::ObTenantSwitchGuard tenant_guard; ASSERT_EQ(OB_SUCCESS, tenant_guard.switch_to(1)); minor_freeze_tx_ctx_memtable(ls_handle.get_ls()); } ASSERT_EQ(restart_zone(2, 1), OB_SUCCESS); } TEST_F(GET_RESTART_ZONE_TEST_CLASS_NAME(2, 1), restart_zone2_from_tx_ctx_table) { int ret = OB_SUCCESS; std::string tmp_event_val; ASSERT_EQ(OB_SUCCESS, wait_event_finish("REGISTER_MDS_SUCC", tmp_event_val, 30 * 60 * 1000)); ASSERT_EQ(OB_SUCCESS, EventArgSerTool::deserialize_arg(register_succ_arg, tmp_event_val)); ObPartTransCtx *tx_ctx = nullptr; GET_LS(1, 1, ls_handle); ASSERT_EQ(ls_handle.is_valid(), true); ASSERT_EQ(OB_SUCCESS, ls_handle.get_ls()->get_tx_ctx(register_succ_arg.tx_id_, true, tx_ctx)); RETRY_UNTIL_TIMEOUT(tx_ctx->ctx_source_ == PartCtxSource::RECOVER && tx_ctx->create_ctx_scn_ == register_succ_arg.register_scn2_, 5 * 1000 * 1000, 5000); // tx_ctx->print_trace_log(); // TRANS_LOG(INFO, "after restart, print tx ctx",K(*tx_ctx)); ASSERT_EQ(ret, OB_SUCCESS); RETRY_UNTIL_TIMEOUT(tx_ctx->exec_info_.max_applying_log_ts_ == register_succ_arg.register_scn2_, 5 * 1000 * 1000, 5000); ASSERT_EQ(ret, OB_SUCCESS); TRANS_LOG(INFO, "[ObMultiReplicaTestBase] Print mds in tx ctx after retarted", K(ret), K(tx_ctx->trans_id_), K(tx_ctx->exec_info_.multi_data_source_)); ASSERT_EQ(tx_ctx->exec_info_.multi_data_source_.count(), 2); ASSERT_EQ(tx_ctx->exec_info_.multi_data_source_[0].get_register_no(), register_succ_arg.register_no1_); ASSERT_EQ(tx_ctx->exec_info_.multi_data_source_[1].get_register_no(), register_succ_arg.register_no2_); ASSERT_EQ(OB_SUCCESS, ls_handle.get_ls()->revert_tx_ctx(tx_ctx)); } } // namespace unittest } // namespace oceanbase