[4.1] fix some bugs in checkpoint

This commit is contained in:
obdev 2023-04-13 05:45:34 +00:00 committed by ob-robot
parent 01399a3774
commit 1feabe8bc9
4 changed files with 52 additions and 22 deletions

View File

@ -99,6 +99,8 @@ static int advance_checkpoint_by_flush(const uint64_t tenant_id, const share::Ob
// clog checkpoint ts has passed start log ts
ret = OB_SUCCESS;
break;
} else if (OB_EAGAIN == ret) {
ret = OB_SUCCESS;
} else {
LOG_WARN("failed to advance checkpoint by flush", K(ret), K(tenant_id), K(ls_id));
}

View File

@ -199,6 +199,7 @@ int ObCheckpointExecutor::advance_checkpoint_by_flush(SCN recycle_scn) {
RLockGuard guard(rwlock_);
if (update_checkpoint_enabled_) {
int tmp_ret = OB_SUCCESS;
SCN max_decided_scn;
// calcu recycle_scn according to clog disk situation
if (!recycle_scn.is_valid()) {
@ -218,27 +219,38 @@ int ObCheckpointExecutor::advance_checkpoint_by_flush(SCN recycle_scn) {
K(calcu_recycle_lsn), K(ls_->get_ls_id()));
}
}
} else {
if (recycle_scn.is_max()) {
} else if (OB_FAIL(loghandler_->get_max_decided_scn(max_decided_scn))) {
STORAGE_LOG(WARN, "failed to get_max_decided_scn",
K(recycle_scn), K(ls_->get_clog_checkpoint_scn()), K(ls_->get_ls_id()));
} else if (recycle_scn > max_decided_scn) {
ret = OB_EAGAIN;
STORAGE_LOG(WARN, "recycle_scn is more than max_decided_scn",
KR(ret), K(recycle_scn), K(ls_->get_clog_checkpoint_scn()), K(ls_->get_ls_id()), K(max_decided_scn));
}
}
if (OB_SUCC(ret)) {
if (recycle_scn < ls_->get_clog_checkpoint_scn()) {
ret = OB_NO_NEED_UPDATE;
STORAGE_LOG(WARN, "recycle_scn should not smaller than checkpoint_log_scn",
K(recycle_scn), K(ls_->get_clog_checkpoint_scn()), K(ls_->get_ls_id()));
} else {
STORAGE_LOG(INFO, "start flush",
K(recycle_scn),
K(ls_->get_clog_checkpoint_scn()),
K(ls_->get_ls_id()));
for (int i = 1; i < ObLogBaseType::MAX_LOG_BASE_TYPE; i++) {
if (OB_NOT_NULL(handlers_[i])
&& OB_SUCCESS != (tmp_ret = (handlers_[i]->flush(recycle_scn)))) {
STORAGE_LOG(WARN, "handler flush failed", K(recycle_scn), K(tmp_ret),
K(i), K(ls_->get_ls_id()));
}
if (OB_FAIL(ret)) {
} else if (recycle_scn < ls_->get_clog_checkpoint_scn()) {
ret = OB_NO_NEED_UPDATE;
STORAGE_LOG(WARN, "recycle_scn should not smaller than checkpoint_log_scn",
K(recycle_scn), K(ls_->get_clog_checkpoint_scn()), K(ls_->get_ls_id()));
} else {
STORAGE_LOG(INFO, "start flush",
K(recycle_scn),
K(ls_->get_clog_checkpoint_scn()),
K(ls_->get_ls_id()));
for (int i = 1; i < ObLogBaseType::MAX_LOG_BASE_TYPE; i++) {
if (OB_NOT_NULL(handlers_[i])
&& OB_SUCCESS != (tmp_ret = (handlers_[i]->flush(recycle_scn)))) {
STORAGE_LOG(WARN, "handler flush failed", K(recycle_scn), K(tmp_ret),
K(i), K(ls_->get_ls_id()));
}
}
}
} else {
STORAGE_LOG(WARN, "update_checkpoint is not enabled", K(ls_->get_ls_id()));
}
return ret;
@ -285,6 +297,22 @@ int ObCheckpointExecutor::diagnose(CheckpointDiagnoseInfo &diagnose_info) const
diagnose_info.log_type_ = log_type;
return ret;
}
int ObCheckpointExecutor::traversal_flush() const
{
int ret = OB_SUCCESS;
ObLSTxService *ls_tx_ser = nullptr;
if (!update_checkpoint_enabled_) {
STORAGE_LOG(WARN, "update_checkpoint is not enabled", K(ls_->get_ls_id()));
} else if (OB_ISNULL(ls_tx_ser = ls_->get_tx_svr())) {
ret = OB_ERR_UNEXPECTED;
STORAGE_LOG(WARN, "ls_tx_ser should not be null", K(ret), K(ls_->get_ls_id()));
} else if (OB_FAIL(ls_tx_ser->traversal_flush())) {
STORAGE_LOG(WARN, "ls_tx_ser flush failed", K(ret), K(ls_->get_ls_id()));
}
return ret;
}
} // namespace checkpoint
} // namespace storage
} // namespace oceanbase

View File

@ -86,6 +86,7 @@ public:
int diagnose(CheckpointDiagnoseInfo &diagnose_info) const;
int traversal_flush() const;
private:
static const int64_t CLOG_GC_PERCENT = 60;

View File

@ -322,17 +322,16 @@ void ObCheckPointService::ObTraversalFlushTask::runTimerTask()
int ls_cnt = 0;
for (; OB_SUCC(ret) && OB_SUCC(iter->get_next(ls)); ++ls_cnt) {
ObLSHandle ls_handle;
ObLSTxService *ls_tx_ser = nullptr;
ObCheckpointExecutor *checkpoint_executor = nullptr;
if (OB_FAIL(ls_svr->get_ls(ls->get_ls_id(), ls_handle, ObLSGetMod::APPLY_MOD))) {
STORAGE_LOG(WARN, "get log stream failed", K(ret), K(ls->get_ls_id()));
} else if (OB_ISNULL(ls = ls_handle.get_ls())) {
ret = OB_ERR_UNEXPECTED;
STORAGE_LOG(WARN, "log stream not exist", K(ret), K(ls->get_ls_id()));
} else if (OB_ISNULL(ls_tx_ser = ls->get_tx_svr())) {
ret = OB_ERR_UNEXPECTED;
STORAGE_LOG(WARN, "ls_tx_ser should not be null", K(ret), K(ls->get_ls_id()));
} else if (OB_FAIL(ls_tx_ser->traversal_flush())) {
STORAGE_LOG(WARN, "ls_tx_ser flush failed", K(ret), K(ls->get_ls_id()));
} else if (OB_ISNULL(checkpoint_executor = ls->get_checkpoint_executor())) {
STORAGE_LOG(WARN, "checkpoint_executor should not be null", K(ls->get_ls_id()));
} else if (OB_FAIL(checkpoint_executor->traversal_flush())) {
STORAGE_LOG(WARN, "traversal_flush failed", K(ret), K(ls->get_ls_id()));
}
}
if (ret == OB_ITER_END) {