tx miss abort at end stmt and add abort check before commit

This commit is contained in:
felix-w15 2024-05-21 05:32:36 +00:00 committed by ob-robot
parent 430086798f
commit 3ef61117ef
4 changed files with 33 additions and 63 deletions

View File

@ -824,12 +824,6 @@ int ObPartTransCtx::commit(const ObTxCommitParts &parts,
int ret = OB_SUCCESS;
int tmp_ret = OB_SUCCESS;
CtxLockGuard guard(lock_);
if (tenant_id_ % 2 == 0) {
TRANS_LOG(DEBUG, "commit transaction now!", "trans_id", get_trans_id());
}
if (is_parallel_logging()) {
TRANS_LOG(INFO, "commit transaction now!", KPC(this));
}
if (IS_NOT_INIT) {
TRANS_LOG(WARN, "ObPartTransCtx not inited");
ret = OB_NOT_INIT;

View File

@ -386,7 +386,6 @@ int rollback_to_local_implicit_savepoint_(ObTxDesc &tx,
int rollback_to_global_implicit_savepoint_(ObTxDesc &tx,
const ObTxSEQ savepoint,
const int64_t expire_ts,
const share::ObLSArray *extra_touched_ls,
const int exec_errcode);
int ls_sync_rollback_savepoint__(ObPartTransCtx *part_ctx,
const ObTxSEQ savepoint,
@ -398,8 +397,7 @@ int ls_sync_rollback_savepoint__(ObPartTransCtx *part_ctx,
ObIArray<ObTxLSEpochPair> &downstream_parts);
void tx_post_terminate_(ObTxDesc &tx);
int start_epoch_(ObTxDesc &tx);
// in_stmt means stmt is executing
int tx_sanity_check_(ObTxDesc &tx, const bool in_stmt = false);
int tx_sanity_check_(ObTxDesc &tx);
bool tx_need_reset_(const int error_code) const;
int get_tx_table_guard_(ObLS *ls,
const share::ObLSID &ls_id,

View File

@ -509,6 +509,11 @@ int ObTransService::submit_commit_tx(ObTxDesc &tx,
abort_tx_(tx, ObTxAbortCause::PARTICIPANT_STATE_INCOMPLETE);
handle_tx_commit_result_(tx, OB_TRANS_ROLLBACKED);
ret = OB_TRANS_ROLLBACKED;
} else if (tx.flags_.PART_ABORTED_) {
TRANS_LOG(WARN, "txn participant aborted, can not commit", K(ret), K(tx));
abort_tx_(tx, OB_TRANS_ROLLBACKED);
handle_tx_commit_result_(tx, OB_TRANS_ROLLBACKED);
ret = OB_TRANS_ROLLBACKED;
} else {
int clean = true;
ARRAY_FOREACH_X(tx.parts_, i, cnt, clean) {
@ -963,7 +968,7 @@ int ObTransService::create_implicit_savepoint(ObTxDesc &tx,
// TODO: rework this interface, allow skip pass tx_param if not required
ret = OB_INVALID_ARGUMENT;
TRANS_LOG(WARN, "tx param invalid", K(ret), K(tx_param), K(tx));
} else if (OB_FAIL(tx_sanity_check_(tx, !release))) {
} else if (OB_FAIL(tx_sanity_check_(tx))) {
} else if (tx.state_ >= ObTxDesc::State::IN_TERMINATE) {
ret = OB_TRANS_INVALID_STATE;
TRANS_LOG(WARN, "create implicit savepoint but tx terminated", K(ret), K(tx));
@ -1067,10 +1072,11 @@ int ObTransService::rollback_to_implicit_savepoint(ObTxDesc &tx,
{
int ret = OB_SUCCESS;
ObSpinLockGuard guard(tx.lock_);
if (OB_FAIL(tx_sanity_check_(tx, true))) {
} else if (savepoint.get_branch() // NOTE: branch savepoint only support local rollback
if (savepoint.get_branch() // NOTE: branch savepoint only support local rollback
|| tx.flags_.SHADOW_) {
if (OB_NOT_NULL(extra_touched_ls)) {
if (OB_FAIL(tx_sanity_check_(tx))) {
} else if (OB_NOT_NULL(extra_touched_ls)) {
ret = OB_NOT_SUPPORTED;
TRANS_LOG(WARN, "rollback on remote only suport collected tx parts",
K(ret), K(savepoint), K(tx));
@ -1078,11 +1084,24 @@ int ObTransService::rollback_to_implicit_savepoint(ObTxDesc &tx,
ret = rollback_to_local_implicit_savepoint_(tx, savepoint, expire_ts);
}
} else {
ret = rollback_to_global_implicit_savepoint_(tx,
savepoint,
expire_ts,
extra_touched_ls,
exec_errcode);
if (tx.state_ < ObTxDesc::State::IN_TERMINATE) {
if (OB_NOT_NULL(extra_touched_ls) && !extra_touched_ls->empty()) {
if (OB_FAIL(tx.update_parts(*extra_touched_ls))) {
TRANS_LOG(WARN, "add tx part with extra_touched_ls fail", K(ret), K(tx), KPC(extra_touched_ls));
abort_tx_(tx, ret);
} else {
TRANS_LOG(INFO, "add tx part with extra_touched_ls", KPC(extra_touched_ls), K_(tx.tx_id));
}
}
}
if (OB_FAIL(tx_sanity_check_(tx))) {
} else {
ret = rollback_to_global_implicit_savepoint_(tx,
savepoint,
expire_ts,
exec_errcode);
}
}
return ret;
}
@ -1164,23 +1183,12 @@ int ObTransService::rollback_to_local_implicit_savepoint_(ObTxDesc &tx,
int ObTransService::rollback_to_global_implicit_savepoint_(ObTxDesc &tx,
const ObTxSEQ savepoint,
const int64_t expire_ts,
const share::ObLSArray *extra_touched_ls,
const int exec_errcode)
{
int ret = OB_SUCCESS;
int64_t start_ts = ObTimeUtility::current_time();
tx.inc_op_sn();
bool reset_tx = false, normal_rollback = false, reset_active_scn = false;
// merge extra touched ls
if (OB_NOT_NULL(extra_touched_ls) && !extra_touched_ls->empty()) {
if (OB_FAIL(tx.update_parts(*extra_touched_ls))) {
TRANS_LOG(WARN, "add tx part with extra_touched_ls fail", K(ret), K(tx), KPC(extra_touched_ls));
abort_tx_(tx, ret);
} else {
TRANS_LOG(INFO, "add tx part with extra_touched_ls", KPC(extra_touched_ls), K_(tx.tx_id));
}
}
if (OB_SUCC(ret)) {
switch(tx.state_) {
case ObTxDesc::State::IDLE:
@ -1236,7 +1244,7 @@ int ObTransService::rollback_to_global_implicit_savepoint_(ObTxDesc &tx,
if (reset_tx) {
} else if (OB_FAIL(ret)) {
TRANS_LOG(WARN, "rollback savepoint fail, abort tx",
K(ret), K(savepoint), KP(extra_touched_ls), K(parts), K(tx));
K(ret), K(savepoint), K(parts), K(tx));
// advance op_sequence to reject further rollback resp messsages
tx.inc_op_sn();
abort_tx_(tx, ObTxAbortCause::SAVEPOINT_ROLLBACK_FAIL);
@ -1277,7 +1285,6 @@ int ObTransService::rollback_to_global_implicit_savepoint_(ObTxDesc &tx,
REC_TRANS_TRACE_EXT(&tlog, rollback_global_implicit_savepoint,
OB_Y(ret), OB_ID(savepoint), savepoint.cast_to_int(), OB_Y(expire_ts),
OB_ID(time_used), elapsed_us,
OB_ID(arg), (void*)extra_touched_ls,
OB_ID(tag1), reset_tx,
OB_ID(opid), tx.op_sn_,
OB_ID(ref), tx.get_ref(),
@ -2014,7 +2021,7 @@ int ObTransService::release_tx_ref(ObTxDesc &tx)
return tx_desc_mgr_.release_tx_ref(&tx);
}
OB_INLINE int ObTransService::tx_sanity_check_(ObTxDesc &tx, const bool in_stmt)
OB_INLINE int ObTransService::tx_sanity_check_(ObTxDesc &tx)
{
int ret = OB_SUCCESS;
if (tx.expire_ts_ <= ObClockGenerator::getClock()) {
@ -2029,14 +2036,8 @@ OB_INLINE int ObTransService::tx_sanity_check_(ObTxDesc &tx, const bool in_stmt)
case ObTxDesc::State::ACTIVE:
case ObTxDesc::State::IMPLICIT_ACTIVE:
if (tx.flags_.PART_ABORTED_) {
if (!in_stmt) {
// not inside of stmt(stmt has not began to execute), abort tx now
TRANS_LOG(WARN, "some participant was aborted, abort tx now");
abort_tx_(tx, tx.abort_cause_);
} else {
// abort tx after stmt has executed
TRANS_LOG(WARN, "some participant was aborted, but inside stmt, not abort tx now");
}
TRANS_LOG(WARN, "some participant was aborted, abort tx now");
abort_tx_(tx, tx.abort_cause_);
// go through
} else {
break;

View File

@ -340,29 +340,6 @@ TEST_F(ObTestTx, rollback_savepoint_with_need_retry_error)
}
}
TEST_F(ObTestTx, create_savepoint_with_sanity_check_tx_abort)
{
START_ONE_TX_NODE(n1);
PREPARE_TX(n1, tx);
PREPARE_TX_PARAM(tx_param);
GET_READ_SNAPSHOT(n1, tx, tx_param, snapshot);
tx.flags_.PART_ABORTED_ = 1;
ObTxSEQ sp;
ASSERT_EQ(OB_TRANS_NEED_ROLLBACK, n1->create_implicit_savepoint(tx, tx_param, sp));
}
TEST_F(ObTestTx, rollback_savepoint_with_sanity_check_tx_abort)
{
START_ONE_TX_NODE(n1);
PREPARE_TX(n1, tx);
PREPARE_TX_PARAM(tx_param);
GET_READ_SNAPSHOT(n1, tx, tx_param, snapshot);
CREATE_IMPLICIT_SAVEPOINT(n1, tx, tx_param, sp);
tx.flags_.PART_ABORTED_ = 1;
ASSERT_EQ(OB_TRANS_NEED_ROLLBACK, n1->rollback_to_implicit_savepoint(tx, sp, n1->ts_after_ms(5), nullptr));
}
TEST_F(ObTestTx, switch_to_follower_gracefully)
{
int ret = OB_SUCCESS;