diff --git a/src/sql/ob_sql_trans_control.cpp b/src/sql/ob_sql_trans_control.cpp index c3b27b1f2d..25c2ff453e 100644 --- a/src/sql/ob_sql_trans_control.cpp +++ b/src/sql/ob_sql_trans_control.cpp @@ -1028,6 +1028,7 @@ int ObSqlTransControl::end_stmt(ObExecContext &exec_ctx, const bool rollback) sql::stmt::StmtType stmt_type = sql::stmt::StmtType::T_NONE; bool is_plain_select = false; transaction::ObTxSEQ savepoint = das_ctx.get_savepoint(); + int exec_errcode = exec_ctx.get_errcode(); CK (OB_NOT_NULL(session), OB_NOT_NULL(plan_ctx)); CK (OB_NOT_NULL(plan = plan_ctx->get_phy_plan())); @@ -1053,7 +1054,7 @@ int ObSqlTransControl::end_stmt(ObExecContext &exec_ctx, const bool rollback) } else if (rollback) { auto stmt_expire_ts = get_stmt_expire_ts(plan_ctx, *session); auto &touched_ls = tx_result.get_touched_ls(); - OZ (txs->rollback_to_implicit_savepoint(*tx_desc, savepoint, stmt_expire_ts, &touched_ls), + OZ (txs->rollback_to_implicit_savepoint(*tx_desc, savepoint, stmt_expire_ts, &touched_ls, exec_errcode), savepoint, stmt_expire_ts, touched_ls); // prioritize returning session error code if (session->is_terminate(ret)) { diff --git a/src/storage/tx/ob_trans_define_v4.cpp b/src/storage/tx/ob_trans_define_v4.cpp index 95ae23dd64..a0b027b079 100644 --- a/src/storage/tx/ob_trans_define_v4.cpp +++ b/src/storage/tx/ob_trans_define_v4.cpp @@ -731,8 +731,15 @@ void ObTxDesc::implicit_start_tx_() { if (parts_.count() > 0 && state_ == ObTxDesc::State::IDLE) { state_ = ObTxDesc::State::IMPLICIT_ACTIVE; - active_ts_ = ObClockGenerator::getClock(); - expire_ts_ = active_ts_ + timeout_us_; + if (expire_ts_ == INT64_MAX ) { + /* + * To calculate transaction's execution time + * and determine whether transaction has timeout + * just set active_ts and expire_ts on stmt's first execution + */ + active_ts_ = ObClockGenerator::getClock(); + expire_ts_ = active_ts_ + timeout_us_; + } active_scn_ = get_tx_seq(); state_change_flags_.mark_all(); } diff --git a/src/storage/tx/ob_trans_service_v4.h b/src/storage/tx/ob_trans_service_v4.h index 2ec4f4c2cd..c2c78419c5 100644 --- a/src/storage/tx/ob_trans_service_v4.h +++ b/src/storage/tx/ob_trans_service_v4.h @@ -368,7 +368,8 @@ int rollback_to_local_implicit_savepoint_(ObTxDesc &tx, int rollback_to_global_implicit_savepoint_(ObTxDesc &tx, const ObTxSEQ savepoint, const int64_t expire_ts, - const share::ObLSArray *extra_touched_ls); + const share::ObLSArray *extra_touched_ls, + const int exec_errcode); int ls_sync_rollback_savepoint__(ObPartTransCtx *part_ctx, const ObTxSEQ savepoint, const int64_t op_sn, @@ -376,6 +377,7 @@ int ls_sync_rollback_savepoint__(ObPartTransCtx *part_ctx, void tx_post_terminate_(ObTxDesc &tx); int start_epoch_(ObTxDesc &tx); int tx_sanity_check_(ObTxDesc &tx); +bool tx_need_reset_(const int error_code) const; int get_tx_table_guard_(ObLS *ls, const share::ObLSID &ls_id, ObTxTableGuard &guard); diff --git a/src/storage/tx/ob_tx_api.cpp b/src/storage/tx/ob_tx_api.cpp index f67e9630a4..221f11bba9 100644 --- a/src/storage/tx/ob_tx_api.cpp +++ b/src/storage/tx/ob_tx_api.cpp @@ -997,7 +997,8 @@ int ObTransService::create_global_implicit_savepoint_(ObTxDesc &tx, int ObTransService::rollback_to_implicit_savepoint(ObTxDesc &tx, const ObTxSEQ savepoint, const int64_t expire_ts, - const share::ObLSArray *extra_touched_ls) + const share::ObLSArray *extra_touched_ls, + const int exec_errcode) { int ret = OB_SUCCESS; ObSpinLockGuard guard(tx.lock_); @@ -1014,7 +1015,8 @@ int ObTransService::rollback_to_implicit_savepoint(ObTxDesc &tx, ret = rollback_to_global_implicit_savepoint_(tx, savepoint, expire_ts, - extra_touched_ls); + extra_touched_ls, + exec_errcode); } return ret; } @@ -1067,12 +1069,13 @@ int ObTransService::rollback_to_local_implicit_savepoint_(ObTxDesc &tx, int ObTransService::rollback_to_global_implicit_savepoint_(ObTxDesc &tx, const ObTxSEQ savepoint, const int64_t expire_ts, - const share::ObLSArray *extra_touched_ls) + const share::ObLSArray *extra_touched_ls, + const int exec_errcode) { int ret = OB_SUCCESS; int64_t start_ts = ObTimeUtility::current_time(); tx.inc_op_sn(); - bool reset_tx = false, normal_rollback = false; + bool reset_tx = false, normal_rollback = false, reset_tx_state = false; // merge extra touched ls if (OB_NOT_NULL(extra_touched_ls) && !extra_touched_ls->empty()) { if (OB_FAIL(tx.update_parts(*extra_touched_ls))) { @@ -1099,12 +1102,15 @@ int ObTransService::rollback_to_global_implicit_savepoint_(ObTxDesc &tx, && !tx.has_implicit_savepoint() // to first savepoint && tx.active_scn_ >= savepoint // rollback all dirty state && !tx.has_extra_state_()) { // hasn't explicit savepoint or serializable snapshot - reset_tx = true; /* - * Avoid lock conflicts between first stmt retry and tx async abort(end first stmt caused) - * Add a sync rollback process before async abort tx. + * if sql execute error code don't need reset(abort) tx but need rollback stmt and retry + * e.g. "lock conflict error" + * to ensure next retry can still recognize it is first stmt in transaction + * we should reset tx state */ - normal_rollback = true; + reset_tx = tx_need_reset_(exec_errcode); + reset_tx_state = !reset_tx; + normal_rollback = !reset_tx; } else { normal_rollback = true; } @@ -1140,7 +1146,13 @@ int ObTransService::rollback_to_global_implicit_savepoint_(ObTxDesc &tx, tx.inc_op_sn(); abort_tx_(tx, ObTxAbortCause::SAVEPOINT_ROLLBACK_FAIL); } else { - /* + if (reset_tx_state) { + // first stmt retry need reset tx state + tx.state_ = ObTxDesc::State::IDLE; + tx.parts_.reset(); + tx.active_scn_.reset(); + } + /* * advance txn op_seqence to barrier duplicate rollback msg * otherwise, rollback may erase following write */ @@ -2001,6 +2013,24 @@ int ObTransService::sql_stmt_end_hook(const ObXATransID &xid, ObTxDesc &tx) } return ret; } + +// error code whether need reset tx when rollback fisrt stmt +// error codes do not need reset tx +// 1. lock conflict error code +bool ObTransService::tx_need_reset_(const int error_code) const +{ + bool ret = true; + switch (error_code) { + case OB_TRANSACTION_SET_VIOLATION: + case OB_TRY_LOCK_ROW_CONFLICT: + ret = false; + break; + default: + ret = true; + break; + } + return ret; +} } // transaction } // namespace #undef TXN_API_SANITY_CHECK_FOR_TXN_FREE_ROUTE diff --git a/src/storage/tx/ob_tx_api.h b/src/storage/tx/ob_tx_api.h index 0be089c043..a5f073f695 100644 --- a/src/storage/tx/ob_tx_api.h +++ b/src/storage/tx/ob_tx_api.h @@ -361,6 +361,7 @@ int create_explicit_savepoint(ObTxDesc &tx, * the savepoint but not sensed by this * transaction for some reason * (eg. network partition, OutOfMemory) + * @exec_errcode: stmt execution error code * * Return: * OB_SUCCESS - OK @@ -372,7 +373,8 @@ int create_explicit_savepoint(ObTxDesc &tx, int rollback_to_implicit_savepoint(ObTxDesc &tx, const ObTxSEQ savepoint, const int64_t expire_ts, - const share::ObLSArray *extra_touched_ls); + const share::ObLSArray *extra_touched_ls, + const int exec_errcode = OB_SUCCESS); /** * rollback_to_explicit_savepoint - rollback to a explicit savepoint diff --git a/unittest/storage/tx/it/test_tx.cpp b/unittest/storage/tx/it/test_tx.cpp index 79e1afea38..30e9a27f94 100644 --- a/unittest/storage/tx/it/test_tx.cpp +++ b/unittest/storage/tx/it/test_tx.cpp @@ -223,6 +223,36 @@ TEST_F(ObTestTx, rollback_savepoint_with_uncertain_participants) ASSERT_EQ(0, tx.parts_.count()); } +TEST_F(ObTestTx, rollback_savepoint_with_need_retry_error) +{ + START_ONE_TX_NODE(n1); + PREPARE_TX(n1, tx); + PREPARE_TX_PARAM(tx_param); + GET_READ_SNAPSHOT(n1, tx, tx_param, snapshot); + + { + CREATE_IMPLICIT_SAVEPOINT(n1, tx, tx_param, sp); + ASSERT_TRUE(sp.is_valid()); + ASSERT_EQ(OB_SUCCESS, n1->write(tx, snapshot, 100, 200)); + ASSERT_EQ(OB_SUCCESS, n1->rollback_to_implicit_savepoint(tx, sp, n1->ts_after_ms(5), nullptr, OB_TRANSACTION_SET_VIOLATION)); + ASSERT_EQ(ObTxDesc::State::IDLE, tx.state_); + ASSERT_EQ(0, tx.parts_.count()); + ASSERT_EQ(ObTxSEQ::INVL(), tx.active_scn_); + ASSERT_EQ(OB_SUCCESS, n1->rollback_to_implicit_savepoint(tx, sp, n1->ts_after_ms(5), nullptr)); + } + + { + CREATE_IMPLICIT_SAVEPOINT(n1, tx, tx_param, sp); + ASSERT_TRUE(sp.is_valid()); + ASSERT_EQ(OB_SUCCESS, n1->write(tx, snapshot, 100, 200)); + ASSERT_EQ(OB_SUCCESS, n1->rollback_to_implicit_savepoint(tx, sp, n1->ts_after_ms(5), nullptr, OB_TRY_LOCK_ROW_CONFLICT)); + ASSERT_EQ(ObTxDesc::State::IDLE, tx.state_); + ASSERT_EQ(0, tx.parts_.count()); + ASSERT_EQ(ObTxSEQ::INVL(), tx.active_scn_); + ASSERT_EQ(OB_SUCCESS, n1->rollback_to_implicit_savepoint(tx, sp, n1->ts_after_ms(5), nullptr)); + } +} + TEST_F(ObTestTx, switch_to_follower_gracefully) { int ret = OB_SUCCESS;