patch backup &migrate code to open source

This commit is contained in:
mw0
2021-09-27 20:48:34 +08:00
committed by wangzelin.wzl
parent a2c22b06aa
commit 64b53b6a17
41 changed files with 4944 additions and 4232 deletions

View File

@ -11974,8 +11974,8 @@ void ObPartitionMigrationDataStatics::reset()
reuse_count_ = 0;
partition_count_ = 0;
finish_partition_count_ = 0;
// input_bytes_;
// output_bytes_;
input_bytes_ = 0;
output_bytes_ = 0;
}
ObRestoreInfo::ObRestoreInfo()
@ -14057,5 +14057,69 @@ int ObPartitionService::check_standby_cluster_schema_condition(const ObPartition
return ret;
}
int ObPartitionService::standby_cut_data_batch(const obrpc::ObStandbyCutDataBatchTaskArg& arg)
{
int ret = OB_SUCCESS;
ObArray<ObReplicaOpArg> task_list;
const ObReplicaOpType type = RESTORE_STANDBY_OP;
int64_t flashback_ts = 0;
if (OB_UNLIKELY(!is_inited_)) {
ret = OB_NOT_INIT;
STORAGE_LOG(WARN, "The ObPartitionService has not been inited, ", K(ret));
} else if (OB_UNLIKELY(!is_running_)) {
ret = OB_CANCELED;
STORAGE_LOG(WARN, "The service is not running, ", K(ret));
} else if (OB_UNLIKELY(!is_scan_disk_finished())) {
ret = OB_EAGAIN;
STORAGE_LOG(WARN, "rebooting, replica op not allow", K(ret));
} else if (!arg.is_valid() || arg.arg_array_.empty()) {
ret = OB_INVALID_ARGUMENT;
STORAGE_LOG(WARN, "failed args", K(ret), K(arg));
} else if (OB_FAIL(task_list.reserve(arg.arg_array_.count()))) {
STORAGE_LOG(WARN, "failed to reserve task list", K(ret));
} else if (FALSE_IT(flashback_ts = arg.flashback_ts_)) {
} else {
for (int64_t i = 0; OB_SUCC(ret) && i < arg.arg_array_.count(); ++i) {
const obrpc::ObStandbyCutDataTaskArg& single_arg = arg.arg_array_.at(i);
ObReplicaOpArg replica_op_arg;
replica_op_arg.data_src_ = single_arg.dst_;
replica_op_arg.dst_ = single_arg.dst_;
replica_op_arg.key_ = single_arg.pkey_;
replica_op_arg.priority_ = ObReplicaOpPriority::PRIO_LOW;
replica_op_arg.cluster_id_ = GCONF.cluster_id;
replica_op_arg.type_ = type;
replica_op_arg.switch_epoch_ = GCTX.get_switch_epoch2();
replica_op_arg.phy_restore_arg_.restore_info_.restore_snapshot_version_ = flashback_ts;
ObIPartitionGroupGuard guard;
ObIPartitionGroup* partition = NULL;
ObReplicaRestoreStatus restore_status;
if (OB_FAIL(get_partition(single_arg.pkey_, guard))) {
STORAGE_LOG(WARN, "get partition failed", K(single_arg), K(ret));
} else if (OB_ISNULL(guard.get_partition_group())) {
ret = OB_ENTRY_NOT_EXIST;
STORAGE_LOG(WARN, "partition not exist, maybe migrate out", K(single_arg), K(ret));
} else if (OB_ISNULL(partition = guard.get_partition_group())) {
ret = OB_ERR_UNEXPECTED;
STORAGE_LOG(WARN, "partition should not be NULL", K(ret), K(single_arg));
} else if (FALSE_IT(restore_status = partition->get_pg_storage().get_restore_status())) {
} else if (ObReplicaRestoreStatus::REPLICA_RESTORE_STANDBY_CUT != restore_status) {
LOG_INFO("partition restore status is not cut data, skip it", K(ret), K(single_arg));
} else if (OB_FAIL(task_list.push_back(replica_op_arg))) {
LOG_WARN("failed to push replica op arg into array", K(ret), K(replica_op_arg));
}
}
}
if (OB_SUCC(ret)) {
if (task_list.empty()) {
LOG_INFO("has no replica need cut data, skip", K(ret), K(arg));
} else if (OB_FAIL(ObPartGroupMigrator::get_instance().schedule(task_list, arg.trace_id_))) {
STORAGE_LOG(WARN, "fail to schedule migrate task.", K(arg), K(ret));
}
}
return ret;
}
} // end of namespace storage
} // end of namespace oceanbase