diff --git a/src/storage/backup/ob_backup_task.cpp b/src/storage/backup/ob_backup_task.cpp index 801f637065..3cdde11cbb 100644 --- a/src/storage/backup/ob_backup_task.cpp +++ b/src/storage/backup/ob_backup_task.cpp @@ -72,13 +72,9 @@ static int advance_checkpoint_by_flush(const uint64_t tenant_id, const share::Ob int ret = OB_SUCCESS; const int64_t advance_checkpoint_timeout = GCONF._advance_checkpoint_timeout; LOG_INFO("backup advance checkpoint timeout", K(tenant_id), K(advance_checkpoint_timeout)); - checkpoint::ObCheckpointExecutor *checkpoint_executor = NULL; if (start_scn < 0) { ret = OB_INVALID_ARGUMENT; LOG_WARN("get invalid args", K(ret), K(start_scn)); - } else if (OB_ISNULL(checkpoint_executor = ls->get_checkpoint_executor())) { - ret = OB_ERR_UNEXPECTED; - LOG_WARN("checkpoint executor should not be null", K(ret), KPC(ls)); } else { ObLSMetaPackage ls_meta_package; int64_t i = 0; @@ -88,7 +84,7 @@ static int advance_checkpoint_by_flush(const uint64_t tenant_id, const share::Ob if (cur_ts - start_ts > advance_checkpoint_timeout) { ret = OB_BACKUP_ADVANCE_CHECKPOINT_TIMEOUT; LOG_WARN("backup advance checkpoint by flush timeout", K(ret), K(tenant_id), K(ls_id), K(start_scn)); - } else if (OB_FAIL(checkpoint_executor->advance_checkpoint_by_flush(start_scn))) { + } else if (OB_FAIL(ls->advance_checkpoint_by_flush(start_scn))) { if (OB_NO_NEED_UPDATE == ret) { // clog checkpoint ts has passed start log ts ret = OB_SUCCESS; diff --git a/src/storage/checkpoint/ob_checkpoint_executor.cpp b/src/storage/checkpoint/ob_checkpoint_executor.cpp index 6bdca1f849..0c6fce78fc 100644 --- a/src/storage/checkpoint/ob_checkpoint_executor.cpp +++ b/src/storage/checkpoint/ob_checkpoint_executor.cpp @@ -200,48 +200,52 @@ int ObCheckpointExecutor::update_clog_checkpoint() int ObCheckpointExecutor::advance_checkpoint_by_flush(int64_t recycle_ts) { int ret = OB_SUCCESS; - int tmp_ret = OB_SUCCESS; - // calcu recycle_ts according to clog disk situation - if (recycle_ts == 0) { - LSN end_lsn; - int64_t calcu_recycle_ts = INT64_MAX; - if (OB_FAIL(loghandler_->get_end_lsn(end_lsn))) { - STORAGE_LOG(WARN, "get end lsn failed", K(ret), K(ls_->get_ls_id())); - } else { - LSN clog_checkpoint_lsn = ls_->get_clog_base_lsn(); - LSN calcu_recycle_lsn = clog_checkpoint_lsn - + ((end_lsn - clog_checkpoint_lsn) * CLOG_GC_PERCENT / 100); - if (OB_FAIL(loghandler_->locate_by_lsn_coarsely(calcu_recycle_lsn, recycle_ts))) { - STORAGE_LOG(WARN, "locate_by_lsn_coarsely failed", K(calcu_recycle_ts), K(calcu_recycle_lsn), - K(recycle_ts), K(ls_->get_ls_id())); + ObSpinLockGuard guard(lock_); + if (update_checkpoint_enabled_) { + int tmp_ret = OB_SUCCESS; + + // calcu recycle_ts according to clog disk situation + if (recycle_ts == 0) { + LSN end_lsn; + int64_t calcu_recycle_ts = INT64_MAX; + if (OB_FAIL(loghandler_->get_end_lsn(end_lsn))) { + STORAGE_LOG(WARN, "get end lsn failed", K(ret), K(ls_->get_ls_id())); } else { - STORAGE_LOG(INFO, "advance checkpoint by flush to avoid clog disk full", - K(recycle_ts), K(end_lsn), K(clog_checkpoint_lsn), - K(calcu_recycle_lsn), K(ls_->get_ls_id())); + LSN clog_checkpoint_lsn = ls_->get_clog_base_lsn(); + LSN calcu_recycle_lsn = clog_checkpoint_lsn + + ((end_lsn - clog_checkpoint_lsn) * CLOG_GC_PERCENT / 100); + if (OB_FAIL(loghandler_->locate_by_lsn_coarsely(calcu_recycle_lsn, recycle_ts))) { + STORAGE_LOG(WARN, "locate_by_lsn_coarsely failed", K(calcu_recycle_ts), K(calcu_recycle_lsn), + K(recycle_ts), K(ls_->get_ls_id())); + } else { + STORAGE_LOG(INFO, "advance checkpoint by flush to avoid clog disk full", + K(recycle_ts), K(end_lsn), K(clog_checkpoint_lsn), + K(calcu_recycle_lsn), K(ls_->get_ls_id())); + } + } + // the log of end_log_lsn and the log of clog_checkpoint_lsn may be in a block + if (recycle_ts < ls_->get_clog_checkpoint_ts()) { + recycle_ts = INT64_MAX; } } - // the log of end_log_lsn and the log of clog_checkpoint_lsn may be in a block - if (recycle_ts < ls_->get_clog_checkpoint_ts()) { - recycle_ts = INT64_MAX; - } - } - if (OB_SUCC(ret)) { - if (recycle_ts < ls_->get_clog_checkpoint_ts()) { - ret = OB_NO_NEED_UPDATE; - STORAGE_LOG(WARN, "recycle_ts should not smaller than checkpoint_log_ts", - K(recycle_ts), K(ls_->get_clog_checkpoint_ts()), K(ls_->get_ls_id())); - } else { - STORAGE_LOG(INFO, "start flush", - K(recycle_ts), - K(ls_->get_clog_checkpoint_ts()), - K(ls_->get_ls_id())); - for (int i = 1; i < ObLogBaseType::MAX_LOG_BASE_TYPE; i++) { - if (OB_NOT_NULL(handlers_[i]) - && OB_SUCCESS != (tmp_ret = (handlers_[i]->flush(recycle_ts)))) { - STORAGE_LOG(WARN, "handler flush failed", K(recycle_ts), K(tmp_ret), - K(i), K(ls_->get_ls_id())); + if (OB_SUCC(ret)) { + if (recycle_ts < ls_->get_clog_checkpoint_ts()) { + ret = OB_NO_NEED_UPDATE; + STORAGE_LOG(WARN, "recycle_ts should not smaller than checkpoint_log_ts", + K(recycle_ts), K(ls_->get_clog_checkpoint_ts()), K(ls_->get_ls_id())); + } else { + STORAGE_LOG(INFO, "start flush", + K(recycle_ts), + K(ls_->get_clog_checkpoint_ts()), + K(ls_->get_ls_id())); + for (int i = 1; i < ObLogBaseType::MAX_LOG_BASE_TYPE; i++) { + if (OB_NOT_NULL(handlers_[i]) + && OB_SUCCESS != (tmp_ret = (handlers_[i]->flush(recycle_ts)))) { + STORAGE_LOG(WARN, "handler flush failed", K(recycle_ts), K(tmp_ret), + K(i), K(ls_->get_ls_id())); + } } } } diff --git a/src/storage/high_availability/ob_ls_prepare_migration.cpp b/src/storage/high_availability/ob_ls_prepare_migration.cpp index 314777ba5b..b29dffc6c6 100644 --- a/src/storage/high_availability/ob_ls_prepare_migration.cpp +++ b/src/storage/high_availability/ob_ls_prepare_migration.cpp @@ -1037,9 +1037,6 @@ int ObStartPrepareMigrationTask::wait_ls_checkpoint_ts_push_() LOG_WARN("failed to get ls saved info", K(ret), KPC(ls), KPC(ctx_)); } else if (!saved_info.is_empty()) { LOG_INFO("saved info is not empty, no need wait ls checkpoint ts push", K(saved_info), KPC(ctx_)); - } else if (OB_ISNULL(checkpoint_executor = ls->get_checkpoint_executor())) { - ret = OB_ERR_UNEXPECTED; - LOG_WARN("checkpoint executor should not be NULL", K(ret), KPC(ctx_), KP(checkpoint_executor)); } else { const int64_t wait_checkpoint_push_start_ts = ObTimeUtility::current_time(); while (OB_SUCC(ret)) { @@ -1059,7 +1056,7 @@ int ObStartPrepareMigrationTask::wait_ls_checkpoint_ts_push_() const int64_t cost_ts = ObTimeUtility::current_time() - wait_checkpoint_push_start_ts; LOG_INFO("succeed wait clog checkpoint ts push", "cost", cost_ts, "ls_id", ctx_->arg_.ls_id_); break; - } else if (OB_FAIL(checkpoint_executor->advance_checkpoint_by_flush(ctx_->log_sync_scn_))) { + } else if (OB_FAIL(ls->advance_checkpoint_by_flush(ctx_->log_sync_scn_))) { if (OB_NO_NEED_UPDATE == ret) { ret = OB_SUCCESS; } else { diff --git a/src/storage/ls/ob_ls.cpp b/src/storage/ls/ob_ls.cpp index 749be9458e..3b543d8df5 100644 --- a/src/storage/ls/ob_ls.cpp +++ b/src/storage/ls/ob_ls.cpp @@ -1153,6 +1153,14 @@ int ObLS::force_tablet_freeze(const ObTabletID &tablet_id) return ret; } +int ObLS::advance_checkpoint_by_flush(int64_t recycle_ts) +{ + int64_t read_lock = LSLOCKALL; + int64_t write_lock = 0; + ObLSLockGuard lock_myself(lock_, read_lock, write_lock); + return checkpoint_executor_.advance_checkpoint_by_flush(recycle_ts); +} + int ObLS::get_ls_meta_package_and_tablet_ids(ObLSMetaPackage &meta_package, common::ObIArray &tablet_ids) { int ret = OB_SUCCESS; diff --git a/src/storage/ls/ob_ls.h b/src/storage/ls/ob_ls.h index 7dc1e56fb6..2f84ad4ba2 100644 --- a/src/storage/ls/ob_ls.h +++ b/src/storage/ls/ob_ls.h @@ -605,6 +605,7 @@ public: // ObCheckpointExecutor interface: DELEGATE_WITH_RET(checkpoint_executor_, get_checkpoint_info, int); + int advance_checkpoint_by_flush(int64_t recycle_ts); // ObDataCheckpoint interface: DELEGATE_WITH_RET(data_checkpoint_, get_freezecheckpoint_info, int); diff --git a/src/storage/tx/ob_tx_retain_ctx_mgr.cpp b/src/storage/tx/ob_tx_retain_ctx_mgr.cpp index e0da7ff803..9b32adc535 100644 --- a/src/storage/tx/ob_tx_retain_ctx_mgr.cpp +++ b/src/storage/tx/ob_tx_retain_ctx_mgr.cpp @@ -50,10 +50,9 @@ int ObAdvanceLSCkptTask::try_advance_ls_ckpt_ts() ret = OB_INVALID_ARGUMENT; } TRANS_LOG(WARN, "get ls faild", K(ret), K(MTL(ObLSService *))); - } else if (ls_handle.get_ls()->get_checkpoint_executor()->advance_checkpoint_by_flush( - target_ckpt_ts_)) { + } else if (ls_handle.get_ls()->advance_checkpoint_by_flush(target_ckpt_ts_)) { TRANS_LOG(WARN, "advance checkpoint ts failed", K(ret), K(ls_id_), K(target_ckpt_ts_)); - } + } if (OB_SUCC(ret)) { TRANS_LOG(INFO, "[RetainCtxMgr] advance ls checkpoint ts success", K(ret), K(ls_id_), K(target_ckpt_ts_)); diff --git a/src/storage/tx_storage/ob_checkpoint_service.cpp b/src/storage/tx_storage/ob_checkpoint_service.cpp index 0cb40b045e..9fba3c804f 100644 --- a/src/storage/tx_storage/ob_checkpoint_service.cpp +++ b/src/storage/tx_storage/ob_checkpoint_service.cpp @@ -355,10 +355,7 @@ int ObCheckPointService::do_minor_freeze() ObLS *ls = nullptr; int ls_cnt = 0; for (; OB_SUCC(iter->get_next(ls)); ++ls_cnt) { - ObCheckpointExecutor *checkpoint_executor = nullptr; - if (OB_ISNULL(checkpoint_executor = ls->get_checkpoint_executor())) { - STORAGE_LOG(WARN, "checkpoint_executor should not be null", K(ls->get_ls_id())); - } else if (OB_SUCCESS != (tmp_ret = (checkpoint_executor->advance_checkpoint_by_flush(INT64_MAX)))) { + if (OB_SUCCESS != (tmp_ret = (ls->advance_checkpoint_by_flush(INT64_MAX)))) { STORAGE_LOG(WARN, "advance_checkpoint_by_flush failed", K(tmp_ret), K(ls->get_ls_id())); } }