diff --git a/src/sql/engine/cmd/ob_alter_system_executor.cpp b/src/sql/engine/cmd/ob_alter_system_executor.cpp index 5380fd9dca..36da8868cd 100644 --- a/src/sql/engine/cmd/ob_alter_system_executor.cpp +++ b/src/sql/engine/cmd/ob_alter_system_executor.cpp @@ -760,84 +760,14 @@ int ObAdminZoneExecutor::execute(ObExecContext &ctx, ObAdminZoneStmt &stmt) common_proxy->get_server()); } else if (ObAdminZoneArg::STOP == stmt.get_op() || ObAdminZoneArg::FORCE_STOP == stmt.get_op()) { - // check whether all leaders are switched out - ObMySQLProxy *sql_proxy = ctx.get_sql_proxy(); - const int64_t idx = 0; - const int64_t retry_interval_us = 1000l * 1000l; // 1s - bool stop = false; - while (OB_SUCC(ret) && !stop) { - ObSqlString sql; - SMART_VAR(ObMySQLProxy::MySQLResult, res) { - sqlclient::ObMySQLResult *result = NULL; - const int64_t rpc_timeout = THIS_WORKER.get_timeout_remain(); - obrpc::Bool can_stop(true /* default value */); - int64_t leader_cnt = 0; - if (0 > THIS_WORKER.get_timeout_remain()) { - ret = OB_WAIT_LEADER_SWITCH_TIMEOUT; - LOG_WARN("wait switching out all leaders timeout", K(ret)); - } else if (OB_FAIL(THIS_WORKER.check_status())) { - LOG_WARN("ctx check status failed", K(ret)); - } - - if (OB_FAIL(ret)) { - } else if (!can_stop) { - } else if (OB_FAIL(sql.assign_fmt( - "SELECT CAST(COUNT(*) AS SIGNED) FROM %s WHERE role = 'LEADER' AND zone = '%s'", - share::OB_CDB_OB_LS_LOCATIONS_TNAME, arg.zone_.ptr()))) { - LOG_WARN("assign_fmt failed", K(ret)); - } else if (OB_FAIL(sql_proxy->read(res, sql.ptr()))) { - if (OB_RS_SHUTDOWN == ret || OB_RS_NOT_MASTER == ret) { - // switching rs, sleep and retry - ret = OB_SUCCESS; - } else { - LOG_WARN("execute sql failed", K(ret), K(sql)); - } - } else if (OB_ISNULL(result = res.get_result())) { - ret = OB_ERR_UNEXPECTED; - LOG_WARN("get result failed", K(ret)); - } else if (OB_FAIL(result->next())) { - if (OB_ITER_END == ret) { - ret = OB_ERR_UNEXPECTED; - LOG_WARN("result is empty", K(ret)); - } else { - LOG_WARN("get next result failed", K(ret)); - } - } else if (OB_FAIL(result->get_int(idx, leader_cnt))) { - if (OB_ERR_NULL_VALUE == ret) { - ret = OB_SUCCESS; - ObSqlString this_sql; - SMART_VAR(ObMySQLProxy::MySQLResult, this_res) { - sqlclient::ObMySQLResult *this_result = NULL; - int64_t server_cnt = -1; - if (OB_FAIL(this_sql.assign_fmt("select count(*) from %s where zone = '%s'", - share::OB_ALL_SERVER_TNAME, arg.zone_.ptr()))) { - LOG_WARN("fail to assign fmt", K(ret)); - } else if (OB_FAIL(sql_proxy->read(this_res, this_sql.ptr()))) { - LOG_WARN("fail to execute sql", K(ret), K(this_sql)); - } else if (OB_ISNULL(this_result = this_res.get_result())) { - ret = OB_ERR_UNEXPECTED; - LOG_WARN("fail to get result", K(ret)); - } else if (OB_FAIL(this_result->next())) { - LOG_WARN("get result error", K(ret)); - } else if (OB_FAIL(this_result->get_int(0L, server_cnt))) { - LOG_WARN("fail to get result", K(ret)); - } else if (0 == server_cnt) { - // no server in this zone; - stop = true; - } else { - // __all_virtual_server_stat is not ready, sleep and retry - } - } - } else { - LOG_WARN("get sum failed", K(ret)); - } - } else if (0 == leader_cnt) { - stop = true; - } else { - LOG_INFO("waiting switching leaders out", K(ret), "left count", leader_cnt); - ob_usleep(retry_interval_us); - } - } + obrpc::ObServerList server_list; + if (OB_FAIL(construct_servers_in_zone_(*(ctx.get_sql_proxy()), arg, server_list))) { + LOG_WARN("fail to construct servers in zone", KR(ret), K(arg)); + } else if (0 == server_list.count()) { + // no need to wait leader election and arb-degration + } else if (OB_FAIL(wait_leader_switch_out_(*(ctx.get_sql_proxy()), arg))) { + // check whether all leaders are switched out + LOG_WARN("fail to wait leader switch out", KR(ret), K(arg)); } } else {} // force stop, no need to wait leader switch } else if (ObAdminZoneArg::MODIFY == stmt.get_op()) { @@ -852,6 +782,112 @@ int ObAdminZoneExecutor::execute(ObExecContext &ctx, ObAdminZoneStmt &stmt) return ret; } +int ObAdminZoneExecutor::wait_leader_switch_out_( + ObISQLClient &sql_proxy, + const obrpc::ObAdminZoneArg &arg) +{ + int ret = OB_SUCCESS; + const int64_t idx = 0; + const int64_t retry_interval_us = 1000l * 1000l; // 1s + bool stop = false; + ObSqlString sql("AdminZoneExe"); + if (OB_UNLIKELY(!arg.is_valid())) { + ret = OB_INVALID_ARGUMENT; + LOG_WARN("invalid argument", KR(ret), K(arg)); + } else if (OB_FAIL(construct_wait_leader_switch_sql_(arg, sql))) { + LOG_WARN("fail to construct wait leader switch sql", KR(ret), K(arg)); + } + + while (OB_SUCC(ret) && !stop) { + SMART_VAR(ObMySQLProxy::MySQLResult, res) { + sqlclient::ObMySQLResult *result = NULL; + int64_t leader_cnt = 0; + if (0 > THIS_WORKER.get_timeout_remain()) { + ret = OB_WAIT_LEADER_SWITCH_TIMEOUT; + LOG_WARN("wait switching out leaders from all servers timeout", KR(ret)); + } else if (OB_FAIL(THIS_WORKER.check_status())) { + LOG_WARN("ctx check status failed", KR(ret)); + } else if (OB_FAIL(sql_proxy.read(res, sql.ptr()))) { + if (OB_RS_SHUTDOWN == ret || OB_RS_NOT_MASTER == ret) { + // switching rs, sleep and retry + ret = OB_SUCCESS; + } else { + LOG_WARN("execute sql failed", KR(ret), K(sql)); + } + } else if (OB_ISNULL(result = res.get_result())) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("get result failed", KR(ret)); + } else if (OB_FAIL(result->next())) { + if (OB_ITER_END == ret) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("result is empty", KR(ret)); + } else { + LOG_WARN("get next result failed", KR(ret)); + } + } else if (OB_FAIL(result->get_int(idx, leader_cnt))) { + if (OB_ERR_NULL_VALUE == ret) { + ret = OB_SUCCESS; + // __all_virtual_server_stat is not ready, sleep and retry + } else { + LOG_WARN("get sum failed", KR(ret)); + } + } else if (0 == leader_cnt) { + stop = true; + } else { + LOG_INFO("waiting switching leaders out", KR(ret), "left count", leader_cnt); + ob_usleep(retry_interval_us); + } + } + } + return ret; +} + +int ObAdminZoneExecutor::construct_wait_leader_switch_sql_( + const obrpc::ObAdminZoneArg &arg, + ObSqlString &sql) +{ + int ret = OB_SUCCESS; + if (OB_UNLIKELY(!arg.is_valid())) { + ret = OB_INVALID_ARGUMENT; + LOG_WARN("invalid argument", KR(ret), K(arg)); + } else if (OB_FAIL(sql.assign_fmt( + "SELECT CAST(COUNT(*) AS SIGNED) FROM %s " + "WHERE role = 'LEADER' AND zone = '%s'", + share::OB_CDB_OB_LS_LOCATIONS_TNAME, arg.zone_.ptr()))) { + LOG_WARN("assign_fmt failed", KR(ret), K(arg)); + } + return ret; +} + +int ObAdminZoneExecutor::construct_servers_in_zone_( + ObISQLClient &sql_proxy, + const obrpc::ObAdminZoneArg &arg, + obrpc::ObServerList &svr_list) +{ + int ret = OB_SUCCESS; + svr_list.reset(); + share::ObServerTableOperator st_operator; + ObArray server_statuses; + + if (OB_UNLIKELY(!arg.is_valid())) { + ret = OB_INVALID_ARGUMENT; + LOG_WARN("invalid argument", KR(ret), K(arg)); + } else if (OB_FAIL(st_operator.init(&sql_proxy))) { + LOG_WARN("fail to init ObServerTableOperator", KR(ret)); + } else if (OB_FAIL(st_operator.get(server_statuses))) { + LOG_WARN("build server statused from __all_server failed", KR(ret)); + } else { + for (int64_t idx = 0; OB_SUCC(ret) && idx < server_statuses.count(); ++idx) { + if (arg.zone_ == server_statuses.at(idx).zone_) { + if (OB_FAIL(svr_list.push_back(server_statuses.at(idx).server_))) { + LOG_WARN("fail to add server to server_list", KR(ret), K(arg), K(server_statuses)); + } + } + } + } + return ret; +} + int ObSwitchReplicaRoleExecutor::execute(ObExecContext &ctx, ObSwitchReplicaRoleStmt &stmt) { int ret = OB_SUCCESS; diff --git a/src/sql/engine/cmd/ob_alter_system_executor.h b/src/sql/engine/cmd/ob_alter_system_executor.h index 2ba4f7f6d9..b000bf9bbe 100644 --- a/src/sql/engine/cmd/ob_alter_system_executor.h +++ b/src/sql/engine/cmd/ob_alter_system_executor.h @@ -38,8 +38,6 @@ class ObBootstrapStmt; DISALLOW_COPY_AND_ASSIGN(name##Executor); \ } -DEF_SIMPLE_EXECUTOR(ObAdminZone); - DEF_SIMPLE_EXECUTOR(ObFreeze); DEF_SIMPLE_EXECUTOR(ObFlushCache); @@ -194,6 +192,36 @@ private: DISALLOW_COPY_AND_ASSIGN(ObAdminServerExecutor); }; +class ObAdminZoneExecutor +{ +public: + ObAdminZoneExecutor() {} + virtual ~ObAdminZoneExecutor() {} + int execute(ObExecContext &ctx, ObAdminZoneStmt &stmt); +private: + // wait leader switch out + // @params[in] sql_proxy, the proxy to use + // @params[in] arg, which zone to stop + int wait_leader_switch_out_( + ObISQLClient &sql_proxy, + const obrpc::ObAdminZoneArg &arg); + // construct sql to check waitint-result + // @params[in] arg, which zone to stop + // @params[out] sql, the sql builded + int construct_wait_leader_switch_sql_( + const obrpc::ObAdminZoneArg &arg, + ObSqlString &sql); + // construct server infos in this zone + // @params[in] sql_proxy, the proxy to use + // @params[in] arg, which zone to stop + // @params[out] svr_list, which servers to stop + int construct_servers_in_zone_( + ObISQLClient &sql_proxy, + const obrpc::ObAdminZoneArg &arg, + obrpc::ObServerList &svr_list); + DISALLOW_COPY_AND_ASSIGN(ObAdminZoneExecutor); +}; + #undef DEF_SIMPLE_EXECUTOR } // end namespace sql