// owner: gengli.wzy // owner group: transaction /** * Copyright (c) 2021 OceanBase * OceanBase CE is licensed under Mulan PubL v2. * You can use this software according to the terms and conditions of the Mulan PubL v2. * You may obtain a copy of Mulan PubL v2 at: * http://license.coscl.org.cn/MulanPubL-2.0 * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, * EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, * MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE. * See the Mulan PubL v2 for more details. */ #include #define USING_LOG_PREFIX SERVER #define protected public #define private public #include "env/ob_simple_cluster_test_base.h" #include "mittest/env/ob_simple_server_helper.h" namespace oceanbase { namespace unittest { using namespace oceanbase::transaction; using namespace oceanbase::storage; #define EQ(x, y) GTEST_ASSERT_EQ(x, y); #define NEQ(x, y) GTEST_ASSERT_NE(x, y); #define LE(x, y) GTEST_ASSERT_LE(x, y); #define GE(x, y) GTEST_ASSERT_GE(x, y); class TestRunCtx { public: uint64_t tenant_id_ = 0; int64_t time_sec_ = 0; }; TestRunCtx R; class ObTxDataTest : public ObSimpleClusterTestBase { public: // 指定case运行目录前缀 test_ob_simple_cluster_ ObTxDataTest() : ObSimpleClusterTestBase("test_tx_data_", "50G", "50G") {} }; TEST_F(ObTxDataTest, observer_start) { SERVER_LOG(INFO, "observer_start succ"); } // 创建租户并不轻量,看场景必要性使用 TEST_F(ObTxDataTest, add_tenant) { // 创建普通租户tt1 ASSERT_EQ(OB_SUCCESS, create_tenant("tt1", "40G", "40G", false, 10)); // 获取租户tt1的tenant_id ASSERT_EQ(OB_SUCCESS, get_tenant_id(R.tenant_id_)); ASSERT_NE(0, R.tenant_id_); // 初始化普通租户tt1的sql proxy ASSERT_EQ(OB_SUCCESS, get_curr_simple_server().init_sql_proxy2()); } TEST_F(ObTxDataTest, create_new_ls) { // 在单节点ObServer下创建新的日志流, 注意避免被RS任务GC掉 EQ(0, SSH::create_ls(R.tenant_id_, get_curr_observer().self_addr_)); int64_t ls_count = 0; EQ(0, SSH::g_select_int64(R.tenant_id_, "select count(ls_id) as val from __all_ls where ls_id!=1", ls_count)); EQ(2, ls_count); } TEST_F(ObTxDataTest, rollback_to) { common::ObMySQLProxy &sql_proxy = get_curr_simple_server().get_sql_proxy2(); int64_t affected_rows; EQ(0, sql_proxy.write("drop table if exists stu1", affected_rows)); EQ(0, sql_proxy.write("create table stu1(col1 int)", affected_rows)); sqlclient::ObISQLConnection *conn1 = NULL; EQ(0, sql_proxy.acquire(conn1)); EQ(0, SSH::write(conn1, "set autocommit=0", affected_rows)); EQ(0, SSH::write(conn1, "insert into stu1 values(100)", affected_rows)); ObTransID tx_id; EQ(0, SSH::find_tx(conn1, tx_id)); LOGI("find_tx:%ld", tx_id.get_id()); EQ(0, SSH::write(conn1, "savepoint sp1")); EQ(0, SSH::write(conn1, "insert into stu1 values(200)", affected_rows)); EQ(0, SSH::write(conn1, "rollback to sp1")); int64_t val = 0; EQ(0, SSH::select_int64(conn1, "select sum(col1) val from stu1", val)); EQ(100, val); EQ(0, SSH::write(conn1, "commit")); } TEST_F(ObTxDataTest, rollback_to_with_redo) { common::ObMySQLProxy &sql_proxy = get_curr_simple_server().get_sql_proxy2(); int64_t affected_rows; EQ(0, sql_proxy.write("drop table if exists stu1", affected_rows)); EQ(0, sql_proxy.write("create table stu1(col1 int)", affected_rows)); sqlclient::ObISQLConnection *conn1 = NULL; EQ(0, sql_proxy.acquire(conn1)); EQ(0, SSH::write(conn1, "set autocommit=0", affected_rows)); EQ(0, SSH::write(conn1, "insert into stu1 values(100)", affected_rows)); ObTransID tx_id; EQ(0, SSH::find_tx(conn1, tx_id)); LOGI("find_tx:%ld", tx_id.get_id()); EQ(0, SSH::write(conn1, "savepoint sp1")); EQ(0, SSH::write(conn1, "insert into stu1 values(200)", affected_rows)); ObLSID loc1; EQ(0, SSH::select_table_loc(R.tenant_id_, "stu1", loc1)); // when tx has redo, rollback to need write log EQ(0, SSH::submit_redo(R.tenant_id_, loc1)); EQ(0, SSH::write(conn1, "rollback to sp1")); int64_t val = 0; EQ(0, SSH::select_int64(conn1, "select sum(col1) val from stu1", val)); EQ(100, val); EQ(0, SSH::write(conn1, "commit")); } TEST_F(ObTxDataTest, rollback_to_with_read_sstable_uncommit) { common::ObMySQLProxy &sql_proxy = get_curr_simple_server().get_sql_proxy2(); int64_t affected_rows; EQ(0, sql_proxy.write("drop table if exists stu1", affected_rows)); EQ(0, sql_proxy.write("create table stu1(col1 int)", affected_rows)); sqlclient::ObISQLConnection *conn1 = NULL; EQ(0, sql_proxy.acquire(conn1)); EQ(0, SSH::write(conn1, "set autocommit=0", affected_rows)); EQ(0, SSH::write(conn1, "insert into stu1 values(100)", affected_rows)); ObTransID tx_id; EQ(0, SSH::find_tx(conn1, tx_id)); LOGI("find_tx:%ld", tx_id.get_id()); EQ(0, SSH::write(conn1, "savepoint sp1")); EQ(0, SSH::write(conn1, "insert into stu1 values(200)", affected_rows)); ObLSID loc1; EQ(0, SSH::select_table_loc(R.tenant_id_, "stu1", loc1)); // when tx has redo, rollback to need write log EQ(0, SSH::submit_redo(R.tenant_id_, loc1)); EQ(0, SSH::write(conn1, "rollback to sp1")); int64_t val = 0; EQ(0, SSH::select_int64(conn1, "select sum(col1) val from stu1", val)); EQ(100, val); EQ(0, sql_proxy.write("alter system minor freeze", affected_rows)); EQ(0, SSH::wait_checkpoint_newest(R.tenant_id_, loc1)); // read from sstable uncommit row EQ(0, SSH::select_int64(conn1, "select sum(col1) val from stu1", val)); EQ(100, val); EQ(0, SSH::write(conn1, "commit")); EQ(0, SSH::select_int64(sql_proxy, "select sum(col1) as val from stu1",val)); EQ(100, val); } TEST_F(ObTxDataTest, rollback_to_with_ls_replay) { common::ObMySQLProxy &sql_proxy = get_curr_simple_server().get_sql_proxy2(); int64_t affected_rows; EQ(0, sql_proxy.write("drop table if exists stu1", affected_rows)); EQ(0, sql_proxy.write("create table stu1(col1 int)", affected_rows)); sqlclient::ObISQLConnection *conn1 = NULL; EQ(0, sql_proxy.acquire(conn1)); EQ(0, SSH::write(conn1, "set autocommit=0", affected_rows)); EQ(0, SSH::write(conn1, "insert into stu1 values(100)", affected_rows)); ObTransID tx_id; EQ(0, SSH::find_tx(conn1, tx_id)); LOGI("find_tx:%ld", tx_id.get_id()); EQ(0, SSH::write(conn1, "savepoint sp1")); EQ(0, SSH::write(conn1, "insert into stu1 values(200)", affected_rows)); ObLSID loc1; EQ(0, SSH::select_table_loc(R.tenant_id_, "stu1", loc1)); // when tx has redo, rollback to need write log EQ(0, SSH::submit_redo(R.tenant_id_, loc1)); EQ(0, SSH::write(conn1, "rollback to sp1")); int64_t val = 0; EQ(0, SSH::select_int64(conn1, "select sum(col1) val from stu1", val)); EQ(100, val); EQ(0, sql_proxy.write("alter system minor freeze", affected_rows)); EQ(0, SSH::wait_checkpoint_newest(R.tenant_id_, loc1)); // read from sstable uncommit row EQ(0, SSH::select_int64(conn1, "select sum(col1) val from stu1", val)); EQ(100, val); LOGI("ls_reboot:%ld", loc1.id()); // tx has not commit, tx ctx recover from tx_sstable EQ(0, SSH::ls_reboot(R.tenant_id_, loc1)); EQ(0, SSH::write(conn1, "commit")); EQ(0, SSH::select_int64(sql_proxy, "select sum(col1) as val from stu1",val)); EQ(100, val); } TEST_F(ObTxDataTest, rollback_to_with_ls_replay_from_middle) { common::ObMySQLProxy &sql_proxy = get_curr_simple_server().get_sql_proxy2(); int64_t affected_rows; EQ(0, sql_proxy.write("drop table if exists stu1", affected_rows)); EQ(0, sql_proxy.write("create table stu1(col1 int)", affected_rows)); sqlclient::ObISQLConnection *conn1 = NULL; EQ(0, sql_proxy.acquire(conn1)); EQ(0, SSH::write(conn1, "set autocommit=0", affected_rows)); EQ(0, SSH::write(conn1, "insert into stu1 values(100)", affected_rows)); ObTransID tx_id; EQ(0, SSH::find_tx(conn1, tx_id)); LOGI("find_tx:%ld", tx_id.get_id()); EQ(0, SSH::write(conn1, "savepoint sp1")); EQ(0, SSH::write(conn1, "insert into stu1 values(200)", affected_rows)); ObLSID loc1; EQ(0, SSH::select_table_loc(R.tenant_id_, "stu1", loc1)); // when tx has redo, rollback to need write log EQ(0, SSH::submit_redo(R.tenant_id_, loc1)); EQ(0, sql_proxy.write("alter system minor freeze", affected_rows)); EQ(0, SSH::wait_checkpoint_newest(R.tenant_id_, loc1)); EQ(0, SSH::write(conn1, "rollback to sp1")); int64_t val = 0; EQ(0, SSH::select_int64(conn1, "select sum(col1) val from stu1", val)); EQ(100, val); EQ(0, SSH::write(conn1, "commit")); // make tx_ctx checkpoint EQ(0, SSH::freeze_tx_ctx(R.tenant_id_, loc1)); LOGI("ls_reboot:%ld", loc1.id()); EQ(0, SSH::ls_reboot(R.tenant_id_, loc1)); EQ(0, SSH::select_int64(sql_proxy, "select sum(col1) as val from stu1",val)); EQ(100, val); } TEST_F(ObTxDataTest, retain_ctx) { common::ObMySQLProxy &sql_proxy = get_curr_simple_server().get_sql_proxy2(); int64_t affected_rows = 0; ObMySQLTransaction trans; EQ(0, trans.start(GCTX.sql_proxy_, R.tenant_id_)); observer::ObInnerSQLConnection *conn = static_cast(trans.get_connection()); char buf[10]; ObRegisterMdsFlag flag; ObLSID ls_id1(1001); ObLSID ls_id2(1002); EQ(0, conn->register_multi_data_source(R.tenant_id_, ls_id1, ObTxDataSourceType::TEST3, buf, 10, flag)); EQ(0, SSH::submit_redo(R.tenant_id_, ls_id1)); EQ(0, sql_proxy.write("alter system minor freeze", affected_rows)); EQ(0, SSH::wait_checkpoint_newest(R.tenant_id_, ls_id1)); EQ(0, conn->register_multi_data_source(R.tenant_id_, ls_id2, ObTxDataSourceType::TEST3, buf, 10, flag)); ObTransID tx_id; EQ(0, SSH::g_select_int64(R.tenant_id_, "select trans_id as val from __all_virtual_trans_stat where is_exiting=0 and session_id<=1 limit 1", tx_id.tx_id_)); LOGI("find active_tx tx_id:%ld", tx_id.get_id()); EQ(0, trans.end(true)); // make tx_ctx checkpoint EQ(0, SSH::freeze_tx_ctx(R.tenant_id_, ls_id1)); LOGI("ls_reboot:%ld", ls_id1.id()); EQ(0, SSH::ls_reboot(R.tenant_id_, ls_id1)); EQ(0, SSH::freeze_tx_ctx(R.tenant_id_, ls_id2)); LOGI("ls_reboot:%ld", ls_id2.id()); EQ(0, SSH::ls_reboot(R.tenant_id_, ls_id2)); } TEST_F(ObTxDataTest, retain_ctx2) { common::ObMySQLProxy &sql_proxy = get_curr_simple_server().get_sql_proxy2(); int64_t affected_rows = 0; EQ(0, sql_proxy.write("drop table if exists stu1", affected_rows)); EQ(0, sql_proxy.write("create table stu1(col1 int)", affected_rows)); ObMySQLTransaction trans; EQ(0, trans.start(GCTX.sql_proxy_, R.tenant_id_)); observer::ObInnerSQLConnection *conn = static_cast(trans.get_connection()); char buf[10]; ObRegisterMdsFlag flag; ObLSID ls_id1(1001); EQ(0, conn->register_multi_data_source(R.tenant_id_, ls_id1, ObTxDataSourceType::TEST3, buf, 10, flag)); EQ(0, SSH::submit_redo(R.tenant_id_, ls_id1)); EQ(0, sql_proxy.write("alter system minor freeze", affected_rows)); EQ(0, SSH::wait_checkpoint_newest(R.tenant_id_, ls_id1)); EQ(0, sql_proxy.write("insert into stu1 values(100)", affected_rows)); EQ(0, conn->register_multi_data_source(R.tenant_id_, ls_id1, ObTxDataSourceType::TEST3, buf, 10, flag)); ObTransID tx_id; EQ(0, SSH::g_select_int64(R.tenant_id_, "select trans_id as val from __all_virtual_trans_stat where is_exiting=0 and session_id<=1 limit 1", tx_id.tx_id_)); LOGI("find active_tx tx_id:%ld", tx_id.get_id()); EQ(0, trans.end(true)); // make tx_ctx checkpoint EQ(0, SSH::freeze_tx_ctx(R.tenant_id_, ls_id1)); // make tx_data checkpoint EQ(0, SSH::freeze_tx_data(R.tenant_id_, ls_id1)); LOGI("ls_reboot:%ld", ls_id1.id()); EQ(0, SSH::ls_reboot(R.tenant_id_, ls_id1)); } TEST_F(ObTxDataTest, end) { if (R.time_sec_ > 0) { ::sleep(R.time_sec_); } } } // end unittest } // end oceanbase int main(int argc, char **argv) { int64_t c = 0; int64_t time_sec = 0; char *log_level = (char*)"INFO"; while(EOF != (c = getopt(argc,argv,"t:l:"))) { switch(c) { case 't': time_sec = atoi(optarg); break; case 'l': log_level = optarg; oceanbase::unittest::ObSimpleClusterTestBase::enable_env_warn_log_ = false; break; default: break; } } oceanbase::unittest::init_log_and_gtest(argc, argv); OB_LOGGER.set_log_level(log_level); LOG_INFO("main>>>"); oceanbase::unittest::R.time_sec_ = time_sec; ::testing::InitGoogleTest(&argc, argv); return RUN_ALL_TESTS(); }