exchange tx.lock_.lock() and tx.lock_.unlock() with lock_guard

This commit is contained in:
felix-w15 2023-11-13 06:09:29 +00:00 committed by ob-robot
parent d5ccf4d28c
commit 57e368c905
3 changed files with 202 additions and 184 deletions

View File

@ -917,6 +917,8 @@ private:
bool is_switching_;
};
typedef lib::ObLockGuardWithTimeout<ObSpinLock> ObSpinLockGuardWithTimeout;
#define REC_TRANS_TRACE(recorder_ptr, trace_event) do { \
if (NULL != recorder_ptr) { \
REC_TRACE(*recorder_ptr, trace_event); \

View File

@ -353,32 +353,34 @@ int ObTransService::handle_tx_commit_timeout(ObTxDesc &tx, const int64_t delay)
{
int ret = OB_SUCCESS;
int32_t ref_cnt = 0;
// remember tx_id because tx maybe cleanout and reused
// in this function's following steps.
tx.lock_.lock();
ObTransID tx_id = tx.tx_id_;
int64_t now = ObClockGenerator::getClock();
ObTransID tx_id;
bool cb_executed = false;
if (!tx.commit_task_.is_registered()){
TRANS_LOG(INFO, "task canceled", K(tx));
} else if (FALSE_IT(tx.commit_task_.set_registered(false))) {
} else if (tx.flags_.RELEASED_) {
TRANS_LOG(INFO, "tx released, cancel commit retry", K(tx));
} else if (tx.state_ != ObTxDesc::State::IN_TERMINATE) {
ret = OB_ERR_UNEXPECTED;
TRANS_LOG(WARN, "unexpect tx state", K(ret), K_(tx.state), K(tx));
} else if (tx.expire_ts_ <= now) {
TRANS_LOG(WARN, "tx has timeout", K_(tx.expire_ts), K(tx));
handle_tx_commit_result_(tx, OB_TRANS_TIMEOUT);
} else if (tx.commit_expire_ts_ <= now) {
TRANS_LOG(WARN, "tx commit timeout", K_(tx.commit_expire_ts), K(tx));
handle_tx_commit_result_(tx, OB_TRANS_STMT_TIMEOUT);
} else if (OB_FAIL(do_commit_tx_slowpath_(tx))) {
TRANS_LOG(WARN, "retry do commit tx failed", K(ret), K(tx));
handle_tx_commit_result_(tx, ret);
{
// remember tx_id because tx maybe cleanout and reused
// in this function's following steps.
ObSpinLockGuard guard(tx.lock_);
tx_id = tx.tx_id_;
int64_t now = ObClockGenerator::getClock();
if (!tx.commit_task_.is_registered()){
TRANS_LOG(INFO, "task canceled", K(tx));
} else if (FALSE_IT(tx.commit_task_.set_registered(false))) {
} else if (tx.flags_.RELEASED_) {
TRANS_LOG(INFO, "tx released, cancel commit retry", K(tx));
} else if (tx.state_ != ObTxDesc::State::IN_TERMINATE) {
ret = OB_ERR_UNEXPECTED;
TRANS_LOG(WARN, "unexpect tx state", K(ret), K_(tx.state), K(tx));
} else if (tx.expire_ts_ <= now) {
TRANS_LOG(WARN, "tx has timeout", K_(tx.expire_ts), K(tx));
handle_tx_commit_result_(tx, OB_TRANS_TIMEOUT);
} else if (tx.commit_expire_ts_ <= now) {
TRANS_LOG(WARN, "tx commit timeout", K_(tx.commit_expire_ts), K(tx));
handle_tx_commit_result_(tx, OB_TRANS_STMT_TIMEOUT);
} else if (OB_FAIL(do_commit_tx_slowpath_(tx))) {
TRANS_LOG(WARN, "retry do commit tx failed", K(ret), K(tx));
handle_tx_commit_result_(tx, ret);
}
ref_cnt = tx.get_ref();
}
ref_cnt = tx.get_ref();
tx.lock_.unlock();
cb_executed = tx.execute_commit_cb();
// NOTE:
// it not safe and meaningless to access tx after commit_cb
@ -673,7 +675,7 @@ int ObTransService::decide_tx_commit_info_(ObTxDesc &tx, ObTxPart *&coord)
int ObTransService::prepare_tx_coord(ObTxDesc &tx, share::ObLSID &coord_id)
{
int ret = OB_SUCCESS;
tx.lock_.lock();
ObSpinLockGuard guard(tx.lock_);
ObTxPart *coord = NULL;
if (OB_FAIL(decide_tx_commit_info_(tx, coord))) {
TRANS_LOG(WARN, "fail to decide tx coordinator, tx will abort", K(ret), K(tx));
@ -686,7 +688,6 @@ int ObTransService::prepare_tx_coord(ObTxDesc &tx, share::ObLSID &coord_id)
coord_id = coord->id_;
}
TRANS_LOG(INFO, "generate tx coord", K(ret), K(tx), K(coord_id));
tx.lock_.unlock();
return ret;
}
@ -702,7 +703,7 @@ int ObTransService::prepare_tx(ObTxDesc &tx,
{
int ret = OB_SUCCESS;
int64_t now = ObClockGenerator::getClock();
tx.lock_.lock();
ObSpinLockGuard guard(tx.lock_);
tx.set_commit_cb(&cb);
tx.commit_expire_ts_ = now + timeout_us;
tx.state_ = ObTxDesc::State::SUB_PREPARING;
@ -719,7 +720,6 @@ int ObTransService::prepare_tx(ObTxDesc &tx,
ret = OB_SUCCESS;
}
TRANS_LOG(INFO, "prepare tx", K(ret), K(tx), KP(&cb));
tx.lock_.unlock();
return ret;
}
@ -2655,7 +2655,7 @@ int ObTransService::recover_tx(const ObTxInfo &tx_info, ObTxDesc *&tx)
int ObTransService::get_tx_info(ObTxDesc &tx, ObTxInfo &tx_info)
{
int ret = OB_SUCCESS;
tx.lock_.lock();
ObSpinLockGuard guard(tx.lock_);
if (OB_FAIL(tx_info.parts_.assign(tx.parts_))) {
TRANS_LOG(WARN, "assgin parts fail", K(ret), K(tx));
} else if (OB_FAIL(assign_user_savepoint_(tx, tx_info.savepoints_))) {
@ -2679,7 +2679,6 @@ int ObTransService::get_tx_info(ObTxDesc &tx, ObTxInfo &tx_info)
tx_info.active_scn_ = tx.active_scn_;
tx_info.session_id_ = tx.sess_id_;
}
tx.lock_.unlock();
return ret;
}
@ -2733,7 +2732,7 @@ int ObTransService::update_user_savepoint(ObTxDesc &tx, const ObTxSavePointList
int ObTransService::get_tx_stmt_info(ObTxDesc &tx, ObTxStmtInfo &stmt_info)
{
int ret = OB_SUCCESS;
tx.lock_.lock();
ObSpinLockGuard guard(tx.lock_);
if (OB_FAIL(stmt_info.parts_.assign(tx.parts_))) {
TRANS_LOG(WARN, "assgin parts fail", K(ret), K(tx));
} else if (OB_FAIL(assign_user_savepoint_(tx, stmt_info.savepoints_))) {
@ -2743,7 +2742,6 @@ int ObTransService::get_tx_stmt_info(ObTxDesc &tx, ObTxStmtInfo &stmt_info)
stmt_info.op_sn_ = tx.op_sn_;
stmt_info.state_ = tx.state_;
}
tx.lock_.unlock();
return ret;
}
@ -2789,28 +2787,36 @@ int ObTransService::handle_timeout_for_xa(ObTxDesc &tx, const int64_t delay)
int ret = OB_SUCCESS;
int64_t now = ObClockGenerator::getClock();
bool cb_executed = false;
ObTransID tx_id = tx.tx_id_;
if (OB_FAIL(tx.lock_.lock(5000000))) {
TRANS_LOG(WARN, "failed to acquire lock in specified time", K(tx));
// FIXME: how to handle it without lock protection
// according to handle_tx_commit_timeout
} else {
if (!tx.commit_task_.is_registered()){
TRANS_LOG(INFO, "task canceled", K(tx));
} else if (tx.flags_.RELEASED_) {
TRANS_LOG(INFO, "tx released, cancel commit retry", K(tx));
} else if (FALSE_IT(tx.commit_task_.set_registered(false))) {
ObTransID tx_id;
bool need_cb = false;
{
ObSpinLockGuardWithTimeout guard(tx.lock_, 5000000);
if (OB_FAIL(guard.get_ret())) {
TRANS_LOG(WARN, "failed to acquire lock in specified time", K(tx));
// FIXME: how to handle it without lock protection
// according to handle_tx_commit_timeout
need_cb = false;
} else {
if (ObTxDesc::State::SUB_PREPARING == tx.state_) {
ret = handle_sub_prepare_timeout_(tx, delay);
} else if (ObTxDesc::State::SUB_COMMITTING == tx.state_) {
ret = handle_sub_commit_timeout_(tx, delay);
} else if (ObTxDesc::State::SUB_ROLLBACKING == tx.state_) {
ret = handle_sub_rollback_timeout_(tx, delay);
tx_id = tx.tx_id_;
if (!tx.commit_task_.is_registered()){
TRANS_LOG(INFO, "task canceled", K(tx));
} else if (tx.flags_.RELEASED_) {
TRANS_LOG(INFO, "tx released, cancel commit retry", K(tx));
} else if (FALSE_IT(tx.commit_task_.set_registered(false))) {
} else {
if (ObTxDesc::State::SUB_PREPARING == tx.state_) {
ret = handle_sub_prepare_timeout_(tx, delay);
} else if (ObTxDesc::State::SUB_COMMITTING == tx.state_) {
ret = handle_sub_commit_timeout_(tx, delay);
} else if (ObTxDesc::State::SUB_ROLLBACKING == tx.state_) {
ret = handle_sub_rollback_timeout_(tx, delay);
} else {
}
}
need_cb = true;
}
tx.lock_.unlock();
}
if (need_cb) {
cb_executed = tx.execute_commit_cb();
}
TRANS_LOG(INFO, "handle tx commit timeout", K(ret), K(tx_id), K(cb_executed));

View File

@ -213,24 +213,30 @@ int ObTransService::reuse_tx(ObTxDesc &tx)
int ObTransService::stop_tx(ObTxDesc &tx)
{
int ret = OB_SUCCESS;
tx.lock_.lock();
TRANS_LOG(INFO, "stop_tx, print its trace as following", K(tx));
tx.print_trace_();
if (tx.addr_ != self_) {
// either on txn temp node or xa temp node
// depends on session cleanup to quit
TRANS_LOG(INFO, "this is not txn start node.");
} else {
if (tx.state_ < ObTxDesc::State::IN_TERMINATE) {
abort_tx_(tx, ObTxAbortCause::STOP, true);
} else if (!tx.is_terminated()) {
unregister_commit_retry_task_(tx);
// arm callback arguments
tx.commit_out_ = OB_TRANS_UNKNOWN;
tx.state_ = ObTxDesc::State::COMMIT_UNKNOWN;
bool need_cb = false;
{
ObSpinLockGuard guard(tx.lock_);
TRANS_LOG(INFO, "stop_tx, print its trace as following", K(tx));
tx.print_trace_();
if (tx.addr_ != self_) {
// either on txn temp node or xa temp node
// depends on session cleanup to quit
TRANS_LOG(INFO, "this is not txn start node.");
need_cb = false;
} else {
if (tx.state_ < ObTxDesc::State::IN_TERMINATE) {
abort_tx_(tx, ObTxAbortCause::STOP, true);
} else if (!tx.is_terminated()) {
unregister_commit_retry_task_(tx);
// arm callback arguments
tx.commit_out_ = OB_TRANS_UNKNOWN;
tx.state_ = ObTxDesc::State::COMMIT_UNKNOWN;
}
need_cb = true;
}
tx.lock_.unlock();
// run callback after unlock
}
// run callback after unlock
if (need_cb) {
tx.execute_commit_cb();
}
return ret;
@ -427,134 +433,138 @@ int ObTransService::submit_commit_tx(ObTxDesc &tx,
{
TXN_API_SANITY_CHECK_FOR_TXN_FREE_ROUTE(true)
int ret = OB_SUCCESS;
tx.lock_.lock();
if (tx.commit_ts_ <= 0) {
tx.commit_ts_ = ObClockGenerator::getClock();
}
tx.inc_op_sn();
switch(tx.state_) {
case ObTxDesc::State::IDLE:
TRANS_LOG(TRACE, "commit a dummy tx", K(tx), KP(&cb));
tx.set_commit_cb(&cb);
handle_tx_commit_result_(tx, OB_SUCCESS);
ret = OB_SUCCESS;
break;
case ObTxDesc::State::ABORTED:
handle_tx_commit_result_(tx, OB_TRANS_ROLLBACKED);
ret = OB_TRANS_ROLLBACKED;
break;
case ObTxDesc::State::ROLLED_BACK:
ret = OB_TRANS_ROLLBACKED;
TRANS_LOG(WARN, "insane tx action", K(ret), K(tx));
break;
case ObTxDesc::State::COMMITTED:
ret = OB_TRANS_COMMITED;
TRANS_LOG(WARN, "insane tx action", K(ret), K(tx));
break;
case ObTxDesc::State::IN_TERMINATE:
case ObTxDesc::State::COMMIT_TIMEOUT:
case ObTxDesc::State::COMMIT_UNKNOWN:
ret = OB_TRANS_HAS_DECIDED;
TRANS_LOG(WARN, "insane tx action", K(ret), K(tx));
break;
case ObTxDesc::State::ACTIVE:
case ObTxDesc::State::IMPLICIT_ACTIVE:
if (tx.expire_ts_ <= ObClockGenerator::getClock()) {
TX_STAT_TIMEOUT_INC
TRANS_LOG(WARN, "tx has timeout, it has rollbacked internally", K_(tx.expire_ts), K(tx));
ret = OB_TRANS_ROLLBACKED;
handle_tx_commit_result_(tx, OB_TRANS_ROLLBACKED);
} else if (tx.flags_.PARTS_INCOMPLETE_) {
TRANS_LOG(WARN, "txn participants set incomplete, can not commit", K(ret), K(tx));
abort_tx_(tx, ObTxAbortCause::PARTICIPANTS_SET_INCOMPLETE);
bool need_cb = false;
{
ObSpinLockGuard guard(tx.lock_);
if (tx.commit_ts_ <= 0) {
tx.commit_ts_ = ObClockGenerator::getClock();
}
tx.inc_op_sn();
switch(tx.state_) {
case ObTxDesc::State::IDLE:
TRANS_LOG(TRACE, "commit a dummy tx", K(tx), KP(&cb));
tx.set_commit_cb(&cb);
handle_tx_commit_result_(tx, OB_SUCCESS);
ret = OB_SUCCESS;
break;
case ObTxDesc::State::ABORTED:
handle_tx_commit_result_(tx, OB_TRANS_ROLLBACKED);
ret = OB_TRANS_ROLLBACKED;
} else if (tx.flags_.PART_EPOCH_MISMATCH_) {
TRANS_LOG(WARN, "txn participant state incomplete, can not commit", K(ret), K(tx));
abort_tx_(tx, ObTxAbortCause::PARTICIPANT_STATE_INCOMPLETE);
handle_tx_commit_result_(tx, OB_TRANS_ROLLBACKED);
break;
case ObTxDesc::State::ROLLED_BACK:
ret = OB_TRANS_ROLLBACKED;
} else {
int clean = true;
ARRAY_FOREACH_X(tx.parts_, i, cnt, clean) {
clean = tx.parts_[i].is_without_ctx() || tx.parts_[i].is_clean();
TRANS_LOG(WARN, "insane tx action", K(ret), K(tx));
break;
case ObTxDesc::State::COMMITTED:
ret = OB_TRANS_COMMITED;
TRANS_LOG(WARN, "insane tx action", K(ret), K(tx));
break;
case ObTxDesc::State::IN_TERMINATE:
case ObTxDesc::State::COMMIT_TIMEOUT:
case ObTxDesc::State::COMMIT_UNKNOWN:
ret = OB_TRANS_HAS_DECIDED;
TRANS_LOG(WARN, "insane tx action", K(ret), K(tx));
break;
case ObTxDesc::State::ACTIVE:
case ObTxDesc::State::IMPLICIT_ACTIVE:
if (tx.expire_ts_ <= ObClockGenerator::getClock()) {
TX_STAT_TIMEOUT_INC
TRANS_LOG(WARN, "tx has timeout, it has rollbacked internally", K_(tx.expire_ts), K(tx));
ret = OB_TRANS_ROLLBACKED;
handle_tx_commit_result_(tx, OB_TRANS_ROLLBACKED);
} else if (tx.flags_.PARTS_INCOMPLETE_) {
TRANS_LOG(WARN, "txn participants set incomplete, can not commit", K(ret), K(tx));
abort_tx_(tx, ObTxAbortCause::PARTICIPANTS_SET_INCOMPLETE);
handle_tx_commit_result_(tx, OB_TRANS_ROLLBACKED);
ret = OB_TRANS_ROLLBACKED;
} else if (tx.flags_.PART_EPOCH_MISMATCH_) {
TRANS_LOG(WARN, "txn participant state incomplete, can not commit", K(ret), K(tx));
abort_tx_(tx, ObTxAbortCause::PARTICIPANT_STATE_INCOMPLETE);
handle_tx_commit_result_(tx, OB_TRANS_ROLLBACKED);
ret = OB_TRANS_ROLLBACKED;
} else {
int clean = true;
ARRAY_FOREACH_X(tx.parts_, i, cnt, clean) {
clean = tx.parts_[i].is_without_ctx() || tx.parts_[i].is_clean();
}
if (clean) {
// explicit savepoint rollback cause empty valid-part-set
tx.set_commit_cb(&cb);
abort_participants_(tx); // let part ctx quit
handle_tx_commit_result_(tx, OB_SUCCESS); // commit success
ret = OB_SUCCESS;
}
}
if (clean) {
// explicit savepoint rollback cause empty valid-part-set
tx.set_commit_cb(&cb);
abort_participants_(tx); // let part ctx quit
handle_tx_commit_result_(tx, OB_SUCCESS); // commit success
ret = OB_SUCCESS;
break;
default:
TRANS_LOG(WARN, "anormaly tx state", K(tx));
abort_tx_(tx, ObTxAbortCause::IN_CONSIST_STATE);
handle_tx_commit_result_(tx, OB_TRANS_ROLLBACKED);
ret = OB_TRANS_ROLLBACKED;
}
// normal path, commit cont.
if (OB_SUCC(ret) && (
tx.state_ == ObTxDesc::State::ACTIVE ||
tx.state_ == ObTxDesc::State::IMPLICIT_ACTIVE)) {
ObTxDesc::State state0 = tx.state_;
tx.state_ = ObTxDesc::State::IN_TERMINATE;
// record trace_info
if (OB_NOT_NULL(trace_info) &&
OB_FAIL(tx.trace_info_.set_app_trace_info(*trace_info))) {
TRANS_LOG(WARN, "set trace_info failed", K(ret), KPC(trace_info));
}
SCN commit_version;
if (OB_SUCC(ret) &&
OB_FAIL(do_commit_tx_(tx, expire_ts, cb, commit_version))) {
TRANS_LOG(WARN, "try to commit tx fail, tx will be aborted",
K(ret), K(expire_ts), K(tx), KP(&cb));
// the error may caused by txn has terminated
handle_tx_commit_result_(tx, ret, commit_version);
}
// if txn not terminated, it can be choice to abort
if (OB_FAIL(ret) && tx.state_ == ObTxDesc::State::IN_TERMINATE) {
tx.state_ = state0;
abort_tx_(tx, ret);
handle_tx_commit_result_(tx, OB_TRANS_ROLLBACKED);
ret = OB_TRANS_ROLLBACKED;
}
}
break;
default:
TRANS_LOG(WARN, "anormaly tx state", K(tx));
abort_tx_(tx, ObTxAbortCause::IN_CONSIST_STATE);
handle_tx_commit_result_(tx, OB_TRANS_ROLLBACKED);
ret = OB_TRANS_ROLLBACKED;
}
// normal path, commit cont.
if (OB_SUCC(ret) && (
tx.state_ == ObTxDesc::State::ACTIVE ||
tx.state_ == ObTxDesc::State::IMPLICIT_ACTIVE)) {
ObTxDesc::State state0 = tx.state_;
tx.state_ = ObTxDesc::State::IN_TERMINATE;
// record trace_info
if (OB_NOT_NULL(trace_info) &&
OB_FAIL(tx.trace_info_.set_app_trace_info(*trace_info))) {
TRANS_LOG(WARN, "set trace_info failed", K(ret), KPC(trace_info));
/* NOTE:
* to prevent potential deadlock, distinguish the commit
* completed by current thread from other cases
*/
bool committed = tx.state_ == ObTxDesc::State::COMMITTED;
// if tx committed, we should callback immediately
//
// NOTE: this must defer to final current function
// in order to assure there is no access to tx, because
// after calling the commit_cb, the tx object may be
// released or reused
if (OB_SUCC(ret) && committed) {
need_cb = true;
}
SCN commit_version;
if (OB_SUCC(ret) &&
OB_FAIL(do_commit_tx_(tx, expire_ts, cb, commit_version))) {
TRANS_LOG(WARN, "try to commit tx fail, tx will be aborted",
K(ret), K(expire_ts), K(tx), KP(&cb));
// the error may caused by txn has terminated
handle_tx_commit_result_(tx, ret, commit_version);
}
// if txn not terminated, it can be choice to abort
if (OB_FAIL(ret) && tx.state_ == ObTxDesc::State::IN_TERMINATE) {
tx.state_ = state0;
abort_tx_(tx, ret);
handle_tx_commit_result_(tx, OB_TRANS_ROLLBACKED);
ret = OB_TRANS_ROLLBACKED;
#ifndef NDEBUG
TRANS_LOG(INFO, "submit commit tx", K(ret),
K(committed), KPC(this), K(tx), K(expire_ts), KP(&cb));
#else
if (OB_FAIL(ret)) {
TRANS_LOG(INFO, "submit commit tx fail", K(ret),
K(committed), KPC(this), K(tx), K(expire_ts), KP(&cb));
}
#endif
ObTransTraceLog &tlog = tx.get_tlog();
const char *trace_info_str = (trace_info == NULL ? NULL : trace_info->ptr());
REC_TRANS_TRACE_EXT(&tlog, submit_commit_tx, OB_Y(ret), OB_Y(expire_ts),
OB_ID(tag1), committed,
OB_ID(tag2), trace_info_str,
OB_ID(ref), tx.get_ref(),
OB_ID(thread_id), GETTID());
}
/* NOTE:
* to prevent potential deadlock, distinguish the commit
* completed by current thread from other cases
*/
bool committed = tx.state_ == ObTxDesc::State::COMMITTED;
// if tx committed, we should callback immediately
//
// NOTE: this must defer to final current function
// in order to assure there is no access to tx, because
// after calling the commit_cb, the tx object may be
// released or reused
DEFER({
tx.lock_.unlock();
if (OB_SUCC(ret) && committed) {
direct_execute_commit_cb_(tx);
}
});
#ifndef NDEBUG
TRANS_LOG(INFO, "submit commit tx", K(ret),
K(committed), KPC(this), K(tx), K(expire_ts), KP(&cb));
#else
if (OB_FAIL(ret)) {
TRANS_LOG(INFO, "submit commit tx fail", K(ret),
K(committed), KPC(this), K(tx), K(expire_ts), KP(&cb));
if (need_cb){
direct_execute_commit_cb_(tx);
}
#endif
ObTransTraceLog &tlog = tx.get_tlog();
const char *trace_info_str = (trace_info == NULL ? NULL : trace_info->ptr());
REC_TRANS_TRACE_EXT(&tlog, submit_commit_tx, OB_Y(ret), OB_Y(expire_ts),
OB_ID(tag1), committed,
OB_ID(tag2), trace_info_str,
OB_ID(ref), tx.get_ref(),
OB_ID(thread_id), GETTID());
return ret;
}