diff --git a/src/sql/ob_sql_trans_control.cpp b/src/sql/ob_sql_trans_control.cpp index 7901fd2cf0..9869188bcc 100644 --- a/src/sql/ob_sql_trans_control.cpp +++ b/src/sql/ob_sql_trans_control.cpp @@ -560,7 +560,7 @@ int ObSqlTransControl::start_stmt(ObExecContext &exec_ctx) ar_snapshot.version_ = snapshot.core_.version_; ar_snapshot.tx_id_ = snapshot.core_.tx_id_.get_id(); ar_snapshot.scn_ = snapshot.core_.scn_.cast_to_int(); - (void)snapshot.generate_snapshot_source(audit_record.snapshot_source_, sizeof(audit_record.snapshot_source_)); + (void)snapshot.format_source_for_display(audit_record.snapshot_source_, sizeof(audit_record.snapshot_source_)); ar_snapshot.source_ = audit_record.snapshot_source_; } if (OB_SUCC(ret) && !session->has_start_stmt()) { diff --git a/src/storage/tx/ob_trans_define_v4.cpp b/src/storage/tx/ob_trans_define_v4.cpp index 3b61b238cd..327e549ae2 100644 --- a/src/storage/tx/ob_trans_define_v4.cpp +++ b/src/storage/tx/ob_trans_define_v4.cpp @@ -383,6 +383,7 @@ ObTxDesc::ObTxDesc() lock_(common::ObLatchIds::TX_DESC_LOCK), commit_cb_lock_(common::ObLatchIds::TX_DESC_COMMIT_LOCK), commit_cb_(NULL), + cb_tid_(-1), exec_info_reap_ts_(0), brpc_mask_set_(), rpc_cond_(), @@ -428,6 +429,7 @@ int ObTxDesc::switch_to_idle() abort_cause_ = 0; can_elr_ = false; commit_cb_ = NULL; + cb_tid_ = -1; exec_info_reap_ts_ = 0; commit_task_.reset(); state_ = State::IDLE; @@ -534,6 +536,7 @@ void ObTxDesc::reset() can_elr_ = false; commit_cb_ = NULL; + cb_tid_ = -1; exec_info_reap_ts_ = 0; brpc_mask_set_.reset(); rpc_cond_.reset(); @@ -925,15 +928,19 @@ bool ObTxDesc::execute_commit_cb() executed = true; cb = commit_cb_; commit_cb_ = NULL; + ATOMIC_STORE_REL(&cb_tid_, GETTID()); // NOTE: it is required add trace event before callback, // because txDesc may be released after callback called REC_TRANS_TRACE_EXT(&tlog_, exec_commit_cb, OB_ID(arg), (void*)cb, OB_ID(ref), get_ref(), OB_ID(thread_id), GETTID()); + commit_cb_lock_.unlock(); cb->callback(commit_out_); + ATOMIC_STORE_REL(&cb_tid_, -1); + } else { + commit_cb_lock_.unlock(); } - commit_cb_lock_.unlock(); } TRANS_LOG(TRACE, "execute_commit_cb", KP(this), K(tx_id), KP(cb), K(executed)); } @@ -1242,14 +1249,14 @@ const char* ObTxReadSnapshot::get_source_name() const } /* - * generate sql_audit's snapshot_source fileds + * format snapshot info for sql audit display * contains: src, ls_id, ls_role, parts * when shorter than 128 char like: * "src:GLOBAL;ls_id:1001;ls_role:LEADER;parts:[(id:1001,epoch:1111),(id:1002,epoch:12222)]" * when longer than 128 char, with "..." in the end * "src:GLOBAL;ls_id:1001;ls_role:LEADER;parts:[(lsid:1001,epoch:1111),(lsid:1002,epoch:122..." */ -int ObTxReadSnapshot::generate_snapshot_source(char *buf, const int64_t buf_len) const +int ObTxReadSnapshot::format_source_for_display(char *buf, const int64_t buf_len) const { int ret = OB_SUCCESS; int64_t pos = 0; diff --git a/src/storage/tx/ob_trans_define_v4.h b/src/storage/tx/ob_trans_define_v4.h index 217715b58f..c6af0a497b 100644 --- a/src/storage/tx/ob_trans_define_v4.h +++ b/src/storage/tx/ob_trans_define_v4.h @@ -275,7 +275,7 @@ struct ObTxReadSnapshot bool is_none_read() const { return SRC::NONE == source_; } bool is_special() const { return SRC::SPECIAL == source_; } bool is_ls_snapshot() const { return SRC::LS == source_; } - int generate_snapshot_source(char *buf, const int64_t buf_len) const; + int format_source_for_display(char *buf, const int64_t buf_len) const; void reset(); int assign(const ObTxReadSnapshot &); ObTxReadSnapshot(); @@ -560,6 +560,7 @@ private: mutable ObSpinLock lock_; ObSpinLock commit_cb_lock_; // protect commit_cb_ field ObITxCallback *commit_cb_; // async commit callback + int64_t cb_tid_; // commit callback thread id int64_t exec_info_reap_ts_; // the time reaping incremental tx exec info RollbackMaskSet brpc_mask_set_; // used in message driven savepoint rollback ObTransCond rpc_cond_; // used in message driven savepoint rollback diff --git a/src/storage/tx/ob_tx_api.cpp b/src/storage/tx/ob_tx_api.cpp index 3febc7a346..8c27265a7e 100644 --- a/src/storage/tx/ob_tx_api.cpp +++ b/src/storage/tx/ob_tx_api.cpp @@ -186,7 +186,8 @@ int ObTransService::reuse_tx(ObTxDesc &tx) // before we reuse it // if reuse come from commit_cb, assume current thread hold one reference - final_ref_cnt = tx.commit_cb_lock_.self_locked() ? 2 : 1; + int64_t cb_tid = ATOMIC_LOAD_ACQ(&tx.cb_tid_); + final_ref_cnt = cb_tid == GETTID() ? 2 : 1; while (tx.get_ref() > final_ref_cnt) { PAUSE(); if (++spin_cnt > 2000) {