From d3da1d9ffc1b4fa8ba2d14962275ff99fb950fea Mon Sep 17 00:00:00 2001 From: obdev Date: Thu, 8 Feb 2024 21:50:06 +0000 Subject: [PATCH] create savepoint not abort tx to avoid concurrent with stmt execute --- src/storage/tx/ob_trans_service_v4.h | 3 ++- src/storage/tx/ob_tx_api.cpp | 16 +++++++++++----- unittest/storage/tx/it/test_tx.cpp | 23 +++++++++++++++++++++++ 3 files changed, 36 insertions(+), 6 deletions(-) diff --git a/src/storage/tx/ob_trans_service_v4.h b/src/storage/tx/ob_trans_service_v4.h index 5a0bad7c5b..3ef7af9de0 100644 --- a/src/storage/tx/ob_trans_service_v4.h +++ b/src/storage/tx/ob_trans_service_v4.h @@ -389,7 +389,8 @@ int ls_sync_rollback_savepoint__(ObPartTransCtx *part_ctx, ObIArray &downstream_parts); void tx_post_terminate_(ObTxDesc &tx); int start_epoch_(ObTxDesc &tx); -int tx_sanity_check_(ObTxDesc &tx); +// in_stmt means stmt is executing +int tx_sanity_check_(ObTxDesc &tx, const bool in_stmt = false); bool tx_need_reset_(const int error_code) const; int get_tx_table_guard_(ObLS *ls, const share::ObLSID &ls_id, diff --git a/src/storage/tx/ob_tx_api.cpp b/src/storage/tx/ob_tx_api.cpp index 6bde477a7e..3febc7a346 100644 --- a/src/storage/tx/ob_tx_api.cpp +++ b/src/storage/tx/ob_tx_api.cpp @@ -935,7 +935,7 @@ int ObTransService::create_implicit_savepoint(ObTxDesc &tx, // TODO: rework this interface, allow skip pass tx_param if not required ret = OB_INVALID_ARGUMENT; TRANS_LOG(WARN, "tx param invalid", K(ret), K(tx_param), K(tx)); - } else if (OB_FAIL(tx_sanity_check_(tx))) { + } else if (OB_FAIL(tx_sanity_check_(tx, !release))) { } else if (tx.state_ >= ObTxDesc::State::IN_TERMINATE) { ret = OB_TRANS_INVALID_STATE; TRANS_LOG(WARN, "create implicit savepoint but tx terminated", K(ret), K(tx)); @@ -1038,7 +1038,7 @@ int ObTransService::rollback_to_implicit_savepoint(ObTxDesc &tx, { int ret = OB_SUCCESS; ObSpinLockGuard guard(tx.lock_); - if (OB_FAIL(tx_sanity_check_(tx))) { + if (OB_FAIL(tx_sanity_check_(tx, true))) { } else if (savepoint.get_branch() // NOTE: branch savepoint only support local rollback || tx.flags_.SHADOW_) { if (OB_NOT_NULL(extra_touched_ls)) { @@ -1981,7 +1981,7 @@ int ObTransService::release_tx_ref(ObTxDesc &tx) return tx_desc_mgr_.release_tx_ref(&tx); } -OB_INLINE int ObTransService::tx_sanity_check_(ObTxDesc &tx) +OB_INLINE int ObTransService::tx_sanity_check_(ObTxDesc &tx, const bool in_stmt) { int ret = OB_SUCCESS; if (tx.expire_ts_ <= ObClockGenerator::getClock()) { @@ -1996,8 +1996,14 @@ OB_INLINE int ObTransService::tx_sanity_check_(ObTxDesc &tx) case ObTxDesc::State::ACTIVE: case ObTxDesc::State::IMPLICIT_ACTIVE: if (tx.flags_.PART_ABORTED_) { - TRANS_LOG(WARN, "some participant was aborted, abort tx now"); - abort_tx_(tx, tx.abort_cause_); + if (!in_stmt) { + // not inside of stmt(stmt has not began to execute), abort tx now + TRANS_LOG(WARN, "some participant was aborted, abort tx now"); + abort_tx_(tx, tx.abort_cause_); + } else { + // abort tx after stmt has executed + TRANS_LOG(WARN, "some participant was aborted, but inside stmt, not abort tx now"); + } // go through } else { break; diff --git a/unittest/storage/tx/it/test_tx.cpp b/unittest/storage/tx/it/test_tx.cpp index f00c901a30..fd525a83c7 100644 --- a/unittest/storage/tx/it/test_tx.cpp +++ b/unittest/storage/tx/it/test_tx.cpp @@ -337,6 +337,29 @@ TEST_F(ObTestTx, rollback_savepoint_with_need_retry_error) } } +TEST_F(ObTestTx, create_savepoint_with_sanity_check_tx_abort) +{ + START_ONE_TX_NODE(n1); + PREPARE_TX(n1, tx); + PREPARE_TX_PARAM(tx_param); + GET_READ_SNAPSHOT(n1, tx, tx_param, snapshot); + tx.flags_.PART_ABORTED_ = 1; + ObTxSEQ sp; + ASSERT_EQ(OB_TRANS_NEED_ROLLBACK, n1->create_implicit_savepoint(tx, tx_param, sp)); + +} + +TEST_F(ObTestTx, rollback_savepoint_with_sanity_check_tx_abort) +{ + START_ONE_TX_NODE(n1); + PREPARE_TX(n1, tx); + PREPARE_TX_PARAM(tx_param); + GET_READ_SNAPSHOT(n1, tx, tx_param, snapshot); + CREATE_IMPLICIT_SAVEPOINT(n1, tx, tx_param, sp); + tx.flags_.PART_ABORTED_ = 1; + ASSERT_EQ(OB_TRANS_NEED_ROLLBACK, n1->rollback_to_implicit_savepoint(tx, sp, n1->ts_after_ms(5), nullptr)); +} + TEST_F(ObTestTx, switch_to_follower_gracefully) { int ret = OB_SUCCESS;