fix backup plus archivelog with newly created ls case issue

This commit is contained in:
oceanoverflow 2024-12-16 14:45:29 +00:00 committed by ob-robot
parent 9ee476ca39
commit cbdd5d00cd
8 changed files with 449 additions and 125 deletions

View File

@ -178,6 +178,13 @@ int ObBackupSetTaskMgr::process()
}
break;
}
case ObBackupStatus::Status::PREPARE_BACKUP_LOG: {
DEBUG_SYNC(BEFORE_BACKUP_COMPLEMENT_LOG);
if (OB_FAIL(prepare_backup_log_())) {
LOG_WARN("[DATA_BACKUP]failed to prepare backup log", K(ret), K(set_task_attr_));
}
break;
}
case ObBackupStatus::Status::BEFORE_BACKUP_LOG: {
if (OB_FAIL(before_backup_log_())) {
LOG_WARN("[DATA_BACKUP]failed to before backup log", K(ret), K(set_task_attr_));
@ -185,7 +192,6 @@ int ObBackupSetTaskMgr::process()
break;
}
case ObBackupStatus::Status::BACKUP_LOG: {
DEBUG_SYNC(BEFORE_BACKUP_COMPLEMENT_LOG);
if (OB_FAIL(backup_completing_log_())) {
LOG_WARN("[DATA_BACKUP]failed to backup completing log", K(ret), K(set_task_attr_));
}
@ -428,6 +434,7 @@ int ObBackupSetTaskMgr::calc_task_turn_(const ObBackupDataTaskType &type, int64_
if (type.is_backup_meta()) {
turn_id = set_task_attr_.meta_turn_id_;
}
break;
}
case ObBackupStatus::BACKUP_META_FINISH: {
if (type.is_backup_user()) {
@ -445,6 +452,10 @@ int ObBackupSetTaskMgr::calc_task_turn_(const ObBackupDataTaskType &type, int64_
turn_id = set_task_attr_.major_turn_id_;
break;
}
case ObBackupStatus::PREPARE_BACKUP_LOG: {
turn_id = 1;
break;
}
default: {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("generate ls task in unmatched status", K(set_task_attr_));
@ -1079,6 +1090,10 @@ int ObBackupSetTaskMgr::get_next_status_(const share::ObBackupStatus &cur_status
break;
}
case ObBackupStatus::Status::BACKUP_FUSE_TABLET_META: {
next_status = ObBackupStatus::Status::PREPARE_BACKUP_LOG;
break;
}
case ObBackupStatus::Status::PREPARE_BACKUP_LOG: {
next_status = ObBackupStatus::Status::BEFORE_BACKUP_LOG;
break;
}
@ -1962,7 +1977,7 @@ int ObBackupSetTaskMgr::backup_fuse_tablet_meta_()
}
if (OB_SUCC(ret) && ls_task.count() == finish_cnt) {
ObBackupStatus next_status = ObBackupStatus::BEFORE_BACKUP_LOG;
ObBackupStatus next_status = ObBackupStatus::PREPARE_BACKUP_LOG;
set_task_attr_.end_ts_ = ObTimeUtility::current_time();
if (OB_FAIL(trans_.start(sql_proxy_, meta_tenant_id_))) {
LOG_WARN("fail to start trans", K(ret), K(meta_tenant_id_));
@ -2017,6 +2032,155 @@ int ObBackupSetTaskMgr::do_backup_fuse_tablet_meta_(ObArray<ObBackupLSTaskAttr>
return ret;
}
int ObBackupSetTaskMgr::prepare_backup_log_()
{
int ret = OB_SUCCESS;
ObArray<ObBackupLSTaskAttr> ls_task;
ObTenantArchiveRoundAttr round_attr;
int64_t finish_cnt = 0;
bool can_prepare_backup_log = false;
if (OB_FAIL(ObBackupLSTaskOperator::get_ls_tasks(*sql_proxy_,
job_attr_->job_id_, job_attr_->tenant_id_, true/*update*/, ls_task))) {
LOG_WARN("[DATA_BACKUP]failed to get log stream tasks", K(ret), "job_id", job_attr_->job_id_, "tenant_id", job_attr_->tenant_id_);
} else if (ls_task.empty()) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("[DATA_BACKUP]no logstream task", K(ret), K(ls_task));
} else if (job_attr_->plus_archivelog_) {
if (OB_FAIL(ObTenantArchiveMgr::get_tenant_current_round(job_attr_->tenant_id_, job_attr_->incarnation_id_, round_attr))) {
LOG_WARN("[DATA_BACKUP]failed to get tenant current round", K(ret));
} else if (!round_attr.is_valid()) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("[DATA_BACKUP]invalid round attr", K(ret));
} else if (round_attr.checkpoint_scn_ < set_task_attr_.end_scn_) {
if (REACH_TIME_INTERVAL(5 * 1000 * 1000)) {
LOG_WARN("[DATA_BACKUP]log archive checkpoint is less than task's end scn, try later", "checkpoint_scn", round_attr.checkpoint_scn_,
"end_scn", set_task_attr_.end_scn_);
}
} else {
can_prepare_backup_log = true;
}
} else {
can_prepare_backup_log = true;
}
if (OB_SUCC(ret) && can_prepare_backup_log) {
if (OB_FAIL(inner_prepare_backup_log_())) {
LOG_WARN("failed to inner prepare backup log", K(ret));
}
}
return ret;
}
int ObBackupSetTaskMgr::get_active_round_dest_id_(const uint64_t tenant_id, int64_t &dest_id)
{
int ret = OB_SUCCESS;
ObArchivePersistHelper helper;
ObArray<ObTenantArchiveRoundAttr> rounds;
if (OB_FAIL(helper.init(tenant_id))) {
LOG_WARN("failed to init archive persist helper", K(ret), K(tenant_id));
} else if (OB_FAIL(helper.get_all_active_rounds(trans_, rounds))) {
LOG_WARN("failed to get all active rounds", K(ret), K(tenant_id), K(dest_id));
} else if (1 != rounds.count()) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("round count is not one", K(ret), K(tenant_id));
} else {
dest_id = rounds.at(0).dest_id_;
}
return ret;
}
int ObBackupSetTaskMgr::get_newly_created_ls_in_piece_(const uint64_t tenant_id,
const share::SCN &start_scn, const share::SCN &end_scn, common::ObIArray<share::ObLSID> &ls_array)
{
int ret = OB_SUCCESS;
int64_t archive_dest_id = 0;
if (OB_FAIL(get_active_round_dest_id_(tenant_id, archive_dest_id))) {
LOG_WARN("failed to get active round dest id", K(ret), K(tenant_id));
} else if (OB_FAIL(inner_get_newly_created_ls_in_piece_(archive_dest_id, tenant_id, start_scn, end_scn, ls_array))) {
LOG_WARN("failed to get newly created ls in piece", K(ret));
}
return ret;
}
int ObBackupSetTaskMgr::inner_get_newly_created_ls_in_piece_(const int64_t dest_id, const uint64_t tenant_id,
const share::SCN &start_scn, const share::SCN &end_scn, common::ObIArray<share::ObLSID> &ls_array)
{
int ret = OB_SUCCESS;
ObArray<ObLSID> backup_ls_ids;
ObArray<ObLSID> archive_ls_ids;
const int64_t task_id = set_task_attr_.task_id_;
if (OB_FAIL(ObLSBackupOperator::get_all_backup_ls_id(tenant_id, task_id, backup_ls_ids, trans_))) {
LOG_WARN("failed to get all backup ls ids", K(ret), K(tenant_id), K(task_id));
} else if (OB_FAIL(ObLSBackupOperator::get_all_archive_ls_id(tenant_id, dest_id, start_scn, end_scn, archive_ls_ids, trans_))) {
LOG_WARN("failed to get all backup ls ids", K(ret), K(tenant_id), K(task_id));
} else {
for (int64_t i = 0; OB_SUCC(ret) && i < archive_ls_ids.count(); ++i) {
const ObLSID &archive_ls_id = archive_ls_ids.at(i);
bool exist = false;
for (int64_t j = 0; OB_SUCC(ret) && j < backup_ls_ids.count(); ++j) {
const ObLSID &backup_ls_id = backup_ls_ids.at(j);
if (backup_ls_id == archive_ls_id) {
exist = true;
break;
}
}
if (OB_SUCC(ret) && !exist) {
if (OB_FAIL(ls_array.push_back(archive_ls_id))) {
LOG_WARN("failed to push back ls id", K(ret), K(archive_ls_id));
}
}
}
}
if (OB_SUCC(ret)) {
ROOTSERVICE_EVENT_ADD("backup_data", "complement_newly_created_ls_ids",
"tenant_id", tenant_id,
"task_id", task_id,
"start_scn", start_scn.get_val_for_inner_table_field(),
"end_scn", end_scn.get_val_for_inner_table_field(),
"ls_array", ls_array);
LOG_INFO("get newly created ls in piece", K(dest_id), K(tenant_id), K(start_scn), K(end_scn),
K(backup_ls_ids), K(archive_ls_ids), K(ls_array));
}
return ret;
}
int ObBackupSetTaskMgr::inner_prepare_backup_log_()
{
int ret = OB_SUCCESS;
if (OB_FAIL(trans_.start(sql_proxy_, meta_tenant_id_))) {
LOG_WARN("fail to start trans", K(ret), K(meta_tenant_id_));
} else {
ObBackupStatus next_status = ObBackupStatus::BEFORE_BACKUP_LOG;
share::ObBackupDataTaskType type(share::ObBackupDataTaskType::Type::BEFORE_PLUS_ARCHIVE_LOG);
ObArray<ObLSID> newly_created_ls_ids;
if (job_attr_->plus_archivelog_ && OB_FAIL(get_newly_created_ls_in_piece_(
job_attr_->tenant_id_, set_task_attr_.start_scn_, set_task_attr_.end_scn_, newly_created_ls_ids))) {
LOG_WARN("failed to get newly created ls in piece", K(ret), KPC_(job_attr), K_(set_task_attr));
} else if (OB_FAIL(advance_status_(trans_, next_status, OB_SUCCESS, set_task_attr_.end_scn_, set_task_attr_.end_ts_))) {
LOG_WARN("[DATA_BACKUP]failed to advance status to BACKUP_PLUS_ARCHIVE_LOG", K(ret), K(next_status));
} else if (OB_FAIL(generate_ls_tasks_(newly_created_ls_ids, type))) {
LOG_WARN("failed to generate ls tasks", K(ret), K(newly_created_ls_ids));
} else {
LOG_INFO("prepare ls task", K(newly_created_ls_ids), K_(set_task_attr));
}
if (OB_SUCC(ret)) {
if (OB_FAIL(trans_.end(true))) {
LOG_WARN("fail to commit trans", K(ret));
} else {
set_task_attr_.status_ = next_status;
ROOTSERVICE_EVENT_ADD("backup_data", "prepare backup completing log succeed", "tenant_id",
job_attr_->tenant_id_, "job_id", job_attr_->job_id_, "task_id", set_task_attr_.task_id_);
backup_service_->wakeup();
}
} else {
int tmp_ret = OB_SUCCESS;
if (OB_TMP_FAIL(trans_.end(false))) {
LOG_WARN("fail to rollback", K(ret), K(tmp_ret));
}
}
}
return ret;
}
int ObBackupSetTaskMgr::before_backup_log_()
{

View File

@ -115,6 +115,13 @@ private:
int update_inner_task_(const ObIArray<share::ObLSID> &new_ls_ids,
const ObIArray<const share::ObBackupLSTaskAttr *> &need_change_turn_ls_tasks);
int convert_task_type_(const ObIArray<share::ObBackupLSTaskAttr> &ls_task);
int prepare_backup_log_();
int inner_prepare_backup_log_();
int get_active_round_dest_id_(const uint64_t tenant_id, int64_t &dest_id);
int get_newly_created_ls_in_piece_(const uint64_t tenant_id,
const share::SCN &start_scn, const share::SCN &end_scn, common::ObIArray<share::ObLSID> &ls_array);
int inner_get_newly_created_ls_in_piece_(const int64_t dest_id, const uint64_t tenant_id,
const share::SCN &start_scn, const share::SCN &end_scn, common::ObIArray<share::ObLSID> &ls_array);
int before_backup_log_();
int stat_all_ls_backup_log_(ObMySQLTransaction &trans);
int backup_completing_log_();

View File

@ -2983,6 +2983,7 @@ const char* ObBackupStatus::get_str() const
"BEFORE_BACKUP_LOG",
"BACKUP_LOG",
"BACKUP_FUSE_TABLET_META",
"PREPARE_BACKUP_LOG",
};
STATIC_ASSERT(MAX_STATUS == ARRAYSIZEOF(status_strs), "status count mismatch");
@ -3013,6 +3014,7 @@ int ObBackupStatus::set_status(const char *str)
"BEFORE_BACKUP_LOG",
"BACKUP_LOG",
"BACKUP_FUSE_TABLET_META",
"PREPARE_BACKUP_LOG",
};
const int64_t count = ARRAYSIZEOF(status_strs);
if (s.empty()) {

View File

@ -1276,6 +1276,7 @@ public:
BEFORE_BACKUP_LOG = 11,
BACKUP_LOG = 12,
BACKUP_FUSE_TABLET_META = 13,
PREPARE_BACKUP_LOG = 14,
MAX_STATUS
};
ObBackupStatus(): status_(MAX_STATUS) {}

View File

@ -148,6 +148,8 @@ DAG_SCHEDULER_DAG_TYPE_DEF(DAG_TYPE_BACKUP_INDEX_REBUILD, ObDagPrio::DAG_PRIO_HA
false, 6, {"tenant_id", "backup_set_id", "backup_data_type", "ls_id", "turn_id", "retry_id"})
DAG_SCHEDULER_DAG_TYPE_DEF(DAG_TYPE_BACKUP_LS_LOG_GROUP, ObDagPrio::DAG_PRIO_HA_LOW, ObSysTaskType::BACKUP_TASK, "BACKUP_COMPLEMENT_LOG_LS_GROUP", "BACKUP",
false, 3, {"tenant_id", "backup_set_id", "ls_id"})
DAG_SCHEDULER_DAG_TYPE_DEF(DAG_TYPE_BACKUP_LS_LOG_GROUP_FINISH, ObDagPrio::DAG_PRIO_HA_LOW, ObSysTaskType::BACKUP_TASK, "BACKUP_COMPLEMENT_LOG_LS_GROUP", "BACKUP",
false, 3, {"tenant_id", "backup_set_id", "ls_id"})
DAG_SCHEDULER_DAG_TYPE_DEF(DAG_TYPE_BACKUP_LS_LOG, ObDagPrio::DAG_PRIO_HA_LOW, ObSysTaskType::BACKUP_TASK, "BACKUP_COMPLEMENT_LOG_LS", "BACKUP",
false, 3, {"tenant_id", "backup_set_id", "ls_id"})
DAG_SCHEDULER_DAG_TYPE_DEF(DAG_TYPE_INITIAL_BACKUP_FUSE, ObDagPrio::DAG_PRIO_HA_LOW, ObSysTaskType::BACKUP_TASK, "INITIAL_BACKUP_FUSE", "BACKUP",

View File

@ -238,6 +238,7 @@ public:
TASK_TYPE_BACKUP_LS_LOG = 84,
TASK_TYPE_BACKUP_LS_LOG_FILE = 85,
TASK_TYPE_BACKUP_LS_LOG_FINISH = 86,
TASK_TYPE_BACKUP_LS_LOG_GROUP_FINISH = 87,
TASK_TYPE_MAX,
};

View File

@ -26,6 +26,8 @@ static int deal_with_fo(ObBackupComplementLogCtx *ctx, const int64_t result)
LOG_WARN("group ctx should not be null", K(ret));
} else if (OB_FAIL(ctx->set_result(result, false/*need_retry*/))) {
LOG_WARN("failed to set result", K(ret));
} else {
LOG_INFO("deal with fo", K(ret), K(result));
}
return ret;
}
@ -260,6 +262,7 @@ int ObBackupComplementLogDagNet::start_running()
int tmp_ret = OB_SUCCESS;
MAKE_TENANT_SWITCH_SCOPE_GUARD(guard);
ObBackupLSLogGroupDag *complement_dag = NULL;
ObBackupLSLogGroupFinishDag *finish_dag = NULL;
ObTenantDagScheduler *dag_scheduler = NULL;
if (IS_NOT_INIT) {
ret = OB_NOT_INIT;
@ -277,16 +280,47 @@ int ObBackupComplementLogDagNet::start_running()
LOG_WARN("failed to create first task for child dag", K(ret), KPC(complement_dag));
} else if (OB_FAIL(add_dag_into_dag_net(*complement_dag))) {
LOG_WARN("failed to add dag into dag_net", K(ret), KPC(complement_dag));
} else if (OB_FAIL(dag_scheduler->add_dag(complement_dag))) {
if (OB_EAGAIN != ret && OB_SIZE_OVERFLOW != ret) {
LOG_WARN("failed to add dag", K(ret), KPC(complement_dag));
} else if (OB_FAIL(dag_scheduler->alloc_dag(finish_dag))) {
LOG_WARN("failed to create dag", K(ret));
} else if (OB_FAIL(finish_dag->init(&ctx_))) {
LOG_WARN("failed to init finish dag", K(ret), K_(ctx));
} else if (OB_FAIL(finish_dag->create_first_task())) {
LOG_WARN("failed to create first task", K(ret));
} else if (OB_FAIL(complement_dag->add_child(*finish_dag))) {
LOG_WARN("failed to add child", K(ret), KPC(complement_dag), KPC(finish_dag));
} else {
bool add_finish_dag_success = false;
bool add_complement_dag_success = false;
if (OB_FAIL(dag_scheduler->add_dag(finish_dag))) {
LOG_WARN("failed to add dag into dag_scheduler", K(ret), KP(finish_dag));
} else {
LOG_WARN("may exist same dag", K(ret), KPC(complement_dag));
add_finish_dag_success = true;
LOG_INFO("success to add finish dag into dag_net", K(ret), KP(finish_dag));
}
if (FAILEDx(dag_scheduler->add_dag(complement_dag))) {
LOG_WARN("failed to add dag into dag_scheduler", K(ret), KP(complement_dag));
} else {
add_complement_dag_success = true;
LOG_INFO("success to add complement log dag into dag_net", K(ret), KP(complement_dag));
}
if (OB_FAIL(ret) && OB_NOT_NULL(dag_scheduler) && OB_NOT_NULL(finish_dag)) {
// add finish dag success and add complement log dag failed, need cancel finish dag
if (add_finish_dag_success && !add_complement_dag_success) {
if (OB_TMP_FAIL(dag_scheduler->cancel_dag(finish_dag))) {
LOG_ERROR("failed to cancel backup dag", K(tmp_ret), KP(dag_scheduler), KP(finish_dag));
} else {
finish_dag = NULL;
}
}
}
}
if (OB_FAIL(ret) && OB_NOT_NULL(dag_scheduler) && OB_NOT_NULL(complement_dag)) {
dag_scheduler->free_dag(*complement_dag);
}
if (OB_FAIL(ret) && OB_NOT_NULL(dag_scheduler) && OB_NOT_NULL(finish_dag)) {
dag_scheduler->free_dag(*finish_dag);
}
if (OB_FAIL(ret)) {
if (OB_TMP_FAIL(ObBackupUtils::report_task_result(ctx_.job_desc_.job_id_,
@ -454,12 +488,7 @@ bool ObBackupLSLogGroupDag::operator==(const ObIDag &other) const
bret = false;
} else {
const ObBackupLSLogGroupDag &other_dag = static_cast<const ObBackupLSLogGroupDag &>(other);
bret = ctx_->job_desc_ == other_dag.ctx_->job_desc_
&& ctx_->backup_dest_ == other_dag.ctx_->backup_dest_
&& ctx_->tenant_id_ == other_dag.ctx_->tenant_id_
&& ctx_->dest_id_ == other_dag.ctx_->dest_id_
&& ctx_->backup_set_desc_ == other_dag.ctx_->backup_set_desc_
&& ctx_->ls_id_ == other_dag.ctx_->ls_id_;
bret = ctx_ == other_dag.ctx_;
}
return bret;
}
@ -474,10 +503,7 @@ int64_t ObBackupLSLogGroupDag::hash() const
} else {
int64_t type = get_type();
hash_value = common::murmurhash(&type, sizeof(type), hash_value);
hash_value = common::murmurhash(&ctx_->job_desc_, sizeof(ctx_->job_desc_), hash_value);
hash_value = common::murmurhash(&ctx_->tenant_id_, sizeof(ctx_->tenant_id_), hash_value);
hash_value = common::murmurhash(&ctx_->backup_set_desc_, sizeof(ctx_->backup_set_desc_), hash_value);
hash_value = common::murmurhash(&ctx_->ls_id_, sizeof(ctx_->ls_id_), hash_value);
hash_value = ctx_->calc_hash(hash_value);
}
return hash_value;
}
@ -528,9 +554,6 @@ int ObBackupLSLogGroupTask::process()
LOG_WARN("ctx should not be null", K(ret));
} else if (ctx_->is_failed()) {
// do nothing
} else if (OB_FAIL(get_newly_created_ls_in_piece_(
ctx_->dest_id_, ctx_->tenant_id_, ctx_->compl_start_scn_, ctx_->compl_end_scn_))) {
LOG_WARN("failed to get newly created ls in piece", K(ret), KPC_(ctx));
} else if (OB_FAIL(generate_ls_dag_())) {
LOG_WARN("failed to generate ls backup dag", K(ret));
}
@ -545,84 +568,6 @@ int ObBackupLSLogGroupTask::process()
return ret;
}
int ObBackupLSLogGroupTask::get_active_round_dest_id_(const uint64_t tenant_id, int64_t &dest_id)
{
int ret = OB_SUCCESS;
ObArchivePersistHelper helper;
ObArray<ObTenantArchiveRoundAttr> rounds;
if (OB_FAIL(helper.init(tenant_id))) {
LOG_WARN("failed to init archive persist helper", K(ret), K(tenant_id));
} else if (OB_FAIL(helper.get_all_active_rounds(*ctx_->report_ctx_.sql_proxy_, rounds))) {
LOG_WARN("failed to get all active rounds", K(ret), K(tenant_id), K(dest_id));
} else if (1 != rounds.count()) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("round count is not one", K(ret), K(tenant_id));
} else {
dest_id = rounds.at(0).dest_id_;
}
return ret;
}
int ObBackupLSLogGroupTask::get_newly_created_ls_in_piece_(const int64_t dest_id, const uint64_t tenant_id,
const share::SCN &start_scn, const share::SCN &end_scn)
{
int ret = OB_SUCCESS;
int64_t archive_dest_id = 0;
if (OB_FAIL(ls_ids_.push_back(ls_id_))) {
LOG_WARN("failed to push back", K(ret), K_(ls_id));
} else if (!ls_id_.is_sys_ls()) {
} else if (OB_FAIL(get_active_round_dest_id_(tenant_id, archive_dest_id))) {
LOG_WARN("failed to get active round dest id", K(ret), K(tenant_id));
} else {
ObArray<ObLSID> newly_created_ls_ids;
if (OB_FAIL(inner_get_newly_created_ls_in_piece_(archive_dest_id, tenant_id,
ctx_->compl_start_scn_, ctx_->compl_end_scn_, newly_created_ls_ids))) {
LOG_WARN("failed to get newly created ls in piece", K(ret));
} else if (OB_FAIL(append(ls_ids_, newly_created_ls_ids))) {
LOG_WARN("failed to append ls", K(ret));
}
}
return ret;
}
int ObBackupLSLogGroupTask::inner_get_newly_created_ls_in_piece_(const int64_t dest_id, const uint64_t tenant_id,
const share::SCN &start_scn, const share::SCN &end_scn, common::ObIArray<share::ObLSID> &ls_array)
{
int ret = OB_SUCCESS;
ObArray<ObLSID> backup_ls_ids;
ObArray<ObLSID> archive_ls_ids;
const int64_t task_id = ctx_->job_desc_.task_id_;
if (OB_FAIL(ObLSBackupOperator::get_all_backup_ls_id(
tenant_id, task_id, backup_ls_ids, *ctx_->report_ctx_.sql_proxy_))) {
LOG_WARN("failed to get all backup ls ids", K(ret), K(tenant_id), K(task_id));
} else if (OB_FAIL(ObLSBackupOperator::get_all_archive_ls_id(
tenant_id, dest_id, start_scn, end_scn, archive_ls_ids, *ctx_->report_ctx_.sql_proxy_))) {
LOG_WARN("failed to get all backup ls ids", K(ret), K(tenant_id), K(task_id));
} else {
for (int64_t i = 0; OB_SUCC(ret) && i < archive_ls_ids.count(); ++i) {
const ObLSID &archive_ls_id = archive_ls_ids.at(i);
bool exist = false;
for (int64_t j = 0; OB_SUCC(ret) && j < backup_ls_ids.count(); ++j) {
const ObLSID &backup_ls_id = backup_ls_ids.at(j);
if (backup_ls_id == archive_ls_id) {
exist = true;
break;
}
}
if (OB_SUCC(ret) && !exist) {
if (OB_FAIL(ls_array.push_back(archive_ls_id))) {
LOG_WARN("failed to push back ls id", K(ret), K(archive_ls_id));
}
}
}
}
if (OB_SUCC(ret)) {
LOG_INFO("get newly created ls in piece", K(dest_id), K(tenant_id), K(start_scn), K(end_scn),
K(backup_ls_ids), K(archive_ls_ids), K(ls_array));
}
return ret;
}
int ObBackupLSLogGroupTask::get_next_ls_id_(share::ObLSID &ls_id)
{
int ret = OB_SUCCESS;
@ -646,6 +591,8 @@ int ObBackupLSLogGroupTask::generate_ls_dag_()
if (OB_ISNULL(scheduler = MTL(ObTenantDagScheduler*))) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("failed to get ObTenantDagScheduler from MTL", K(ret));
} else if (OB_FAIL(ls_ids_.push_back(ls_id_))) {
LOG_WARN("failed to push back", K(ret), K_(ls_id));
}
while (OB_SUCC(ret)) {
share::ObLSID ls_id;
@ -672,7 +619,7 @@ int ObBackupLSLogGroupTask::generate_ls_dag_()
LOG_WARN("failed to init ls dag", K(ret), K(ls_id), KPC_(ctx));
} else if (OB_FAIL(dag_net->add_dag_into_dag_net(*ls_dag))) {
LOG_WARN("failed to add dag into dag net", K(ret), KPC_(ctx));
} else if (OB_FAIL(parent->add_child_without_inheritance(*ls_dag))) {
} else if (OB_FAIL(parent->add_child(*ls_dag))) {
LOG_WARN("failed to add child dag", K(ret), KPC_(ctx));
} else if (OB_FAIL(ls_dag->create_first_task())) {
LOG_WARN("failed to create first task", K(ret), KPC_(ctx));
@ -718,12 +665,13 @@ int ObBackupLSLogGroupTask::record_server_event_()
ret = OB_ERR_UNEXPECTED;
LOG_WARN("ctx should not be null", K(ret));
} else {
SERVER_EVENT_ADD("backup_complement_log", "backup_ls_group_task",
SERVER_EVENT_ADD("backup_complement_log", "backup_ls_log_group_task",
"tenant_id", ctx_->tenant_id_,
"backup_set_id", ctx_->backup_set_desc_.backup_set_id_,
"ls_id", ctx_->ls_id_.id(),
"turn_id", ctx_->turn_id_,
"retry_id", ctx_->retry_id_);
"retry_id", ctx_->retry_id_,
"is_only_calc_stat", ctx_->is_only_calc_stat_);
}
return ret;
}
@ -812,12 +760,7 @@ bool ObBackupLSLogDag::operator==(const ObIDag &other) const
bret = false;
} else {
const ObBackupLSLogDag &other_dag = static_cast<const ObBackupLSLogDag &>(other);
bret = ctx_->job_desc_ == other_dag.ctx_->job_desc_
&& ctx_->backup_dest_ == other_dag.ctx_->backup_dest_
&& ctx_->tenant_id_ == other_dag.ctx_->tenant_id_
&& ctx_->dest_id_ == other_dag.ctx_->dest_id_
&& ctx_->backup_set_desc_ == other_dag.ctx_->backup_set_desc_
&& ls_id_ == other_dag.ls_id_;
bret = ctx_ == other_dag.ctx_;
}
return bret;
}
@ -832,9 +775,7 @@ int64_t ObBackupLSLogDag::hash() const
} else {
int64_t type = get_type();
hash_value = common::murmurhash(&type, sizeof(type), hash_value);
hash_value = common::murmurhash(&ctx_->job_desc_, sizeof(ctx_->job_desc_), hash_value);
hash_value = common::murmurhash(&ctx_->tenant_id_, sizeof(ctx_->tenant_id_), hash_value);
hash_value = common::murmurhash(&ctx_->backup_set_desc_, sizeof(ctx_->backup_set_desc_), hash_value);
hash_value = ctx_->calc_hash(hash_value);
hash_value = common::murmurhash(&ls_id_, sizeof(ls_id_), hash_value);
}
return hash_value;
@ -1735,12 +1676,13 @@ int ObBackupLSLogTask::record_server_event_()
ret = OB_ERR_UNEXPECTED;
LOG_WARN("ctx should not be null", K(ret));
} else {
SERVER_EVENT_ADD("backup_complement_log", "backup_ls_task",
SERVER_EVENT_ADD("backup_complement_log", "backup_ls_log_task",
"tenant_id", ctx_->tenant_id_,
"backup_set_id", ctx_->backup_set_desc_.backup_set_id_,
"ls_id", ctx_->ls_id_.id(),
"turn_id", ctx_->turn_id_,
"retry_id", ctx_->retry_id_);
"retry_id", ctx_->retry_id_,
"is_only_calc_stat", ctx_->is_only_calc_stat_);
}
return ret;
}
@ -1798,8 +1740,11 @@ int ObBackupLSLogFileTask::process()
LOG_WARN("ctx should not be null", K(ret));
} else if (ctx_->is_failed()) {
ret = OB_CANCELED;
LOG_WARN("ctx already failed", K(ret));
} else if (OB_FAIL(inner_process_(backup_piece_file_))) {
LOG_WARN("failed to inner process", K(ret), K_(backup_piece_file));
} else {
LOG_INFO("inner process backup log file", K_(backup_piece_file));
}
if (OB_FAIL(ret)) {
if (OB_TMP_FAIL(deal_with_fo(ctx_, ret))) {
@ -1867,7 +1812,7 @@ int ObBackupLSLogFileTask::inner_process_(const ObBackupPieceFile &piece_file)
} else if (OB_FAIL(report_progress_())) {
LOG_WARN("failed to make progress", K(ret));
} else {
SERVER_EVENT_ADD("backup", "backup_complement_log_file",
SERVER_EVENT_ADD("backup", "backup_ls_log_file_task",
"tenant_id", ctx_->tenant_id_,
"backup_set_id", ctx_->backup_set_desc_.backup_set_id_,
"dest_id", piece_file.dest_id_,
@ -2072,7 +2017,8 @@ int ObBackupLSLogFileTask::record_server_event_()
"ls_id", ctx_->ls_id_.id(),
"round_id", backup_piece_file_.round_id_,
"piece_id", backup_piece_file_.piece_id_,
"file_id", backup_piece_file_.file_id_);
"is_only_calc_stat", ctx_->is_only_calc_stat_,
backup_piece_file_.file_id_);
}
return ret;
}
@ -2176,9 +2122,6 @@ int ObBackupLSLogFinishTask::process()
} else {
// do nothing
}
if (OB_TMP_FAIL(report_task_result_())) {
LOG_WARN("failed to report task result", K(tmp_ret), K(ret));
}
if (OB_TMP_FAIL(record_server_event_())) {
LOG_WARN("failed to record server event", K(tmp_ret), K(ret));
}
@ -2214,7 +2157,176 @@ int ObBackupLSLogFinishTask::get_copy_file_info(ObBackupPieceFile &piece_file)
return ret;
}
int ObBackupLSLogFinishTask::report_task_result_()
int ObBackupLSLogFinishTask::record_server_event_()
{
int ret = OB_SUCCESS;
if (OB_ISNULL(ctx_)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("ctx should not be null", K(ret));
} else {
SERVER_EVENT_ADD("backup_complement_log", "backup_ls_log_finish_task",
"tenant_id", ctx_->tenant_id_,
"backup_set_id", ctx_->backup_set_desc_.backup_set_id_,
"ls_id", ctx_->ls_id_.id(),
"turn_id", ctx_->turn_id_,
"retry_id", ctx_->retry_id_,
"is_only_calc_stat", ctx_->is_only_calc_stat_);
}
return ret;
}
// ObBackupLSLogGroupFinishDag
ObBackupLSLogGroupFinishDag::ObBackupLSLogGroupFinishDag()
: share::ObIDag(ObDagType::DAG_TYPE_BACKUP_LS_LOG_GROUP_FINISH),
is_inited_(false),
ctx_(NULL)
{}
ObBackupLSLogGroupFinishDag::~ObBackupLSLogGroupFinishDag()
{}
int ObBackupLSLogGroupFinishDag::init(ObBackupComplementLogCtx *ctx)
{
int ret = OB_SUCCESS;
if (IS_INIT) {
ret = OB_INIT_TWICE;
LOG_WARN("ls backup complement log task init twice", K(ret));
} else if (OB_ISNULL(ctx)) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("get invalid arg", K(ret), KP(ctx));
} else {
ctx_ = ctx;
is_inited_ = true;
}
return ret;
}
int ObBackupLSLogGroupFinishDag::create_first_task()
{
int ret = OB_SUCCESS;
ObBackupLSLogGroupFinishTask *task = NULL;
if (IS_NOT_INIT) {
ret = OB_NOT_INIT;
LOG_WARN("backup ls group dag do not init", K(ret));
} else if (OB_FAIL(alloc_task(task))) {
LOG_WARN("failed to alloc task", K(ret));
} else if (OB_FAIL(task->init(ctx_))) {
LOG_WARN("failed to init task", K(ret), K_(ctx));
} else if (OB_FAIL(add_task(*task))) {
LOG_WARN("failed to add task", K(ret));
} else {
LOG_INFO("success to add backup ls log group finish task", K(ret), KPC(this), KPC(task));
}
return ret;
}
int ObBackupLSLogGroupFinishDag::fill_info_param(compaction::ObIBasicInfoParam *&out_param, ObIAllocator &allocator) const
{
int ret = OB_SUCCESS;
if (IS_NOT_INIT) {
ret = OB_NOT_INIT;
LOG_WARN("ls backup complement log dag do not init", K(ret));
} else if (OB_FAIL(ADD_DAG_WARN_INFO_PARAM(out_param, allocator, get_type(),
static_cast<int64_t>(ctx_->tenant_id_), ctx_->backup_set_desc_.backup_set_id_,
ctx_->ls_id_.id()))){
LOG_WARN("failed to add dag warning info param", K(ret));
}
return ret;
}
int ObBackupLSLogGroupFinishDag::fill_dag_key(char *buf, const int64_t buf_len) const
{
int ret = OB_SUCCESS;
if (OB_FAIL(databuff_printf(buf, buf_len, "ls_id=%s", to_cstring(ctx_->ls_id_)))) {
LOG_WARN("failed to fill dag_key", K(ret), KPC_(ctx));
}
return ret;
}
bool ObBackupLSLogGroupFinishDag::operator==(const ObIDag &other) const
{
bool bret = false;
if (this == &other) {
bret = true;
} else if (get_type() != other.get_type()) {
bret = false;
} else {
const ObBackupLSLogGroupFinishDag &other_dag = static_cast<const ObBackupLSLogGroupFinishDag &>(other);
bret = ctx_->job_desc_ == other_dag.ctx_->job_desc_
&& ctx_->backup_dest_ == other_dag.ctx_->backup_dest_
&& ctx_->tenant_id_ == other_dag.ctx_->tenant_id_
&& ctx_->dest_id_ == other_dag.ctx_->dest_id_
&& ctx_->backup_set_desc_ == other_dag.ctx_->backup_set_desc_
&& ctx_->ls_id_ == other_dag.ctx_->ls_id_;
}
return bret;
}
int64_t ObBackupLSLogGroupFinishDag::hash() const
{
int ret = OB_SUCCESS;
int64_t hash_value = 0;
if (IS_NOT_INIT) {
ret = OB_ERR_UNEXPECTED;
LOG_ERROR("backup ls group dag do not init", K(ret));
} else {
int64_t type = get_type();
hash_value = common::murmurhash(&type, sizeof(type), hash_value);
hash_value = ctx_->calc_hash(hash_value);
}
return hash_value;
}
// ObBackupLSLogGroupFinishTask
ObBackupLSLogGroupFinishTask::ObBackupLSLogGroupFinishTask()
: ObITask(ObITask::ObITaskType::TASK_TYPE_BACKUP_LS_LOG_GROUP_FINISH),
is_inited_(false),
ctx_(NULL)
{
}
ObBackupLSLogGroupFinishTask::~ObBackupLSLogGroupFinishTask()
{
}
int ObBackupLSLogGroupFinishTask::init(ObBackupComplementLogCtx *ctx)
{
int ret = OB_SUCCESS;
if (IS_INIT) {
ret = OB_INIT_TWICE;
LOG_WARN("backup ls log copy finish task init twice", K(ret));
} else if (OB_ISNULL(ctx)) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("get invalid arg", K(ret), KP(ctx));
} else {
ctx_ = ctx;
is_inited_ = true;
}
return ret;
}
int ObBackupLSLogGroupFinishTask::process()
{
int ret = OB_SUCCESS;
int tmp_ret = OB_SUCCESS;
if (IS_NOT_INIT) {
ret = OB_NOT_INIT;
LOG_WARN("backup ls log copy finish task do not init", K(ret));
} else {
// do nothing
}
if (OB_TMP_FAIL(report_task_result_())) {
LOG_WARN("failed to report task result", K(tmp_ret), K(ret));
}
if (OB_TMP_FAIL(record_server_event_())) {
LOG_WARN("failed to record server event", K(tmp_ret), K(ret));
}
return ret;
}
int ObBackupLSLogGroupFinishTask::report_task_result_()
{
int ret = OB_SUCCESS;
int32_t result = OB_SUCCESS;
@ -2238,19 +2350,20 @@ int ObBackupLSLogFinishTask::report_task_result_()
return ret;
}
int ObBackupLSLogFinishTask::record_server_event_()
int ObBackupLSLogGroupFinishTask::record_server_event_()
{
int ret = OB_SUCCESS;
if (OB_ISNULL(ctx_)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("ctx should not be null", K(ret));
} else {
SERVER_EVENT_ADD("backup_complement_log", "backup_ls_finish_task",
SERVER_EVENT_ADD("backup_complement_log", "backup_ls_log_group_finish_task",
"tenant_id", ctx_->tenant_id_,
"backup_set_id", ctx_->backup_set_desc_.backup_set_id_,
"ls_id", ctx_->ls_id_.id(),
"turn_id", ctx_->turn_id_,
"retry_id", ctx_->retry_id_);
"retry_id", ctx_->retry_id_,
"is_only_calc_stat", ctx_->is_only_calc_stat_);
}
return ret;
}

