create savepoint not abort tx to avoid concurrent with stmt execute
This commit is contained in:
@ -389,7 +389,8 @@ int ls_sync_rollback_savepoint__(ObPartTransCtx *part_ctx,
|
|||||||
ObIArray<ObTxLSEpochPair> &downstream_parts);
|
ObIArray<ObTxLSEpochPair> &downstream_parts);
|
||||||
void tx_post_terminate_(ObTxDesc &tx);
|
void tx_post_terminate_(ObTxDesc &tx);
|
||||||
int start_epoch_(ObTxDesc &tx);
|
int start_epoch_(ObTxDesc &tx);
|
||||||
int tx_sanity_check_(ObTxDesc &tx);
|
// in_stmt means stmt is executing
|
||||||
|
int tx_sanity_check_(ObTxDesc &tx, const bool in_stmt = false);
|
||||||
bool tx_need_reset_(const int error_code) const;
|
bool tx_need_reset_(const int error_code) const;
|
||||||
int get_tx_table_guard_(ObLS *ls,
|
int get_tx_table_guard_(ObLS *ls,
|
||||||
const share::ObLSID &ls_id,
|
const share::ObLSID &ls_id,
|
||||||
|
|||||||
@ -935,7 +935,7 @@ int ObTransService::create_implicit_savepoint(ObTxDesc &tx,
|
|||||||
// TODO: rework this interface, allow skip pass tx_param if not required
|
// TODO: rework this interface, allow skip pass tx_param if not required
|
||||||
ret = OB_INVALID_ARGUMENT;
|
ret = OB_INVALID_ARGUMENT;
|
||||||
TRANS_LOG(WARN, "tx param invalid", K(ret), K(tx_param), K(tx));
|
TRANS_LOG(WARN, "tx param invalid", K(ret), K(tx_param), K(tx));
|
||||||
} else if (OB_FAIL(tx_sanity_check_(tx))) {
|
} else if (OB_FAIL(tx_sanity_check_(tx, !release))) {
|
||||||
} else if (tx.state_ >= ObTxDesc::State::IN_TERMINATE) {
|
} else if (tx.state_ >= ObTxDesc::State::IN_TERMINATE) {
|
||||||
ret = OB_TRANS_INVALID_STATE;
|
ret = OB_TRANS_INVALID_STATE;
|
||||||
TRANS_LOG(WARN, "create implicit savepoint but tx terminated", K(ret), K(tx));
|
TRANS_LOG(WARN, "create implicit savepoint but tx terminated", K(ret), K(tx));
|
||||||
@ -1038,7 +1038,7 @@ int ObTransService::rollback_to_implicit_savepoint(ObTxDesc &tx,
|
|||||||
{
|
{
|
||||||
int ret = OB_SUCCESS;
|
int ret = OB_SUCCESS;
|
||||||
ObSpinLockGuard guard(tx.lock_);
|
ObSpinLockGuard guard(tx.lock_);
|
||||||
if (OB_FAIL(tx_sanity_check_(tx))) {
|
if (OB_FAIL(tx_sanity_check_(tx, true))) {
|
||||||
} else if (savepoint.get_branch() // NOTE: branch savepoint only support local rollback
|
} else if (savepoint.get_branch() // NOTE: branch savepoint only support local rollback
|
||||||
|| tx.flags_.SHADOW_) {
|
|| tx.flags_.SHADOW_) {
|
||||||
if (OB_NOT_NULL(extra_touched_ls)) {
|
if (OB_NOT_NULL(extra_touched_ls)) {
|
||||||
@ -1981,7 +1981,7 @@ int ObTransService::release_tx_ref(ObTxDesc &tx)
|
|||||||
return tx_desc_mgr_.release_tx_ref(&tx);
|
return tx_desc_mgr_.release_tx_ref(&tx);
|
||||||
}
|
}
|
||||||
|
|
||||||
OB_INLINE int ObTransService::tx_sanity_check_(ObTxDesc &tx)
|
OB_INLINE int ObTransService::tx_sanity_check_(ObTxDesc &tx, const bool in_stmt)
|
||||||
{
|
{
|
||||||
int ret = OB_SUCCESS;
|
int ret = OB_SUCCESS;
|
||||||
if (tx.expire_ts_ <= ObClockGenerator::getClock()) {
|
if (tx.expire_ts_ <= ObClockGenerator::getClock()) {
|
||||||
@ -1996,8 +1996,14 @@ OB_INLINE int ObTransService::tx_sanity_check_(ObTxDesc &tx)
|
|||||||
case ObTxDesc::State::ACTIVE:
|
case ObTxDesc::State::ACTIVE:
|
||||||
case ObTxDesc::State::IMPLICIT_ACTIVE:
|
case ObTxDesc::State::IMPLICIT_ACTIVE:
|
||||||
if (tx.flags_.PART_ABORTED_) {
|
if (tx.flags_.PART_ABORTED_) {
|
||||||
|
if (!in_stmt) {
|
||||||
|
// not inside of stmt(stmt has not began to execute), abort tx now
|
||||||
TRANS_LOG(WARN, "some participant was aborted, abort tx now");
|
TRANS_LOG(WARN, "some participant was aborted, abort tx now");
|
||||||
abort_tx_(tx, tx.abort_cause_);
|
abort_tx_(tx, tx.abort_cause_);
|
||||||
|
} else {
|
||||||
|
// abort tx after stmt has executed
|
||||||
|
TRANS_LOG(WARN, "some participant was aborted, but inside stmt, not abort tx now");
|
||||||
|
}
|
||||||
// go through
|
// go through
|
||||||
} else {
|
} else {
|
||||||
break;
|
break;
|
||||||
|
|||||||
@ -337,6 +337,29 @@ TEST_F(ObTestTx, rollback_savepoint_with_need_retry_error)
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
TEST_F(ObTestTx, create_savepoint_with_sanity_check_tx_abort)
|
||||||
|
{
|
||||||
|
START_ONE_TX_NODE(n1);
|
||||||
|
PREPARE_TX(n1, tx);
|
||||||
|
PREPARE_TX_PARAM(tx_param);
|
||||||
|
GET_READ_SNAPSHOT(n1, tx, tx_param, snapshot);
|
||||||
|
tx.flags_.PART_ABORTED_ = 1;
|
||||||
|
ObTxSEQ sp;
|
||||||
|
ASSERT_EQ(OB_TRANS_NEED_ROLLBACK, n1->create_implicit_savepoint(tx, tx_param, sp));
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
TEST_F(ObTestTx, rollback_savepoint_with_sanity_check_tx_abort)
|
||||||
|
{
|
||||||
|
START_ONE_TX_NODE(n1);
|
||||||
|
PREPARE_TX(n1, tx);
|
||||||
|
PREPARE_TX_PARAM(tx_param);
|
||||||
|
GET_READ_SNAPSHOT(n1, tx, tx_param, snapshot);
|
||||||
|
CREATE_IMPLICIT_SAVEPOINT(n1, tx, tx_param, sp);
|
||||||
|
tx.flags_.PART_ABORTED_ = 1;
|
||||||
|
ASSERT_EQ(OB_TRANS_NEED_ROLLBACK, n1->rollback_to_implicit_savepoint(tx, sp, n1->ts_after_ms(5), nullptr));
|
||||||
|
}
|
||||||
|
|
||||||
TEST_F(ObTestTx, switch_to_follower_gracefully)
|
TEST_F(ObTestTx, switch_to_follower_gracefully)
|
||||||
{
|
{
|
||||||
int ret = OB_SUCCESS;
|
int ret = OB_SUCCESS;
|
||||||
|
|||||||
Reference in New Issue
Block a user