[CP] [42x] support use ob_admin to do replica dr-tasks
This commit is contained in:
@ -73,6 +73,7 @@
|
||||
#endif
|
||||
#include "sql/plan_cache/ob_ps_cache.h"
|
||||
#include "pl/pl_cache/ob_pl_cache_mgr.h"
|
||||
#include "rootserver/ob_admin_drtask_util.h" // ObAdminDRTaskUtil
|
||||
#include "rootserver/ob_primary_ls_service.h" // for ObPrimaryLSService
|
||||
#include "rootserver/ob_root_utils.h"
|
||||
#include "sql/session/ob_sql_session_info.h"
|
||||
@ -188,52 +189,7 @@ int ObRpcLSMigrateReplicaP::process()
|
||||
|
||||
int ObRpcLSAddReplicaP::process()
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
uint64_t tenant_id = arg_.tenant_id_;
|
||||
ObLSService *ls_service = nullptr;
|
||||
bool is_exist = false;
|
||||
ObMigrationOpArg migration_op_arg;
|
||||
|
||||
if (tenant_id != MTL_ID()) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_ERROR("ObRpcLSAddReplicaP::process tenant not match", K(tenant_id), K(ret));
|
||||
}
|
||||
ObCurTraceId::set(arg_.task_id_);
|
||||
if (OB_SUCC(ret)) {
|
||||
SERVER_EVENT_ADD("storage_ha", "schedule_ls_add start", "tenant_id", arg_.tenant_id_, "ls_id", arg_.ls_id_.id(),
|
||||
"data_src", arg_.data_source_.get_server(), "dest", arg_.dst_.get_server());
|
||||
|
||||
ls_service = MTL(ObLSService*);
|
||||
if (OB_ISNULL(ls_service)) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
COMMON_LOG(ERROR, "mtl ObLSService should not be null", K(ret));
|
||||
} else if (OB_FAIL(ls_service->check_ls_exist(arg_.ls_id_, is_exist))) {
|
||||
COMMON_LOG(WARN, "failed to check ls exist", K(ret), K(arg_));
|
||||
} else if (is_exist) {
|
||||
ret = OB_LS_EXIST;
|
||||
COMMON_LOG(WARN, "can not add ls which local ls is exist", K(ret), K(arg_), K(is_exist));
|
||||
} else {
|
||||
migration_op_arg.cluster_id_ = GCONF.cluster_id;
|
||||
migration_op_arg.data_src_ = arg_.data_source_;
|
||||
migration_op_arg.dst_ = arg_.dst_;
|
||||
migration_op_arg.ls_id_ = arg_.ls_id_;
|
||||
//TODO(muwei.ym) need check priority in 4.2 RC3
|
||||
migration_op_arg.priority_ = ObMigrationOpPriority::PRIO_HIGH;
|
||||
migration_op_arg.paxos_replica_number_ = arg_.new_paxos_replica_number_;
|
||||
migration_op_arg.src_ = arg_.data_source_;
|
||||
migration_op_arg.type_ = ObMigrationOpType::ADD_LS_OP;
|
||||
|
||||
if (OB_FAIL(ls_service->create_ls_for_ha(arg_.task_id_, migration_op_arg))) {
|
||||
COMMON_LOG(WARN, "failed to create ls for ha", K(ret), K(arg_), K(migration_op_arg));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (OB_FAIL(ret)) {
|
||||
SERVER_EVENT_ADD("storage_ha", "schedule_ls_add failed", "tenant_id", arg_.tenant_id_,
|
||||
"ls_id", arg_.ls_id_, "result", ret);
|
||||
}
|
||||
return ret;
|
||||
return observer::ObService::do_add_ls_replica(arg_);
|
||||
}
|
||||
|
||||
int ObRpcLSTypeTransformP::process()
|
||||
@ -279,80 +235,12 @@ int ObRpcLSTypeTransformP::process()
|
||||
|
||||
int ObRpcLSRemovePaxosReplicaP::process()
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
uint64_t tenant_id = arg_.tenant_id_;
|
||||
MAKE_TENANT_SWITCH_SCOPE_GUARD(guard);
|
||||
ObLSService *ls_service = nullptr;
|
||||
ObLSHandle ls_handle;
|
||||
ObLS *ls = nullptr;
|
||||
|
||||
if (tenant_id != MTL_ID()) {
|
||||
ret = guard.switch_to(tenant_id);
|
||||
}
|
||||
ObCurTraceId::set(arg_.task_id_);
|
||||
if (OB_SUCC(ret)) {
|
||||
SERVER_EVENT_ADD("storage_ha", "remove_ls_paxos_member start", "tenant_id", arg_.tenant_id_, "ls_id", arg_.ls_id_.id(),
|
||||
"dest", arg_.remove_member_.get_server());
|
||||
LOG_INFO("start do remove ls paxos member", K(arg_));
|
||||
|
||||
ls_service = MTL(ObLSService*);
|
||||
if (OB_ISNULL(ls_service)) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
COMMON_LOG(ERROR, "mtl ObLSService should not be null", K(ret));
|
||||
} else if (OB_FAIL(ls_service->get_ls(arg_.ls_id_, ls_handle, ObLSGetMod::OBSERVER_MOD))) {
|
||||
LOG_WARN("failed to get ls", K(ret), K(arg_));
|
||||
} else if (OB_ISNULL(ls = ls_handle.get_ls())) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_WARN("ls should not be NULL", K(ret), K(arg_));
|
||||
} else if (OB_FAIL(ls->get_ls_remove_member_handler()->remove_paxos_member(arg_))) {
|
||||
LOG_WARN("failed to remove paxos member", K(ret), K(arg_));
|
||||
}
|
||||
}
|
||||
|
||||
if (OB_FAIL(ret)) {
|
||||
SERVER_EVENT_ADD("storage_ha", "remove_ls_paxos_member failed", "tenant_id",
|
||||
arg_.tenant_id_, "ls_id", arg_.ls_id_.id(), "result", ret);
|
||||
}
|
||||
return ret;
|
||||
return observer::ObService::do_remove_ls_paxos_replica(arg_);
|
||||
}
|
||||
|
||||
int ObRpcLSRemoveNonPaxosReplicaP::process()
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
uint64_t tenant_id = arg_.tenant_id_;
|
||||
MAKE_TENANT_SWITCH_SCOPE_GUARD(guard);
|
||||
ObLSService *ls_service = nullptr;
|
||||
ObLSHandle ls_handle;
|
||||
ObLS *ls = nullptr;
|
||||
|
||||
if (tenant_id != MTL_ID()) {
|
||||
ret = guard.switch_to(tenant_id);
|
||||
}
|
||||
ObCurTraceId::set(arg_.task_id_);
|
||||
if (OB_SUCC(ret)) {
|
||||
SERVER_EVENT_ADD("storage_ha", "remove_ls_learner_member start", "tenant_id", arg_.tenant_id_, "ls_id", arg_.ls_id_.id(),
|
||||
"dest", arg_.remove_member_.get_server());
|
||||
LOG_INFO("start do remove ls learner member", K(arg_));
|
||||
|
||||
ls_service = MTL(ObLSService*);
|
||||
if (OB_ISNULL(ls_service)) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
COMMON_LOG(ERROR, "mtl ObLSService should not be null", K(ret));
|
||||
} else if (OB_FAIL(ls_service->get_ls(arg_.ls_id_, ls_handle, ObLSGetMod::OBSERVER_MOD))) {
|
||||
LOG_WARN("failed to get ls", K(ret), K(arg_));
|
||||
} else if (OB_ISNULL(ls = ls_handle.get_ls())) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_WARN("ls should not be NULL", K(ret), K(arg_));
|
||||
} else if (OB_FAIL(ls->get_ls_remove_member_handler()->remove_learner_member(arg_))) {
|
||||
LOG_WARN("failed to remove paxos member", K(ret), K(arg_));
|
||||
}
|
||||
}
|
||||
|
||||
if (OB_FAIL(ret)) {
|
||||
SERVER_EVENT_ADD("storage_ha", "remove_ls_learner_member failed", "tenant_id",
|
||||
arg_.tenant_id_, "ls_id", arg_.ls_id_.id(), "result", ret);
|
||||
}
|
||||
return ret;
|
||||
return observer::ObService::do_remove_ls_nonpaxos_replica(arg_);
|
||||
}
|
||||
|
||||
int ObRpcLSModifyPaxosReplicaNumberP::process()
|
||||
@ -443,6 +331,21 @@ int ObRpcLSCheckDRTaskExistP::process()
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObAdminDRTaskP::process()
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
ObCurTraceId::init(GCONF.self_addr_);
|
||||
LOG_INFO("start to handle ls replica task triggered by ob_admin", K_(arg));
|
||||
if (OB_UNLIKELY(!arg_.is_valid())) {
|
||||
ret = OB_INVALID_ARGUMENT;
|
||||
LOG_WARN("invalid argument", KR(ret), K_(arg));
|
||||
} else if (OB_FAIL(ObAdminDRTaskUtil::handle_obadmin_command(arg_))) {
|
||||
LOG_WARN("fail to handle ob admin command", KR(ret), K_(arg));
|
||||
}
|
||||
LOG_INFO("finish handle ls replica task triggered by ob_admin", K_(arg));
|
||||
return ret;
|
||||
}
|
||||
|
||||
#ifdef OB_BUILD_ARBITRATION
|
||||
int ObRpcAddArbP::process()
|
||||
{
|
||||
|
||||
Reference in New Issue
Block a user