diff --git a/src/storage/backup/ob_backup_task.cpp b/src/storage/backup/ob_backup_task.cpp index b2ac22169..d0e72365b 100644 --- a/src/storage/backup/ob_backup_task.cpp +++ b/src/storage/backup/ob_backup_task.cpp @@ -99,6 +99,8 @@ static int advance_checkpoint_by_flush(const uint64_t tenant_id, const share::Ob // clog checkpoint ts has passed start log ts ret = OB_SUCCESS; break; + } else if (OB_EAGAIN == ret) { + ret = OB_SUCCESS; } else { LOG_WARN("failed to advance checkpoint by flush", K(ret), K(tenant_id), K(ls_id)); } diff --git a/src/storage/checkpoint/ob_checkpoint_executor.cpp b/src/storage/checkpoint/ob_checkpoint_executor.cpp index af3ddda91..72d3f6529 100644 --- a/src/storage/checkpoint/ob_checkpoint_executor.cpp +++ b/src/storage/checkpoint/ob_checkpoint_executor.cpp @@ -199,6 +199,7 @@ int ObCheckpointExecutor::advance_checkpoint_by_flush(SCN recycle_scn) { RLockGuard guard(rwlock_); if (update_checkpoint_enabled_) { int tmp_ret = OB_SUCCESS; + SCN max_decided_scn; // calcu recycle_scn according to clog disk situation if (!recycle_scn.is_valid()) { @@ -218,27 +219,38 @@ int ObCheckpointExecutor::advance_checkpoint_by_flush(SCN recycle_scn) { K(calcu_recycle_lsn), K(ls_->get_ls_id())); } } + } else { + if (recycle_scn.is_max()) { + } else if (OB_FAIL(loghandler_->get_max_decided_scn(max_decided_scn))) { + STORAGE_LOG(WARN, "failed to get_max_decided_scn", + K(recycle_scn), K(ls_->get_clog_checkpoint_scn()), K(ls_->get_ls_id())); + } else if (recycle_scn > max_decided_scn) { + ret = OB_EAGAIN; + STORAGE_LOG(WARN, "recycle_scn is more than max_decided_scn", + KR(ret), K(recycle_scn), K(ls_->get_clog_checkpoint_scn()), K(ls_->get_ls_id()), K(max_decided_scn)); + } } - if (OB_SUCC(ret)) { - if (recycle_scn < ls_->get_clog_checkpoint_scn()) { - ret = OB_NO_NEED_UPDATE; - STORAGE_LOG(WARN, "recycle_scn should not smaller than checkpoint_log_scn", - K(recycle_scn), K(ls_->get_clog_checkpoint_scn()), K(ls_->get_ls_id())); - } else { - STORAGE_LOG(INFO, "start flush", - K(recycle_scn), - K(ls_->get_clog_checkpoint_scn()), - K(ls_->get_ls_id())); - for (int i = 1; i < ObLogBaseType::MAX_LOG_BASE_TYPE; i++) { - if (OB_NOT_NULL(handlers_[i]) - && OB_SUCCESS != (tmp_ret = (handlers_[i]->flush(recycle_scn)))) { - STORAGE_LOG(WARN, "handler flush failed", K(recycle_scn), K(tmp_ret), - K(i), K(ls_->get_ls_id())); - } + if (OB_FAIL(ret)) { + } else if (recycle_scn < ls_->get_clog_checkpoint_scn()) { + ret = OB_NO_NEED_UPDATE; + STORAGE_LOG(WARN, "recycle_scn should not smaller than checkpoint_log_scn", + K(recycle_scn), K(ls_->get_clog_checkpoint_scn()), K(ls_->get_ls_id())); + } else { + STORAGE_LOG(INFO, "start flush", + K(recycle_scn), + K(ls_->get_clog_checkpoint_scn()), + K(ls_->get_ls_id())); + for (int i = 1; i < ObLogBaseType::MAX_LOG_BASE_TYPE; i++) { + if (OB_NOT_NULL(handlers_[i]) + && OB_SUCCESS != (tmp_ret = (handlers_[i]->flush(recycle_scn)))) { + STORAGE_LOG(WARN, "handler flush failed", K(recycle_scn), K(tmp_ret), + K(i), K(ls_->get_ls_id())); } } } + } else { + STORAGE_LOG(WARN, "update_checkpoint is not enabled", K(ls_->get_ls_id())); } return ret; @@ -285,6 +297,22 @@ int ObCheckpointExecutor::diagnose(CheckpointDiagnoseInfo &diagnose_info) const diagnose_info.log_type_ = log_type; return ret; } + +int ObCheckpointExecutor::traversal_flush() const +{ + int ret = OB_SUCCESS; + ObLSTxService *ls_tx_ser = nullptr; + if (!update_checkpoint_enabled_) { + STORAGE_LOG(WARN, "update_checkpoint is not enabled", K(ls_->get_ls_id())); + } else if (OB_ISNULL(ls_tx_ser = ls_->get_tx_svr())) { + ret = OB_ERR_UNEXPECTED; + STORAGE_LOG(WARN, "ls_tx_ser should not be null", K(ret), K(ls_->get_ls_id())); + } else if (OB_FAIL(ls_tx_ser->traversal_flush())) { + STORAGE_LOG(WARN, "ls_tx_ser flush failed", K(ret), K(ls_->get_ls_id())); + } + return ret; +} + } // namespace checkpoint } // namespace storage } // namespace oceanbase diff --git a/src/storage/checkpoint/ob_checkpoint_executor.h b/src/storage/checkpoint/ob_checkpoint_executor.h index ce81ddd53..be72d9714 100644 --- a/src/storage/checkpoint/ob_checkpoint_executor.h +++ b/src/storage/checkpoint/ob_checkpoint_executor.h @@ -86,6 +86,7 @@ public: int diagnose(CheckpointDiagnoseInfo &diagnose_info) const; + int traversal_flush() const; private: static const int64_t CLOG_GC_PERCENT = 60; diff --git a/src/storage/tx_storage/ob_checkpoint_service.cpp b/src/storage/tx_storage/ob_checkpoint_service.cpp index 8438d4893..56f215403 100644 --- a/src/storage/tx_storage/ob_checkpoint_service.cpp +++ b/src/storage/tx_storage/ob_checkpoint_service.cpp @@ -322,17 +322,16 @@ void ObCheckPointService::ObTraversalFlushTask::runTimerTask() int ls_cnt = 0; for (; OB_SUCC(ret) && OB_SUCC(iter->get_next(ls)); ++ls_cnt) { ObLSHandle ls_handle; - ObLSTxService *ls_tx_ser = nullptr; + ObCheckpointExecutor *checkpoint_executor = nullptr; if (OB_FAIL(ls_svr->get_ls(ls->get_ls_id(), ls_handle, ObLSGetMod::APPLY_MOD))) { STORAGE_LOG(WARN, "get log stream failed", K(ret), K(ls->get_ls_id())); } else if (OB_ISNULL(ls = ls_handle.get_ls())) { ret = OB_ERR_UNEXPECTED; STORAGE_LOG(WARN, "log stream not exist", K(ret), K(ls->get_ls_id())); - } else if (OB_ISNULL(ls_tx_ser = ls->get_tx_svr())) { - ret = OB_ERR_UNEXPECTED; - STORAGE_LOG(WARN, "ls_tx_ser should not be null", K(ret), K(ls->get_ls_id())); - } else if (OB_FAIL(ls_tx_ser->traversal_flush())) { - STORAGE_LOG(WARN, "ls_tx_ser flush failed", K(ret), K(ls->get_ls_id())); + } else if (OB_ISNULL(checkpoint_executor = ls->get_checkpoint_executor())) { + STORAGE_LOG(WARN, "checkpoint_executor should not be null", K(ls->get_ls_id())); + } else if (OB_FAIL(checkpoint_executor->traversal_flush())) { + STORAGE_LOG(WARN, "traversal_flush failed", K(ret), K(ls->get_ls_id())); } } if (ret == OB_ITER_END) {