[master] fix tx desc allocator traverse
This commit is contained in:
@ -707,14 +707,14 @@ inline bool ObTxDesc::acq_commit_cb_lock_if_need_()
|
||||
* shortcut and return.
|
||||
* for more detail, refer to 'acq_commit_cb_lock_if_need_' function.
|
||||
*/
|
||||
void ObTxDesc::execute_commit_cb()
|
||||
bool ObTxDesc::execute_commit_cb()
|
||||
{
|
||||
bool executed = false;
|
||||
if (is_tx_end() || is_xa_terminate_state_()) {
|
||||
auto tx_id = tx_id_;
|
||||
auto cb = commit_cb_;
|
||||
int ret = OB_SUCCESS;
|
||||
bool executed = false;
|
||||
if (OB_NOT_NULL(commit_cb_) && acq_commit_cb_lock_if_need_()) {
|
||||
if (OB_NOT_NULL(commit_cb_) && acq_commit_cb_lock_if_need_()) {
|
||||
if (OB_NOT_NULL(commit_cb_)) {
|
||||
executed = true;
|
||||
cb = commit_cb_;
|
||||
@ -731,6 +731,7 @@ void ObTxDesc::execute_commit_cb()
|
||||
}
|
||||
TRANS_LOG(TRACE, "execute_commit_cb", KP(this), K(tx_id), KP(cb), K(executed));
|
||||
}
|
||||
return executed;
|
||||
}
|
||||
|
||||
ObITxCallback *ObTxDesc::cancel_commit_cb()
|
||||
|
@ -469,7 +469,7 @@ private:
|
||||
int update_parts(const share::ObLSArray &parts);
|
||||
int switch_to_idle();
|
||||
int set_commit_cb(ObITxCallback *cb);
|
||||
void execute_commit_cb();
|
||||
bool execute_commit_cb();
|
||||
private:
|
||||
int update_part_(ObTxPart &p, bool append = true);
|
||||
int add_conflict_tx_(const ObTransIDAndAddr &conflict_tx);
|
||||
@ -706,6 +706,7 @@ private:
|
||||
int for_each(Function &fn)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
ObSpinLockGuard guard(lk_);
|
||||
auto n = list_.next_;
|
||||
while(n != &list_) {
|
||||
auto tx = CONTAINER_OF(n, ObTxDesc, alloc_link_);
|
||||
|
@ -328,6 +328,7 @@ int ObTransService::handle_tx_commit_timeout(ObTxDesc &tx, const int64_t delay)
|
||||
// in this function's following steps.
|
||||
auto tx_id = tx.tx_id_;
|
||||
int64_t now = ObClockGenerator::getClock();
|
||||
bool cb_executed = false;
|
||||
if (OB_FAIL(tx.lock_.lock(5000000))) {
|
||||
TRANS_LOG(WARN, "failed to acquire lock in specified time", K(tx));
|
||||
// FIXME: how to handle it without lock protection
|
||||
@ -365,13 +366,13 @@ int ObTransService::handle_tx_commit_timeout(ObTxDesc &tx, const int64_t delay)
|
||||
}
|
||||
}
|
||||
tx.lock_.unlock();
|
||||
tx.execute_commit_cb();
|
||||
cb_executed = tx.execute_commit_cb();
|
||||
}
|
||||
// NOTE:
|
||||
// it not safe and meaningless to access tx after commit_cb
|
||||
// has been called, the tx may has been reused or release
|
||||
// in the commit_cb
|
||||
TRANS_LOG(INFO, "handle tx commit timeout", K(ret), K(tx_id));
|
||||
TRANS_LOG(INFO, "handle tx commit timeout", K(ret), K(tx_id), K(cb_executed));
|
||||
return ret;
|
||||
}
|
||||
|
||||
@ -2417,6 +2418,8 @@ int ObTransService::handle_timeout_for_xa(ObTxDesc &tx, const int64_t delay)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
int64_t now = ObClockGenerator::getClock();
|
||||
bool cb_executed = false;
|
||||
auto tx_id = tx.tx_id_;
|
||||
if (OB_FAIL(tx.lock_.lock(5000000))) {
|
||||
TRANS_LOG(WARN, "failed to acquire lock in specified time", K(tx));
|
||||
// FIXME: how to handle it without lock protection
|
||||
@ -2438,9 +2441,9 @@ int ObTransService::handle_timeout_for_xa(ObTxDesc &tx, const int64_t delay)
|
||||
}
|
||||
}
|
||||
tx.lock_.unlock();
|
||||
tx.execute_commit_cb();
|
||||
cb_executed = tx.execute_commit_cb();
|
||||
}
|
||||
TRANS_LOG(INFO, "handle tx commit timeout", K(ret), K(tx));
|
||||
TRANS_LOG(INFO, "handle tx commit timeout", K(ret), K(tx_id), K(cb_executed));
|
||||
return ret;
|
||||
}
|
||||
|
||||
|
@ -161,6 +161,8 @@ int ObTransService::release_tx(ObTxDesc &tx)
|
||||
int ObTransService::reuse_tx(ObTxDesc &tx)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
int spin_cnt = 0;
|
||||
int final_ref_cnt = 0;
|
||||
ObTransID orig_tx_id = tx.tx_id_;
|
||||
if (tx.is_in_tx() && !tx.is_tx_end()) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
@ -168,15 +170,18 @@ int ObTransService::reuse_tx(ObTxDesc &tx)
|
||||
} else if (OB_FAIL(finalize_tx_(tx))) {
|
||||
TRANS_LOG(WARN, "finalize tx fail", K(ret), K(tx.tx_id_));
|
||||
} else {
|
||||
int cnt = 0;
|
||||
int final_ref_cnt = tx.commit_cb_lock_.self_locked() ? 2 : 1;
|
||||
final_ref_cnt = tx.commit_cb_lock_.self_locked() ? 2 : 1;
|
||||
while (tx.get_ref() > final_ref_cnt) {
|
||||
PAUSE();
|
||||
if (++cnt > 100) { ob_usleep(500); }
|
||||
if (cnt > 2000 && TC_REACH_TIME_INTERVAL(2 * 1000 * 1000)) {
|
||||
TRANS_LOG_RET(WARN, OB_ERR_TOO_MUCH_TIME, "blocking to wait tx referent quiescent cost too much time",
|
||||
"tx_id", orig_tx_id, KP(&tx), K(final_ref_cnt), K(tx.get_ref()));
|
||||
if (++spin_cnt > 2000) {
|
||||
TRANS_LOG(WARN, "blocking to wait tx referent quiescent cost too much time",
|
||||
"tx_id", orig_tx_id, KP(&tx), K(final_ref_cnt), K(spin_cnt), K(tx.get_ref()));
|
||||
tx.print_trace();
|
||||
usleep(2000000); // 2s
|
||||
} else if (spin_cnt > 200) {
|
||||
usleep(2000); // 2ms
|
||||
} else if (spin_cnt > 100) {
|
||||
usleep(200); // 200us
|
||||
}
|
||||
}
|
||||
// it is safe to operate tx without lock when not shared
|
||||
@ -189,6 +194,9 @@ int ObTransService::reuse_tx(ObTxDesc &tx)
|
||||
REC_TRANS_TRACE_EXT(&tlog, reuse, OB_Y(ret),
|
||||
OB_ID(addr), (void*)&tx,
|
||||
OB_ID(txid), orig_tx_id,
|
||||
OB_ID(tag1), spin_cnt,
|
||||
OB_ID(tag2), final_ref_cnt,
|
||||
OB_ID(ref), tx.get_ref(),
|
||||
OB_ID(thread_id), GETTID());
|
||||
return ret;
|
||||
}
|
||||
|
Reference in New Issue
Block a user