diff --git a/mittest/env/ob_simple_server_helper.cpp b/mittest/env/ob_simple_server_helper.cpp index 843142c24..b8b263fc7 100644 --- a/mittest/env/ob_simple_server_helper.cpp +++ b/mittest/env/ob_simple_server_helper.cpp @@ -187,6 +187,25 @@ int SimpleServerHelper::select_int64(sqlclient::ObISQLConnection *conn, const ch return ret; } +int SimpleServerHelper::select_int64(ObMySQLTransaction &trans, uint64_t tenant_id, const char *sql, int64_t &val) +{ + int ret = OB_SUCCESS; + SMART_VAR(ObMySQLProxy::MySQLResult, res) { + if (OB_FAIL(trans.read(res, tenant_id, sql))) { + } else { + sqlclient::ObMySQLResult *result = res.get_result(); + if (result == nullptr) { + ret = OB_ENTRY_NOT_EXIST; + } else if (OB_FAIL(result->next())) { + } else if (OB_FAIL(result->get_int("val", val))) { + } + } + } + if (OB_FAIL(ret)) { + LOG_WARN("select failed", KR(ret), K(sql)); + } + return ret; +} int SimpleServerHelper::g_select_varchar(uint64_t tenant_id, const char *sql, ObString &val) { int ret = OB_SUCCESS; diff --git a/mittest/env/ob_simple_server_helper.h b/mittest/env/ob_simple_server_helper.h index 6bf4bcbf8..45a463b81 100644 --- a/mittest/env/ob_simple_server_helper.h +++ b/mittest/env/ob_simple_server_helper.h @@ -39,6 +39,7 @@ public: static int g_select_uint64(uint64_t tenant_id, const char *sql, uint64_t &val); static int select_int64(sqlclient::ObISQLConnection *conn, const char *sql, int64_t &val); + static int select_int64(ObMySQLTransaction &trans, uint64_t tenant_id, const char *sql, int64_t &val); static int select_varchar(sqlclient::ObISQLConnection *conn, const char *sql, ObString &val); static int g_select_varchar(uint64_t tenant_id, const char *sql, ObString &val); diff --git a/mittest/mtlenv/storage/test_memtable_v2.cpp b/mittest/mtlenv/storage/test_memtable_v2.cpp index b6b243658..083236b8d 100644 --- a/mittest/mtlenv/storage/test_memtable_v2.cpp +++ b/mittest/mtlenv/storage/test_memtable_v2.cpp @@ -259,7 +259,8 @@ public: ObPartTransCtx *tx_ctx = store_ctx->mvcc_acc_ctx_.tx_ctx_; ObTxDataGuard tx_data_guard; EXPECT_EQ(OB_SUCCESS, tx_ctx->ls_tx_ctx_mgr_->get_tx_table()->alloc_tx_data(tx_data_guard)); - EXPECT_EQ(OB_SUCCESS, tx_ctx->insert_undo_action_to_tx_table_(undo, tx_data_guard, SCN::min_scn())); + ObUndoStatusNode *undo_node = NULL; + EXPECT_EQ(OB_SUCCESS, tx_ctx->insert_undo_action_to_tx_table_(undo, tx_data_guard, undo_node, SCN::min_scn())); ObMemtableCtx *mt_ctx = store_ctx->mvcc_acc_ctx_.mem_ctx_; ObTxCallbackList &cb_list = mt_ctx->trans_mgr_.callback_list_; for (ObMvccRowCallback *iter = (ObMvccRowCallback *)(cb_list.get_guard()->get_next()); diff --git a/mittest/simple_server/rewrite_function_for_test_big_tx_data.cpp b/mittest/simple_server/rewrite_function_for_test_big_tx_data.cpp index 1e5be9d36..bf4e63cda 100644 --- a/mittest/simple_server/rewrite_function_for_test_big_tx_data.cpp +++ b/mittest/simple_server/rewrite_function_for_test_big_tx_data.cpp @@ -79,7 +79,7 @@ int ObTxTable::check_with_tx_data(ObReadTxDataArg &read_tx_data_arg, ObITxDataCh int ObTxData::add_undo_action(ObTxTable *tx_table, transaction::ObUndoAction &new_undo_action, - ObUndoStatusNode *undo_node) + ObUndoStatusNode *&undo_node) { // STORAGE_LOG(DEBUG, "do add_undo_action"); UNUSED(undo_node); diff --git a/mittest/simple_server/test_transfer_tx.cpp b/mittest/simple_server/test_transfer_tx.cpp index e30a7133b..b903bb6cb 100644 --- a/mittest/simple_server/test_transfer_tx.cpp +++ b/mittest/simple_server/test_transfer_tx.cpp @@ -54,11 +54,11 @@ public: TestRunCtx R; -class ObSimpleClusterExampleTest : public ObSimpleClusterTestBase +class ObTransferTx : public ObSimpleClusterTestBase { public: // 指定case运行目录前缀 test_ob_simple_cluster_ - ObSimpleClusterExampleTest() : ObSimpleClusterTestBase("test_transfer_tx_", "50G", "50G") {} + ObTransferTx() : ObSimpleClusterTestBase("test_transfer_tx_", "50G", "50G") {} int do_balance(uint64_t tenant_id); private: int do_balance_inner_(uint64_t tenant_id); @@ -66,7 +66,7 @@ private: int wait_balance_clean(uint64_t tenant_id); }; -int ObSimpleClusterExampleTest::do_balance_inner_(uint64_t tenant_id) +int ObTransferTx::do_balance_inner_(uint64_t tenant_id) { int ret = OB_SUCCESS; static std::mutex mutex; @@ -102,7 +102,7 @@ int ObSimpleClusterExampleTest::do_balance_inner_(uint64_t tenant_id) return ret; } -int ObSimpleClusterExampleTest::wait_balance_clean(uint64_t tenant_id) +int ObTransferTx::wait_balance_clean(uint64_t tenant_id) { int ret = OB_SUCCESS; int64_t begin_time = ObTimeUtil::current_time(); @@ -134,7 +134,7 @@ int ObSimpleClusterExampleTest::wait_balance_clean(uint64_t tenant_id) return ret; } -int ObSimpleClusterExampleTest::do_balance(uint64_t tenant_id) +int ObTransferTx::do_balance(uint64_t tenant_id) { int ret = OB_SUCCESS; if (OB_FAIL(do_balance_inner_(tenant_id))) { @@ -143,7 +143,7 @@ int ObSimpleClusterExampleTest::do_balance(uint64_t tenant_id) return ret; } -int ObSimpleClusterExampleTest::do_transfer_start_abort(uint64_t tenant_id, ObLSID dest_ls_id, ObLSID src_ls_id, ObTransferTabletInfo tablet_info) +int ObTransferTx::do_transfer_start_abort(uint64_t tenant_id, ObLSID dest_ls_id, ObLSID src_ls_id, ObTransferTabletInfo tablet_info) { int ret = OB_SUCCESS; MTL_SWITCH(tenant_id) { @@ -184,15 +184,11 @@ int ObSimpleClusterExampleTest::do_transfer_start_abort(uint64_t tenant_id, ObLS return ret; } -TEST_F(ObSimpleClusterExampleTest, observer_start) +TEST_F(ObTransferTx, prepare) { - LOG_INFO("observer_start succ"); LOGI("observer start"); -} -// 创建租户并不轻量,看场景必要性使用 -TEST_F(ObSimpleClusterExampleTest, add_tenant) -{ + LOGI("create tenant begin"); // 创建普通租户tt1 EQ(OB_SUCCESS, create_tenant("tt1", "40G", "40G", false, 10)); // 获取租户tt1的tenant_id @@ -200,18 +196,9 @@ TEST_F(ObSimpleClusterExampleTest, add_tenant) ASSERT_NE(0, R.tenant_id_); // 初始化普通租户tt1的sql proxy EQ(OB_SUCCESS, get_curr_simple_server().init_sql_proxy2()); -} - -/* -TEST_F(ObSimpleClusterExampleTest, delete_tenant) -{ - EQ(OB_SUCCESS, delete_tenant()); -} -*/ - -TEST_F(ObSimpleClusterExampleTest, worker) -{ int tenant_id = R.tenant_id_; + LOGI("create tenant finish"); + R.worker_ = std::thread([this, tenant_id] () { int ret = OB_SUCCESS; lib::set_thread_name_inner("MY_BALANCE"); @@ -223,10 +210,7 @@ TEST_F(ObSimpleClusterExampleTest, worker) ::sleep(3); } }); -} -TEST_F(ObSimpleClusterExampleTest, create_new_ls) -{ // 在单节点ObServer下创建新的日志流, 注意避免被RS任务GC掉 EQ(0, SSH::create_ls(R.tenant_id_, get_curr_observer().self_addr_)); int64_t ls_count = 0; @@ -253,7 +237,7 @@ TEST_F(ObSimpleClusterExampleTest, create_new_ls) NEQ(loc1, loc2); \ EQ(0, sql_proxy.write("create tablegroup tg1 sharding='NONE';", affected_rows)); -TEST_F(ObSimpleClusterExampleTest, tx_exit) +TEST_F(ObTransferTx, tx_exit) { TRANSFER_CASE_PREPARE; ObLSID ls_id; @@ -355,7 +339,7 @@ TEST_F(ObSimpleClusterExampleTest, tx_exit) } /* -TEST_F(ObSimpleClusterExampleTest, large_query) +TEST_F(ObTransferTx, large_query) { TRANSFER_CASE_PREPARE; @@ -440,7 +424,7 @@ TEST_F(ObSimpleClusterExampleTest, large_query) } */ -TEST_F(ObSimpleClusterExampleTest, epoch_recover_from_active_info) +TEST_F(ObTransferTx, epoch_recover_from_active_info) { TRANSFER_CASE_PREPARE; @@ -467,7 +451,7 @@ TEST_F(ObSimpleClusterExampleTest, epoch_recover_from_active_info) EQ(0, conn->commit()); } -TEST_F(ObSimpleClusterExampleTest, epoch_recover_from_ctx_checkpoint) +TEST_F(ObTransferTx, epoch_recover_from_ctx_checkpoint) { TRANSFER_CASE_PREPARE; @@ -513,7 +497,7 @@ TEST_F(ObSimpleClusterExampleTest, epoch_recover_from_ctx_checkpoint) EQ(0, commit_ret); } -TEST_F(ObSimpleClusterExampleTest, epoch_recover_from_ctx_checkpoint2) +TEST_F(ObTransferTx, epoch_recover_from_ctx_checkpoint2) { TRANSFER_CASE_PREPARE; @@ -559,7 +543,7 @@ TEST_F(ObSimpleClusterExampleTest, epoch_recover_from_ctx_checkpoint2) } // 空sstable、没有活跃事务 -TEST_F(ObSimpleClusterExampleTest, transfer_empty_tablet) +TEST_F(ObTransferTx, transfer_empty_tablet) { // 关掉observer内部的均衡,防止LS均衡,只调度分区均衡 TRANSFER_CASE_PREPARE; @@ -582,7 +566,7 @@ TEST_F(ObSimpleClusterExampleTest, transfer_empty_tablet) } // sstable有数据,没有活跃事务 transfer -TEST_F(ObSimpleClusterExampleTest, transfer_no_active_tx) +TEST_F(ObTransferTx, transfer_no_active_tx) { TRANSFER_CASE_PREPARE; @@ -609,7 +593,7 @@ TEST_F(ObSimpleClusterExampleTest, transfer_no_active_tx) } // sstable有数据,有活跃事务 transfer -TEST_F(ObSimpleClusterExampleTest, transfer_active_tx) +TEST_F(ObTransferTx, transfer_active_tx) { TRANSFER_CASE_PREPARE; @@ -681,7 +665,7 @@ TEST_F(ObSimpleClusterExampleTest, transfer_active_tx) // transfer active tx A->B B->A -TEST_F(ObSimpleClusterExampleTest, transfer_A_B_AND_B_A) +TEST_F(ObTransferTx, transfer_A_B_AND_B_A) { TRANSFER_CASE_PREPARE; @@ -723,7 +707,7 @@ TEST_F(ObSimpleClusterExampleTest, transfer_A_B_AND_B_A) EQ(600, val); } -TEST_F(ObSimpleClusterExampleTest, transfer_replay) +TEST_F(ObTransferTx, transfer_replay) { TRANSFER_CASE_PREPARE; @@ -772,7 +756,7 @@ TEST_F(ObSimpleClusterExampleTest, transfer_replay) } // sstable有数据,有活跃事务 transfer -TEST_F(ObSimpleClusterExampleTest, transfer_abort_active_tx) +TEST_F(ObTransferTx, transfer_abort_active_tx) { TRANSFER_CASE_PREPARE; @@ -827,7 +811,7 @@ TEST_F(ObSimpleClusterExampleTest, transfer_abort_active_tx) EQ(1300, sum1); } -TEST_F(ObSimpleClusterExampleTest, transfer_resume) +TEST_F(ObTransferTx, transfer_resume) { TRANSFER_CASE_PREPARE; @@ -861,7 +845,7 @@ TEST_F(ObSimpleClusterExampleTest, transfer_resume) } // sstable有数据,有活跃事务,但事务数据丢失不完整 transfer -TEST_F(ObSimpleClusterExampleTest, transfer_query_lost) +TEST_F(ObTransferTx, transfer_query_lost) { TRANSFER_CASE_PREPARE; @@ -906,7 +890,7 @@ TEST_F(ObSimpleClusterExampleTest, transfer_query_lost) } // transfer active tx A->B A->B -TEST_F(ObSimpleClusterExampleTest, transfer_A_B_AND_A_B) +TEST_F(ObTransferTx, transfer_A_B_AND_A_B) { TRANSFER_CASE_PREPARE; @@ -955,7 +939,7 @@ TEST_F(ObSimpleClusterExampleTest, transfer_A_B_AND_A_B) NEQ(0, conn->commit()); } -TEST_F(ObSimpleClusterExampleTest, transfer_AND_ddl) +TEST_F(ObTransferTx, transfer_AND_ddl) { TRANSFER_CASE_PREPARE; @@ -1008,7 +992,7 @@ TEST_F(ObSimpleClusterExampleTest, transfer_AND_ddl) EQ(0, case_err); } -TEST_F(ObSimpleClusterExampleTest, transfer_AND_rollback) +TEST_F(ObTransferTx, transfer_AND_rollback) { TRANSFER_CASE_PREPARE; @@ -1044,9 +1028,16 @@ TEST_F(ObSimpleClusterExampleTest, transfer_AND_rollback) EQ(0, conn->commit()); EQ(0, SSH::select_int64(conn, "select sum(col) as val from stu2", val)); EQ(400, val); + + EQ(0, SSH::freeze_tx_ctx(R.tenant_id_, loc1)); + EQ(0, SSH::freeze_tx_ctx(R.tenant_id_, loc2)); + EQ(0, SSH::freeze_tx_data(R.tenant_id_, loc1)); + EQ(0, SSH::freeze_tx_data(R.tenant_id_, loc2)); + EQ(0, SSH::ls_reboot(R.tenant_id_, loc1)); + EQ(0, SSH::ls_reboot(R.tenant_id_, loc2)); } -TEST_F(ObSimpleClusterExampleTest, transfer_AND_rollback2) +TEST_F(ObTransferTx, transfer_AND_rollback2) { TRANSFER_CASE_PREPARE; @@ -1093,7 +1084,7 @@ TEST_F(ObSimpleClusterExampleTest, transfer_AND_rollback2) EQ(100, val); } -TEST_F(ObSimpleClusterExampleTest, transfer_tx_ctx_merge) +TEST_F(ObTransferTx, transfer_tx_ctx_merge) { TRANSFER_CASE_PREPARE; @@ -1211,11 +1202,15 @@ TEST_F(ObSimpleClusterExampleTest, transfer_tx_ctx_merge) EQ(400, val1); EQ(400, val2); + EQ(0, SSH::freeze_tx_ctx(R.tenant_id_, loc1)); + EQ(0, SSH::freeze_tx_ctx(R.tenant_id_, loc2)); + EQ(0, SSH::freeze_tx_data(R.tenant_id_, loc1)); + EQ(0, SSH::freeze_tx_data(R.tenant_id_, loc2)); EQ(0, SSH::ls_reboot(R.tenant_id_, loc1)); EQ(0, SSH::ls_reboot(R.tenant_id_, loc2)); } -TEST_F(ObSimpleClusterExampleTest, transfer_batch) +TEST_F(ObTransferTx, transfer_batch) { TRANSFER_CASE_PREPARE; sql_proxy.write("alter system set _transfer_start_trans_timeout='5s'",affected_rows); @@ -1249,17 +1244,70 @@ TEST_F(ObSimpleClusterExampleTest, transfer_batch) EQ(0, SSH::select_int64(sql_proxy, "select sum(col) as val from stu2", sum)); EQ(100 * 5000, sum); sql_proxy.write("alter system set _transfer_start_trans_timeout='1s'",affected_rows); + + EQ(0, SSH::freeze_tx_ctx(R.tenant_id_, loc1)); + EQ(0, SSH::freeze_tx_ctx(R.tenant_id_, loc2)); + EQ(0, SSH::freeze_tx_data(R.tenant_id_, loc1)); + EQ(0, SSH::freeze_tx_data(R.tenant_id_, loc2)); + EQ(0, SSH::ls_reboot(R.tenant_id_, loc1)); + EQ(0, SSH::ls_reboot(R.tenant_id_, loc2)); } -TEST_F(ObSimpleClusterExampleTest, transfer_retain_ctx) +/* +TEST_F(ObTransferTx, replay_from_middle) +{ + TRANSFER_CASE_PREPARE; + + sqlclient::ObISQLConnection *conn = NULL; + EQ(0, sql_proxy.acquire(conn)); + EQ(0, SSH::write(conn, "set autocommit=0")); + EQ(0, SSH::write(conn, "insert into test.stu1 values(100)", affected_rows)); + EQ(0, SSH::wait_checkpoint_newest(R.tenant_id_, loc1)); + int64_t val = -1; + EQ(0, SSH::select_int64(conn, "select get_lock('test_lock', 10) val", val)); + EQ(1, val); + EQ(0, conn->commit()); + + EQ(0, SSH::freeze_tx_ctx(R.tenant_id_, loc1)); + EQ(0, SSH::freeze_tx_ctx(R.tenant_id_, loc2)); + EQ(0, SSH::freeze_tx_ctx(R.tenant_id_, ObLSID(1))); + EQ(0, SSH::freeze_tx_data(R.tenant_id_, loc1)); + EQ(0, SSH::freeze_tx_data(R.tenant_id_, loc2)); + EQ(0, SSH::freeze_tx_data(R.tenant_id_, ObLSID(1))); + EQ(0, SSH::ls_reboot(R.tenant_id_, loc1)); + EQ(0, SSH::ls_reboot(R.tenant_id_, loc2)); + EQ(0, SSH::ls_reboot(R.tenant_id_, ObLSID(1))); + + LOGI("release_lock"); + val = -1; + EQ(0, SSH::select_int64(conn, "select release_lock('test_lock')", val)); + // session end lock release + LOGI("get_lock"); + val = -1; + EQ(0, SSH::select_int64(sql_proxy, "select get_lock('test_lock', 1) val", val)); + EQ(1, val); + // session end lock release + LOGI("get_lock"); + val = -1; + EQ(0, SSH::select_int64(sql_proxy, "select get_lock('test_lock', 1) val", val)); + EQ(1, val); +} +*/ +TEST_F(ObTransferTx, transfer_mds_trans) { TRANSFER_CASE_PREPARE; ObMySQLTransaction trans; EQ(0, trans.start(GCTX.sql_proxy_, R.tenant_id_)); + EQ(0, trans.write(R.tenant_id_, "insert into test.stu1 values(100)", affected_rows)); + EQ(0, trans.write(R.tenant_id_, "insert into test.stu2 values(100)", affected_rows)); + //EQ(0, sql_proxy.write("alter system minor freeze", affected_rows)); + // make it replay from middle + EQ(0, SSH::wait_checkpoint_newest(R.tenant_id_, loc1)); + EQ(0, SSH::wait_checkpoint_newest(R.tenant_id_, loc2)); + EQ(0, trans.write(R.tenant_id_, "insert into test.stu1 values(200)", affected_rows)); + EQ(0, trans.write(R.tenant_id_, "insert into test.stu2 values(200)", affected_rows)); observer::ObInnerSQLConnection *conn = static_cast(trans.get_connection()); - //EQ(0, conn->execute_write(R.tenant_id_, "insert into stu1 values(100)", affected_rows)); - //EQ(0, conn->execute_write(R.tenant_id_, "insert into stu2 values(100)", affected_rows)); char buf[10]; ObRegisterMdsFlag flag; EQ(0, conn->register_multi_data_source(R.tenant_id_, @@ -1269,6 +1317,13 @@ TEST_F(ObSimpleClusterExampleTest, transfer_retain_ctx) 10, flag)); + EQ(0, conn->register_multi_data_source(R.tenant_id_, + loc2, + ObTxDataSourceType::TEST3, + buf, + 10, + flag)); + EQ(0, conn->register_multi_data_source(R.tenant_id_, loc2, ObTxDataSourceType::TEST3, @@ -1279,28 +1334,9 @@ TEST_F(ObSimpleClusterExampleTest, transfer_retain_ctx) EQ(0, SSH::g_select_int64(R.tenant_id_, "select trans_id as val from __all_virtual_trans_stat where is_exiting=0 and session_id<=1 limit 1", tx_id.tx_id_)); LOGI("find active_tx tx_id:%ld", tx_id.get_id()); - InjectTxFaultHelper inject_tx_fault_helper; - EQ(0, inject_tx_fault_helper.inject_tx_block(R.tenant_id_, loc2, tx_id, ObTxLogType::TX_ABORT_LOG)); - - // make tx ctx enter retain - EQ(0, SSH::submit_redo(R.tenant_id_, loc1)); - EQ(0, SSH::abort_tx(R.tenant_id_, loc1, tx_id)); - EQ(0, sql_proxy.write("alter tablegroup tg1 add stu1,stu2;", affected_rows)); EQ(0, do_balance(R.tenant_id_)); - // make wrs check approve - EQ(0, SSH::modify_wrs(R.tenant_id_, loc2)); - - // transfer task failed bacause tx retain_ctx - NEQ(loc1, loc2); - ob_usleep(3 * 1000 * 1000); - NEQ(loc1, loc2); - - inject_tx_fault_helper.release(); - - // transfer task failed bacause tx retain_ctx - EQ(0, SSH::abort_tx(R.tenant_id_, loc2, tx_id)); // wait while (true) { EQ(0, SSH::select_table_loc(R.tenant_id_, "stu1", loc1)); @@ -1311,7 +1347,7 @@ TEST_F(ObSimpleClusterExampleTest, transfer_retain_ctx) ::sleep(1); } - NEQ(0, trans.end(true)); + EQ(0, trans.end(false)); int64_t val1 = 0; int64_t val2 = 0; EQ(0, SSH::select_int64(sql_proxy, "select count(col) as val from stu1", val1)); @@ -1320,11 +1356,16 @@ TEST_F(ObSimpleClusterExampleTest, transfer_retain_ctx) EQ(0, val1); EQ(0, val2); + EQ(0, SSH::freeze_tx_ctx(R.tenant_id_, loc1)); + EQ(0, SSH::freeze_tx_ctx(R.tenant_id_, loc2)); + EQ(0, SSH::freeze_tx_data(R.tenant_id_, loc1)); + EQ(0, SSH::freeze_tx_data(R.tenant_id_, loc2)); + LOGI("ls_reboot"); EQ(0, SSH::ls_reboot(R.tenant_id_, loc1)); EQ(0, SSH::ls_reboot(R.tenant_id_, loc2)); } -TEST_F(ObSimpleClusterExampleTest, create_more_ls) +TEST_F(ObTransferTx, create_more_ls) { for (int i=0;i<8;i++) { EQ(0, SSH::create_ls(R.tenant_id_, get_curr_observer().self_addr_)); @@ -1334,7 +1375,7 @@ TEST_F(ObSimpleClusterExampleTest, create_more_ls) EQ(10, ls_count); } -TEST_F(ObSimpleClusterExampleTest, bench) +TEST_F(ObTransferTx, bench) { int64_t affected_rows = 0; common::ObMySQLProxy &sql_proxy = get_curr_simple_server().get_sql_proxy2(); @@ -1449,7 +1490,7 @@ TEST_F(ObSimpleClusterExampleTest, bench) EQ(0, bench_err); } -TEST_F(ObSimpleClusterExampleTest, end) +TEST_F(ObTransferTx, end) { int64_t wait_us = R.time_sec_ * 1000 * 1000; while (ObTimeUtil::current_time() - R.start_time_ < wait_us) { diff --git a/src/storage/tx/ob_trans_part_ctx.cpp b/src/storage/tx/ob_trans_part_ctx.cpp index b0294b891..b1968a6a3 100644 --- a/src/storage/tx/ob_trans_part_ctx.cpp +++ b/src/storage/tx/ob_trans_part_ctx.cpp @@ -2357,6 +2357,12 @@ int ObPartTransCtx::replay_mds_to_tx_table_(const ObTxBufferNodeArray &mds_node_ ls_id_, op_scn, tx_op_batch))) { TRANS_LOG(WARN, "add_tx_op_batch failed", KR(ret)); } + // tx_op_batch not put into tx_data, need to release + if (OB_FAIL(ret)) { + for (int64_t idx = 0; idx < tx_op_batch.count(); idx++) { + tx_op_batch.at(idx).release(); + } + } } // tx_ctx and tx_data checkpoint independent if (OB_FAIL(ret)) { @@ -2442,6 +2448,7 @@ int ObPartTransCtx::insert_mds_to_tx_table_(ObTxLogCb &log_cb) int ObPartTransCtx::insert_undo_action_to_tx_table_(ObUndoAction &undo_action, ObTxDataGuard &new_tx_data_guard, + storage::ObUndoStatusNode *&undo_node, const share::SCN op_scn) { int ret = OB_SUCCESS; @@ -2451,7 +2458,7 @@ int ObPartTransCtx::insert_undo_action_to_tx_table_(ObUndoAction &undo_action, TRANS_LOG(WARN, "get tx data failed", KR(ret)); } else if (OB_FAIL(tx_data_guard.tx_data()->init_tx_op())) { TRANS_LOG(WARN, "init tx op failed", KR(ret)); - } else if (OB_FAIL(tx_data_guard.tx_data()->add_undo_action(ls_tx_ctx_mgr_->get_tx_table(), undo_action))) { + } else if (OB_FAIL(tx_data_guard.tx_data()->add_undo_action(ls_tx_ctx_mgr_->get_tx_table(), undo_action, undo_node))) { TRANS_LOG(WARN, "add undo action failed", KR(ret)); } else { *new_tx_data_guard.tx_data() = *tx_data_guard.tx_data(); @@ -2461,7 +2468,7 @@ int ObPartTransCtx::insert_undo_action_to_tx_table_(ObUndoAction &undo_action, TRANS_LOG(WARN, "insert tx data failed", KR(ret)); } } - TRANS_LOG(INFO, "insert undo_action to tx_table", KR(ret), K(undo_action), K(trans_id_), K(ls_id_), K(op_scn)); + TRANS_LOG(INFO, "insert undo_action to tx_table", KR(ret), K(undo_action), K(trans_id_), K(ls_id_), K(op_scn), KP(undo_node)); return ret; } @@ -2585,10 +2592,14 @@ int ObPartTransCtx::on_success_ops_(ObTxLogCb *log_cb) TRANS_LOG(INFO, "apply commit info log", KR(ret), K(*this), K(two_phase_log_type)); } } else if (ObTxLogType::TX_ROLLBACK_TO_LOG == log_type) { - if (OB_FAIL(insert_undo_action_to_tx_table_(log_cb->get_undo_action(), log_cb->get_tx_data_guard(), log_ts))) { + if (OB_FAIL(insert_undo_action_to_tx_table_(log_cb->get_undo_action(), + log_cb->get_tx_data_guard(), + log_cb->get_undo_node(), + log_ts))) { TRANS_LOG(WARN, "insert to tx table failed", KR(ret), K(*this)); } else { log_cb->set_tx_data(nullptr); + log_cb->reset_undo_node(); } } else if (ObTxLogTypeChecker::is_state_log(log_type)) { sub_state_.clear_state_log_submitting(); @@ -7164,7 +7175,9 @@ int ObPartTransCtx::prepare_mds_tx_op_(const ObTxBufferNodeArray &mds_array, } else if (OB_FAIL(tx_op_array.push_back(tx_op))) { TRANS_LOG(WARN, "push buffer_node to list fail", KR(ret)); } + // attention tx_op is not put into tx_op_array if (OB_FAIL(ret) && OB_NOT_NULL(new_node_wrapper)) { + new_node_wrapper->~ObTxBufferNodeWrapper(); tx_op_allocator.free(new_node_wrapper); } } @@ -8276,7 +8289,7 @@ int ObPartTransCtx::supplement_tx_op_if_exist_(const bool for_replay, const SCN int ObPartTransCtx::recover_tx_ctx_from_tx_op_(ObTxOpVector &tx_op_list, const SCN replay_scn) { - TRANS_LOG(INFO, "recover_tx_ctx_from_tx_op_", K(tx_op_list.get_count()), K(replay_scn), KPC(this)); + TRANS_LOG(INFO, "recover tx_ctx from_tx_op begin", K(tx_op_list.get_count()), K(replay_scn), KPC(this)); int ret = OB_SUCCESS; // filter tx_op for this tx_ctx life_cycle ObTxOpArray ctx_tx_op; @@ -8323,7 +8336,7 @@ int ObPartTransCtx::recover_tx_ctx_from_tx_op_(ObTxOpVector &tx_op_list, const S if (exec_info_.multi_data_source_.count() > 0) { ctx_max_register_no = exec_info_.multi_data_source_.at(exec_info_.multi_data_source_.count() - 1).get_register_no(); } - TRANS_LOG(INFO, "recover tx_ctx from tx_op", KR(ret), K(tx_op_list.get_count()), K(ctx_tx_op.count()), + TRANS_LOG(INFO, "recover tx_ctx from tx_op finish", KR(ret), K(tx_op_list.get_count()), K(ctx_tx_op.count()), K(mds_array.count()), K(exec_info_.multi_data_source_.count()), K(mds_max_register_no), K(ctx_max_register_no), KPC(this)); @@ -8677,6 +8690,7 @@ int ObPartTransCtx::submit_rollback_to_log_(const ObTxSEQ from_scn, ObTxRollbackToLog log(from_scn, to_scn); ObTxLogCb *log_cb = NULL; int64_t replay_hint = trans_id_.get_id(); + ObUndoStatusNode *undo_node = NULL; logservice::ObReplayBarrierType barrier = logservice::ObReplayBarrierType::NO_NEED_BARRIER; if (to_scn.support_branch() && is_parallel_logging()) { const int16_t branch_id = to_scn.get_branch(); @@ -8703,6 +8717,15 @@ int ObPartTransCtx::submit_rollback_to_log_(const ObTxSEQ from_scn, log_cb = NULL; } else if (OB_FAIL(ls_tx_ctx_mgr_->get_tx_table()->alloc_tx_data(log_cb->get_tx_data_guard(), true, INT64_MAX))) { TRANS_LOG(WARN, "alloc_tx_data failed", KR(ret), KPC(this)); + return_log_cb_(log_cb); + log_cb = NULL; + } else if (OB_ISNULL(undo_node = (ObUndoStatusNode*)MTL(ObSharedMemAllocMgr*)->tx_data_allocator().alloc(true, INT64_MAX))) { + ret = OB_ALLOCATE_MEMORY_FAILED; + TRANS_LOG(WARN, "alloc_undo_status_node failed", KR(ret), KPC(this)); + return_log_cb_(log_cb); + log_cb = NULL; + } else if (FALSE_IT(new (undo_node) ObUndoStatusNode())) { + } else if (FALSE_IT(log_cb->get_undo_node() = undo_node)) { } else if (OB_FAIL(submit_log_block_out_(log_block, SCN::min_scn(), log_cb, replay_hint, barrier))) { TRANS_LOG(WARN, "submit log fail", K(ret), K(log_block), KPC(this)); return_log_cb_(log_cb); diff --git a/src/storage/tx/ob_trans_part_ctx.h b/src/storage/tx/ob_trans_part_ctx.h index ef2060957..032339538 100644 --- a/src/storage/tx/ob_trans_part_ctx.h +++ b/src/storage/tx/ob_trans_part_ctx.h @@ -666,7 +666,10 @@ private: bool is_replay); int replay_mds_to_tx_table_(const ObTxBufferNodeArray &mds_node_array, const share::SCN op_scn); int insert_mds_to_tx_table_(ObTxLogCb &log_cb); - int insert_undo_action_to_tx_table_(ObUndoAction &undo_action, ObTxDataGuard &new_tx_data_guard, const share::SCN op_scn); + int insert_undo_action_to_tx_table_(ObUndoAction &undo_action, + ObTxDataGuard &new_tx_data_guard, + storage::ObUndoStatusNode *&undo_node, + const share::SCN op_scn); int replay_undo_action_to_tx_table_(ObUndoAction &undo_action, const share::SCN op_scn); int decide_state_log_barrier_type_(const ObTxLogType &state_log_type, logservice::ObReplayBarrierType &final_barrier_type); diff --git a/src/storage/tx/ob_trans_submit_log_cb.cpp b/src/storage/tx/ob_trans_submit_log_cb.cpp index a79fd8b94..21ec7412c 100644 --- a/src/storage/tx/ob_trans_submit_log_cb.cpp +++ b/src/storage/tx/ob_trans_submit_log_cb.cpp @@ -15,6 +15,7 @@ #include "share/ob_cluster_version.h" #include "ob_trans_service.h" #include "ob_trans_part_ctx.h" +#include "share/allocator/ob_shared_memory_allocator_mgr.h" namespace oceanbase { @@ -105,6 +106,14 @@ void ObTxLogCb::reset_tx_op_array() } } +void ObTxLogCb::reset_undo_node() +{ + if (OB_NOT_NULL(undo_node_)) { + MTL(share::ObSharedMemAllocMgr*)->tx_data_allocator().free(undo_node_); + undo_node_ = NULL; + } +} + void ObTxLogCb::reset() { ObTxBaseLogCb::reset(); @@ -128,6 +137,7 @@ void ObTxLogCb::reset() // is_callbacking_ = false; first_part_scn_.invalid_scn(); reset_tx_op_array(); + reset_undo_node(); } void ObTxLogCb::reuse() @@ -146,6 +156,7 @@ void ObTxLogCb::reuse() first_part_scn_.invalid_scn(); reset_tx_op_array(); + reset_undo_node(); } ObTxLogType ObTxLogCb::get_last_log_type() const diff --git a/src/storage/tx/ob_trans_submit_log_cb.h b/src/storage/tx/ob_trans_submit_log_cb.h index 9e3f957b4..2b0833832 100644 --- a/src/storage/tx/ob_trans_submit_log_cb.h +++ b/src/storage/tx/ob_trans_submit_log_cb.h @@ -75,7 +75,8 @@ class ObTxLogCb : public ObTxBaseLogCb, public common::ObDLinkBase { public: - ObTxLogCb() : extra_cb_(nullptr), need_free_extra_cb_(false), tx_op_array_(nullptr) { reset(); } + ObTxLogCb() : extra_cb_(nullptr), need_free_extra_cb_(false), tx_op_array_(nullptr), + undo_node_(nullptr) { reset(); } ~ObTxLogCb() { destroy(); } int init(const share::ObLSID &key, const ObTransID &trans_id, @@ -95,6 +96,8 @@ public: tx_data_guard_.init(tx_data); } } + void reset_undo_node(); + storage::ObUndoStatusNode *&get_undo_node() { return undo_node_; } void set_undo_action(const ObUndoAction &undo_action) { undo_action_ = undo_action; } @@ -171,6 +174,7 @@ private: bool need_free_extra_cb_; ObUndoAction undo_action_; storage::ObTxOpArray *tx_op_array_; + storage::ObUndoStatusNode *undo_node_; //bool is_callbacking_; }; diff --git a/src/storage/tx/ob_tx_data_define.cpp b/src/storage/tx/ob_tx_data_define.cpp index fc1b85f43..0b1c8ff67 100644 --- a/src/storage/tx/ob_tx_data_define.cpp +++ b/src/storage/tx/ob_tx_data_define.cpp @@ -549,7 +549,7 @@ int ObTxData::reserve_undo(ObTxTable *tx_table) return ret; } -int ObTxData::add_undo_action(ObTxTable *tx_table, transaction::ObUndoAction &new_undo_action, ObUndoStatusNode *undo_node) +int ObTxData::add_undo_action(ObTxTable *tx_table, transaction::ObUndoAction &new_undo_action, ObUndoStatusNode *&undo_node) { // STORAGE_LOG(DEBUG, "do add_undo_action"); int ret = OB_SUCCESS; @@ -599,10 +599,6 @@ int ObTxData::add_undo_action(ObTxTable *tx_table, transaction::ObUndoAction &ne } } } - - if (OB_NOT_NULL(undo_node)) { - tx_data_table->free_undo_status_node(undo_node); - } } return ret; } diff --git a/src/storage/tx/ob_tx_data_define.h b/src/storage/tx/ob_tx_data_define.h index 115a6c71c..2379b62f7 100644 --- a/src/storage/tx/ob_tx_data_define.h +++ b/src/storage/tx/ob_tx_data_define.h @@ -317,7 +317,12 @@ public: */ OB_NOINLINE int add_undo_action(ObTxTable *tx_table, transaction::ObUndoAction &undo_action, - ObUndoStatusNode *undo_node = nullptr); + ObUndoStatusNode *&undo_node); + OB_NOINLINE int add_undo_action(ObTxTable *tx_table, + transaction::ObUndoAction &undo_action) { + ObUndoStatusNode *undo_status_node = nullptr; + return add_undo_action(tx_table, undo_action, undo_status_node); + } /** * @brief Check if this tx data is valid */ diff --git a/unittest/storage/tx/it/tx_node.cpp b/unittest/storage/tx/it/tx_node.cpp index 980125c62..72c7cb10d 100644 --- a/unittest/storage/tx/it/tx_node.cpp +++ b/unittest/storage/tx/it/tx_node.cpp @@ -123,6 +123,8 @@ ObTxNode::ObTxNode(const int64_t ls_id, tenant_.enable_tenant_ctx_check_ = false; tenant_.set(&fake_tenant_freezer_); tenant_.set(&fake_part_trans_ctx_pool_); + fake_shared_mem_alloc_mgr_.init(); + tenant_.set(&fake_shared_mem_alloc_mgr_); tenant_.start(); ObTenantEnv::set_tenant(&tenant_); ObTableHandleV2 lock_memtable_handle; diff --git a/unittest/storage/tx/it/tx_node.h b/unittest/storage/tx/it/tx_node.h index 1d9c29b66..7f86881e7 100644 --- a/unittest/storage/tx/it/tx_node.h +++ b/unittest/storage/tx/it/tx_node.h @@ -305,6 +305,7 @@ public: ObLockTable fake_lock_table_; ObFakeTxTable fake_tx_table_; ObTenantFreezer fake_tenant_freezer_; + ObSharedMemAllocMgr fake_shared_mem_alloc_mgr_; ObLS fake_ls_; ObFreezer fake_freezer_; ObTxNodeRole role_; diff --git a/unittest/storage/tx/mock_utils/basic_fake_define.h b/unittest/storage/tx/mock_utils/basic_fake_define.h index d821326ec..9246b34c8 100644 --- a/unittest/storage/tx/mock_utils/basic_fake_define.h +++ b/unittest/storage/tx/mock_utils/basic_fake_define.h @@ -56,6 +56,7 @@ public: slice_allocator_.set_nway(32); FAKE_ALLOCATOR->init("FAKE_A"); FAKE_ALLOCATOR2->init(); + tx_data_allocator_ = FAKE_ALLOCATOR; is_inited_ = true; } virtual int init(ObLS *ls, ObTxCtxTable *tx_ctx_table) override