Files
oceanbase/mittest/multi_replica/test_mds_replay_from_ctx_table.cpp
2024-11-26 15:16:25 +00:00

239 lines
8.8 KiB
C++

// owner: weixiaoxian.wxx
// owner group: transaction
/**
* Copyright (c) 2021 OceanBase
* OceanBase CE is licensed under Mulan PubL v2.
* You can use this software according to the terms and conditions of the Mulan PubL v2.
* You may obtain a copy of Mulan PubL v2 at:
* http://license.coscl.org.cn/MulanPubL-2.0
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
* See the Mulan PubL v2 for more details.
*/
#include <gtest/gtest.h>
#define USING_LOG_PREFIX SERVER
#define protected public
#define private public
#include "env/ob_fast_bootstrap.h"
#include "env/ob_multi_replica_util.h"
#include "lib/mysqlclient/ob_mysql_result.h"
#include "storage/tx/ob_trans_part_ctx.h"
using namespace oceanbase::transaction;
using namespace oceanbase::storage;
#define CUR_TEST_CASE_NAME ObMdsReplayFromCtxTable
DEFINE_MULTI_ZONE_TEST_CASE_CLASS
APPEND_RESTART_TEST_CASE_CLASS(2, 1);
MULTI_REPLICA_TEST_MAIN_FUNCTION(test_mds_replay_from_ctx_table_);
namespace oceanbase
{
namespace unittest
{
// using namespace storage;
using namespace oceanbase::storage::checkpoint;
struct RegisterSuccTxArg
{
transaction::ObTransID tx_id_;
share::SCN register_scn1_;
int64_t register_no1_;
share::SCN register_scn2_;
int64_t register_no2_;
TO_STRING_KV(K(tx_id_), K(register_scn1_), K(register_no1_), K(register_scn2_), K(register_no2_));
OB_UNIS_VERSION(1);
};
OB_SERIALIZE_MEMBER(RegisterSuccTxArg,
tx_id_,
register_scn1_,
register_no1_,
register_scn2_,
register_no2_);
RegisterSuccTxArg register_succ_arg;
common::ObMySQLTransaction mysql_trans;
oceanbase::observer::ObInnerSQLConnection *register_mds_conn = nullptr;
void minor_freeze_tx_ctx_memtable(ObLS *ls)
{
TRANS_LOG(INFO, "minor_freeze_tx_ctx_memtable begin");
int ret = OB_SUCCESS;
ObCheckpointExecutor *checkpoint_executor = ls->get_checkpoint_executor();
ASSERT_NE(nullptr, checkpoint_executor);
ObTxCtxMemtable *tx_ctx_memtable = dynamic_cast<ObTxCtxMemtable *>(
dynamic_cast<ObLSTxService *>(
checkpoint_executor->handlers_[logservice::TRANS_SERVICE_LOG_BASE_TYPE])
->common_checkpoints_[ObCommonCheckpointType::TX_CTX_MEMTABLE_TYPE]);
ASSERT_EQ(true, tx_ctx_memtable->is_active_memtable());
ASSERT_EQ(OB_SUCCESS, tx_ctx_memtable->flush(share::SCN::max_scn(), -1));
// // TODO(handora.qc): use more graceful wait
// usleep(10 * 1000 * 1000);
// usleep(100 * 1000);
RETRY_UNTIL_TIMEOUT(tx_ctx_memtable->is_active_memtable(), 10 * 1000 * 1000, 100 * 1000);
ASSERT_EQ(OB_SUCCESS, ret);
TRANS_LOG(INFO, "minor_freeze_tx_ctx_memtable end");
}
TEST_F(GET_ZONE_TEST_CLASS_NAME(1), register_mds_without_commit)
{
int ret = OB_SUCCESS;
common::ObMySQLProxy &sys_tenant_proxy = get_curr_simple_server().get_sql_proxy();
ASSERT_EQ(OB_SUCCESS, mysql_trans.start(GCTX.sql_proxy_, 1));
// ACQUIRE_CONN_FROM_SQL_PROXY(sys_conn, sys_tenant_proxy);
// ASSERT_EQ(OB_SUCCESS, mysql_trans.start_transaction(1, false));
register_mds_conn = static_cast<observer::ObInnerSQLConnection *>(mysql_trans.get_connection());
transaction::ObTxDesc *desc = register_mds_conn->get_session().get_tx_desc();
ASSERT_NE(nullptr, desc);
ASSERT_EQ(true, desc->is_valid());
std::string test_register_str1 = "TEST REGISTER MDS 1";
transaction::ObRegisterMdsFlag register_mds_flag1;
register_mds_flag1.need_flush_redo_instantly_ = true;
ASSERT_EQ(OB_SUCCESS, register_mds_flag1.mds_base_scn_.convert_for_tx(1000));
ASSERT_EQ(OB_SUCCESS,
register_mds_conn->register_multi_data_source(
1, ObLSID(1), transaction::ObTxDataSourceType::DDL_TRANS,
test_register_str1.c_str(), test_register_str1.size(), register_mds_flag1));
ObPartTransCtx *tx_ctx = nullptr;
GET_LS(1, 1, ls_handle);
ASSERT_EQ(ls_handle.is_valid(), true);
ASSERT_EQ(OB_SUCCESS, ls_handle.get_ls()->get_tx_ctx(desc->get_tx_id(), false, tx_ctx));
RETRY_UNTIL_TIMEOUT(tx_ctx->busy_cbs_.is_empty(), 5 * 1000 * 1000, 5000);
ASSERT_EQ(ret, OB_SUCCESS);
share::SCN register_1_scn = tx_ctx->exec_info_.max_applying_log_ts_;
std::string test_register_str2 = "TEST REGISTER MDS 2";
transaction::ObRegisterMdsFlag register_mds_flag2;
register_mds_flag2.need_flush_redo_instantly_ = true;
ASSERT_EQ(OB_SUCCESS, register_mds_flag2.mds_base_scn_.convert_for_tx(2000));
ASSERT_EQ(OB_SUCCESS,
register_mds_conn->register_multi_data_source(
1, ObLSID(1), transaction::ObTxDataSourceType::DDL_TRANS,
test_register_str2.c_str(), test_register_str2.size(), register_mds_flag2));
RETRY_UNTIL_TIMEOUT(tx_ctx->busy_cbs_.is_empty(), 5 * 1000 * 1000, 5000);
ASSERT_EQ(ret, OB_SUCCESS);
share::SCN register_2_scn = tx_ctx->exec_info_.max_applying_log_ts_;
ASSERT_EQ(true, register_2_scn > register_1_scn);
ASSERT_EQ(tx_ctx->exec_info_.multi_data_source_.count(), 2);
register_succ_arg.tx_id_ = desc->get_tx_id();
register_succ_arg.register_scn1_ = register_1_scn;
register_succ_arg.register_no1_ = tx_ctx->exec_info_.multi_data_source_[0].get_register_no();
register_succ_arg.register_scn2_ = register_2_scn;
register_succ_arg.register_no2_ = tx_ctx->exec_info_.multi_data_source_[1].get_register_no();
TRANS_LOG(INFO, "[ObMultiReplicaTestBase] register mds into tx and submit mds redo success",
K(ret), K(register_succ_arg), K(tx_ctx->exec_info_.multi_data_source_));
ASSERT_EQ(OB_SUCCESS, ls_handle.get_ls()->revert_tx_ctx(tx_ctx));
std::string tmp_str;
ASSERT_EQ(OB_SUCCESS,
EventArgSerTool<RegisterSuccTxArg>::serialize_arg(register_succ_arg, tmp_str));
ASSERT_EQ(OB_SUCCESS, finish_event("REGISTER_MDS_SUCC", tmp_str));
}
TEST_F(GET_ZONE_TEST_CLASS_NAME(2), replay_mds_log_normal)
{
int ret = OB_SUCCESS;
std::string tmp_event_val;
ASSERT_EQ(OB_SUCCESS, wait_event_finish("REGISTER_MDS_SUCC", tmp_event_val, 30 * 60 * 1000));
ASSERT_EQ(OB_SUCCESS,
EventArgSerTool<RegisterSuccTxArg>::deserialize_arg(register_succ_arg, tmp_event_val));
ObPartTransCtx *tx_ctx = nullptr;
GET_LS(1, 1, ls_handle);
ASSERT_EQ(ls_handle.is_valid(), true);
ASSERT_EQ(OB_SUCCESS, ls_handle.get_ls()->get_tx_ctx(register_succ_arg.tx_id_, true, tx_ctx));
RETRY_UNTIL_TIMEOUT(tx_ctx->exec_info_.max_applied_log_ts_ == register_succ_arg.register_scn2_,
5 * 1000 * 1000, 5000);
ASSERT_EQ(ret, OB_SUCCESS);
ASSERT_EQ(tx_ctx->exec_info_.multi_data_source_.count(), 2);
// ASSERT_EQ(tx_ctx->exec_info_.multi_data_source_[0].get_register_no(),
// register_succ_arg.register_no1_);
// ASSERT_EQ(tx_ctx->exec_info_.multi_data_source_[1].get_register_no(),
// register_succ_arg.register_no2_);
ASSERT_EQ(OB_SUCCESS, ls_handle.get_ls()->revert_tx_ctx(tx_ctx));
}
TEST_F(GET_ZONE_TEST_CLASS_NAME(2), dump_tx_ctx_table)
{
int ret = OB_SUCCESS;
GET_LS(1, 1, ls_handle);
ASSERT_EQ(ls_handle.is_valid(), true);
{
share::ObTenantSwitchGuard tenant_guard;
ASSERT_EQ(OB_SUCCESS, tenant_guard.switch_to(1));
minor_freeze_tx_ctx_memtable(ls_handle.get_ls());
}
ASSERT_EQ(restart_zone(2, 1), OB_SUCCESS);
}
TEST_F(GET_RESTART_ZONE_TEST_CLASS_NAME(2, 1), restart_zone2_from_tx_ctx_table)
{
int ret = OB_SUCCESS;
std::string tmp_event_val;
ASSERT_EQ(OB_SUCCESS, wait_event_finish("REGISTER_MDS_SUCC", tmp_event_val, 30 * 60 * 1000));
ASSERT_EQ(OB_SUCCESS,
EventArgSerTool<RegisterSuccTxArg>::deserialize_arg(register_succ_arg, tmp_event_val));
ObPartTransCtx *tx_ctx = nullptr;
GET_LS(1, 1, ls_handle);
ASSERT_EQ(ls_handle.is_valid(), true);
ASSERT_EQ(OB_SUCCESS, ls_handle.get_ls()->get_tx_ctx(register_succ_arg.tx_id_, true, tx_ctx));
RETRY_UNTIL_TIMEOUT(tx_ctx->ctx_source_ == PartCtxSource::RECOVER
&& tx_ctx->create_ctx_scn_ == register_succ_arg.register_scn2_,
5 * 1000 * 1000, 5000);
// tx_ctx->print_trace_log();
// TRANS_LOG(INFO, "after restart, print tx ctx",K(*tx_ctx));
ASSERT_EQ(ret, OB_SUCCESS);
RETRY_UNTIL_TIMEOUT(tx_ctx->exec_info_.max_applying_log_ts_ == register_succ_arg.register_scn2_,
5 * 1000 * 1000, 5000);
ASSERT_EQ(ret, OB_SUCCESS);
TRANS_LOG(INFO, "[ObMultiReplicaTestBase] Print mds in tx ctx after retarted", K(ret),
K(tx_ctx->trans_id_), K(tx_ctx->exec_info_.multi_data_source_));
ASSERT_EQ(tx_ctx->exec_info_.multi_data_source_.count(), 2);
ASSERT_EQ(tx_ctx->exec_info_.multi_data_source_[0].get_register_no(),
register_succ_arg.register_no1_);
ASSERT_EQ(tx_ctx->exec_info_.multi_data_source_[1].get_register_no(),
register_succ_arg.register_no2_);
ASSERT_EQ(OB_SUCCESS, ls_handle.get_ls()->revert_tx_ctx(tx_ctx));
}
} // namespace unittest
} // namespace oceanbase