fix rs schedule backup state machine

This commit is contained in:
oceanoverflow
2023-12-19 20:48:09 +00:00
committed by ob-robot
parent 64b11adfb4
commit 1b6966fef1
8 changed files with 160 additions and 45 deletions

View File

@ -117,7 +117,15 @@ int ObBackupSetTaskMgr::advance_status_(
LOG_WARN("[DATA_BACKUP]failed to check leader", K(ret));
} else if (OB_FAIL(ObBackupTaskOperator::advance_task_status(trans, set_task_attr_, next_status, result, scn, end_ts))) {
LOG_WARN("[DATA_BACKUP]failed to advance set status", K(ret), K(set_task_attr_), K(next_status));
}
} else {
ROOTSERVICE_EVENT_ADD("backup_data", "advance_status",
"tenant_id", job_attr_->tenant_id_,
"job_id", job_attr_->job_id_,
"backup_set_id", job_attr_->backup_set_id_,
"curr_status", set_task_attr_.status_.get_str(),
"next_status", next_status.get_str(),
"result", result);
}
return ret;
}
@ -150,6 +158,12 @@ int ObBackupSetTaskMgr::process()
}
break;
}
case ObBackupStatus::Status::BACKUP_META_FINISH: {
if (OB_FAIL(backup_meta_finish_())) {
LOG_WARN("[DATA_BACKUP]failed to backup meta finish", K(ret), K(set_task_attr_));
}
break;
}
case ObBackupStatus::Status::BACKUP_DATA_MINOR:
case ObBackupStatus::Status::BACKUP_DATA_MAJOR: {
if (OB_FAIL(backup_data_())) {
@ -508,22 +522,17 @@ int ObBackupSetTaskMgr::backup_user_meta_()
if (OB_FAIL(change_meta_turn_(sys_ls_task))) {
LOG_WARN("failed to change meta turn", K(ret));
}
} else if (OB_FAIL(calc_consistent_scn_(ls_task, consistent_scn))) {
LOG_WARN("failed to calc consistent scn", K(ret), K(ls_task));
} else if (OB_FAIL(merge_ls_meta_infos_(ls_task))) {
LOG_WARN("fail to merge ls meta infos", K(ret), K(ls_task));
} else if (OB_FAIL(merge_tablet_to_ls_info_(consistent_scn, ls_task))) {
LOG_WARN("[DATA_BACKUP]failed to merge tablet to ls info", K(ret), K(ls_task));
} else if (OB_FALSE_IT(DEBUG_SYNC(BEFORE_BACKUP_DATA))) {
} else if (OB_FAIL(trans_.start(sql_proxy_, meta_tenant_id_))) {
LOG_WARN("fail to start trans", K(ret), K(meta_tenant_id_));
} else {
ObBackupStatus next_status = ObBackupStatus::BACKUP_DATA_MINOR;
ObBackupStatus next_status = ObBackupStatus::BACKUP_META_FINISH;
if (OB_FAIL(convert_task_type_(ls_task))) {
LOG_WARN("[DATA_BACKUP]fail to update task type to backup data", K(ret));
} else if (OB_FAIL(advance_status_(trans_, next_status))) {
LOG_WARN("[DATA_BACKUP]failed to advance status to BACKUP_DATA_MINOR", K(ret), K(next_status));
}
} else {
ROOTSERVICE_EVENT_ADD("backup_data", "after_backup_consistent_scn");
}
if (OB_SUCC(ret)) {
if (OB_FAIL(trans_.end(true))) {
@ -545,6 +554,58 @@ int ObBackupSetTaskMgr::backup_user_meta_()
return ret;
}
int ObBackupSetTaskMgr::backup_meta_finish_()
{
int ret = OB_SUCCESS;
ObArray<ObBackupLSTaskAttr> ls_task;
ObArray<ObLSID> ls_ids;
share::SCN consistent_scn;
DEBUG_SYNC(BEFORE_BACKUP_META_FINISH);
if (OB_FAIL(ObBackupLSTaskOperator::get_ls_tasks(*sql_proxy_, job_attr_->job_id_, job_attr_->tenant_id_, false/*update*/, ls_task))) {
LOG_WARN("[DATA_BACKUP]failed to get log stream tasks", K(ret), "job_id", job_attr_->job_id_, "tenant_id", job_attr_->tenant_id_);
} else if (ls_task.empty()) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("[DATA_BACKUP]no logstream task", K(ret), "job_id", job_attr_->job_id_, "tenant_id", job_attr_->tenant_id_);
} else if (OB_FAIL(calc_consistent_scn_(ls_task, consistent_scn))) {
LOG_WARN("failed to calc consistent scn", K(ret), K(ls_task));
} else if (OB_FAIL(merge_ls_meta_infos_(ls_task))) {
LOG_WARN("fail to merge ls meta infos", K(ret), K(ls_task));
} else if (OB_FAIL(merge_tablet_to_ls_info_(consistent_scn, ls_task, ls_ids))) {
LOG_WARN("[DATA_BACKUP]failed to merge tablet to ls info", K(ret), K(ls_task));
} else if (OB_FALSE_IT(DEBUG_SYNC(BEFORE_BACKUP_DATA))) {
} else if (OB_FAIL(trans_.start(sql_proxy_, meta_tenant_id_))) {
LOG_WARN("fail to start trans", K(ret), K(meta_tenant_id_));
} else {
ObBackupStatus next_status = ObBackupStatus::BACKUP_DATA_MINOR;
share::ObBackupDataTaskType type(share::ObBackupDataTaskType::Type::BACKUP_DATA_MINOR);
if (OB_FAIL(convert_task_type_(ls_task))) {
LOG_WARN("[DATA_BACKUP]fail to update task type to backup data", K(ret));
} else if (OB_FAIL(advance_status_(trans_, next_status))) {
LOG_WARN("[DATA_BACKUP]failed to advance status to BACKUP_DATA_MINOR", K(ret), K(next_status));
} else if (OB_FAIL(generate_ls_tasks_(ls_ids, type))) {
LOG_WARN("failed to generate ls tasks", K(ret), K(ls_ids), K(type));
} else {
ROOTSERVICE_EVENT_ADD("backup_data", "after_backup_consistent_scn");
}
if (OB_SUCC(ret)) {
if (OB_FAIL(trans_.end(true))) {
LOG_WARN("failed to commit trans", KR(ret));
} else {
backup_service_->wakeup();
}
} else {
int tmp_ret = OB_SUCCESS;
if (OB_SUCCESS != (tmp_ret = trans_.end(false))) {
LOG_WARN("failed to rollback", KR(ret), K(tmp_ret));
}
}
}
return ret;
}
int ObBackupSetTaskMgr::calc_consistent_scn_(ObIArray<share::ObBackupLSTaskAttr> &ls_tasks, share::SCN &consistent_scn)
{
int ret = OB_SUCCESS;
@ -675,12 +736,13 @@ int ObBackupSetTaskMgr::merge_ls_meta_infos_(
return ret;
}
int ObBackupSetTaskMgr::merge_tablet_to_ls_info_(const share::SCN &consistent_scn, const ObIArray<ObBackupLSTaskAttr> &ls_tasks)
int ObBackupSetTaskMgr::merge_tablet_to_ls_info_(const share::SCN &consistent_scn,
const ObIArray<ObBackupLSTaskAttr> &ls_tasks, common::ObIArray<share::ObLSID> &ls_ids)
{
int ret = OB_SUCCESS;
ls_ids.reset();
ObHashMap<ObLSID, ObArray<ObTabletID>> latest_ls_tablet_map;
ObHashMap<ObLSID, const ObBackupLSTaskAttr *> backup_ls_map; // the ls task persisted in __all_backup_ls_task
ObArray<share::ObLSID> ls_ids;
const int64_t OB_BACKUP_MAX_LS_BUCKET = 1024;
SCN max_backup_scn;
if (ls_tasks.empty() || !consistent_scn.is_valid()) {
@ -738,29 +800,6 @@ int ObBackupSetTaskMgr::merge_tablet_to_ls_info_(const share::SCN &consistent_sc
}
}
}
share::ObBackupDataTaskType type(share::ObBackupDataTaskType::Type::BACKUP_DATA_MINOR);
if (OB_FAIL(ret)) {
} else if (OB_FAIL(trans_.start(sql_proxy_, meta_tenant_id_))) {
LOG_WARN("fail to start trans", K(ret), K(meta_tenant_id_));
} else if (OB_FAIL(generate_ls_tasks_(ls_ids, type))) {
LOG_WARN("failed to generate ls tasks", K(ret), K(ls_ids), K(type));
} else {
ROOTSERVICE_EVENT_ADD("backup_data",
"after_backup_consistent_scn",
"tenant_id",
job_attr_->tenant_id_,
"job_id",
job_attr_->job_id_,
"task_id",
set_task_attr_.task_id_);
}
if (trans_.is_started()) {
int tmp_ret = OB_SUCCESS;
if (OB_TMP_FAIL(trans_.end(OB_SUCC(ret)))) {
ret = OB_SUCC(ret) ? tmp_ret : ret;
LOG_WARN("failed to end trans", K(ret), K(tmp_ret));
}
}
return ret;
}
@ -977,6 +1016,10 @@ int ObBackupSetTaskMgr::get_next_status_(const share::ObBackupStatus &cur_status
break;
}
case ObBackupStatus::Status::BACKUP_USER_META: {
next_status = ObBackupStatus::Status::BACKUP_META_FINISH;
break;
}
case ObBackupStatus::Status::BACKUP_META_FINISH: {
next_status = ObBackupStatus::Status::BACKUP_DATA_MINOR;
break;
}
@ -1525,6 +1568,10 @@ int ObBackupSetTaskMgr::convert_task_type_(const ObIArray<ObBackupLSTaskAttr> &l
ObBackupDataTaskType type;
switch(set_task_attr_.status_.status_) {
case ObBackupStatus::Status::BACKUP_USER_META: {
type.type_ = ObBackupDataTaskType::Type::BACKUP_META_FINISH;
break;
}
case ObBackupStatus::Status::BACKUP_META_FINISH: {
type.type_ = ObBackupDataTaskType::Type::BACKUP_DATA_MINOR;
break;
}