// owner: xuwang.txw // owner group: transaction /** * Copyright (c) 2021 OceanBase * OceanBase CE is licensed under Mulan PubL v2. * You can use this software according to the terms and conditions of the Mulan PubL v2. * You may obtain a copy of Mulan PubL v2 at: * http://license.coscl.org.cn/MulanPubL-2.0 * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, * EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, * MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE. * See the Mulan PubL v2 for more details. */ #ifndef DEBUG_FOR_MDS #define DEBUG_FOR_MDS #define TEST_MDS_TRANSACTION #define USING_LOG_PREFIX SERVER #define protected public #define private public #include "storage/tx_storage/ob_ls_service.h" #include "env/ob_simple_cluster_test_base.h" #include "storage/multi_data_source/test/example_user_helper_define.cpp" namespace oceanbase { share::SCN mock_tablet_oldest_scn = unittest::mock_scn(1); ObTabletID tablet_id; namespace compaction { int ObMediumCompactionInfo::assign(ObIAllocator &allocator, const ObMediumCompactionInfo &medium_info) { UNUSED(allocator); data_version_ = medium_info.data_version_; return OB_SUCCESS; } } namespace storage { namespace mds { int ObTenantMdsTimer::get_tablet_oldest_scn_(ObTablet &tablet, share::SCN &oldest_scn) { #define PRINT_WRAPPER KR(ret), K(tablet), K(tablet_oldest_scn), KPC(this) int ret = OB_SUCCESS; if (tablet.tablet_meta_.tablet_id_ == tablet_id) {// for tested tablet oldest_scn = mock_tablet_oldest_scn; } else {// for other tablet oldest_scn = unittest::mock_scn(1000);// FIXME: get correct min scn from oldest version tablet } return ret; #undef PRINT_WRAPPER } } } namespace unittest { using namespace oceanbase::transaction; using namespace oceanbase::storage; using namespace mds; using namespace compaction; class TestRunCtx { public: uint64_t tenant_id_ = 0; int time_sec_ = 0; }; TestRunCtx RunCtx; class TestMdsTransactionTest : public ObSimpleClusterTestBase { public: // 指定case运行目录前缀 test_ob_simple_cluster_ TestMdsTransactionTest() : ObSimpleClusterTestBase("test_mds_transaction_") {} virtual void SetUp() override { ObSimpleClusterTestBase::SetUp(); OB_LOGGER.set_log_level("TRACE"); } virtual void TearDown() override { OB_LOGGER.set_log_level("WDIAG"); ObSimpleClusterTestBase::TearDown(); } }; TEST_F(TestMdsTransactionTest, simple_test) { common::ObMySQLProxy *sql_proxy = GCTX.ddl_sql_proxy_; sqlclient::ObISQLConnection *connection = nullptr; ASSERT_EQ(OB_SUCCESS, sql_proxy->acquire(connection)); observer::ObInnerSQLConnection *inner_conn = dynamic_cast(connection); ASSERT_EQ(OB_SUCCESS, inner_conn->start_transaction(OB_SYS_TENANT_ID)); constexpr int64_t LEN = 128; char buffer[LEN]; int64_t pos = 0; int64_t magic_number = 12345; ASSERT_EQ(OB_SUCCESS, serialization::encode(buffer, LEN, pos, magic_number)); ASSERT_EQ(OB_SUCCESS, inner_conn->register_multi_data_source(OB_SYS_TENANT_ID, share::ObLSID(1), transaction::ObTxDataSourceType::TEST1, buffer, pos)); ASSERT_EQ(OB_SUCCESS, inner_conn->commit()); } TEST_F(TestMdsTransactionTest, add_tenant) { // 创建普通租户tt1 ASSERT_EQ(OB_SUCCESS, create_tenant()); // 获取租户tt1的tenant_id ASSERT_EQ(OB_SUCCESS, get_tenant_id(RunCtx.tenant_id_)); ASSERT_NE(0, RunCtx.tenant_id_); // 初始化普通租户tt1的sql proxy ASSERT_EQ(OB_SUCCESS, get_curr_simple_server().init_sql_proxy2()); } TEST_F(TestMdsTransactionTest, write_mds_table) { std::this_thread::sleep_for(std::chrono::seconds(5)); common::ObMySQLProxy *sql_proxy = GCTX.ddl_sql_proxy_; sqlclient::ObISQLConnection *connection = nullptr; ASSERT_EQ(OB_SUCCESS, sql_proxy->acquire(connection)); observer::ObInnerSQLConnection *inner_conn = dynamic_cast(connection); ASSERT_EQ(OB_SUCCESS, inner_conn->start_transaction(RunCtx.tenant_id_)); constexpr int64_t LEN = 128; char buffer[LEN]; int64_t pos = 0; int64_t magic_number = 12345; ASSERT_EQ(OB_SUCCESS, serialization::encode(buffer, LEN, pos, magic_number)); ASSERT_EQ(OB_SUCCESS, inner_conn->register_multi_data_source(RunCtx.tenant_id_, share::ObLSID(1), transaction::ObTxDataSourceType::TEST3, buffer, pos)); ASSERT_EQ(OB_SUCCESS, inner_conn->register_multi_data_source(RunCtx.tenant_id_, share::ObLSID(1001), transaction::ObTxDataSourceType::TEST3, buffer, pos)); // ASSERT_EQ(OB_SUCCESS, inner_conn->commit()); // ASSERT_EQ(OB_SUCCESS, inner_conn->rollback()); ASSERT_EQ(OB_SUCCESS, inner_conn->commit()); // int64_t val = 0; // ASSERT_EQ(OB_SUCCESS, TestMdsTable.get_snapshot([&val](const ExampleUserData1 &data) { // val = data.value_; // return OB_SUCCESS; // }, share::SCN::max_scn())); // ASSERT_EQ(magic_number, val); } TEST_F(TestMdsTransactionTest, test_for_each_kv_in_unit_in_tablet) { int ret = OB_SUCCESS; ObMediumCompactionInfo data_1, data_2, data_3, data_11; ObMediumCompactionInfo &data_1_ref = data_1; data_1.data_version_ = 1; data_2.data_version_ = 2; data_3.data_version_ = 3; data_11.data_version_ = 11; MTL_SWITCH(OB_SYS_TENANT_ID) { int64_t _; // 1. 新建一个tablet ASSERT_EQ(OB_SUCCESS, GCTX.ddl_sql_proxy_->write(OB_SYS_TENANT_ID, "create table test_mds_table(a int)", _)); // 2. 从表名拿到它的tablet_id ObTabletID tablet_id; ASSERT_EQ(OB_SUCCESS, ObTableAccessHelper::read_single_row(OB_SYS_TENANT_ID, {"tablet_id"}, OB_ALL_TABLE_TNAME, "where table_name = 'test_mds_table'", tablet_id)); // 3. 从tablet_id拿到它的ls_id ObLSID ls_id; char where_condition[512] = { 0 }; databuff_printf(where_condition, 512, "where tablet_id = %ld", tablet_id.id()); ASSERT_EQ(OB_SUCCESS, ObTableAccessHelper::read_single_row(OB_SYS_TENANT_ID, {"ls_id"}, OB_ALL_TABLET_TO_LS_TNAME, where_condition, ls_id)); // 4. 从ls_id找到ls storage::ObLSHandle ls_handle; ASSERT_EQ(OB_SUCCESS, MTL(storage::ObLSService *)->get_ls(ls_id, ls_handle, ObLSGetMod::TRANS_MOD)); // 5. 从LS找到tablet结构 storage::ObTabletHandle tablet_handle; ASSERT_EQ(OB_SUCCESS, ls_handle.get_ls()->get_tablet(tablet_id, tablet_handle)); // 6. 调用tablet接口第一次写入多源数据,提交 MdsCtx ctx1(mds::MdsWriter(ObTransID(1))); ASSERT_EQ(OB_SUCCESS, tablet_handle.get_obj()->set(ObMediumCompactionInfoKey(1), data_1_ref, ctx1)); ASSERT_EQ(OB_SUCCESS, tablet_handle.get_obj()->set(ObMediumCompactionInfoKey(3), data_3, ctx1)); ctx1.single_log_commit(mock_scn(10), mock_scn(10)); // 7. 调用tablet接口第二次写入多源数据,不提交 MdsCtx ctx2(mds::MdsWriter(ObTransID(2))); ASSERT_EQ(OB_SUCCESS, tablet_handle.get_obj()->set(ObMediumCompactionInfoKey(1), data_11, ctx2)); ASSERT_EQ(OB_SUCCESS, tablet_handle.get_obj()->set(ObMediumCompactionInfoKey(2), data_2, ctx2)); } }// ctx2 is auto aborted, but print ERROR log TEST_F(TestMdsTransactionTest, test_mds_table_gc_and_recycle) { MDS_LOG(INFO, "test_mds_table_gc_and_recycle"); int ret = OB_SUCCESS; ObTabletBindingMdsUserData data_to_write; data_to_write.schema_version_ = 1; data_to_write.snapshot_version_ = 1; data_to_write.data_tablet_id_ = ObTabletID(2); data_to_write.hidden_tablet_id_ = ObTabletID(3); data_to_write.lob_meta_tablet_id_ = ObTabletID(4); data_to_write.lob_piece_tablet_id_ = ObTabletID(5); MTL_SWITCH(OB_SYS_TENANT_ID) { int64_t _; // 1. 新建一个tablet ASSERT_EQ(OB_SUCCESS, GCTX.ddl_sql_proxy_->write(OB_SYS_TENANT_ID, "create table test_mds_table2(a int)", _)); // 2. 从表名拿到它的tablet_id ASSERT_EQ(OB_SUCCESS, ObTableAccessHelper::read_single_row(OB_SYS_TENANT_ID, {"tablet_id"}, OB_ALL_TABLE_TNAME, "where table_name = 'test_mds_table2'", tablet_id)); // 3. 从tablet_id拿到它的ls_id ObLSID ls_id; char where_condition[512] = { 0 }; databuff_printf(where_condition, 512, "where tablet_id = %ld", tablet_id.id()); ASSERT_EQ(OB_SUCCESS, ObTableAccessHelper::read_single_row(OB_SYS_TENANT_ID, {"ls_id"}, OB_ALL_TABLET_TO_LS_TNAME, where_condition, ls_id)); // 4. 从ls_id找到ls storage::ObLSHandle ls_handle; ASSERT_EQ(OB_SUCCESS, MTL(storage::ObLSService *)->get_ls(ls_id, ls_handle, ObLSGetMod::TRANS_MOD)); // 5. 从LS找到tablet结构1 storage::ObTabletHandle tablet_handle; ASSERT_EQ(OB_SUCCESS, ls_handle.get_ls()->get_tablet(tablet_id, tablet_handle)); // 6. 调用tablet接口写入多源数据,提交 MdsCtx ctx1(mds::MdsWriter(ObTransID(1))); share::SCN rec_scn; ASSERT_EQ(OB_SUCCESS, tablet_handle.get_obj()->set(data_to_write, ctx1)); // ASSERT_EQ(OB_SUCCESS, static_cast(tablet_handle.get_obj()->pointer_hdl_.get_resource_ptr())->mds_table_handler_.mds_table_handle_.get_rec_scn(rec_scn)); // ASSERT_EQ(share::SCN::max_scn(), rec_scn); ctx1.single_log_commit(mock_scn(10), mock_scn(10000000)); ASSERT_EQ(true, static_cast(tablet_handle.get_obj()->pointer_hdl_.get_resource_ptr())->mds_table_handler_.mds_table_handle_.is_valid()); // ASSERT_EQ(OB_SUCCESS, static_cast(tablet_handle.get_obj()->pointer_hdl_.get_resource_ptr())->mds_table_handler_.mds_table_handle_.get_rec_scn(rec_scn)); // ASSERT_EQ(mock_scn(10), rec_scn); std::this_thread::sleep_for(std::chrono::seconds(5)); share::SCN max_decided_scn; ASSERT_EQ(OB_SUCCESS, ls_handle.get_ls()->get_max_decided_scn(max_decided_scn)); ASSERT_EQ(OB_SUCCESS, tablet_handle.get_obj()->mds_table_flush(max_decided_scn)); // 7. 检查mds table的存在情况 std::this_thread::sleep_for(std::chrono::seconds(5)); ASSERT_EQ(true, static_cast(tablet_handle.get_obj()->pointer_hdl_.get_resource_ptr())->mds_table_handler_.mds_table_handle_.is_valid()); ASSERT_EQ(OB_SUCCESS, static_cast(tablet_handle.get_obj()->pointer_hdl_.get_resource_ptr())->mds_table_handler_.mds_table_handle_.get_rec_scn(rec_scn)); ASSERT_EQ(share::SCN::max_scn(), rec_scn); MDS_LOG(INFO, "change mock_tablet_oldest_scn", K(tablet_id)); mock_tablet_oldest_scn = unittest::mock_scn(2074916885902668817); std::this_thread::sleep_for(std::chrono::seconds(15)); ASSERT_EQ(false, static_cast(tablet_handle.get_obj()->pointer_hdl_.get_resource_ptr())->mds_table_handler_.mds_table_handle_.is_valid()); ASSERT_EQ(OB_SUCCESS, ls_handle.get_ls()->get_tablet(tablet_id, tablet_handle));// 重新获取一下tablet handle ASSERT_EQ(OB_SUCCESS, (tablet_handle.get_obj()->get_mds_data_from_tablet(mds::DummyKey(), share::SCN::max_scn(), 1_s, [&data_to_write](const ObTabletBindingMdsUserData &data_to_read) -> int { OB_ASSERT(data_to_write.schema_version_ == data_to_read.schema_version_); return OB_SUCCESS; }))); mock_tablet_oldest_scn = unittest::mock_scn(1000); } } TEST_F(TestMdsTransactionTest, test_mds_table_get_tablet_status_transfer_in_written_state) { MDS_LOG(INFO, "test_mds_table_get_tablet_status_transfer_in_written_state"); int ret = OB_SUCCESS; bool written = false; ObTabletCreateDeleteMdsUserData data_to_write; data_to_write.tablet_status_ = ObTabletStatus::TRANSFER_IN; data_to_write.create_commit_scn_ = mock_scn(10); data_to_write.create_commit_version_ = 10; data_to_write.transfer_ls_id_ = ObLSID(10); data_to_write.transfer_scn_ = mock_scn(10); MTL_SWITCH(OB_SYS_TENANT_ID) { int64_t _; // 1. 新建一个tablet ASSERT_EQ(OB_SUCCESS, GCTX.ddl_sql_proxy_->write(OB_SYS_TENANT_ID, "create table test_mds_table3(a int)", _)); // 2. 从表名拿到它的tablet_id ASSERT_EQ(OB_SUCCESS, ObTableAccessHelper::read_single_row(OB_SYS_TENANT_ID, {"tablet_id"}, OB_ALL_TABLE_TNAME, "where table_name = 'test_mds_table3'", tablet_id)); // 3. 从tablet_id拿到它的ls_id ObLSID ls_id; char where_condition[512] = { 0 }; databuff_printf(where_condition, 512, "where tablet_id = %ld", tablet_id.id()); ASSERT_EQ(OB_SUCCESS, ObTableAccessHelper::read_single_row(OB_SYS_TENANT_ID, {"ls_id"}, OB_ALL_TABLET_TO_LS_TNAME, where_condition, ls_id)); // 4. 从ls_id找到ls storage::ObLSHandle ls_handle; ASSERT_EQ(OB_SUCCESS, MTL(storage::ObLSService *)->get_ls(ls_id, ls_handle, ObLSGetMod::TRANS_MOD)); // 5. 从LS找到tablet结构 storage::ObTabletHandle tablet_handle; ASSERT_EQ(OB_SUCCESS, ls_handle.get_ls()->get_tablet(tablet_id, tablet_handle)); // 6. 调用tablet接口写入多源数据,提交 share::SCN max_decided_scn; ASSERT_EQ(OB_SUCCESS, ls_handle.get_ls()->get_max_decided_scn(max_decided_scn)); MdsCtx ctx1(mds::MdsWriter(ObTransID(1))); share::SCN rec_scn; ASSERT_EQ(OB_STATE_NOT_MATCH, tablet_handle.get_obj()->check_transfer_in_redo_written(written));// 这个时候因为tablet status不是TRANSFER IN所以查不出来 ASSERT_EQ(OB_SUCCESS, tablet_handle.get_obj()->set(data_to_write, ctx1)); ASSERT_EQ(OB_SUCCESS, tablet_handle.get_obj()->check_transfer_in_redo_written(written));// 这个时候tablet status是TRANSFER IN, 但事务还没写日志,所以可以查出结果,但结果是false ASSERT_EQ(false, written); ctx1.single_log_commit(max_decided_scn, max_decided_scn); ASSERT_EQ(OB_SUCCESS, tablet_handle.get_obj()->check_transfer_in_redo_written(written));// 这个时候tablet status是TRANSFER IN, 并且事务已经提交,所以可以查出结果,并且结果是true ASSERT_EQ(true, written); ASSERT_EQ(true, static_cast(tablet_handle.get_obj()->pointer_hdl_.get_resource_ptr())->mds_table_handler_.mds_table_handle_.is_valid()); std::this_thread::sleep_for(std::chrono::seconds(5)); ASSERT_EQ(OB_SUCCESS, tablet_handle.get_obj()->mds_table_flush(max_decided_scn)); // 7. 检查mds table的存在情况 std::this_thread::sleep_for(std::chrono::seconds(5)); MDS_LOG(INFO, "print tablet id", K(tablet_id)); ASSERT_EQ(true, static_cast(tablet_handle.get_obj()->pointer_hdl_.get_resource_ptr())->mds_table_handler_.mds_table_handle_.is_valid()); ASSERT_EQ(OB_SUCCESS, static_cast(tablet_handle.get_obj()->pointer_hdl_.get_resource_ptr())->mds_table_handler_.mds_table_handle_.get_rec_scn(rec_scn)); ASSERT_EQ(share::SCN::max_scn(), rec_scn); MDS_LOG(INFO, "change mock_tablet_oldest_scn", K(tablet_id)); mock_tablet_oldest_scn = unittest::mock_scn(2074916885902668817); std::this_thread::sleep_for(std::chrono::seconds(15)); ASSERT_EQ(false, static_cast(tablet_handle.get_obj()->pointer_hdl_.get_resource_ptr())->mds_table_handler_.mds_table_handle_.is_valid());// mds table已经释放 ASSERT_EQ(OB_SUCCESS, ls_handle.get_ls()->get_tablet(tablet_id, tablet_handle));// 重新获取一下tablet handle ASSERT_EQ(OB_SUCCESS, tablet_handle.get_obj()->check_transfer_in_redo_written(written));// 这个时候tablet status是TRANSFER IN, 并且事务已经提交,所以可以查出结果,并且结果是true ASSERT_EQ(true, written); mock_tablet_oldest_scn = unittest::mock_scn(1000); } } TEST_F(TestMdsTransactionTest, end) { if (RunCtx.time_sec_ > 0) { ::sleep(RunCtx.time_sec_); } } } // end unittest } // end oceanbase int main(int argc, char **argv) { int c = 0; int time_sec = 0; char *log_level = (char*)"WDIAG"; while(EOF != (c = getopt(argc,argv,"t:l:"))) { switch(c) { case 't': time_sec = atoi(optarg); break; case 'l': log_level = optarg; oceanbase::unittest::ObSimpleClusterTestBase::enable_env_warn_log_ = false; break; default: break; } } oceanbase::unittest::init_log_and_gtest(argc, argv); OB_LOGGER.set_log_level(log_level); LOG_INFO("main>>>"); oceanbase::unittest::RunCtx.time_sec_ = time_sec; ::testing::InitGoogleTest(&argc, argv); return RUN_ALL_TESTS(); } #endif