large trx callback waiting for tx ctx lock and hold clog callback thread too long, affecting small trx latency.
This commit is contained in:
parent
542516e369
commit
225d78f12e
@ -58,7 +58,9 @@ void CtxLock::after_unlock(CtxLockArg &arg)
|
||||
|
||||
int CtxLock::lock()
|
||||
{
|
||||
ATOMIC_INC(&waiting_lock_cnt_);
|
||||
int ret = lock_.wrlock(common::ObLatchIds::TRANS_CTX_LOCK);
|
||||
ATOMIC_DEC(&waiting_lock_cnt_);
|
||||
lock_start_ts_ = ObClockGenerator::getClock();
|
||||
return ret;
|
||||
}
|
||||
|
@ -57,7 +57,7 @@ public:
|
||||
class CtxLock
|
||||
{
|
||||
public:
|
||||
CtxLock() : lock_(), ctx_(NULL), lock_start_ts_(0) {}
|
||||
CtxLock() : lock_(), ctx_(NULL), lock_start_ts_(0), waiting_lock_cnt_(0) {}
|
||||
~CtxLock() {}
|
||||
int init(ObTransCtx *ctx);
|
||||
void reset();
|
||||
@ -69,6 +69,7 @@ public:
|
||||
void after_unlock(CtxLockArg &arg);
|
||||
ObTransCtx *get_ctx() { return ctx_; }
|
||||
bool is_locked_by_self() const { return lock_.is_wrlocked_by(); }
|
||||
int64_t get_waiting_lock_cnt() const { return ATOMIC_LOAD(&waiting_lock_cnt_); }
|
||||
private:
|
||||
static const int64_t WARN_LOCK_TS = 1 * 1000 * 1000;
|
||||
DISALLOW_COPY_AND_ASSIGN(CtxLock);
|
||||
@ -76,6 +77,7 @@ private:
|
||||
common::ObLatch lock_;
|
||||
ObTransCtx *ctx_;
|
||||
int64_t lock_start_ts_;
|
||||
int64_t waiting_lock_cnt_;
|
||||
};
|
||||
|
||||
class CtxLockGuard
|
||||
|
@ -1773,15 +1773,17 @@ int ObPartTransCtx::submit_redo_log(const bool is_freeze)
|
||||
if (is_freeze) {
|
||||
bool need_submit = !is_logging_blocked();
|
||||
if (need_submit) {
|
||||
// spin lock
|
||||
CtxLockGuard guard(lock_);
|
||||
tg.click();
|
||||
ret = submit_redo_log_for_freeze_(try_submit);
|
||||
tg.click();
|
||||
if (try_submit) {
|
||||
REC_TRANS_TRACE_EXT2(tlog_, submit_instant_log, OB_Y(ret), OB_ID(arg2), is_freeze,
|
||||
OB_ID(used), tg.get_diff(), OB_ID(ref), get_ref());
|
||||
}
|
||||
do {
|
||||
// spin lock
|
||||
CtxLockGuard guard(lock_);
|
||||
tg.click();
|
||||
ret = submit_redo_log_for_freeze_(try_submit);
|
||||
tg.click();
|
||||
if (try_submit) {
|
||||
REC_TRANS_TRACE_EXT2(tlog_, submit_instant_log, OB_Y(ret), OB_ID(arg2), is_freeze,
|
||||
OB_ID(used), tg.get_diff(), OB_ID(ref), get_ref());
|
||||
}
|
||||
} while (OB_EAGAIN == ret);
|
||||
}
|
||||
} else if (!mt_ctx_.pending_log_size_too_large()) {
|
||||
} else if (OB_FAIL(lock_.try_lock())) {
|
||||
@ -1792,15 +1794,19 @@ int ObPartTransCtx::submit_redo_log(const bool is_freeze)
|
||||
TRANS_LOG(ERROR, "try lock error, unexpected error", K(ret), K(*this));
|
||||
}
|
||||
} else {
|
||||
CtxLockGuard guard(lock_, false);
|
||||
tg.click();
|
||||
ret = check_and_submit_redo_log_(try_submit);
|
||||
tg.click();
|
||||
if (try_submit) {
|
||||
REC_TRANS_TRACE_EXT2(tlog_, submit_instant_log, OB_Y(ret), OB_ID(arg2), is_freeze,
|
||||
OB_ID(used), tg.get_diff(),
|
||||
OB_ID(ref), get_ref());
|
||||
}
|
||||
bool need_lock = false;
|
||||
do {
|
||||
CtxLockGuard guard(lock_, need_lock);
|
||||
tg.click();
|
||||
ret = check_and_submit_redo_log_(try_submit);
|
||||
tg.click();
|
||||
if (try_submit) {
|
||||
REC_TRANS_TRACE_EXT2(tlog_, submit_instant_log, OB_Y(ret), OB_ID(arg2), is_freeze,
|
||||
OB_ID(used), tg.get_diff(),
|
||||
OB_ID(ref), get_ref());
|
||||
}
|
||||
need_lock = true;
|
||||
} while (OB_EAGAIN == ret);
|
||||
}
|
||||
if (OB_BLOCK_FROZEN == ret) {
|
||||
ret = OB_SUCCESS;
|
||||
@ -2725,6 +2731,7 @@ int ObPartTransCtx::submit_redo_log_(ObTxLogBlock &log_block,
|
||||
bool need_submit_log = false;
|
||||
bool need_undo_log = false;
|
||||
ObTxLogCb *log_cb = NULL;
|
||||
bool need_stop_submit_redo = false;
|
||||
|
||||
while (OB_SUCC(ret) && need_continue) {
|
||||
ObTxRedoLog redo_log(get_redo_log_no_(), cluster_version_);
|
||||
@ -2762,6 +2769,13 @@ int ObPartTransCtx::submit_redo_log_(ObTxLogBlock &log_block,
|
||||
}
|
||||
} else if (OB_EAGAIN == ret) {
|
||||
has_redo = true;
|
||||
/*
|
||||
* stop submitting redo log when someone is waitting for tx lock
|
||||
* e.g. large tranction submit too many redo log for a long time
|
||||
* avoid others wait for ctx lock too long
|
||||
* and the submitted redo log callback will drive remain log submit
|
||||
*/
|
||||
need_stop_submit_redo = lock_.get_waiting_lock_cnt() > 0;
|
||||
if (OB_FAIL(log_block.finish_mutator_buf(redo_log, mutator_size))) {
|
||||
TRANS_LOG(WARN, "finish mutator buf failed", KR(ret), K(*this));
|
||||
} else if (OB_FAIL(log_block.add_new_log(redo_log))) {
|
||||
@ -2833,6 +2847,13 @@ int ObPartTransCtx::submit_redo_log_(ObTxLogBlock &log_block,
|
||||
if (need_undo_log) {
|
||||
has_redo = false;
|
||||
}
|
||||
|
||||
if (OB_SUCC(ret) && need_continue && need_stop_submit_redo) {
|
||||
ret = OB_EAGAIN;
|
||||
#ifndef NDEBUG
|
||||
TRANS_LOG(INFO, "stop submitting redo log", KR(ret), K(*this));
|
||||
#endif
|
||||
}
|
||||
}
|
||||
|
||||
return ret;
|
||||
@ -2856,6 +2877,10 @@ int ObPartTransCtx::submit_redo_log_()
|
||||
// don't need to handle OB_BLOCK_FROZEN ret
|
||||
if (OB_BLOCK_FROZEN == ret) {
|
||||
TRANS_LOG(INFO, "submit log meets frozen memtable", KR(ret), K(*this));
|
||||
} else if (OB_EAGAIN == ret) {
|
||||
#ifndef NDEBUG
|
||||
TRANS_LOG(INFO, "stop submitting redo log", KR(ret), K(*this));
|
||||
#endif
|
||||
} else if (REACH_TIME_INTERVAL(100 * 1000)) {
|
||||
TRANS_LOG(WARN, "submit redo log failed", KR(ret), K(*this));
|
||||
}
|
||||
|
Loading…
x
Reference in New Issue
Block a user