callback sql level should not hold trans level lock
This commit is contained in:
@ -560,7 +560,7 @@ int ObSqlTransControl::start_stmt(ObExecContext &exec_ctx)
|
||||
ar_snapshot.version_ = snapshot.core_.version_;
|
||||
ar_snapshot.tx_id_ = snapshot.core_.tx_id_.get_id();
|
||||
ar_snapshot.scn_ = snapshot.core_.scn_.cast_to_int();
|
||||
(void)snapshot.generate_snapshot_source(audit_record.snapshot_source_, sizeof(audit_record.snapshot_source_));
|
||||
(void)snapshot.format_source_for_display(audit_record.snapshot_source_, sizeof(audit_record.snapshot_source_));
|
||||
ar_snapshot.source_ = audit_record.snapshot_source_;
|
||||
}
|
||||
if (OB_SUCC(ret) && !session->has_start_stmt()) {
|
||||
|
||||
@ -383,6 +383,7 @@ ObTxDesc::ObTxDesc()
|
||||
lock_(common::ObLatchIds::TX_DESC_LOCK),
|
||||
commit_cb_lock_(common::ObLatchIds::TX_DESC_COMMIT_LOCK),
|
||||
commit_cb_(NULL),
|
||||
cb_tid_(-1),
|
||||
exec_info_reap_ts_(0),
|
||||
brpc_mask_set_(),
|
||||
rpc_cond_(),
|
||||
@ -428,6 +429,7 @@ int ObTxDesc::switch_to_idle()
|
||||
abort_cause_ = 0;
|
||||
can_elr_ = false;
|
||||
commit_cb_ = NULL;
|
||||
cb_tid_ = -1;
|
||||
exec_info_reap_ts_ = 0;
|
||||
commit_task_.reset();
|
||||
state_ = State::IDLE;
|
||||
@ -534,6 +536,7 @@ void ObTxDesc::reset()
|
||||
can_elr_ = false;
|
||||
|
||||
commit_cb_ = NULL;
|
||||
cb_tid_ = -1;
|
||||
exec_info_reap_ts_ = 0;
|
||||
brpc_mask_set_.reset();
|
||||
rpc_cond_.reset();
|
||||
@ -925,15 +928,19 @@ bool ObTxDesc::execute_commit_cb()
|
||||
executed = true;
|
||||
cb = commit_cb_;
|
||||
commit_cb_ = NULL;
|
||||
ATOMIC_STORE_REL(&cb_tid_, GETTID());
|
||||
// NOTE: it is required add trace event before callback,
|
||||
// because txDesc may be released after callback called
|
||||
REC_TRANS_TRACE_EXT(&tlog_, exec_commit_cb,
|
||||
OB_ID(arg), (void*)cb,
|
||||
OB_ID(ref), get_ref(),
|
||||
OB_ID(thread_id), GETTID());
|
||||
cb->callback(commit_out_);
|
||||
}
|
||||
commit_cb_lock_.unlock();
|
||||
cb->callback(commit_out_);
|
||||
ATOMIC_STORE_REL(&cb_tid_, -1);
|
||||
} else {
|
||||
commit_cb_lock_.unlock();
|
||||
}
|
||||
}
|
||||
TRANS_LOG(TRACE, "execute_commit_cb", KP(this), K(tx_id), KP(cb), K(executed));
|
||||
}
|
||||
@ -1242,14 +1249,14 @@ const char* ObTxReadSnapshot::get_source_name() const
|
||||
}
|
||||
|
||||
/*
|
||||
* generate sql_audit's snapshot_source fileds
|
||||
* format snapshot info for sql audit display
|
||||
* contains: src, ls_id, ls_role, parts
|
||||
* when shorter than 128 char like:
|
||||
* "src:GLOBAL;ls_id:1001;ls_role:LEADER;parts:[(id:1001,epoch:1111),(id:1002,epoch:12222)]"
|
||||
* when longer than 128 char, with "..." in the end
|
||||
* "src:GLOBAL;ls_id:1001;ls_role:LEADER;parts:[(lsid:1001,epoch:1111),(lsid:1002,epoch:122..."
|
||||
*/
|
||||
int ObTxReadSnapshot::generate_snapshot_source(char *buf, const int64_t buf_len) const
|
||||
int ObTxReadSnapshot::format_source_for_display(char *buf, const int64_t buf_len) const
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
int64_t pos = 0;
|
||||
|
||||
@ -275,7 +275,7 @@ struct ObTxReadSnapshot
|
||||
bool is_none_read() const { return SRC::NONE == source_; }
|
||||
bool is_special() const { return SRC::SPECIAL == source_; }
|
||||
bool is_ls_snapshot() const { return SRC::LS == source_; }
|
||||
int generate_snapshot_source(char *buf, const int64_t buf_len) const;
|
||||
int format_source_for_display(char *buf, const int64_t buf_len) const;
|
||||
void reset();
|
||||
int assign(const ObTxReadSnapshot &);
|
||||
ObTxReadSnapshot();
|
||||
@ -560,6 +560,7 @@ private:
|
||||
mutable ObSpinLock lock_;
|
||||
ObSpinLock commit_cb_lock_; // protect commit_cb_ field
|
||||
ObITxCallback *commit_cb_; // async commit callback
|
||||
int64_t cb_tid_; // commit callback thread id
|
||||
int64_t exec_info_reap_ts_; // the time reaping incremental tx exec info
|
||||
RollbackMaskSet brpc_mask_set_; // used in message driven savepoint rollback
|
||||
ObTransCond rpc_cond_; // used in message driven savepoint rollback
|
||||
|
||||
@ -186,7 +186,8 @@ int ObTransService::reuse_tx(ObTxDesc &tx)
|
||||
// before we reuse it
|
||||
|
||||
// if reuse come from commit_cb, assume current thread hold one reference
|
||||
final_ref_cnt = tx.commit_cb_lock_.self_locked() ? 2 : 1;
|
||||
int64_t cb_tid = ATOMIC_LOAD_ACQ(&tx.cb_tid_);
|
||||
final_ref_cnt = cb_tid == GETTID() ? 2 : 1;
|
||||
while (tx.get_ref() > final_ref_cnt) {
|
||||
PAUSE();
|
||||
if (++spin_cnt > 2000) {
|
||||
|
||||
Reference in New Issue
Block a user