fix undo_node pre_alloc and add tx_ctx replay from middle case

This commit is contained in:
obdev 2024-05-29 07:45:25 +00:00 committed by ob-robot
parent 0c935cf425
commit 0e82d014fd
14 changed files with 194 additions and 86 deletions

View File

@ -187,6 +187,25 @@ int SimpleServerHelper::select_int64(sqlclient::ObISQLConnection *conn, const ch
return ret;
}
int SimpleServerHelper::select_int64(ObMySQLTransaction &trans, uint64_t tenant_id, const char *sql, int64_t &val)
{
int ret = OB_SUCCESS;
SMART_VAR(ObMySQLProxy::MySQLResult, res) {
if (OB_FAIL(trans.read(res, tenant_id, sql))) {
} else {
sqlclient::ObMySQLResult *result = res.get_result();
if (result == nullptr) {
ret = OB_ENTRY_NOT_EXIST;
} else if (OB_FAIL(result->next())) {
} else if (OB_FAIL(result->get_int("val", val))) {
}
}
}
if (OB_FAIL(ret)) {
LOG_WARN("select failed", KR(ret), K(sql));
}
return ret;
}
int SimpleServerHelper::g_select_varchar(uint64_t tenant_id, const char *sql, ObString &val)
{
int ret = OB_SUCCESS;

View File

@ -39,6 +39,7 @@ public:
static int g_select_uint64(uint64_t tenant_id, const char *sql, uint64_t &val);
static int select_int64(sqlclient::ObISQLConnection *conn, const char *sql, int64_t &val);
static int select_int64(ObMySQLTransaction &trans, uint64_t tenant_id, const char *sql, int64_t &val);
static int select_varchar(sqlclient::ObISQLConnection *conn, const char *sql, ObString &val);
static int g_select_varchar(uint64_t tenant_id, const char *sql, ObString &val);

View File

@ -259,7 +259,8 @@ public:
ObPartTransCtx *tx_ctx = store_ctx->mvcc_acc_ctx_.tx_ctx_;
ObTxDataGuard tx_data_guard;
EXPECT_EQ(OB_SUCCESS, tx_ctx->ls_tx_ctx_mgr_->get_tx_table()->alloc_tx_data(tx_data_guard));
EXPECT_EQ(OB_SUCCESS, tx_ctx->insert_undo_action_to_tx_table_(undo, tx_data_guard, SCN::min_scn()));
ObUndoStatusNode *undo_node = NULL;
EXPECT_EQ(OB_SUCCESS, tx_ctx->insert_undo_action_to_tx_table_(undo, tx_data_guard, undo_node, SCN::min_scn()));
ObMemtableCtx *mt_ctx = store_ctx->mvcc_acc_ctx_.mem_ctx_;
ObTxCallbackList &cb_list = mt_ctx->trans_mgr_.callback_list_;
for (ObMvccRowCallback *iter = (ObMvccRowCallback *)(cb_list.get_guard()->get_next());

View File

@ -79,7 +79,7 @@ int ObTxTable::check_with_tx_data(ObReadTxDataArg &read_tx_data_arg, ObITxDataCh
int ObTxData::add_undo_action(ObTxTable *tx_table,
transaction::ObUndoAction &new_undo_action,
ObUndoStatusNode *undo_node)
ObUndoStatusNode *&undo_node)
{
// STORAGE_LOG(DEBUG, "do add_undo_action");
UNUSED(undo_node);

View File

@ -54,11 +54,11 @@ public:
TestRunCtx R;
class ObSimpleClusterExampleTest : public ObSimpleClusterTestBase
class ObTransferTx : public ObSimpleClusterTestBase
{
public:
// 指定case运行目录前缀 test_ob_simple_cluster_
ObSimpleClusterExampleTest() : ObSimpleClusterTestBase("test_transfer_tx_", "50G", "50G") {}
ObTransferTx() : ObSimpleClusterTestBase("test_transfer_tx_", "50G", "50G") {}
int do_balance(uint64_t tenant_id);
private:
int do_balance_inner_(uint64_t tenant_id);
@ -66,7 +66,7 @@ private:
int wait_balance_clean(uint64_t tenant_id);
};
int ObSimpleClusterExampleTest::do_balance_inner_(uint64_t tenant_id)
int ObTransferTx::do_balance_inner_(uint64_t tenant_id)
{
int ret = OB_SUCCESS;
static std::mutex mutex;
@ -102,7 +102,7 @@ int ObSimpleClusterExampleTest::do_balance_inner_(uint64_t tenant_id)
return ret;
}
int ObSimpleClusterExampleTest::wait_balance_clean(uint64_t tenant_id)
int ObTransferTx::wait_balance_clean(uint64_t tenant_id)
{
int ret = OB_SUCCESS;
int64_t begin_time = ObTimeUtil::current_time();
@ -134,7 +134,7 @@ int ObSimpleClusterExampleTest::wait_balance_clean(uint64_t tenant_id)
return ret;
}
int ObSimpleClusterExampleTest::do_balance(uint64_t tenant_id)
int ObTransferTx::do_balance(uint64_t tenant_id)
{
int ret = OB_SUCCESS;
if (OB_FAIL(do_balance_inner_(tenant_id))) {
@ -143,7 +143,7 @@ int ObSimpleClusterExampleTest::do_balance(uint64_t tenant_id)
return ret;
}
int ObSimpleClusterExampleTest::do_transfer_start_abort(uint64_t tenant_id, ObLSID dest_ls_id, ObLSID src_ls_id, ObTransferTabletInfo tablet_info)
int ObTransferTx::do_transfer_start_abort(uint64_t tenant_id, ObLSID dest_ls_id, ObLSID src_ls_id, ObTransferTabletInfo tablet_info)
{
int ret = OB_SUCCESS;
MTL_SWITCH(tenant_id) {
@ -184,15 +184,11 @@ int ObSimpleClusterExampleTest::do_transfer_start_abort(uint64_t tenant_id, ObLS
return ret;
}
TEST_F(ObSimpleClusterExampleTest, observer_start)
TEST_F(ObTransferTx, prepare)
{
LOG_INFO("observer_start succ");
LOGI("observer start");
}
// 创建租户并不轻量,看场景必要性使用
TEST_F(ObSimpleClusterExampleTest, add_tenant)
{
LOGI("create tenant begin");
// 创建普通租户tt1
EQ(OB_SUCCESS, create_tenant("tt1", "40G", "40G", false, 10));
// 获取租户tt1的tenant_id
@ -200,18 +196,9 @@ TEST_F(ObSimpleClusterExampleTest, add_tenant)
ASSERT_NE(0, R.tenant_id_);
// 初始化普通租户tt1的sql proxy
EQ(OB_SUCCESS, get_curr_simple_server().init_sql_proxy2());
}
/*
TEST_F(ObSimpleClusterExampleTest, delete_tenant)
{
EQ(OB_SUCCESS, delete_tenant());
}
*/
TEST_F(ObSimpleClusterExampleTest, worker)
{
int tenant_id = R.tenant_id_;
LOGI("create tenant finish");
R.worker_ = std::thread([this, tenant_id] () {
int ret = OB_SUCCESS;
lib::set_thread_name_inner("MY_BALANCE");
@ -223,10 +210,7 @@ TEST_F(ObSimpleClusterExampleTest, worker)
::sleep(3);
}
});
}
TEST_F(ObSimpleClusterExampleTest, create_new_ls)
{
// 在单节点ObServer下创建新的日志流, 注意避免被RS任务GC掉
EQ(0, SSH::create_ls(R.tenant_id_, get_curr_observer().self_addr_));
int64_t ls_count = 0;
@ -253,7 +237,7 @@ TEST_F(ObSimpleClusterExampleTest, create_new_ls)
NEQ(loc1, loc2); \
EQ(0, sql_proxy.write("create tablegroup tg1 sharding='NONE';", affected_rows));
TEST_F(ObSimpleClusterExampleTest, tx_exit)
TEST_F(ObTransferTx, tx_exit)
{
TRANSFER_CASE_PREPARE;
ObLSID ls_id;
@ -355,7 +339,7 @@ TEST_F(ObSimpleClusterExampleTest, tx_exit)
}
/*
TEST_F(ObSimpleClusterExampleTest, large_query)
TEST_F(ObTransferTx, large_query)
{
TRANSFER_CASE_PREPARE;
@ -440,7 +424,7 @@ TEST_F(ObSimpleClusterExampleTest, large_query)
}
*/
TEST_F(ObSimpleClusterExampleTest, epoch_recover_from_active_info)
TEST_F(ObTransferTx, epoch_recover_from_active_info)
{
TRANSFER_CASE_PREPARE;
@ -467,7 +451,7 @@ TEST_F(ObSimpleClusterExampleTest, epoch_recover_from_active_info)
EQ(0, conn->commit());
}
TEST_F(ObSimpleClusterExampleTest, epoch_recover_from_ctx_checkpoint)
TEST_F(ObTransferTx, epoch_recover_from_ctx_checkpoint)
{
TRANSFER_CASE_PREPARE;
@ -513,7 +497,7 @@ TEST_F(ObSimpleClusterExampleTest, epoch_recover_from_ctx_checkpoint)
EQ(0, commit_ret);
}
TEST_F(ObSimpleClusterExampleTest, epoch_recover_from_ctx_checkpoint2)
TEST_F(ObTransferTx, epoch_recover_from_ctx_checkpoint2)
{
TRANSFER_CASE_PREPARE;
@ -559,7 +543,7 @@ TEST_F(ObSimpleClusterExampleTest, epoch_recover_from_ctx_checkpoint2)
}
// 空sstable、没有活跃事务
TEST_F(ObSimpleClusterExampleTest, transfer_empty_tablet)
TEST_F(ObTransferTx, transfer_empty_tablet)
{
// 关掉observer内部的均衡,防止LS均衡,只调度分区均衡
TRANSFER_CASE_PREPARE;
@ -582,7 +566,7 @@ TEST_F(ObSimpleClusterExampleTest, transfer_empty_tablet)
}
// sstable有数据,没有活跃事务 transfer
TEST_F(ObSimpleClusterExampleTest, transfer_no_active_tx)
TEST_F(ObTransferTx, transfer_no_active_tx)
{
TRANSFER_CASE_PREPARE;
@ -609,7 +593,7 @@ TEST_F(ObSimpleClusterExampleTest, transfer_no_active_tx)
}
// sstable有数据,有活跃事务 transfer
TEST_F(ObSimpleClusterExampleTest, transfer_active_tx)
TEST_F(ObTransferTx, transfer_active_tx)
{
TRANSFER_CASE_PREPARE;
@ -681,7 +665,7 @@ TEST_F(ObSimpleClusterExampleTest, transfer_active_tx)
// transfer active tx A->B B->A
TEST_F(ObSimpleClusterExampleTest, transfer_A_B_AND_B_A)
TEST_F(ObTransferTx, transfer_A_B_AND_B_A)
{
TRANSFER_CASE_PREPARE;
@ -723,7 +707,7 @@ TEST_F(ObSimpleClusterExampleTest, transfer_A_B_AND_B_A)
EQ(600, val);
}
TEST_F(ObSimpleClusterExampleTest, transfer_replay)
TEST_F(ObTransferTx, transfer_replay)
{
TRANSFER_CASE_PREPARE;
@ -772,7 +756,7 @@ TEST_F(ObSimpleClusterExampleTest, transfer_replay)
}
// sstable有数据,有活跃事务 transfer
TEST_F(ObSimpleClusterExampleTest, transfer_abort_active_tx)
TEST_F(ObTransferTx, transfer_abort_active_tx)
{
TRANSFER_CASE_PREPARE;
@ -827,7 +811,7 @@ TEST_F(ObSimpleClusterExampleTest, transfer_abort_active_tx)
EQ(1300, sum1);
}
TEST_F(ObSimpleClusterExampleTest, transfer_resume)
TEST_F(ObTransferTx, transfer_resume)
{
TRANSFER_CASE_PREPARE;
@ -861,7 +845,7 @@ TEST_F(ObSimpleClusterExampleTest, transfer_resume)
}
// sstable有数据,有活跃事务,但事务数据丢失不完整 transfer
TEST_F(ObSimpleClusterExampleTest, transfer_query_lost)
TEST_F(ObTransferTx, transfer_query_lost)
{
TRANSFER_CASE_PREPARE;
@ -906,7 +890,7 @@ TEST_F(ObSimpleClusterExampleTest, transfer_query_lost)
}
// transfer active tx A->B A->B
TEST_F(ObSimpleClusterExampleTest, transfer_A_B_AND_A_B)
TEST_F(ObTransferTx, transfer_A_B_AND_A_B)
{
TRANSFER_CASE_PREPARE;
@ -955,7 +939,7 @@ TEST_F(ObSimpleClusterExampleTest, transfer_A_B_AND_A_B)
NEQ(0, conn->commit());
}
TEST_F(ObSimpleClusterExampleTest, transfer_AND_ddl)
TEST_F(ObTransferTx, transfer_AND_ddl)
{
TRANSFER_CASE_PREPARE;
@ -1008,7 +992,7 @@ TEST_F(ObSimpleClusterExampleTest, transfer_AND_ddl)
EQ(0, case_err);
}
TEST_F(ObSimpleClusterExampleTest, transfer_AND_rollback)
TEST_F(ObTransferTx, transfer_AND_rollback)
{
TRANSFER_CASE_PREPARE;
@ -1044,9 +1028,16 @@ TEST_F(ObSimpleClusterExampleTest, transfer_AND_rollback)
EQ(0, conn->commit());
EQ(0, SSH::select_int64(conn, "select sum(col) as val from stu2", val));
EQ(400, val);
EQ(0, SSH::freeze_tx_ctx(R.tenant_id_, loc1));
EQ(0, SSH::freeze_tx_ctx(R.tenant_id_, loc2));
EQ(0, SSH::freeze_tx_data(R.tenant_id_, loc1));
EQ(0, SSH::freeze_tx_data(R.tenant_id_, loc2));
EQ(0, SSH::ls_reboot(R.tenant_id_, loc1));
EQ(0, SSH::ls_reboot(R.tenant_id_, loc2));
}
TEST_F(ObSimpleClusterExampleTest, transfer_AND_rollback2)
TEST_F(ObTransferTx, transfer_AND_rollback2)
{
TRANSFER_CASE_PREPARE;
@ -1093,7 +1084,7 @@ TEST_F(ObSimpleClusterExampleTest, transfer_AND_rollback2)
EQ(100, val);
}
TEST_F(ObSimpleClusterExampleTest, transfer_tx_ctx_merge)
TEST_F(ObTransferTx, transfer_tx_ctx_merge)
{
TRANSFER_CASE_PREPARE;
@ -1211,11 +1202,15 @@ TEST_F(ObSimpleClusterExampleTest, transfer_tx_ctx_merge)
EQ(400, val1);
EQ(400, val2);
EQ(0, SSH::freeze_tx_ctx(R.tenant_id_, loc1));
EQ(0, SSH::freeze_tx_ctx(R.tenant_id_, loc2));
EQ(0, SSH::freeze_tx_data(R.tenant_id_, loc1));
EQ(0, SSH::freeze_tx_data(R.tenant_id_, loc2));
EQ(0, SSH::ls_reboot(R.tenant_id_, loc1));
EQ(0, SSH::ls_reboot(R.tenant_id_, loc2));
}
TEST_F(ObSimpleClusterExampleTest, transfer_batch)
TEST_F(ObTransferTx, transfer_batch)
{
TRANSFER_CASE_PREPARE;
sql_proxy.write("alter system set _transfer_start_trans_timeout='5s'",affected_rows);
@ -1249,17 +1244,70 @@ TEST_F(ObSimpleClusterExampleTest, transfer_batch)
EQ(0, SSH::select_int64(sql_proxy, "select sum(col) as val from stu2", sum));
EQ(100 * 5000, sum);
sql_proxy.write("alter system set _transfer_start_trans_timeout='1s'",affected_rows);
EQ(0, SSH::freeze_tx_ctx(R.tenant_id_, loc1));
EQ(0, SSH::freeze_tx_ctx(R.tenant_id_, loc2));
EQ(0, SSH::freeze_tx_data(R.tenant_id_, loc1));
EQ(0, SSH::freeze_tx_data(R.tenant_id_, loc2));
EQ(0, SSH::ls_reboot(R.tenant_id_, loc1));
EQ(0, SSH::ls_reboot(R.tenant_id_, loc2));
}
TEST_F(ObSimpleClusterExampleTest, transfer_retain_ctx)
/*
TEST_F(ObTransferTx, replay_from_middle)
{
TRANSFER_CASE_PREPARE;
sqlclient::ObISQLConnection *conn = NULL;
EQ(0, sql_proxy.acquire(conn));
EQ(0, SSH::write(conn, "set autocommit=0"));
EQ(0, SSH::write(conn, "insert into test.stu1 values(100)", affected_rows));
EQ(0, SSH::wait_checkpoint_newest(R.tenant_id_, loc1));
int64_t val = -1;
EQ(0, SSH::select_int64(conn, "select get_lock('test_lock', 10) val", val));
EQ(1, val);
EQ(0, conn->commit());
EQ(0, SSH::freeze_tx_ctx(R.tenant_id_, loc1));
EQ(0, SSH::freeze_tx_ctx(R.tenant_id_, loc2));
EQ(0, SSH::freeze_tx_ctx(R.tenant_id_, ObLSID(1)));
EQ(0, SSH::freeze_tx_data(R.tenant_id_, loc1));
EQ(0, SSH::freeze_tx_data(R.tenant_id_, loc2));
EQ(0, SSH::freeze_tx_data(R.tenant_id_, ObLSID(1)));
EQ(0, SSH::ls_reboot(R.tenant_id_, loc1));
EQ(0, SSH::ls_reboot(R.tenant_id_, loc2));
EQ(0, SSH::ls_reboot(R.tenant_id_, ObLSID(1)));
LOGI("release_lock");
val = -1;
EQ(0, SSH::select_int64(conn, "select release_lock('test_lock')", val));
// session end lock release
LOGI("get_lock");
val = -1;
EQ(0, SSH::select_int64(sql_proxy, "select get_lock('test_lock', 1) val", val));
EQ(1, val);
// session end lock release
LOGI("get_lock");
val = -1;
EQ(0, SSH::select_int64(sql_proxy, "select get_lock('test_lock', 1) val", val));
EQ(1, val);
}
*/
TEST_F(ObTransferTx, transfer_mds_trans)
{
TRANSFER_CASE_PREPARE;
ObMySQLTransaction trans;
EQ(0, trans.start(GCTX.sql_proxy_, R.tenant_id_));
EQ(0, trans.write(R.tenant_id_, "insert into test.stu1 values(100)", affected_rows));
EQ(0, trans.write(R.tenant_id_, "insert into test.stu2 values(100)", affected_rows));
//EQ(0, sql_proxy.write("alter system minor freeze", affected_rows));
// make it replay from middle
EQ(0, SSH::wait_checkpoint_newest(R.tenant_id_, loc1));
EQ(0, SSH::wait_checkpoint_newest(R.tenant_id_, loc2));
EQ(0, trans.write(R.tenant_id_, "insert into test.stu1 values(200)", affected_rows));
EQ(0, trans.write(R.tenant_id_, "insert into test.stu2 values(200)", affected_rows));
observer::ObInnerSQLConnection *conn = static_cast<observer::ObInnerSQLConnection *>(trans.get_connection());
//EQ(0, conn->execute_write(R.tenant_id_, "insert into stu1 values(100)", affected_rows));
//EQ(0, conn->execute_write(R.tenant_id_, "insert into stu2 values(100)", affected_rows));
char buf[10];
ObRegisterMdsFlag flag;
EQ(0, conn->register_multi_data_source(R.tenant_id_,
@ -1269,6 +1317,13 @@ TEST_F(ObSimpleClusterExampleTest, transfer_retain_ctx)
10,
flag));
EQ(0, conn->register_multi_data_source(R.tenant_id_,
loc2,
ObTxDataSourceType::TEST3,
buf,
10,
flag));
EQ(0, conn->register_multi_data_source(R.tenant_id_,
loc2,
ObTxDataSourceType::TEST3,
@ -1279,28 +1334,9 @@ TEST_F(ObSimpleClusterExampleTest, transfer_retain_ctx)
EQ(0, SSH::g_select_int64(R.tenant_id_, "select trans_id as val from __all_virtual_trans_stat where is_exiting=0 and session_id<=1 limit 1", tx_id.tx_id_));
LOGI("find active_tx tx_id:%ld", tx_id.get_id());
InjectTxFaultHelper inject_tx_fault_helper;
EQ(0, inject_tx_fault_helper.inject_tx_block(R.tenant_id_, loc2, tx_id, ObTxLogType::TX_ABORT_LOG));
// make tx ctx enter retain
EQ(0, SSH::submit_redo(R.tenant_id_, loc1));
EQ(0, SSH::abort_tx(R.tenant_id_, loc1, tx_id));
EQ(0, sql_proxy.write("alter tablegroup tg1 add stu1,stu2;", affected_rows));
EQ(0, do_balance(R.tenant_id_));
// make wrs check approve
EQ(0, SSH::modify_wrs(R.tenant_id_, loc2));
// transfer task failed bacause tx retain_ctx
NEQ(loc1, loc2);
ob_usleep(3 * 1000 * 1000);
NEQ(loc1, loc2);
inject_tx_fault_helper.release();
// transfer task failed bacause tx retain_ctx
EQ(0, SSH::abort_tx(R.tenant_id_, loc2, tx_id));
// wait
while (true) {
EQ(0, SSH::select_table_loc(R.tenant_id_, "stu1", loc1));
@ -1311,7 +1347,7 @@ TEST_F(ObSimpleClusterExampleTest, transfer_retain_ctx)
::sleep(1);
}
NEQ(0, trans.end(true));
EQ(0, trans.end(false));
int64_t val1 = 0;
int64_t val2 = 0;
EQ(0, SSH::select_int64(sql_proxy, "select count(col) as val from stu1", val1));
@ -1320,11 +1356,16 @@ TEST_F(ObSimpleClusterExampleTest, transfer_retain_ctx)
EQ(0, val1);
EQ(0, val2);
EQ(0, SSH::freeze_tx_ctx(R.tenant_id_, loc1));
EQ(0, SSH::freeze_tx_ctx(R.tenant_id_, loc2));
EQ(0, SSH::freeze_tx_data(R.tenant_id_, loc1));
EQ(0, SSH::freeze_tx_data(R.tenant_id_, loc2));
LOGI("ls_reboot");
EQ(0, SSH::ls_reboot(R.tenant_id_, loc1));
EQ(0, SSH::ls_reboot(R.tenant_id_, loc2));
}
TEST_F(ObSimpleClusterExampleTest, create_more_ls)
TEST_F(ObTransferTx, create_more_ls)
{
for (int i=0;i<8;i++) {
EQ(0, SSH::create_ls(R.tenant_id_, get_curr_observer().self_addr_));
@ -1334,7 +1375,7 @@ TEST_F(ObSimpleClusterExampleTest, create_more_ls)
EQ(10, ls_count);
}
TEST_F(ObSimpleClusterExampleTest, bench)
TEST_F(ObTransferTx, bench)
{
int64_t affected_rows = 0;
common::ObMySQLProxy &sql_proxy = get_curr_simple_server().get_sql_proxy2();
@ -1449,7 +1490,7 @@ TEST_F(ObSimpleClusterExampleTest, bench)
EQ(0, bench_err);
}
TEST_F(ObSimpleClusterExampleTest, end)
TEST_F(ObTransferTx, end)
{
int64_t wait_us = R.time_sec_ * 1000 * 1000;
while (ObTimeUtil::current_time() - R.start_time_ < wait_us) {

View File

@ -2357,6 +2357,12 @@ int ObPartTransCtx::replay_mds_to_tx_table_(const ObTxBufferNodeArray &mds_node_
ls_id_, op_scn, tx_op_batch))) {
TRANS_LOG(WARN, "add_tx_op_batch failed", KR(ret));
}
// tx_op_batch not put into tx_data, need to release
if (OB_FAIL(ret)) {
for (int64_t idx = 0; idx < tx_op_batch.count(); idx++) {
tx_op_batch.at(idx).release();
}
}
}
// tx_ctx and tx_data checkpoint independent
if (OB_FAIL(ret)) {
@ -2442,6 +2448,7 @@ int ObPartTransCtx::insert_mds_to_tx_table_(ObTxLogCb &log_cb)
int ObPartTransCtx::insert_undo_action_to_tx_table_(ObUndoAction &undo_action,
ObTxDataGuard &new_tx_data_guard,
storage::ObUndoStatusNode *&undo_node,
const share::SCN op_scn)
{
int ret = OB_SUCCESS;
@ -2451,7 +2458,7 @@ int ObPartTransCtx::insert_undo_action_to_tx_table_(ObUndoAction &undo_action,
TRANS_LOG(WARN, "get tx data failed", KR(ret));
} else if (OB_FAIL(tx_data_guard.tx_data()->init_tx_op())) {
TRANS_LOG(WARN, "init tx op failed", KR(ret));
} else if (OB_FAIL(tx_data_guard.tx_data()->add_undo_action(ls_tx_ctx_mgr_->get_tx_table(), undo_action))) {
} else if (OB_FAIL(tx_data_guard.tx_data()->add_undo_action(ls_tx_ctx_mgr_->get_tx_table(), undo_action, undo_node))) {
TRANS_LOG(WARN, "add undo action failed", KR(ret));
} else {
*new_tx_data_guard.tx_data() = *tx_data_guard.tx_data();
@ -2461,7 +2468,7 @@ int ObPartTransCtx::insert_undo_action_to_tx_table_(ObUndoAction &undo_action,
TRANS_LOG(WARN, "insert tx data failed", KR(ret));
}
}
TRANS_LOG(INFO, "insert undo_action to tx_table", KR(ret), K(undo_action), K(trans_id_), K(ls_id_), K(op_scn));
TRANS_LOG(INFO, "insert undo_action to tx_table", KR(ret), K(undo_action), K(trans_id_), K(ls_id_), K(op_scn), KP(undo_node));
return ret;
}
@ -2585,10 +2592,14 @@ int ObPartTransCtx::on_success_ops_(ObTxLogCb *log_cb)
TRANS_LOG(INFO, "apply commit info log", KR(ret), K(*this), K(two_phase_log_type));
}
} else if (ObTxLogType::TX_ROLLBACK_TO_LOG == log_type) {
if (OB_FAIL(insert_undo_action_to_tx_table_(log_cb->get_undo_action(), log_cb->get_tx_data_guard(), log_ts))) {
if (OB_FAIL(insert_undo_action_to_tx_table_(log_cb->get_undo_action(),
log_cb->get_tx_data_guard(),
log_cb->get_undo_node(),
log_ts))) {
TRANS_LOG(WARN, "insert to tx table failed", KR(ret), K(*this));
} else {
log_cb->set_tx_data(nullptr);
log_cb->reset_undo_node();
}
} else if (ObTxLogTypeChecker::is_state_log(log_type)) {
sub_state_.clear_state_log_submitting();
@ -7164,7 +7175,9 @@ int ObPartTransCtx::prepare_mds_tx_op_(const ObTxBufferNodeArray &mds_array,
} else if (OB_FAIL(tx_op_array.push_back(tx_op))) {
TRANS_LOG(WARN, "push buffer_node to list fail", KR(ret));
}
// attention tx_op is not put into tx_op_array
if (OB_FAIL(ret) && OB_NOT_NULL(new_node_wrapper)) {
new_node_wrapper->~ObTxBufferNodeWrapper();
tx_op_allocator.free(new_node_wrapper);
}
}
@ -8276,7 +8289,7 @@ int ObPartTransCtx::supplement_tx_op_if_exist_(const bool for_replay, const SCN
int ObPartTransCtx::recover_tx_ctx_from_tx_op_(ObTxOpVector &tx_op_list, const SCN replay_scn)
{
TRANS_LOG(INFO, "recover_tx_ctx_from_tx_op_", K(tx_op_list.get_count()), K(replay_scn), KPC(this));
TRANS_LOG(INFO, "recover tx_ctx from_tx_op begin", K(tx_op_list.get_count()), K(replay_scn), KPC(this));
int ret = OB_SUCCESS;
// filter tx_op for this tx_ctx life_cycle
ObTxOpArray ctx_tx_op;
@ -8323,7 +8336,7 @@ int ObPartTransCtx::recover_tx_ctx_from_tx_op_(ObTxOpVector &tx_op_list, const S
if (exec_info_.multi_data_source_.count() > 0) {
ctx_max_register_no = exec_info_.multi_data_source_.at(exec_info_.multi_data_source_.count() - 1).get_register_no();
}
TRANS_LOG(INFO, "recover tx_ctx from tx_op", KR(ret), K(tx_op_list.get_count()), K(ctx_tx_op.count()),
TRANS_LOG(INFO, "recover tx_ctx from tx_op finish", KR(ret), K(tx_op_list.get_count()), K(ctx_tx_op.count()),
K(mds_array.count()), K(exec_info_.multi_data_source_.count()),
K(mds_max_register_no), K(ctx_max_register_no),
KPC(this));
@ -8677,6 +8690,7 @@ int ObPartTransCtx::submit_rollback_to_log_(const ObTxSEQ from_scn,
ObTxRollbackToLog log(from_scn, to_scn);
ObTxLogCb *log_cb = NULL;
int64_t replay_hint = trans_id_.get_id();
ObUndoStatusNode *undo_node = NULL;
logservice::ObReplayBarrierType barrier = logservice::ObReplayBarrierType::NO_NEED_BARRIER;
if (to_scn.support_branch() && is_parallel_logging()) {
const int16_t branch_id = to_scn.get_branch();
@ -8703,6 +8717,15 @@ int ObPartTransCtx::submit_rollback_to_log_(const ObTxSEQ from_scn,
log_cb = NULL;
} else if (OB_FAIL(ls_tx_ctx_mgr_->get_tx_table()->alloc_tx_data(log_cb->get_tx_data_guard(), true, INT64_MAX))) {
TRANS_LOG(WARN, "alloc_tx_data failed", KR(ret), KPC(this));
return_log_cb_(log_cb);
log_cb = NULL;
} else if (OB_ISNULL(undo_node = (ObUndoStatusNode*)MTL(ObSharedMemAllocMgr*)->tx_data_allocator().alloc(true, INT64_MAX))) {
ret = OB_ALLOCATE_MEMORY_FAILED;
TRANS_LOG(WARN, "alloc_undo_status_node failed", KR(ret), KPC(this));
return_log_cb_(log_cb);
log_cb = NULL;
} else if (FALSE_IT(new (undo_node) ObUndoStatusNode())) {
} else if (FALSE_IT(log_cb->get_undo_node() = undo_node)) {
} else if (OB_FAIL(submit_log_block_out_(log_block, SCN::min_scn(), log_cb, replay_hint, barrier))) {
TRANS_LOG(WARN, "submit log fail", K(ret), K(log_block), KPC(this));
return_log_cb_(log_cb);

View File

@ -666,7 +666,10 @@ private:
bool is_replay);
int replay_mds_to_tx_table_(const ObTxBufferNodeArray &mds_node_array, const share::SCN op_scn);
int insert_mds_to_tx_table_(ObTxLogCb &log_cb);
int insert_undo_action_to_tx_table_(ObUndoAction &undo_action, ObTxDataGuard &new_tx_data_guard, const share::SCN op_scn);
int insert_undo_action_to_tx_table_(ObUndoAction &undo_action,
ObTxDataGuard &new_tx_data_guard,
storage::ObUndoStatusNode *&undo_node,
const share::SCN op_scn);
int replay_undo_action_to_tx_table_(ObUndoAction &undo_action, const share::SCN op_scn);
int decide_state_log_barrier_type_(const ObTxLogType &state_log_type,
logservice::ObReplayBarrierType &final_barrier_type);

View File

@ -15,6 +15,7 @@
#include "share/ob_cluster_version.h"
#include "ob_trans_service.h"
#include "ob_trans_part_ctx.h"
#include "share/allocator/ob_shared_memory_allocator_mgr.h"
namespace oceanbase
{
@ -105,6 +106,14 @@ void ObTxLogCb::reset_tx_op_array()
}
}
void ObTxLogCb::reset_undo_node()
{
if (OB_NOT_NULL(undo_node_)) {
MTL(share::ObSharedMemAllocMgr*)->tx_data_allocator().free(undo_node_);
undo_node_ = NULL;
}
}
void ObTxLogCb::reset()
{
ObTxBaseLogCb::reset();
@ -128,6 +137,7 @@ void ObTxLogCb::reset()
// is_callbacking_ = false;
first_part_scn_.invalid_scn();
reset_tx_op_array();
reset_undo_node();
}
void ObTxLogCb::reuse()
@ -146,6 +156,7 @@ void ObTxLogCb::reuse()
first_part_scn_.invalid_scn();
reset_tx_op_array();
reset_undo_node();
}
ObTxLogType ObTxLogCb::get_last_log_type() const

View File

@ -75,7 +75,8 @@ class ObTxLogCb : public ObTxBaseLogCb,
public common::ObDLinkBase<ObTxLogCb>
{
public:
ObTxLogCb() : extra_cb_(nullptr), need_free_extra_cb_(false), tx_op_array_(nullptr) { reset(); }
ObTxLogCb() : extra_cb_(nullptr), need_free_extra_cb_(false), tx_op_array_(nullptr),
undo_node_(nullptr) { reset(); }
~ObTxLogCb() { destroy(); }
int init(const share::ObLSID &key,
const ObTransID &trans_id,
@ -95,6 +96,8 @@ public:
tx_data_guard_.init(tx_data);
}
}
void reset_undo_node();
storage::ObUndoStatusNode *&get_undo_node() { return undo_node_; }
void set_undo_action(const ObUndoAction &undo_action) {
undo_action_ = undo_action;
}
@ -171,6 +174,7 @@ private:
bool need_free_extra_cb_;
ObUndoAction undo_action_;
storage::ObTxOpArray *tx_op_array_;
storage::ObUndoStatusNode *undo_node_;
//bool is_callbacking_;
};

View File

@ -549,7 +549,7 @@ int ObTxData::reserve_undo(ObTxTable *tx_table)
return ret;
}
int ObTxData::add_undo_action(ObTxTable *tx_table, transaction::ObUndoAction &new_undo_action, ObUndoStatusNode *undo_node)
int ObTxData::add_undo_action(ObTxTable *tx_table, transaction::ObUndoAction &new_undo_action, ObUndoStatusNode *&undo_node)
{
// STORAGE_LOG(DEBUG, "do add_undo_action");
int ret = OB_SUCCESS;
@ -599,10 +599,6 @@ int ObTxData::add_undo_action(ObTxTable *tx_table, transaction::ObUndoAction &ne
}
}
}
if (OB_NOT_NULL(undo_node)) {
tx_data_table->free_undo_status_node(undo_node);
}
}
return ret;
}

View File

@ -317,7 +317,12 @@ public:
*/
OB_NOINLINE int add_undo_action(ObTxTable *tx_table,
transaction::ObUndoAction &undo_action,
ObUndoStatusNode *undo_node = nullptr);
ObUndoStatusNode *&undo_node);
OB_NOINLINE int add_undo_action(ObTxTable *tx_table,
transaction::ObUndoAction &undo_action) {
ObUndoStatusNode *undo_status_node = nullptr;
return add_undo_action(tx_table, undo_action, undo_status_node);
}
/**
* @brief Check if this tx data is valid
*/

View File

@ -123,6 +123,8 @@ ObTxNode::ObTxNode(const int64_t ls_id,
tenant_.enable_tenant_ctx_check_ = false;
tenant_.set(&fake_tenant_freezer_);
tenant_.set(&fake_part_trans_ctx_pool_);
fake_shared_mem_alloc_mgr_.init();
tenant_.set(&fake_shared_mem_alloc_mgr_);
tenant_.start();
ObTenantEnv::set_tenant(&tenant_);
ObTableHandleV2 lock_memtable_handle;

View File

@ -305,6 +305,7 @@ public:
ObLockTable fake_lock_table_;
ObFakeTxTable fake_tx_table_;
ObTenantFreezer fake_tenant_freezer_;
ObSharedMemAllocMgr fake_shared_mem_alloc_mgr_;
ObLS fake_ls_;
ObFreezer fake_freezer_;
ObTxNodeRole role_;

View File

@ -56,6 +56,7 @@ public:
slice_allocator_.set_nway(32);
FAKE_ALLOCATOR->init("FAKE_A");
FAKE_ALLOCATOR2->init();
tx_data_allocator_ = FAKE_ALLOCATOR;
is_inited_ = true;
}
virtual int init(ObLS *ls, ObTxCtxTable *tx_ctx_table) override