[CP] add RPC to check server empty in executor
This commit is contained in:
parent
dc49458623
commit
74b7683448
@ -666,7 +666,13 @@ int ObAdminServerExecutor::execute(ObExecContext &ctx, ObAdminServerStmt &stmt)
|
||||
if (OB_FAIL(ret)) {
|
||||
// nothing
|
||||
} else if (ObAdminServerArg::ADD == stmt.get_op()) {
|
||||
if (OB_FAIL(common_proxy->add_server(arg))) {
|
||||
ObSrvRpcProxy *rpc_proxy = NULL;
|
||||
if (OB_ISNULL(rpc_proxy = task_exec_ctx->get_srv_rpc())) {
|
||||
ret = OB_NOT_INIT;
|
||||
LOG_WARN("get server rpc proxy failed", K(ret));
|
||||
} else if (OB_FAIL(check_server_empty_(*rpc_proxy, arg.servers_))) {
|
||||
LOG_WARN("failed to check server empty", KR(ret), K(arg));
|
||||
} else if (OB_FAIL(common_proxy->add_server(arg))) {
|
||||
LOG_WARN("common rpc proxy add server failed", K(arg), K(ret));
|
||||
}
|
||||
} else if (ObAdminServerArg::CANCEL_DELETE == stmt.get_op()) {
|
||||
@ -718,6 +724,40 @@ int ObAdminServerExecutor::execute(ObExecContext &ctx, ObAdminServerStmt &stmt)
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObAdminServerExecutor::check_server_empty_(obrpc::ObSrvRpcProxy &rpc_proxy, const obrpc::ObServerList &servers)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
ObTimeoutCtx ctx;
|
||||
int64_t timeout = 0;
|
||||
uint64_t sys_tenant_data_version = 0;
|
||||
if (OB_FAIL(rootserver::ObRootUtils::get_rs_default_timeout_ctx(ctx))) {
|
||||
LOG_WARN("fail to get timeout ctx", KR(ret), K(ctx));
|
||||
} else if (FALSE_IT(timeout = ctx.get_timeout())) {
|
||||
} else if (OB_UNLIKELY(timeout <= 0)) {
|
||||
ret = OB_TIMEOUT;
|
||||
LOG_WARN("ctx time out", KR(ret), K(timeout));
|
||||
} else if (OB_FAIL(GET_MIN_DATA_VERSION(OB_SYS_TENANT_ID, sys_tenant_data_version))) {
|
||||
LOG_WARN("fail to get sys tenant's min data version", KR(ret));
|
||||
} else {
|
||||
Bool is_empty = false;
|
||||
const ObCheckServerEmptyArg rpc_arg(ObCheckServerEmptyArg::ADD_SERVER, sys_tenant_data_version);
|
||||
FOREACH_X(it, servers, OB_SUCC(ret)) {
|
||||
const ObAddr &addr = *it;
|
||||
is_empty = false;
|
||||
if (OB_FAIL(rpc_proxy.to(addr)
|
||||
.timeout(timeout)
|
||||
.is_empty_server(rpc_arg, is_empty))) {
|
||||
LOG_WARN("failed to check server empty", KR(ret));
|
||||
} else if (!is_empty) {
|
||||
ret = OB_OP_NOT_ALLOW;
|
||||
LOG_WARN("adding non-empty server is not allowed", KR(ret));
|
||||
LOG_USER_ERROR(OB_OP_NOT_ALLOW, "add non-empty server");
|
||||
}
|
||||
}
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObAdminServerExecutor::wait_leader_switch_out_(
|
||||
ObISQLClient &sql_proxy,
|
||||
const obrpc::ObServerList &svr_list)
|
||||
|
@ -197,6 +197,7 @@ private:
|
||||
int construct_wait_leader_switch_sql_(
|
||||
const obrpc::ObServerList &svr_list,
|
||||
ObSqlString &sql);
|
||||
int check_server_empty_(obrpc::ObSrvRpcProxy &rpc_proxy, const obrpc::ObServerList &servers);
|
||||
DISALLOW_COPY_AND_ASSIGN(ObAdminServerExecutor);
|
||||
};
|
||||
|
||||
|
Loading…
x
Reference in New Issue
Block a user