[CP] persist piece checkpoint ts before round checkpoint ts
This commit is contained in:
@ -1735,7 +1735,6 @@ int ObLogArchiveScheduler::try_update_checkpoit_ts_(share::ObLogArchiveBackupInf
|
|||||||
int64_t doing_tenant_count = 0;
|
int64_t doing_tenant_count = 0;
|
||||||
int64_t checkpoint_ts = last_update_tenant_ts_;
|
int64_t checkpoint_ts = last_update_tenant_ts_;
|
||||||
share::ObLogArchiveBackupInfo sys_info = cur_sys_info;
|
share::ObLogArchiveBackupInfo sys_info = cur_sys_info;
|
||||||
ObLogArchiveBackupInfoMgr info_mgr;
|
|
||||||
inactive_server_count = 0;
|
inactive_server_count = 0;
|
||||||
min_piece_id = INT64_MAX;
|
min_piece_id = INT64_MAX;
|
||||||
|
|
||||||
@ -1824,17 +1823,12 @@ int ObLogArchiveScheduler::try_update_checkpoit_ts_(share::ObLogArchiveBackupInf
|
|||||||
K(checkpoint_ts),
|
K(checkpoint_ts),
|
||||||
K(sys_info));
|
K(sys_info));
|
||||||
} else if (FALSE_IT(sys_info.status_.checkpoint_ts_ = checkpoint_ts)) {
|
} else if (FALSE_IT(sys_info.status_.checkpoint_ts_ = checkpoint_ts)) {
|
||||||
} else if (OB_FAIL(update_sys_backup_info_(cur_sys_info, sys_info))) {
|
} else if (OB_FAIL(update_sys_log_archive_backup_process_(cur_sys_info, sys_info, sys_non_frozen_piece))) {
|
||||||
LOG_WARN("failed to update sys backup info", K(ret), K(sys_info), K(cur_sys_info));
|
LOG_WARN("failed to update sys backup info", K(ret), K(sys_info), K(cur_sys_info), K(sys_non_frozen_piece));
|
||||||
} else {
|
|
||||||
FLOG_INFO("[LOG_ARCHIVE] succeed to commit update log archive process", K(sys_info), K(last_update_tenant_ts_));
|
|
||||||
if (OB_FAIL(info_mgr.update_extern_log_archive_backup_info(sys_info, *backup_lease_service_))) {
|
|
||||||
LOG_WARN("failed to update update_extern_log_archive_backup_info", K(ret), K(sys_info));
|
|
||||||
} else {
|
} else {
|
||||||
cur_sys_info = sys_info;
|
cur_sys_info = sys_info;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
|
||||||
|
|
||||||
return ret;
|
return ret;
|
||||||
}
|
}
|
||||||
@ -1859,8 +1853,6 @@ int ObLogArchiveScheduler::try_update_backup_piece_(const share::ObLogArchiveBac
|
|||||||
if (!is_inited_) {
|
if (!is_inited_) {
|
||||||
ret = OB_NOT_INIT;
|
ret = OB_NOT_INIT;
|
||||||
LOG_ERROR("not inited", K(ret));
|
LOG_ERROR("not inited", K(ret));
|
||||||
} else if (OB_FAIL(update_active_piece_checkpoint_ts_(sys_info, log_archive_status_map, sys_piece_info))) {
|
|
||||||
LOG_WARN("failed to update cur piece checkpoint ts", K(ret));
|
|
||||||
} else if (OB_FAIL(backup_dest_opt.init(is_backup_backup))) {
|
} else if (OB_FAIL(backup_dest_opt.init(is_backup_backup))) {
|
||||||
LOG_WARN("failed to get_backup_dest_opt", K(ret));
|
LOG_WARN("failed to get_backup_dest_opt", K(ret));
|
||||||
} else if (!sys_info.is_valid()) {
|
} else if (!sys_info.is_valid()) {
|
||||||
@ -1918,42 +1910,6 @@ int ObLogArchiveScheduler::try_update_backup_piece_(const share::ObLogArchiveBac
|
|||||||
return ret;
|
return ret;
|
||||||
}
|
}
|
||||||
|
|
||||||
int ObLogArchiveScheduler::update_active_piece_checkpoint_ts_(const share::ObLogArchiveBackupInfo &sys_info,
|
|
||||||
const TENANT_ARCHIVE_STATUS_MAP &log_archive_status_map, share::ObBackupPieceInfo &sys_piece_info)
|
|
||||||
{
|
|
||||||
int ret = OB_SUCCESS;
|
|
||||||
share::ObBackupPieceInfoKey tenant_key;
|
|
||||||
|
|
||||||
if (!is_inited_) {
|
|
||||||
ret = OB_NOT_INIT;
|
|
||||||
LOG_WARN("not inited", K(ret));
|
|
||||||
} else if (!sys_info.is_valid() || !sys_piece_info.is_valid() ||
|
|
||||||
ObBackupPieceStatus::BACKUP_PIECE_ACTIVE != sys_piece_info.status_) {
|
|
||||||
ret = OB_INVALID_ARGUMENT;
|
|
||||||
LOG_WARN("invalid args", K(ret), K(sys_info), K(sys_piece_info));
|
|
||||||
} else {
|
|
||||||
for (int64_t i = 0; OB_SUCC(ret) && i < tenant_ids_.count(); ++i) {
|
|
||||||
const uint64_t tenant_id = tenant_ids_.at(i);
|
|
||||||
share::ObServerTenantLogArchiveStatus *status = nullptr;
|
|
||||||
tenant_key = sys_piece_info.key_;
|
|
||||||
tenant_key.tenant_id_ = tenant_id;
|
|
||||||
if (OB_FAIL(log_archive_status_map.get_refactored(tenant_id, status))) {
|
|
||||||
LOG_WARN("failed to get log archive status", K(ret), K(tenant_id));
|
|
||||||
} else if (OB_FAIL(update_active_piece_checkpoint_ts_(tenant_key, status->checkpoint_ts_))) {
|
|
||||||
LOG_WARN("failed to update checkpoint ts", K(ret), K(tenant_key), KPC(status));
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if (OB_SUCC(ret)) {
|
|
||||||
if (OB_FAIL(update_sys_active_piece_checkpoint_ts_(sys_info.status_.checkpoint_ts_, sys_piece_info))) {
|
|
||||||
LOG_WARN("failed to update checkpoint ts", K(ret), K(sys_piece_info), K(sys_info));
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
return ret;
|
|
||||||
}
|
|
||||||
|
|
||||||
int ObLogArchiveScheduler::update_sys_active_piece_checkpoint_ts_(
|
int ObLogArchiveScheduler::update_sys_active_piece_checkpoint_ts_(
|
||||||
const int64_t checkpoint_ts, share::ObBackupPieceInfo &sys_piece_info)
|
const int64_t checkpoint_ts, share::ObBackupPieceInfo &sys_piece_info)
|
||||||
{
|
{
|
||||||
@ -1966,6 +1922,11 @@ int ObLogArchiveScheduler::update_sys_active_piece_checkpoint_ts_(
|
|||||||
if (!is_inited_) {
|
if (!is_inited_) {
|
||||||
ret = OB_NOT_INIT;
|
ret = OB_NOT_INIT;
|
||||||
LOG_WARN("not inited", K(ret));
|
LOG_WARN("not inited", K(ret));
|
||||||
|
} else if (inner_table_version_ < OB_BACKUP_INNER_TABLE_V2) {
|
||||||
|
LOG_INFO("inner table version is old, skip update sys piece checkpoint ts",
|
||||||
|
K(ret),
|
||||||
|
K(inner_table_version_),
|
||||||
|
K(sys_piece_info));
|
||||||
} else if (!sys_piece_info.is_valid()) {
|
} else if (!sys_piece_info.is_valid()) {
|
||||||
ret = OB_INVALID_ARGUMENT;
|
ret = OB_INVALID_ARGUMENT;
|
||||||
LOG_WARN("invalid args", K(ret), K(sys_piece_info));
|
LOG_WARN("invalid args", K(ret), K(sys_piece_info));
|
||||||
@ -2013,6 +1974,11 @@ int ObLogArchiveScheduler::update_active_piece_checkpoint_ts_(
|
|||||||
if (!is_inited_) {
|
if (!is_inited_) {
|
||||||
ret = OB_NOT_INIT;
|
ret = OB_NOT_INIT;
|
||||||
LOG_WARN("not inited", K(ret));
|
LOG_WARN("not inited", K(ret));
|
||||||
|
} else if (inner_table_version_ < OB_BACKUP_INNER_TABLE_V2) {
|
||||||
|
LOG_INFO("inner table version is old, skip update active piece checkpoint ts",
|
||||||
|
K(ret),
|
||||||
|
K(inner_table_version_),
|
||||||
|
K(piece_key));
|
||||||
} else if (!piece_key.is_valid()) {
|
} else if (!piece_key.is_valid()) {
|
||||||
ret = OB_INVALID_ARGUMENT;
|
ret = OB_INVALID_ARGUMENT;
|
||||||
LOG_WARN("invalid args", K(ret), K(piece_key));
|
LOG_WARN("invalid args", K(ret), K(piece_key));
|
||||||
@ -2247,6 +2213,32 @@ int ObLogArchiveScheduler::trigger_freeze_pieces_(const share::ObLogArchiveBacku
|
|||||||
return ret;
|
return ret;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int ObLogArchiveScheduler::update_sys_log_archive_backup_process_(const share::ObLogArchiveBackupInfo &cur_info,
|
||||||
|
share::ObLogArchiveBackupInfo &new_info, const share::ObNonFrozenBackupPieceInfo &sys_non_frozen_piece)
|
||||||
|
{
|
||||||
|
int ret = OB_SUCCESS;
|
||||||
|
ObLogArchiveBackupInfoMgr info_mgr;
|
||||||
|
ObBackupPieceInfo sys_piece_info = sys_non_frozen_piece.cur_piece_info_;
|
||||||
|
if (!cur_info.is_valid() || !new_info.is_valid() || !sys_non_frozen_piece.is_valid()) {
|
||||||
|
ret = OB_INVALID_ARGUMENT;
|
||||||
|
LOG_WARN("invalid args", K(ret), K(cur_info), K(new_info), K(sys_non_frozen_piece));
|
||||||
|
} else if (OB_FAIL(update_sys_active_piece_checkpoint_ts_(new_info.status_.checkpoint_ts_, sys_piece_info))) {
|
||||||
|
LOG_WARN("failed to update checkpoint ts", K(ret), K(cur_info), K(new_info), K(sys_non_frozen_piece));
|
||||||
|
} else if (OB_FAIL(update_sys_backup_info_(cur_info, new_info))) {
|
||||||
|
LOG_WARN("failed to update sys backup info", K(ret), K(cur_info), K(new_info), K(sys_non_frozen_piece));
|
||||||
|
} else {
|
||||||
|
FLOG_INFO("[LOG_ARCHIVE] succeed to commit update log archive process",
|
||||||
|
K(cur_info),
|
||||||
|
K(new_info),
|
||||||
|
K(sys_non_frozen_piece));
|
||||||
|
if (OB_FAIL(info_mgr.update_extern_log_archive_backup_info(new_info, *backup_lease_service_))) {
|
||||||
|
LOG_WARN("failed to update update_extern_log_archive_backup_info", K(ret), K(new_info));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return ret;
|
||||||
|
}
|
||||||
|
|
||||||
int ObLogArchiveScheduler::update_tenant_log_archive_backup_process_(const share::ObLogArchiveBackupInfo &sys_info,
|
int ObLogArchiveScheduler::update_tenant_log_archive_backup_process_(const share::ObLogArchiveBackupInfo &sys_info,
|
||||||
const share::ObNonFrozenBackupPieceInfo &sys_non_frozen_piece,
|
const share::ObNonFrozenBackupPieceInfo &sys_non_frozen_piece,
|
||||||
const share::ObServerTenantLogArchiveStatus &tenant_status)
|
const share::ObServerTenantLogArchiveStatus &tenant_status)
|
||||||
@ -2297,9 +2289,13 @@ int ObLogArchiveScheduler::update_tenant_log_archive_backup_process_(const share
|
|||||||
LOG_WARN("failed to set backup piece start ts", K(ret), K(tenant_piece_key), K(info));
|
LOG_WARN("failed to set backup piece start ts", K(ret), K(tenant_piece_key), K(info));
|
||||||
}
|
}
|
||||||
} else if (ObLogArchiveStatus::DOING == info.status_.status_) {
|
} else if (ObLogArchiveStatus::DOING == info.status_.status_) {
|
||||||
|
ObBackupPieceInfoKey active_piece_key = sys_non_frozen_piece.cur_piece_info_.key_;
|
||||||
|
active_piece_key.tenant_id_ = tenant_status.tenant_id_;
|
||||||
if (info.status_.start_ts_ < tenant_status.start_ts_) {
|
if (info.status_.start_ts_ < tenant_status.start_ts_) {
|
||||||
ret = OB_LOG_ARCHIVE_INTERRUPTED;
|
ret = OB_LOG_ARCHIVE_INTERRUPTED;
|
||||||
LOG_ERROR("server start ts must not larger than prev one", K(ret), K(info), K(tenant_status));
|
LOG_ERROR("server start ts must not larger than prev one", K(ret), K(info), K(tenant_status));
|
||||||
|
} else if (OB_FAIL(update_active_piece_checkpoint_ts_(active_piece_key, tenant_status.checkpoint_ts_))) {
|
||||||
|
LOG_WARN("failed to update active piece checkpoint ts", K(ret), K(active_piece_key), K(tenant_status));
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
ret = OB_ERR_UNEXPECTED;
|
ret = OB_ERR_UNEXPECTED;
|
||||||
|
|||||||
@ -132,14 +132,14 @@ private:
|
|||||||
int frozen_old_piece_(const bool force_stop, const share::ObNonFrozenBackupPieceInfo& sys_non_frozen_piece,
|
int frozen_old_piece_(const bool force_stop, const share::ObNonFrozenBackupPieceInfo& sys_non_frozen_piece,
|
||||||
const uint64_t tenant_id, const int64_t server_tenant_max_ts, int64_t& max_ts);
|
const uint64_t tenant_id, const int64_t server_tenant_max_ts, int64_t& max_ts);
|
||||||
int frozen_sys_old_piece_(
|
int frozen_sys_old_piece_(
|
||||||
const bool force_stop, const int64_t max_ts, share::ObNonFrozenBackupPieceInfo& sys_non_frozen_piece);
|
const bool force_stop, const int64_t max_ts, share::ObNonFrozenBackupPieceInfo &sys_non_frozen_piece);
|
||||||
int update_tenant_log_archive_backup_process_(const share::ObLogArchiveBackupInfo& sys_info,
|
int update_sys_log_archive_backup_process_(const share::ObLogArchiveBackupInfo &cur_info,
|
||||||
const share::ObNonFrozenBackupPieceInfo& sys_non_frozen_piece,
|
share::ObLogArchiveBackupInfo &new_info, const share::ObNonFrozenBackupPieceInfo &sys_non_frozen_piece);
|
||||||
const share::ObServerTenantLogArchiveStatus& tenant_status);
|
int update_tenant_log_archive_backup_process_(const share::ObLogArchiveBackupInfo &sys_info,
|
||||||
int update_active_piece_checkpoint_ts_(const share::ObLogArchiveBackupInfo& sys_info,
|
const share::ObNonFrozenBackupPieceInfo &sys_non_frozen_piece,
|
||||||
const TENANT_ARCHIVE_STATUS_MAP& log_archive_status_map, share::ObBackupPieceInfo& sys_piece_info);
|
const share::ObServerTenantLogArchiveStatus &tenant_status);
|
||||||
int update_active_piece_checkpoint_ts_(const share::ObBackupPieceInfoKey& piece_key, const int64_t checkpoint_ts);
|
int update_active_piece_checkpoint_ts_(const share::ObBackupPieceInfoKey &piece_key, const int64_t checkpoint_ts);
|
||||||
int update_sys_active_piece_checkpoint_ts_(const int64_t checkpoint_ts, share::ObBackupPieceInfo& sys_piece_info);
|
int update_sys_active_piece_checkpoint_ts_(const int64_t checkpoint_ts, share::ObBackupPieceInfo &sys_piece_info);
|
||||||
|
|
||||||
// for ObLogArchiveStatus::STOPPING status
|
// for ObLogArchiveStatus::STOPPING status
|
||||||
int stop_log_archive_backup_(const bool force_stop, share::ObLogArchiveBackupInfo& cur_sys_info,
|
int stop_log_archive_backup_(const bool force_stop, share::ObLogArchiveBackupInfo& cur_sys_info,
|
||||||
|
|||||||
Reference in New Issue
Block a user