implicit trx first stmt retry does not abort tx
This commit is contained in:
@ -1028,6 +1028,7 @@ int ObSqlTransControl::end_stmt(ObExecContext &exec_ctx, const bool rollback)
|
|||||||
sql::stmt::StmtType stmt_type = sql::stmt::StmtType::T_NONE;
|
sql::stmt::StmtType stmt_type = sql::stmt::StmtType::T_NONE;
|
||||||
bool is_plain_select = false;
|
bool is_plain_select = false;
|
||||||
transaction::ObTxSEQ savepoint = das_ctx.get_savepoint();
|
transaction::ObTxSEQ savepoint = das_ctx.get_savepoint();
|
||||||
|
int exec_errcode = exec_ctx.get_errcode();
|
||||||
|
|
||||||
CK (OB_NOT_NULL(session), OB_NOT_NULL(plan_ctx));
|
CK (OB_NOT_NULL(session), OB_NOT_NULL(plan_ctx));
|
||||||
CK (OB_NOT_NULL(plan = plan_ctx->get_phy_plan()));
|
CK (OB_NOT_NULL(plan = plan_ctx->get_phy_plan()));
|
||||||
@ -1053,7 +1054,7 @@ int ObSqlTransControl::end_stmt(ObExecContext &exec_ctx, const bool rollback)
|
|||||||
} else if (rollback) {
|
} else if (rollback) {
|
||||||
auto stmt_expire_ts = get_stmt_expire_ts(plan_ctx, *session);
|
auto stmt_expire_ts = get_stmt_expire_ts(plan_ctx, *session);
|
||||||
auto &touched_ls = tx_result.get_touched_ls();
|
auto &touched_ls = tx_result.get_touched_ls();
|
||||||
OZ (txs->rollback_to_implicit_savepoint(*tx_desc, savepoint, stmt_expire_ts, &touched_ls),
|
OZ (txs->rollback_to_implicit_savepoint(*tx_desc, savepoint, stmt_expire_ts, &touched_ls, exec_errcode),
|
||||||
savepoint, stmt_expire_ts, touched_ls);
|
savepoint, stmt_expire_ts, touched_ls);
|
||||||
// prioritize returning session error code
|
// prioritize returning session error code
|
||||||
if (session->is_terminate(ret)) {
|
if (session->is_terminate(ret)) {
|
||||||
|
|||||||
@ -731,8 +731,15 @@ void ObTxDesc::implicit_start_tx_()
|
|||||||
{
|
{
|
||||||
if (parts_.count() > 0 && state_ == ObTxDesc::State::IDLE) {
|
if (parts_.count() > 0 && state_ == ObTxDesc::State::IDLE) {
|
||||||
state_ = ObTxDesc::State::IMPLICIT_ACTIVE;
|
state_ = ObTxDesc::State::IMPLICIT_ACTIVE;
|
||||||
active_ts_ = ObClockGenerator::getClock();
|
if (expire_ts_ == INT64_MAX ) {
|
||||||
expire_ts_ = active_ts_ + timeout_us_;
|
/*
|
||||||
|
* To calculate transaction's execution time
|
||||||
|
* and determine whether transaction has timeout
|
||||||
|
* just set active_ts and expire_ts on stmt's first execution
|
||||||
|
*/
|
||||||
|
active_ts_ = ObClockGenerator::getClock();
|
||||||
|
expire_ts_ = active_ts_ + timeout_us_;
|
||||||
|
}
|
||||||
active_scn_ = get_tx_seq();
|
active_scn_ = get_tx_seq();
|
||||||
state_change_flags_.mark_all();
|
state_change_flags_.mark_all();
|
||||||
}
|
}
|
||||||
|
|||||||
@ -368,7 +368,8 @@ int rollback_to_local_implicit_savepoint_(ObTxDesc &tx,
|
|||||||
int rollback_to_global_implicit_savepoint_(ObTxDesc &tx,
|
int rollback_to_global_implicit_savepoint_(ObTxDesc &tx,
|
||||||
const ObTxSEQ savepoint,
|
const ObTxSEQ savepoint,
|
||||||
const int64_t expire_ts,
|
const int64_t expire_ts,
|
||||||
const share::ObLSArray *extra_touched_ls);
|
const share::ObLSArray *extra_touched_ls,
|
||||||
|
const int exec_errcode);
|
||||||
int ls_sync_rollback_savepoint__(ObPartTransCtx *part_ctx,
|
int ls_sync_rollback_savepoint__(ObPartTransCtx *part_ctx,
|
||||||
const ObTxSEQ savepoint,
|
const ObTxSEQ savepoint,
|
||||||
const int64_t op_sn,
|
const int64_t op_sn,
|
||||||
@ -376,6 +377,7 @@ int ls_sync_rollback_savepoint__(ObPartTransCtx *part_ctx,
|
|||||||
void tx_post_terminate_(ObTxDesc &tx);
|
void tx_post_terminate_(ObTxDesc &tx);
|
||||||
int start_epoch_(ObTxDesc &tx);
|
int start_epoch_(ObTxDesc &tx);
|
||||||
int tx_sanity_check_(ObTxDesc &tx);
|
int tx_sanity_check_(ObTxDesc &tx);
|
||||||
|
bool tx_need_reset_(const int error_code) const;
|
||||||
int get_tx_table_guard_(ObLS *ls,
|
int get_tx_table_guard_(ObLS *ls,
|
||||||
const share::ObLSID &ls_id,
|
const share::ObLSID &ls_id,
|
||||||
ObTxTableGuard &guard);
|
ObTxTableGuard &guard);
|
||||||
|
|||||||
@ -997,7 +997,8 @@ int ObTransService::create_global_implicit_savepoint_(ObTxDesc &tx,
|
|||||||
int ObTransService::rollback_to_implicit_savepoint(ObTxDesc &tx,
|
int ObTransService::rollback_to_implicit_savepoint(ObTxDesc &tx,
|
||||||
const ObTxSEQ savepoint,
|
const ObTxSEQ savepoint,
|
||||||
const int64_t expire_ts,
|
const int64_t expire_ts,
|
||||||
const share::ObLSArray *extra_touched_ls)
|
const share::ObLSArray *extra_touched_ls,
|
||||||
|
const int exec_errcode)
|
||||||
{
|
{
|
||||||
int ret = OB_SUCCESS;
|
int ret = OB_SUCCESS;
|
||||||
ObSpinLockGuard guard(tx.lock_);
|
ObSpinLockGuard guard(tx.lock_);
|
||||||
@ -1014,7 +1015,8 @@ int ObTransService::rollback_to_implicit_savepoint(ObTxDesc &tx,
|
|||||||
ret = rollback_to_global_implicit_savepoint_(tx,
|
ret = rollback_to_global_implicit_savepoint_(tx,
|
||||||
savepoint,
|
savepoint,
|
||||||
expire_ts,
|
expire_ts,
|
||||||
extra_touched_ls);
|
extra_touched_ls,
|
||||||
|
exec_errcode);
|
||||||
}
|
}
|
||||||
return ret;
|
return ret;
|
||||||
}
|
}
|
||||||
@ -1067,12 +1069,13 @@ int ObTransService::rollback_to_local_implicit_savepoint_(ObTxDesc &tx,
|
|||||||
int ObTransService::rollback_to_global_implicit_savepoint_(ObTxDesc &tx,
|
int ObTransService::rollback_to_global_implicit_savepoint_(ObTxDesc &tx,
|
||||||
const ObTxSEQ savepoint,
|
const ObTxSEQ savepoint,
|
||||||
const int64_t expire_ts,
|
const int64_t expire_ts,
|
||||||
const share::ObLSArray *extra_touched_ls)
|
const share::ObLSArray *extra_touched_ls,
|
||||||
|
const int exec_errcode)
|
||||||
{
|
{
|
||||||
int ret = OB_SUCCESS;
|
int ret = OB_SUCCESS;
|
||||||
int64_t start_ts = ObTimeUtility::current_time();
|
int64_t start_ts = ObTimeUtility::current_time();
|
||||||
tx.inc_op_sn();
|
tx.inc_op_sn();
|
||||||
bool reset_tx = false, normal_rollback = false;
|
bool reset_tx = false, normal_rollback = false, reset_tx_state = false;
|
||||||
// merge extra touched ls
|
// merge extra touched ls
|
||||||
if (OB_NOT_NULL(extra_touched_ls) && !extra_touched_ls->empty()) {
|
if (OB_NOT_NULL(extra_touched_ls) && !extra_touched_ls->empty()) {
|
||||||
if (OB_FAIL(tx.update_parts(*extra_touched_ls))) {
|
if (OB_FAIL(tx.update_parts(*extra_touched_ls))) {
|
||||||
@ -1099,12 +1102,15 @@ int ObTransService::rollback_to_global_implicit_savepoint_(ObTxDesc &tx,
|
|||||||
&& !tx.has_implicit_savepoint() // to first savepoint
|
&& !tx.has_implicit_savepoint() // to first savepoint
|
||||||
&& tx.active_scn_ >= savepoint // rollback all dirty state
|
&& tx.active_scn_ >= savepoint // rollback all dirty state
|
||||||
&& !tx.has_extra_state_()) { // hasn't explicit savepoint or serializable snapshot
|
&& !tx.has_extra_state_()) { // hasn't explicit savepoint or serializable snapshot
|
||||||
reset_tx = true;
|
|
||||||
/*
|
/*
|
||||||
* Avoid lock conflicts between first stmt retry and tx async abort(end first stmt caused)
|
* if sql execute error code don't need reset(abort) tx but need rollback stmt and retry
|
||||||
* Add a sync rollback process before async abort tx.
|
* e.g. "lock conflict error"
|
||||||
|
* to ensure next retry can still recognize it is first stmt in transaction
|
||||||
|
* we should reset tx state
|
||||||
*/
|
*/
|
||||||
normal_rollback = true;
|
reset_tx = tx_need_reset_(exec_errcode);
|
||||||
|
reset_tx_state = !reset_tx;
|
||||||
|
normal_rollback = !reset_tx;
|
||||||
} else {
|
} else {
|
||||||
normal_rollback = true;
|
normal_rollback = true;
|
||||||
}
|
}
|
||||||
@ -1140,7 +1146,13 @@ int ObTransService::rollback_to_global_implicit_savepoint_(ObTxDesc &tx,
|
|||||||
tx.inc_op_sn();
|
tx.inc_op_sn();
|
||||||
abort_tx_(tx, ObTxAbortCause::SAVEPOINT_ROLLBACK_FAIL);
|
abort_tx_(tx, ObTxAbortCause::SAVEPOINT_ROLLBACK_FAIL);
|
||||||
} else {
|
} else {
|
||||||
/*
|
if (reset_tx_state) {
|
||||||
|
// first stmt retry need reset tx state
|
||||||
|
tx.state_ = ObTxDesc::State::IDLE;
|
||||||
|
tx.parts_.reset();
|
||||||
|
tx.active_scn_.reset();
|
||||||
|
}
|
||||||
|
/*
|
||||||
* advance txn op_seqence to barrier duplicate rollback msg
|
* advance txn op_seqence to barrier duplicate rollback msg
|
||||||
* otherwise, rollback may erase following write
|
* otherwise, rollback may erase following write
|
||||||
*/
|
*/
|
||||||
@ -2001,6 +2013,24 @@ int ObTransService::sql_stmt_end_hook(const ObXATransID &xid, ObTxDesc &tx)
|
|||||||
}
|
}
|
||||||
return ret;
|
return ret;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// error code whether need reset tx when rollback fisrt stmt
|
||||||
|
// error codes do not need reset tx
|
||||||
|
// 1. lock conflict error code
|
||||||
|
bool ObTransService::tx_need_reset_(const int error_code) const
|
||||||
|
{
|
||||||
|
bool ret = true;
|
||||||
|
switch (error_code) {
|
||||||
|
case OB_TRANSACTION_SET_VIOLATION:
|
||||||
|
case OB_TRY_LOCK_ROW_CONFLICT:
|
||||||
|
ret = false;
|
||||||
|
break;
|
||||||
|
default:
|
||||||
|
ret = true;
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
return ret;
|
||||||
|
}
|
||||||
} // transaction
|
} // transaction
|
||||||
} // namespace
|
} // namespace
|
||||||
#undef TXN_API_SANITY_CHECK_FOR_TXN_FREE_ROUTE
|
#undef TXN_API_SANITY_CHECK_FOR_TXN_FREE_ROUTE
|
||||||
|
|||||||
@ -361,6 +361,7 @@ int create_explicit_savepoint(ObTxDesc &tx,
|
|||||||
* the savepoint but not sensed by this
|
* the savepoint but not sensed by this
|
||||||
* transaction for some reason
|
* transaction for some reason
|
||||||
* (eg. network partition, OutOfMemory)
|
* (eg. network partition, OutOfMemory)
|
||||||
|
* @exec_errcode: stmt execution error code
|
||||||
*
|
*
|
||||||
* Return:
|
* Return:
|
||||||
* OB_SUCCESS - OK
|
* OB_SUCCESS - OK
|
||||||
@ -372,7 +373,8 @@ int create_explicit_savepoint(ObTxDesc &tx,
|
|||||||
int rollback_to_implicit_savepoint(ObTxDesc &tx,
|
int rollback_to_implicit_savepoint(ObTxDesc &tx,
|
||||||
const ObTxSEQ savepoint,
|
const ObTxSEQ savepoint,
|
||||||
const int64_t expire_ts,
|
const int64_t expire_ts,
|
||||||
const share::ObLSArray *extra_touched_ls);
|
const share::ObLSArray *extra_touched_ls,
|
||||||
|
const int exec_errcode = OB_SUCCESS);
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* rollback_to_explicit_savepoint - rollback to a explicit savepoint
|
* rollback_to_explicit_savepoint - rollback to a explicit savepoint
|
||||||
|
|||||||
@ -223,6 +223,36 @@ TEST_F(ObTestTx, rollback_savepoint_with_uncertain_participants)
|
|||||||
ASSERT_EQ(0, tx.parts_.count());
|
ASSERT_EQ(0, tx.parts_.count());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
TEST_F(ObTestTx, rollback_savepoint_with_need_retry_error)
|
||||||
|
{
|
||||||
|
START_ONE_TX_NODE(n1);
|
||||||
|
PREPARE_TX(n1, tx);
|
||||||
|
PREPARE_TX_PARAM(tx_param);
|
||||||
|
GET_READ_SNAPSHOT(n1, tx, tx_param, snapshot);
|
||||||
|
|
||||||
|
{
|
||||||
|
CREATE_IMPLICIT_SAVEPOINT(n1, tx, tx_param, sp);
|
||||||
|
ASSERT_TRUE(sp.is_valid());
|
||||||
|
ASSERT_EQ(OB_SUCCESS, n1->write(tx, snapshot, 100, 200));
|
||||||
|
ASSERT_EQ(OB_SUCCESS, n1->rollback_to_implicit_savepoint(tx, sp, n1->ts_after_ms(5), nullptr, OB_TRANSACTION_SET_VIOLATION));
|
||||||
|
ASSERT_EQ(ObTxDesc::State::IDLE, tx.state_);
|
||||||
|
ASSERT_EQ(0, tx.parts_.count());
|
||||||
|
ASSERT_EQ(ObTxSEQ::INVL(), tx.active_scn_);
|
||||||
|
ASSERT_EQ(OB_SUCCESS, n1->rollback_to_implicit_savepoint(tx, sp, n1->ts_after_ms(5), nullptr));
|
||||||
|
}
|
||||||
|
|
||||||
|
{
|
||||||
|
CREATE_IMPLICIT_SAVEPOINT(n1, tx, tx_param, sp);
|
||||||
|
ASSERT_TRUE(sp.is_valid());
|
||||||
|
ASSERT_EQ(OB_SUCCESS, n1->write(tx, snapshot, 100, 200));
|
||||||
|
ASSERT_EQ(OB_SUCCESS, n1->rollback_to_implicit_savepoint(tx, sp, n1->ts_after_ms(5), nullptr, OB_TRY_LOCK_ROW_CONFLICT));
|
||||||
|
ASSERT_EQ(ObTxDesc::State::IDLE, tx.state_);
|
||||||
|
ASSERT_EQ(0, tx.parts_.count());
|
||||||
|
ASSERT_EQ(ObTxSEQ::INVL(), tx.active_scn_);
|
||||||
|
ASSERT_EQ(OB_SUCCESS, n1->rollback_to_implicit_savepoint(tx, sp, n1->ts_after_ms(5), nullptr));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
TEST_F(ObTestTx, switch_to_follower_gracefully)
|
TEST_F(ObTestTx, switch_to_follower_gracefully)
|
||||||
{
|
{
|
||||||
int ret = OB_SUCCESS;
|
int ret = OB_SUCCESS;
|
||||||
|
|||||||
Reference in New Issue
Block a user