From 74b76834489bc1e3feb32388c04decd7e5965f9a Mon Sep 17 00:00:00 2001 From: zhaoyiping0622 Date: Tue, 18 Jun 2024 12:06:13 +0000 Subject: [PATCH] [CP] add RPC to check server empty in executor --- .../engine/cmd/ob_alter_system_executor.cpp | 42 ++++++++++++++++++- src/sql/engine/cmd/ob_alter_system_executor.h | 1 + 2 files changed, 42 insertions(+), 1 deletion(-) diff --git a/src/sql/engine/cmd/ob_alter_system_executor.cpp b/src/sql/engine/cmd/ob_alter_system_executor.cpp index fec02d5f7..f18a985b6 100644 --- a/src/sql/engine/cmd/ob_alter_system_executor.cpp +++ b/src/sql/engine/cmd/ob_alter_system_executor.cpp @@ -666,7 +666,13 @@ int ObAdminServerExecutor::execute(ObExecContext &ctx, ObAdminServerStmt &stmt) if (OB_FAIL(ret)) { // nothing } else if (ObAdminServerArg::ADD == stmt.get_op()) { - if (OB_FAIL(common_proxy->add_server(arg))) { + ObSrvRpcProxy *rpc_proxy = NULL; + if (OB_ISNULL(rpc_proxy = task_exec_ctx->get_srv_rpc())) { + ret = OB_NOT_INIT; + LOG_WARN("get server rpc proxy failed", K(ret)); + } else if (OB_FAIL(check_server_empty_(*rpc_proxy, arg.servers_))) { + LOG_WARN("failed to check server empty", KR(ret), K(arg)); + } else if (OB_FAIL(common_proxy->add_server(arg))) { LOG_WARN("common rpc proxy add server failed", K(arg), K(ret)); } } else if (ObAdminServerArg::CANCEL_DELETE == stmt.get_op()) { @@ -718,6 +724,40 @@ int ObAdminServerExecutor::execute(ObExecContext &ctx, ObAdminServerStmt &stmt) return ret; } +int ObAdminServerExecutor::check_server_empty_(obrpc::ObSrvRpcProxy &rpc_proxy, const obrpc::ObServerList &servers) +{ + int ret = OB_SUCCESS; + ObTimeoutCtx ctx; + int64_t timeout = 0; + uint64_t sys_tenant_data_version = 0; + if (OB_FAIL(rootserver::ObRootUtils::get_rs_default_timeout_ctx(ctx))) { + LOG_WARN("fail to get timeout ctx", KR(ret), K(ctx)); + } else if (FALSE_IT(timeout = ctx.get_timeout())) { + } else if (OB_UNLIKELY(timeout <= 0)) { + ret = OB_TIMEOUT; + LOG_WARN("ctx time out", KR(ret), K(timeout)); + } else if (OB_FAIL(GET_MIN_DATA_VERSION(OB_SYS_TENANT_ID, sys_tenant_data_version))) { + LOG_WARN("fail to get sys tenant's min data version", KR(ret)); + } else { + Bool is_empty = false; + const ObCheckServerEmptyArg rpc_arg(ObCheckServerEmptyArg::ADD_SERVER, sys_tenant_data_version); + FOREACH_X(it, servers, OB_SUCC(ret)) { + const ObAddr &addr = *it; + is_empty = false; + if (OB_FAIL(rpc_proxy.to(addr) + .timeout(timeout) + .is_empty_server(rpc_arg, is_empty))) { + LOG_WARN("failed to check server empty", KR(ret)); + } else if (!is_empty) { + ret = OB_OP_NOT_ALLOW; + LOG_WARN("adding non-empty server is not allowed", KR(ret)); + LOG_USER_ERROR(OB_OP_NOT_ALLOW, "add non-empty server"); + } + } + } + return ret; +} + int ObAdminServerExecutor::wait_leader_switch_out_( ObISQLClient &sql_proxy, const obrpc::ObServerList &svr_list) diff --git a/src/sql/engine/cmd/ob_alter_system_executor.h b/src/sql/engine/cmd/ob_alter_system_executor.h index 4c8898c32..5005ca479 100644 --- a/src/sql/engine/cmd/ob_alter_system_executor.h +++ b/src/sql/engine/cmd/ob_alter_system_executor.h @@ -197,6 +197,7 @@ private: int construct_wait_leader_switch_sql_( const obrpc::ObServerList &svr_list, ObSqlString &sql); + int check_server_empty_(obrpc::ObSrvRpcProxy &rpc_proxy, const obrpc::ObServerList &servers); DISALLOW_COPY_AND_ASSIGN(ObAdminServerExecutor); };