[BUG] donnot reset pending count when resume

This commit is contained in:
Handora
2022-11-21 07:27:20 +00:00
committed by wangzelin.wzl
parent d9d65a329f
commit cd2a883c13
4 changed files with 10 additions and 10 deletions

View File

@ -590,21 +590,21 @@ int ObMemtableCtx::trans_replay_end(const bool commit,
}
//leader takeover actions
int ObMemtableCtx::replay_to_commit()
int ObMemtableCtx::replay_to_commit(const bool is_resume)
{
int ret = OB_SUCCESS;
ATOMIC_STORE(&is_master_, true);
ObByteLockGuard guard(lock_);
trans_mgr_.set_for_replay(false);
if (!is_resume) {
trans_mgr_.clear_pending_log_size();
}
if (OB_FAIL(reuse_log_generator_())) {
TRANS_LOG(ERROR, "fail to reset log generator", K(ret));
} else {
// do nothing
}
if (OB_SUCCESS == ret) {
TRANS_LOG(INFO, "replay to commit success", K(this));
} else {
if (OB_FAIL(ret)) {
TRANS_LOG(ERROR, "replay to commit failed", K(ret), K(this));
}
return ret;

View File

@ -366,7 +366,7 @@ public:
const uint64_t log_cluster_version = 0,
const uint64_t checksum = 0);
//method called when leader takeover
virtual int replay_to_commit();
virtual int replay_to_commit(const bool is_resume);
//method called when leader revoke
virtual int commit_to_replay();
virtual int fill_redo_log(char *buf,

View File

@ -72,7 +72,7 @@ public:
const uint64_t checksum = 0) = 0;
virtual void print_callbacks() = 0;
//method called when leader takeover
virtual int replay_to_commit() = 0;
virtual int replay_to_commit(const bool is_resume) = 0;
//method called when leader revoke
virtual int commit_to_replay() = 0;
virtual void set_trans_ctx(transaction::ObPartTransCtx *ctx) = 0;

View File

@ -4481,7 +4481,7 @@ int ObPartTransCtx::switch_to_leader(int64_t start_working_ts)
const bool contain_table_lock = is_contain_mds_type_(ObTxDataSourceType::TABLE_LOCK);
if (ObTxState::INIT == exec_info_.state_) {
if (exec_info_.data_complete_ && !contain_table_lock) {
if (OB_FAIL(mt_ctx_.replay_to_commit())) {
if (OB_FAIL(mt_ctx_.replay_to_commit(false /*is_resume*/))) {
TRANS_LOG(WARN, "replay to commit failed", KR(ret), K(*this));
}
} else {
@ -4495,7 +4495,7 @@ int ObPartTransCtx::switch_to_leader(int64_t start_working_ts)
}
}
} else {
if (OB_FAIL(mt_ctx_.replay_to_commit())) {
if (OB_FAIL(mt_ctx_.replay_to_commit(false /*is_resume*/))) {
TRANS_LOG(WARN, "replay to commit failed", KR(ret), K(*this));
}
}
@ -4769,7 +4769,7 @@ int ObPartTransCtx::resume_leader(int64_t start_working_ts)
} else if (OB_FAIL(state_helper.switch_state(TxCtxOps::RESUME))) {
TRANS_LOG(WARN, "switch role state error", KR(ret), K(*this));
} else {
if (OB_FAIL(mt_ctx_.replay_to_commit())) {
if (OB_FAIL(mt_ctx_.replay_to_commit(true /*is_resume*/))) {
TRANS_LOG(WARN, "replay to commit failed", KR(ret), K(*this));
} else {
if (ObTxState::INIT == exec_info_.state_) {