View File

@ -167,11 +167,6 @@ public:
virtual int process() override;
private:
int get_active_round_dest_id_(const uint64_t tenant_id, int64_t &dest_id);
int get_newly_created_ls_in_piece_(const int64_t dest_id, const uint64_t tenant_id,
const share::SCN &start_scn, const share::SCN &end_scn);
int inner_get_newly_created_ls_in_piece_(const int64_t dest_id, const uint64_t tenant_id,
const share::SCN &start_scn, const share::SCN &end_scn, common::ObIArray<share::ObLSID> &ls_array);
int get_next_ls_id_(share::ObLSID &ls_id);
int generate_ls_dag_();
int record_server_event_();
@ -341,6 +336,45 @@ private:
DISALLOW_COPY_AND_ASSIGN(ObBackupLSLogFinishTask);
};
class ObBackupLSLogGroupFinishDag : public share::ObIDag
{
public:
ObBackupLSLogGroupFinishDag();
virtual ~ObBackupLSLogGroupFinishDag();
int init(ObBackupComplementLogCtx *ctx);
virtual bool operator == (const share::ObIDag &other) const override;
virtual int64_t hash() const override;
virtual int fill_dag_key(char *buf, const int64_t buf_len) const override;
virtual int create_first_task() override;
virtual int fill_info_param(compaction::ObIBasicInfoParam *&out_param, ObIAllocator &allocator) const override;
virtual lib::Worker::CompatMode get_compat_mode() const { return lib::Worker::CompatMode::MYSQL; }
virtual uint64_t get_consumer_group_id() const override { return consumer_group_id_; }
virtual bool is_ha_dag() const override { return true; }
protected:
bool is_inited_;
ObBackupComplementLogCtx *ctx_;
DISALLOW_COPY_AND_ASSIGN(ObBackupLSLogGroupFinishDag);
};
class ObBackupLSLogGroupFinishTask : public share::ObITask
{
public:
ObBackupLSLogGroupFinishTask();
virtual ~ObBackupLSLogGroupFinishTask();
int init(ObBackupComplementLogCtx *ctx);
virtual int process() override;
private:
int report_task_result_();
int record_server_event_();
private:
bool is_inited_;
ObBackupComplementLogCtx *ctx_;
DISALLOW_COPY_AND_ASSIGN(ObBackupLSLogGroupFinishTask);
};
} // namespace backup
} // namespace oceanbase