fix restore concurrent with clean issue
This commit is contained in:
parent
533802c26e
commit
f7a91cb0b1
@ -730,6 +730,26 @@ int ObRestoreScheduler::tenant_restore_finish(const ObPhysicalRestoreJob &job_in
|
||||
LOG_WARN("failed to reset restore concurrency", K(ret), K(job_info));
|
||||
} else if (share::PHYSICAL_RESTORE_SUCCESS == job_info.get_status()) {
|
||||
//restore success
|
||||
} else {
|
||||
int tmp_ret = OB_SUCCESS;
|
||||
ObRestoreFailureChecker checker;
|
||||
bool is_concurrent_with_clean = false;
|
||||
if (OB_TMP_FAIL(checker.init(job_info))) {
|
||||
LOG_WARN("failed to init restore failure checker", K(tmp_ret), K(job_info));
|
||||
} else if (OB_TMP_FAIL(checker.check_is_concurrent_with_clean(is_concurrent_with_clean))) {
|
||||
LOG_WARN("failed to check is clean concurrency failure", K(tmp_ret));
|
||||
}
|
||||
if (OB_SUCC(ret) && is_concurrent_with_clean) {
|
||||
int64_t pos = 0;
|
||||
if (OB_FAIL(databuff_printf(history_info.comment_.ptr(), history_info.comment_.capacity(), pos,
|
||||
"%s;", "physical restore run concurrently with backup data clean, please check backup and archive jobs"))) {
|
||||
if (OB_SIZE_OVERFLOW == ret) {
|
||||
ret = OB_SUCCESS;
|
||||
} else {
|
||||
LOG_WARN("failed to databuff printf comment", K(ret));
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (FAILEDx(ObRestoreUtil::recycle_restore_job(*sql_proxy_,
|
||||
|
@ -1122,4 +1122,162 @@ int ObRestoreUtil::get_backup_sys_time_zone_(
|
||||
}
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
}
|
||||
|
||||
ObRestoreFailureChecker::ObRestoreFailureChecker()
|
||||
: is_inited_(false),
|
||||
job_()
|
||||
{
|
||||
}
|
||||
|
||||
ObRestoreFailureChecker::~ObRestoreFailureChecker()
|
||||
{
|
||||
}
|
||||
|
||||
int ObRestoreFailureChecker::init(const share::ObPhysicalRestoreJob &job)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
if (IS_INIT) {
|
||||
ret = OB_INIT_TWICE;
|
||||
LOG_WARN("restore failure checker init twice", K(ret));
|
||||
} else if (!job.is_valid()) {
|
||||
ret = OB_INVALID_ARGUMENT;
|
||||
LOG_WARN("get invalid arg", K(ret), K(job));
|
||||
} else if (OB_FAIL(job_.assign(job))) {
|
||||
LOG_WARN("failed to assign job", K(ret), K(job));
|
||||
} else {
|
||||
is_inited_ = true;
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObRestoreFailureChecker::check_is_concurrent_with_clean(bool &is_concurrent_with_clean)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
is_concurrent_with_clean = false;
|
||||
if (IS_NOT_INIT) {
|
||||
ret = OB_NOT_INIT;
|
||||
LOG_WARN("[RESTORE_FAILURE_CHECKER]restore failure checker not do init", K(ret));
|
||||
} else if (OB_FAIL(loop_path_list_(job_, is_concurrent_with_clean))) {
|
||||
LOG_WARN("failed to loop path list", K(ret), K_(job));
|
||||
}
|
||||
FLOG_INFO("[RESTORE_FAILURE_CHECKER]check is concurrent with clean", K(ret), K(is_concurrent_with_clean), K_(job));
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObRestoreFailureChecker::loop_path_list_(const share::ObPhysicalRestoreJob &job, bool &has_been_cleaned)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
has_been_cleaned = false;
|
||||
ObBackupDest backup_tenant_dest;
|
||||
const ObPhysicalRestoreBackupDestList& list = job.get_multi_restore_path_list();
|
||||
const common::ObSArray<share::ObBackupSetPath> &backup_set_path_list = list.get_backup_set_path_list();
|
||||
const common::ObSArray<share::ObBackupPiecePath> &backup_piece_path_list = list.get_backup_piece_path_list();
|
||||
|
||||
ARRAY_FOREACH_X(backup_set_path_list, idx, cnt, OB_SUCC(ret) && !has_been_cleaned) {
|
||||
backup_tenant_dest.reset();
|
||||
const share::ObBackupSetPath &backup_set_path = backup_set_path_list.at(idx);
|
||||
bool is_exist = true;
|
||||
if (OB_FAIL(backup_tenant_dest.set(backup_set_path.ptr()))) {
|
||||
LOG_WARN("failed to set backup tenant dest", K(ret), K(backup_set_path));
|
||||
} else if (OB_FAIL(check_tenant_backup_set_infos_path_exist_(backup_tenant_dest, is_exist))) {
|
||||
LOG_WARN("failed to check tenant backup set infos path exist", K(ret), K(backup_tenant_dest));
|
||||
} else {
|
||||
has_been_cleaned = !is_exist;
|
||||
}
|
||||
}
|
||||
|
||||
ARRAY_FOREACH_X(backup_piece_path_list, idx, cnt, OB_SUCC(ret) && !has_been_cleaned) {
|
||||
backup_tenant_dest.reset();
|
||||
const share::ObBackupPiecePath &backup_piece_path = backup_piece_path_list.at(idx);
|
||||
bool is_exist = true;
|
||||
bool is_empty = false;
|
||||
if (OB_FAIL(backup_tenant_dest.set(backup_piece_path.ptr()))) {
|
||||
LOG_WARN("failed to set backup tenant dest", K(ret), K(backup_piece_path));
|
||||
} else if (OB_FAIL(check_tenant_archive_piece_infos_path_exist_(backup_tenant_dest, is_exist))) {
|
||||
LOG_WARN("failed to check archive piece infos path exist", K(ret), K(backup_tenant_dest));
|
||||
} else if (OB_FAIL(check_checkpoint_dir_emtpy_(backup_tenant_dest, is_empty))) {
|
||||
LOG_WARN("failed to check checkpoint dir empty", K(ret), K(backup_tenant_dest));
|
||||
} else {
|
||||
has_been_cleaned = !is_exist && is_empty;
|
||||
}
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
// single_backup_set_info
|
||||
int ObRestoreFailureChecker::check_tenant_backup_set_infos_path_exist_(
|
||||
const share::ObBackupDest &backup_set_dest,
|
||||
bool &is_exist)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
is_exist = false;
|
||||
ObBackupPath backup_path;
|
||||
if (OB_FAIL(ObBackupPathUtil::get_backup_set_info_path(backup_set_dest, backup_path))) {
|
||||
LOG_WARN("failed to get backup set info path", K(ret), K(backup_set_dest));
|
||||
} else if (OB_FAIL(check_path_exist_(backup_path, backup_set_dest.get_storage_info(), is_exist))) {
|
||||
LOG_WARN("failed to check path exist", K(ret));
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
// tenant_archive_piece_infos
|
||||
int ObRestoreFailureChecker::check_tenant_archive_piece_infos_path_exist_(
|
||||
const share::ObBackupDest &backup_set_dest,
|
||||
bool &is_exist)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
is_exist = false;
|
||||
ObBackupPath backup_path;
|
||||
if (OB_FAIL(ObArchivePathUtil::get_tenant_archive_piece_infos_file_path(backup_set_dest, backup_path))) {
|
||||
LOG_WARN("failed to get tenant archive piece infos file path", K(ret), K(backup_set_dest));
|
||||
} else if (OB_FAIL(check_path_exist_(backup_path, backup_set_dest.get_storage_info(), is_exist))) {
|
||||
LOG_WARN("failed to check path exist", K(ret));
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObRestoreFailureChecker::check_checkpoint_dir_emtpy_(
|
||||
const share::ObBackupDest &backup_tenant_dest,
|
||||
bool &is_empty)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
is_empty = false;
|
||||
ObBackupPath backup_path;
|
||||
if (OB_FAIL(ObArchivePathUtil::get_piece_checkpoint_dir_path(backup_tenant_dest, backup_path))) {
|
||||
LOG_WARN("failed to get tenant archive piece infos file path", K(ret), K(backup_tenant_dest));
|
||||
} else if (OB_FAIL(check_dir_empty_(backup_path, backup_tenant_dest.get_storage_info(), is_empty))) {
|
||||
LOG_WARN("failed to check dir empty", K(ret));
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObRestoreFailureChecker::check_path_exist_(
|
||||
const share::ObBackupPath &backup_path,
|
||||
const share::ObBackupStorageInfo *storage_info,
|
||||
bool &is_exist)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
is_exist = false;
|
||||
ObBackupIoAdapter util;
|
||||
if (OB_FAIL(util.is_exist(backup_path.get_ptr(), storage_info, is_exist))) {
|
||||
LOG_WARN("failed to check is exist", K(ret));
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObRestoreFailureChecker::check_dir_empty_(
|
||||
const share::ObBackupPath &backup_path,
|
||||
const share::ObBackupStorageInfo *storage_info,
|
||||
bool &is_empty)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
is_empty = false;
|
||||
ObBackupIoAdapter util;
|
||||
if (OB_FAIL(util.is_empty_directory(backup_path.get_ptr(), storage_info, is_empty))) {
|
||||
LOG_WARN("fail to init store", K(ret), K(backup_path));
|
||||
} else {
|
||||
LOG_INFO("is empty dir", K(backup_path), K(is_empty));
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
@ -46,7 +46,7 @@ public:
|
||||
const share::ObPhysicalRestoreJob &job_info);
|
||||
static int recycle_restore_job(common::ObMySQLProxy &sql_proxy,
|
||||
const share::ObPhysicalRestoreJob &job_info,
|
||||
const share::ObHisRestoreJobPersistInfo &history_info);
|
||||
const share::ObHisRestoreJobPersistInfo &history_info);
|
||||
static int check_has_physical_restore_job(
|
||||
common::ObISQLClient &sql_client,
|
||||
const common::ObString &tenant_name,
|
||||
@ -150,6 +150,40 @@ private:
|
||||
DISALLOW_COPY_AND_ASSIGN(ObRestoreUtil);
|
||||
};
|
||||
|
||||
class ObRestoreFailureChecker final
|
||||
{
|
||||
public:
|
||||
ObRestoreFailureChecker();
|
||||
~ObRestoreFailureChecker();
|
||||
int init(const share::ObPhysicalRestoreJob &job);
|
||||
int check_is_concurrent_with_clean(bool &is_clean_concurrency);
|
||||
|
||||
private:
|
||||
int loop_path_list_(const share::ObPhysicalRestoreJob &job, bool &has_been_cleaned);
|
||||
int check_tenant_backup_set_infos_path_exist_(
|
||||
const share::ObBackupDest &backup_tenant_dest,
|
||||
bool &is_exist);
|
||||
int check_tenant_archive_piece_infos_path_exist_(
|
||||
const share::ObBackupDest &backup_tenant_dest,
|
||||
bool &is_exist);
|
||||
int check_checkpoint_dir_emtpy_(
|
||||
const share::ObBackupDest &backup_tenant_dest,
|
||||
bool &is_empty);
|
||||
int check_path_exist_(
|
||||
const share::ObBackupPath &backup_path,
|
||||
const share::ObBackupStorageInfo *storage_info,
|
||||
bool &is_exist);
|
||||
int check_dir_empty_(
|
||||
const share::ObBackupPath &backup_path,
|
||||
const share::ObBackupStorageInfo *storage_info,
|
||||
bool &is_exist);
|
||||
|
||||
private:
|
||||
bool is_inited_;
|
||||
share::ObPhysicalRestoreJob job_;
|
||||
DISALLOW_COPY_AND_ASSIGN(ObRestoreFailureChecker);
|
||||
};
|
||||
|
||||
}
|
||||
}
|
||||
#endif /* __OB_RS_RESTORE_UTIL_H__ */
|
||||
|
@ -207,6 +207,20 @@ int ObArchivePathUtil::get_piece_checkpoint_dir_path(const ObBackupDest &dest, c
|
||||
return ret;
|
||||
}
|
||||
|
||||
// oss://archive/checkpoint
|
||||
int ObArchivePathUtil::get_piece_checkpoint_dir_path(
|
||||
const ObBackupDest &dest, ObBackupPath &path)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
path.reset();
|
||||
if (OB_FAIL(path.init(dest.get_root_path()))) {
|
||||
LOG_WARN("failed to assign dest path", K(ret), K(dest));
|
||||
} else if (OB_FAIL(path.join("checkpoint", ObBackupFileSuffix::NONE))) {
|
||||
LOG_WARN("failed to join checkpoint dir path", K(ret), K(path));
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
// oss://archive/piece_d[dest_id]r[round_id]p[piece_id]/checkpoint/checkpoint_info.[file_id].obarc
|
||||
int ObArchivePathUtil::get_piece_checkpoint_file_path(const ObBackupDest &dest, const int64_t dest_id,
|
||||
const int64_t round_id, const int64_t piece_id, const int64_t file_id, ObBackupPath &path)
|
||||
@ -316,6 +330,19 @@ int ObArchivePathUtil::get_tenant_archive_piece_infos_file_path(const ObBackupDe
|
||||
return ret;
|
||||
}
|
||||
|
||||
// oss://archive/tenant_archive_piece_infos.obarc
|
||||
int ObArchivePathUtil::get_tenant_archive_piece_infos_file_path(const ObBackupDest &dest, ObBackupPath &path)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
path.reset();
|
||||
if (OB_FAIL(path.init(dest.get_root_path()))) {
|
||||
LOG_WARN("failed to assign dest path", K(ret), K(dest));
|
||||
} else if (OB_FAIL(path.join("tenant_archive_piece_infos", ObBackupFileSuffix::ARCHIVE))) {
|
||||
LOG_WARN("failed to join tenant_archive_piece_infos file", K(ret), K(path));
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
// oss://archive/piece_d[dest_id]r[round_id]p[piece_id]/logstream_[%ld]/log/[file_id]
|
||||
int ObArchivePathUtil::get_ls_archive_file_path(const ObBackupDest &dest, const int64_t dest_id,
|
||||
const int64_t round_id, const int64_t piece_id, const share::ObLSID &ls_id, const int64_t file_id, ObBackupPath &path)
|
||||
|
@ -71,6 +71,9 @@ public:
|
||||
static int get_piece_checkpoint_dir_path(const ObBackupDest &dest, const int64_t dest_id,
|
||||
const int64_t round_id, const int64_t piece_id, ObBackupPath &path);
|
||||
|
||||
// oss://archive/checkpoint
|
||||
static int get_piece_checkpoint_dir_path(const ObBackupDest &dest, ObBackupPath &path);
|
||||
|
||||
// oss://archive/piece_d[dest_id]r[round_id]p[piece_id]/checkpoint/checkpoint_info.[file_id].obarc
|
||||
static int get_piece_checkpoint_file_path(const ObBackupDest &dest, const int64_t dest_id,
|
||||
const int64_t round_id, const int64_t piece_id, const int64_t file_id, ObBackupPath &path);
|
||||
@ -96,6 +99,8 @@ public:
|
||||
static int get_tenant_archive_piece_infos_file_path(const ObBackupDest &dest, const int64_t dest_id,
|
||||
const int64_t round_id, const int64_t piece_id, ObBackupPath &path);
|
||||
|
||||
static int get_tenant_archive_piece_infos_file_path(const ObBackupDest &dest, ObBackupPath &path);
|
||||
|
||||
// oss://archive/piece_d[dest_id]r[round_id]p[piece_id]/logstream_[%ld]/log/[file_id].obarc
|
||||
static int get_ls_archive_file_path(const ObBackupDest &dest, const int64_t dest_id,
|
||||
const int64_t round_id, const int64_t piece_id, const share::ObLSID &ls_id, const int64_t file_id, ObBackupPath &path);
|
||||
|
Loading…
x
Reference in New Issue
Block a user