diff --git a/src/rootserver/ob_log_archive_scheduler.cpp b/src/rootserver/ob_log_archive_scheduler.cpp index bbafb165a9..5b3ca25e5b 100644 --- a/src/rootserver/ob_log_archive_scheduler.cpp +++ b/src/rootserver/ob_log_archive_scheduler.cpp @@ -1735,7 +1735,6 @@ int ObLogArchiveScheduler::try_update_checkpoit_ts_(share::ObLogArchiveBackupInf int64_t doing_tenant_count = 0; int64_t checkpoint_ts = last_update_tenant_ts_; share::ObLogArchiveBackupInfo sys_info = cur_sys_info; - ObLogArchiveBackupInfoMgr info_mgr; inactive_server_count = 0; min_piece_id = INT64_MAX; @@ -1824,15 +1823,10 @@ int ObLogArchiveScheduler::try_update_checkpoit_ts_(share::ObLogArchiveBackupInf K(checkpoint_ts), K(sys_info)); } else if (FALSE_IT(sys_info.status_.checkpoint_ts_ = checkpoint_ts)) { - } else if (OB_FAIL(update_sys_backup_info_(cur_sys_info, sys_info))) { - LOG_WARN("failed to update sys backup info", K(ret), K(sys_info), K(cur_sys_info)); + } else if (OB_FAIL(update_sys_log_archive_backup_process_(cur_sys_info, sys_info, sys_non_frozen_piece))) { + LOG_WARN("failed to update sys backup info", K(ret), K(sys_info), K(cur_sys_info), K(sys_non_frozen_piece)); } else { - FLOG_INFO("[LOG_ARCHIVE] succeed to commit update log archive process", K(sys_info), K(last_update_tenant_ts_)); - if (OB_FAIL(info_mgr.update_extern_log_archive_backup_info(sys_info, *backup_lease_service_))) { - LOG_WARN("failed to update update_extern_log_archive_backup_info", K(ret), K(sys_info)); - } else { - cur_sys_info = sys_info; - } + cur_sys_info = sys_info; } } @@ -1859,8 +1853,6 @@ int ObLogArchiveScheduler::try_update_backup_piece_(const share::ObLogArchiveBac if (!is_inited_) { ret = OB_NOT_INIT; LOG_ERROR("not inited", K(ret)); - } else if (OB_FAIL(update_active_piece_checkpoint_ts_(sys_info, log_archive_status_map, sys_piece_info))) { - LOG_WARN("failed to update cur piece checkpoint ts", K(ret)); } else if (OB_FAIL(backup_dest_opt.init(is_backup_backup))) { LOG_WARN("failed to get_backup_dest_opt", K(ret)); } else if (!sys_info.is_valid()) { @@ -1918,42 +1910,6 @@ int ObLogArchiveScheduler::try_update_backup_piece_(const share::ObLogArchiveBac return ret; } -int ObLogArchiveScheduler::update_active_piece_checkpoint_ts_(const share::ObLogArchiveBackupInfo &sys_info, - const TENANT_ARCHIVE_STATUS_MAP &log_archive_status_map, share::ObBackupPieceInfo &sys_piece_info) -{ - int ret = OB_SUCCESS; - share::ObBackupPieceInfoKey tenant_key; - - if (!is_inited_) { - ret = OB_NOT_INIT; - LOG_WARN("not inited", K(ret)); - } else if (!sys_info.is_valid() || !sys_piece_info.is_valid() || - ObBackupPieceStatus::BACKUP_PIECE_ACTIVE != sys_piece_info.status_) { - ret = OB_INVALID_ARGUMENT; - LOG_WARN("invalid args", K(ret), K(sys_info), K(sys_piece_info)); - } else { - for (int64_t i = 0; OB_SUCC(ret) && i < tenant_ids_.count(); ++i) { - const uint64_t tenant_id = tenant_ids_.at(i); - share::ObServerTenantLogArchiveStatus *status = nullptr; - tenant_key = sys_piece_info.key_; - tenant_key.tenant_id_ = tenant_id; - if (OB_FAIL(log_archive_status_map.get_refactored(tenant_id, status))) { - LOG_WARN("failed to get log archive status", K(ret), K(tenant_id)); - } else if (OB_FAIL(update_active_piece_checkpoint_ts_(tenant_key, status->checkpoint_ts_))) { - LOG_WARN("failed to update checkpoint ts", K(ret), K(tenant_key), KPC(status)); - } - } - - if (OB_SUCC(ret)) { - if (OB_FAIL(update_sys_active_piece_checkpoint_ts_(sys_info.status_.checkpoint_ts_, sys_piece_info))) { - LOG_WARN("failed to update checkpoint ts", K(ret), K(sys_piece_info), K(sys_info)); - } - } - } - - return ret; -} - int ObLogArchiveScheduler::update_sys_active_piece_checkpoint_ts_( const int64_t checkpoint_ts, share::ObBackupPieceInfo &sys_piece_info) { @@ -1966,6 +1922,11 @@ int ObLogArchiveScheduler::update_sys_active_piece_checkpoint_ts_( if (!is_inited_) { ret = OB_NOT_INIT; LOG_WARN("not inited", K(ret)); + } else if (inner_table_version_ < OB_BACKUP_INNER_TABLE_V2) { + LOG_INFO("inner table version is old, skip update sys piece checkpoint ts", + K(ret), + K(inner_table_version_), + K(sys_piece_info)); } else if (!sys_piece_info.is_valid()) { ret = OB_INVALID_ARGUMENT; LOG_WARN("invalid args", K(ret), K(sys_piece_info)); @@ -2013,6 +1974,11 @@ int ObLogArchiveScheduler::update_active_piece_checkpoint_ts_( if (!is_inited_) { ret = OB_NOT_INIT; LOG_WARN("not inited", K(ret)); + } else if (inner_table_version_ < OB_BACKUP_INNER_TABLE_V2) { + LOG_INFO("inner table version is old, skip update active piece checkpoint ts", + K(ret), + K(inner_table_version_), + K(piece_key)); } else if (!piece_key.is_valid()) { ret = OB_INVALID_ARGUMENT; LOG_WARN("invalid args", K(ret), K(piece_key)); @@ -2247,6 +2213,32 @@ int ObLogArchiveScheduler::trigger_freeze_pieces_(const share::ObLogArchiveBacku return ret; } +int ObLogArchiveScheduler::update_sys_log_archive_backup_process_(const share::ObLogArchiveBackupInfo &cur_info, + share::ObLogArchiveBackupInfo &new_info, const share::ObNonFrozenBackupPieceInfo &sys_non_frozen_piece) +{ + int ret = OB_SUCCESS; + ObLogArchiveBackupInfoMgr info_mgr; + ObBackupPieceInfo sys_piece_info = sys_non_frozen_piece.cur_piece_info_; + if (!cur_info.is_valid() || !new_info.is_valid() || !sys_non_frozen_piece.is_valid()) { + ret = OB_INVALID_ARGUMENT; + LOG_WARN("invalid args", K(ret), K(cur_info), K(new_info), K(sys_non_frozen_piece)); + } else if (OB_FAIL(update_sys_active_piece_checkpoint_ts_(new_info.status_.checkpoint_ts_, sys_piece_info))) { + LOG_WARN("failed to update checkpoint ts", K(ret), K(cur_info), K(new_info), K(sys_non_frozen_piece)); + } else if (OB_FAIL(update_sys_backup_info_(cur_info, new_info))) { + LOG_WARN("failed to update sys backup info", K(ret), K(cur_info), K(new_info), K(sys_non_frozen_piece)); + } else { + FLOG_INFO("[LOG_ARCHIVE] succeed to commit update log archive process", + K(cur_info), + K(new_info), + K(sys_non_frozen_piece)); + if (OB_FAIL(info_mgr.update_extern_log_archive_backup_info(new_info, *backup_lease_service_))) { + LOG_WARN("failed to update update_extern_log_archive_backup_info", K(ret), K(new_info)); + } + } + + return ret; +} + int ObLogArchiveScheduler::update_tenant_log_archive_backup_process_(const share::ObLogArchiveBackupInfo &sys_info, const share::ObNonFrozenBackupPieceInfo &sys_non_frozen_piece, const share::ObServerTenantLogArchiveStatus &tenant_status) @@ -2297,9 +2289,13 @@ int ObLogArchiveScheduler::update_tenant_log_archive_backup_process_(const share LOG_WARN("failed to set backup piece start ts", K(ret), K(tenant_piece_key), K(info)); } } else if (ObLogArchiveStatus::DOING == info.status_.status_) { + ObBackupPieceInfoKey active_piece_key = sys_non_frozen_piece.cur_piece_info_.key_; + active_piece_key.tenant_id_ = tenant_status.tenant_id_; if (info.status_.start_ts_ < tenant_status.start_ts_) { ret = OB_LOG_ARCHIVE_INTERRUPTED; LOG_ERROR("server start ts must not larger than prev one", K(ret), K(info), K(tenant_status)); + } else if (OB_FAIL(update_active_piece_checkpoint_ts_(active_piece_key, tenant_status.checkpoint_ts_))) { + LOG_WARN("failed to update active piece checkpoint ts", K(ret), K(active_piece_key), K(tenant_status)); } } else { ret = OB_ERR_UNEXPECTED; diff --git a/src/rootserver/ob_log_archive_scheduler.h b/src/rootserver/ob_log_archive_scheduler.h index 93e62f1f36..bbf6c4122e 100644 --- a/src/rootserver/ob_log_archive_scheduler.h +++ b/src/rootserver/ob_log_archive_scheduler.h @@ -132,14 +132,14 @@ private: int frozen_old_piece_(const bool force_stop, const share::ObNonFrozenBackupPieceInfo& sys_non_frozen_piece, const uint64_t tenant_id, const int64_t server_tenant_max_ts, int64_t& max_ts); int frozen_sys_old_piece_( - const bool force_stop, const int64_t max_ts, share::ObNonFrozenBackupPieceInfo& sys_non_frozen_piece); - int update_tenant_log_archive_backup_process_(const share::ObLogArchiveBackupInfo& sys_info, - const share::ObNonFrozenBackupPieceInfo& sys_non_frozen_piece, - const share::ObServerTenantLogArchiveStatus& tenant_status); - int update_active_piece_checkpoint_ts_(const share::ObLogArchiveBackupInfo& sys_info, - const TENANT_ARCHIVE_STATUS_MAP& log_archive_status_map, share::ObBackupPieceInfo& sys_piece_info); - int update_active_piece_checkpoint_ts_(const share::ObBackupPieceInfoKey& piece_key, const int64_t checkpoint_ts); - int update_sys_active_piece_checkpoint_ts_(const int64_t checkpoint_ts, share::ObBackupPieceInfo& sys_piece_info); + const bool force_stop, const int64_t max_ts, share::ObNonFrozenBackupPieceInfo &sys_non_frozen_piece); + int update_sys_log_archive_backup_process_(const share::ObLogArchiveBackupInfo &cur_info, + share::ObLogArchiveBackupInfo &new_info, const share::ObNonFrozenBackupPieceInfo &sys_non_frozen_piece); + int update_tenant_log_archive_backup_process_(const share::ObLogArchiveBackupInfo &sys_info, + const share::ObNonFrozenBackupPieceInfo &sys_non_frozen_piece, + const share::ObServerTenantLogArchiveStatus &tenant_status); + int update_active_piece_checkpoint_ts_(const share::ObBackupPieceInfoKey &piece_key, const int64_t checkpoint_ts); + int update_sys_active_piece_checkpoint_ts_(const int64_t checkpoint_ts, share::ObBackupPieceInfo &sys_piece_info); // for ObLogArchiveStatus::STOPPING status int stop_log_archive_backup_(const bool force_stop, share::ObLogArchiveBackupInfo& cur_sys_info,