reserve log callback for freezing

This commit is contained in:
obdev 2022-11-06 01:35:31 +00:00 committed by wangzelin.wzl
parent 5c15eeccd8
commit e7a71c9f57
2 changed files with 16 additions and 3 deletions

View File

@ -276,6 +276,7 @@ void ObPartTransCtx::default_init_()
mds_cache_.reset();
start_replay_ts_ = 0;
is_incomplete_replay_ctx_ = false;
is_submitting_redo_log_for_freeze_ = false;
start_working_log_ts_ = 0;
coord_prepare_info_arr_.reset();
reserve_allocator_.reset();
@ -1437,6 +1438,7 @@ int ObPartTransCtx::submit_redo_log(const bool is_freeze)
if (is_freeze) {
// spin lock
CtxLockGuard guard(lock_);
ATOMIC_STORE(&is_submitting_redo_log_for_freeze_, true);
is_tx_committing = ObTxState::INIT != get_downstream_state();
final_log_submitting = final_log_cb_.is_valid();
@ -1448,6 +1450,7 @@ int ObPartTransCtx::submit_redo_log(const bool is_freeze)
}
try_submit = true;
}
ATOMIC_STORE(&is_submitting_redo_log_for_freeze_, false);
if (try_submit) {
REC_TRANS_TRACE_EXT2(tlog_, submit_instant_log, Y(ret), OB_ID(arg2), is_freeze,
OB_ID(used), ObTimeUtility::fast_current_time() - start,
@ -2940,7 +2943,7 @@ int ObPartTransCtx::submit_record_log_()
if (OB_FAIL(log_block.init(replay_hint, log_block_header))) {
TRANS_LOG(WARN, "init log block failed", KR(ret), K(*this));
} else if (OB_FAIL(get_log_cb_(!NEED_FINAL_CB, log_cb))) {
} else if (OB_FAIL(prepare_log_cb_(!NEED_FINAL_CB, log_cb))) {
if (OB_UNLIKELY(OB_TX_NOLOGCB != ret)) {
TRANS_LOG(WARN, "get log cb failed", KR(ret), K(*this));
}
@ -3194,7 +3197,11 @@ int ObPartTransCtx::get_log_cb_(const bool need_final_cb, ObTxLogCb *&log_cb)
} else {
if (free_cbs_.is_empty()) {
ret = OB_TX_NOLOGCB;
//TRANS_LOG(INFO, "all log cbs are busy now, try again later", K(*this));
//TRANS_LOG(INFO, "all log cbs are busy now, try again later", K(ret), K(*this));
} else if (free_cbs_.get_size() == RESERVE_LOG_CALLBACK_COUNT_FOR_FREEZING &&
ATOMIC_LOAD(&is_submitting_redo_log_for_freeze_) == false) {
ret = OB_TX_NOLOGCB;
//TRANS_LOG(INFO, "reserve log callback for freezing, try again later", K(ret), K(*this));
} else if (OB_ISNULL(log_cb = free_cbs_.remove_first())) {
ret = OB_ERR_UNEXPECTED;
TRANS_LOG(WARN, "unexpected null log cb", KR(ret), K(*this));
@ -5705,7 +5712,7 @@ int ObPartTransCtx::submit_rollback_to_log_(const int64_t from_scn,
TRANS_LOG(WARN, "init log block fail", K(ret), KPC(this));
} else if (OB_FAIL(exec_info_.redo_lsns_.reserve(exec_info_.redo_lsns_.count() + 1))) {
TRANS_LOG(WARN, "reserve memory for redo lsn failed", K(ret));
} else if (OB_FAIL(get_log_cb_(!NEED_FINAL_CB, log_cb))) {
} else if (OB_FAIL(prepare_log_cb_(!NEED_FINAL_CB, log_cb))) {
TRANS_LOG(WARN, "get log_cb fail", K(ret), KPC(this));
} else if (OB_FAIL(log_block.add_new_log(log))) {
TRANS_LOG(WARN, "logblock add log fail", K(ret), KPC(this));

View File

@ -81,6 +81,7 @@ namespace transaction
{
const static int64_t OB_TX_MAX_LOG_CBS = 32;
const static int64_t RESERVE_LOG_CALLBACK_COUNT_FOR_FREEZING = 1;
// participant transaction context
class ObPartTransCtx : public ObTransCtx,
@ -738,6 +739,8 @@ private:
// | end_log_ts = n+10 | ----------------> | | -------------------> | | --> | (0<m<10) | --> | |
// +-------------------+ +---------------------------------+ +-------+ +-----------------+ +----------------------+
bool is_incomplete_replay_ctx_;
// set true when submitting redo log for freezing and reset after freezing
bool is_submitting_redo_log_for_freeze_;
int64_t start_replay_ts_; // replay debug
int64_t start_working_log_ts_;
@ -752,6 +755,9 @@ private:
// ========================================================
};
// reserve log callback for freezing and other two log callbacks for normal
STATIC_ASSERT(OB_TX_MAX_LOG_CBS >= RESERVE_LOG_CALLBACK_COUNT_FOR_FREEZING + 2, "log callback is not enough");
#if defined(__x86_64__)
/* uncomment this block to error messaging real size
template<int s> struct size_of_xxx_;