support observer 4.2 add ls from observer 4.1
This commit is contained in:
parent
36f69ff4ab
commit
0ebc7ab3fd
@ -1215,6 +1215,8 @@ int ObStartCompleteMigrationTask::change_member_list_()
|
||||
const int64_t start_ts = ObTimeUtility::current_time();
|
||||
common::ObAddr leader_addr;
|
||||
share::SCN ls_transfer_scn;
|
||||
uint64_t cluster_version = 0;
|
||||
palf::LogConfigVersion fake_config_version;
|
||||
|
||||
if (!is_inited_) {
|
||||
ret = OB_NOT_INIT;
|
||||
@ -1225,14 +1227,22 @@ int ObStartCompleteMigrationTask::change_member_list_()
|
||||
} else if (ObMigrationOpType::ADD_LS_OP != ctx_->arg_.type_
|
||||
&& ObMigrationOpType::MIGRATE_LS_OP != ctx_->arg_.type_) {
|
||||
//do nothing
|
||||
} else if (FALSE_IT(cluster_version = GET_MIN_CLUSTER_VERSION())) {
|
||||
} else if (OB_FAIL(ObStorageHAUtils::get_ls_leader(ctx_->tenant_id_, ctx_->arg_.ls_id_, leader_addr))) {
|
||||
LOG_WARN("failed to get ls leader", K(ret), KPC(ctx_));
|
||||
} else if (OB_FAIL(OB_FAIL(get_ls_transfer_scn_(ls, ls_transfer_scn)))) {
|
||||
LOG_WARN("failed to get ls transfer scn", K(ret), KP(ls));
|
||||
} else if (OB_FAIL(fake_config_version.generate(0, 0))) {
|
||||
LOG_WARN("failed to generate config version", K(ret));
|
||||
} else {
|
||||
if (ObMigrationOpType::ADD_LS_OP == ctx_->arg_.type_) {
|
||||
const int64_t change_member_list_timeout_us = GCONF.sys_bkgd_migration_change_member_list_timeout;
|
||||
if (REPLICA_TYPE_FULL == ctx_->arg_.dst_.get_replica_type()) {
|
||||
if (cluster_version < CLUSTER_VERSION_4_2_0_0) {
|
||||
if (OB_FAIL(ls->get_log_handler()->add_member(ctx_->arg_.dst_, ctx_->arg_.paxos_replica_number_,
|
||||
fake_config_version, change_member_list_timeout_us))) {
|
||||
LOG_WARN("failed to add member", K(ret), KPC(ctx_));
|
||||
}
|
||||
} else if (REPLICA_TYPE_FULL == ctx_->arg_.dst_.get_replica_type()) {
|
||||
if (OB_FAIL(add_member_(leader_addr, ls_transfer_scn))) {
|
||||
LOG_WARN("failed to add member", K(ret), K(leader_addr), K(ls_transfer_scn));
|
||||
}
|
||||
@ -1244,7 +1254,12 @@ int ObStartCompleteMigrationTask::change_member_list_()
|
||||
}
|
||||
} else if (ObMigrationOpType::MIGRATE_LS_OP == ctx_->arg_.type_) {
|
||||
const int64_t change_member_list_timeout_us = GCONF.sys_bkgd_migration_change_member_list_timeout;
|
||||
if (REPLICA_TYPE_FULL == ctx_->arg_.dst_.get_replica_type()) {
|
||||
if (cluster_version < CLUSTER_VERSION_4_2_0_0) {
|
||||
if (OB_FAIL(ls->get_log_handler()->replace_member(ctx_->arg_.dst_, ctx_->arg_.src_,
|
||||
fake_config_version, change_member_list_timeout_us))) {
|
||||
LOG_WARN("failed to repalce member", K(ret), KPC(ctx_));
|
||||
}
|
||||
} else if (REPLICA_TYPE_FULL == ctx_->arg_.dst_.get_replica_type()) {
|
||||
if (OB_FAIL(replace_member_(leader_addr, ls_transfer_scn))) {
|
||||
LOG_WARN("failed to replace member", K(ret), K(leader_addr), K(ls_transfer_scn));
|
||||
}
|
||||
|
@ -929,6 +929,7 @@ int ObStartMigrationTask::init()
|
||||
bandwidth_throttle_ = migration_dag_net->get_bandwidth_throttle();
|
||||
svr_rpc_proxy_ = migration_dag_net->get_storage_rpc_proxy();
|
||||
storage_rpc_ = migration_dag_net->get_storage_rpc();
|
||||
ctx_->reuse();
|
||||
is_inited_ = true;
|
||||
LOG_INFO("succeed init start migration task", "ls id", ctx_->arg_.ls_id_,
|
||||
"dag_id", *ObCurTraceId::get_trace_id(), "dag_net_id", ctx_->task_id_);
|
||||
@ -1457,6 +1458,27 @@ int ObStartMigrationTask::check_before_ls_migrate_(const ObLSMeta &ls_meta)
|
||||
}
|
||||
|
||||
int ObStartMigrationTask::build_ls_()
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
if (!is_inited_) {
|
||||
ret = OB_NOT_INIT;
|
||||
LOG_WARN("start migration task do not init", K(ret));
|
||||
} else {
|
||||
if (OB_FAIL(inner_build_ls_())) {
|
||||
LOG_WARN("failed to do inner build ls", K(ret));
|
||||
}
|
||||
|
||||
if (OB_NOT_SUPPORTED == ret) {
|
||||
//build ls with old rpc, overwrite ret
|
||||
if (OB_FAIL(inner_build_ls_with_old_rpc_())) {
|
||||
LOG_WARN("failed to do inner build ls with old rpc", K(ret));
|
||||
}
|
||||
}
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObStartMigrationTask::inner_build_ls_()
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
void *buf = nullptr;
|
||||
@ -1525,6 +1547,51 @@ int ObStartMigrationTask::create_all_tablets_(
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObStartMigrationTask::inner_build_ls_with_old_rpc_()
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
if (!is_inited_) {
|
||||
ret = OB_NOT_INIT;
|
||||
LOG_WARN("start migration task do not init", K(ret));
|
||||
} else if (OB_FAIL(update_ls_())) {
|
||||
LOG_WARN("failed to update local ls", K(ret), KPC(ctx_));
|
||||
} else if (OB_FAIL(create_all_tablets_with_4_1_rpc_())) {
|
||||
LOG_WARN("failed to create all tablets with 4_1 rpc", K(ret), KPC(ctx_));
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObStartMigrationTask::create_all_tablets_with_4_1_rpc_()
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
ObStorageHATabletsBuilder ha_tablets_builder;
|
||||
ObLSHandle ls_handle;
|
||||
ObLS *ls = nullptr;
|
||||
ObArray<ObTabletID> tablet_id_array;
|
||||
|
||||
if (!is_inited_) {
|
||||
ret = OB_NOT_INIT;
|
||||
LOG_WARN("start migration task do not init", K(ret));
|
||||
} else if (OB_FAIL(append(tablet_id_array, ctx_->sys_tablet_id_array_))) {
|
||||
LOG_WARN("failed to append sys tablet id array", K(ret), KPC(ctx_));
|
||||
} else if (OB_FAIL(append(tablet_id_array, ctx_->data_tablet_id_array_))) {
|
||||
LOG_WARN("failed to append data tablet id array", K(ret), KPC(ctx_));
|
||||
} else if (OB_FAIL(ObStorageHADagUtils::get_ls(ctx_->arg_.ls_id_, ls_handle))) {
|
||||
LOG_WARN("failed to get ls", K(ret), KPC(ctx_));
|
||||
} else if (OB_ISNULL(ls = ls_handle.get_ls())) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_WARN("ls should not be NULL", K(ret), KP(ls), KPC(ctx_));
|
||||
} else if (OB_FAIL(ObLSMigrationUtils::init_ha_tablets_builder(
|
||||
ctx_->tenant_id_, tablet_id_array, ctx_->minor_src_, ctx_->local_rebuild_seq_, ctx_->arg_.type_,
|
||||
ls, &ctx_->ha_table_info_mgr_, ha_tablets_builder))) {
|
||||
LOG_WARN("failed to init ha tablets builder", K(ret), KPC(ctx_));
|
||||
} else if (OB_FAIL(ha_tablets_builder.create_all_tablets_with_4_1_rpc(
|
||||
ctx_->tablet_simple_info_map_))) {
|
||||
LOG_WARN("failed to create all tablets", K(ret), KPC(ctx_));
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObStartMigrationTask::record_server_event_()
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
@ -1703,7 +1770,8 @@ int ObSysTabletsMigrationTask::init()
|
||||
} else {
|
||||
is_inited_ = true;
|
||||
LOG_INFO("succeed init sys tablets migration task", "ls id", ctx_->arg_.ls_id_,
|
||||
"dag_id", *ObCurTraceId::get_trace_id(), "dag_net_id", ctx_->task_id_);
|
||||
"dag_id", *ObCurTraceId::get_trace_id(), "dag_net_id", ctx_->task_id_,
|
||||
"sys_tablet_list", ctx_->sys_tablet_id_array_);
|
||||
}
|
||||
}
|
||||
return ret;
|
||||
@ -4206,7 +4274,6 @@ int ObMigrationFinishTask::process()
|
||||
if (OB_FAIL(ctx_->check_allow_retry(allow_retry))) {
|
||||
LOG_ERROR("failed to check allow retry", K(ret), K(*ctx_));
|
||||
} else if (allow_retry) {
|
||||
ctx_->reuse();
|
||||
if (OB_FAIL(generate_migration_init_dag_())) {
|
||||
LOG_WARN("failed to generate migration init dag", K(ret), KPC(ctx_));
|
||||
}
|
||||
|
@ -246,7 +246,11 @@ private:
|
||||
int check_ls_need_copy_data_(bool &need_copy);
|
||||
int check_before_ls_migrate_(const ObLSMeta &ls_meta);
|
||||
int build_ls_();
|
||||
int inner_build_ls_();
|
||||
int create_all_tablets_(ObCopyLSViewInfoObReader *ob_reader);
|
||||
int inner_build_ls_with_old_rpc_();
|
||||
int create_all_tablets_with_4_1_rpc_();
|
||||
|
||||
int record_server_event_();
|
||||
|
||||
private:
|
||||
|
@ -266,6 +266,54 @@ int ObStorageHATabletsBuilder::create_all_tablets(
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObStorageHATabletsBuilder::create_all_tablets_with_4_1_rpc(
|
||||
CopyTabletSimpleInfoMap &simple_info_map)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
ObLS *ls = nullptr;
|
||||
ObICopyTabletInfoReader *reader = nullptr;
|
||||
obrpc::ObCopyTabletInfo tablet_info;
|
||||
const int overwrite = 1;
|
||||
ObCopyTabletSimpleInfo tablet_simple_info;
|
||||
|
||||
if (!is_inited_) {
|
||||
ret = OB_NOT_INIT;
|
||||
LOG_WARN("storage ha tablets builder do not init", K(ret));
|
||||
} else if (OB_ISNULL(ls = param_.ls_)) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_WARN("log stream should not be NULL", K(ret), KP(ls), K(param_));
|
||||
} else if (OB_FAIL(get_tablet_info_reader_(reader))) {
|
||||
LOG_WARN("failed to get tablet info reader", K(ret), K(param_));
|
||||
} else {
|
||||
while (OB_SUCC(ret)) {
|
||||
tablet_info.reset();
|
||||
if (OB_FAIL(reader->fetch_tablet_info(tablet_info))) {
|
||||
if (OB_ITER_END == ret) {
|
||||
ret = OB_SUCCESS;
|
||||
break;
|
||||
} else {
|
||||
LOG_WARN("failed to fetch tablet info", K(ret));
|
||||
}
|
||||
} else if (OB_FAIL(modified_tablet_info_(tablet_info))) {
|
||||
LOG_WARN("failed to modified tablet info", K(ret), K(tablet_info));
|
||||
} else if (OB_FAIL(create_or_update_tablet_(tablet_info, ls))) {
|
||||
LOG_WARN("failed to create or update tablet", K(ret), K(tablet_info));
|
||||
} else {
|
||||
tablet_simple_info.tablet_id_ = tablet_info.tablet_id_;
|
||||
tablet_simple_info.status_ = tablet_info.status_;
|
||||
tablet_simple_info.data_size_ = tablet_info.data_size_;
|
||||
if (OB_FAIL(simple_info_map.set_refactored(tablet_info.tablet_id_, tablet_simple_info, overwrite))) {
|
||||
LOG_WARN("failed to set tablet status info into map", K(ret), K(tablet_simple_info), K(tablet_info));
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (OB_NOT_NULL(reader)) {
|
||||
free_tablet_info_reader_(reader);
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObStorageHATabletsBuilder::update_pending_tablets_with_remote()
|
||||
{
|
||||
|
@ -83,7 +83,8 @@ public:
|
||||
int update_pending_tablets_with_remote();
|
||||
int build_tablets_sstable_info();
|
||||
int update_local_tablets();
|
||||
|
||||
int create_all_tablets_with_4_1_rpc(
|
||||
CopyTabletSimpleInfoMap &simple_info_map);
|
||||
private:
|
||||
int get_tablet_info_reader_(ObICopyTabletInfoReader *&reader);
|
||||
int get_tablet_info_restore_reader_(ObICopyTabletInfoReader *&reader);
|
||||
|
Loading…
x
Reference in New Issue
Block a user