[CP] [RS] Fix stop server problem when arb server is down
This commit is contained in:
@ -760,84 +760,14 @@ int ObAdminZoneExecutor::execute(ObExecContext &ctx, ObAdminZoneStmt &stmt)
|
||||
common_proxy->get_server());
|
||||
} else if (ObAdminZoneArg::STOP == stmt.get_op()
|
||||
|| ObAdminZoneArg::FORCE_STOP == stmt.get_op()) {
|
||||
// check whether all leaders are switched out
|
||||
ObMySQLProxy *sql_proxy = ctx.get_sql_proxy();
|
||||
const int64_t idx = 0;
|
||||
const int64_t retry_interval_us = 1000l * 1000l; // 1s
|
||||
bool stop = false;
|
||||
while (OB_SUCC(ret) && !stop) {
|
||||
ObSqlString sql;
|
||||
SMART_VAR(ObMySQLProxy::MySQLResult, res) {
|
||||
sqlclient::ObMySQLResult *result = NULL;
|
||||
const int64_t rpc_timeout = THIS_WORKER.get_timeout_remain();
|
||||
obrpc::Bool can_stop(true /* default value */);
|
||||
int64_t leader_cnt = 0;
|
||||
if (0 > THIS_WORKER.get_timeout_remain()) {
|
||||
ret = OB_WAIT_LEADER_SWITCH_TIMEOUT;
|
||||
LOG_WARN("wait switching out all leaders timeout", K(ret));
|
||||
} else if (OB_FAIL(THIS_WORKER.check_status())) {
|
||||
LOG_WARN("ctx check status failed", K(ret));
|
||||
}
|
||||
|
||||
if (OB_FAIL(ret)) {
|
||||
} else if (!can_stop) {
|
||||
} else if (OB_FAIL(sql.assign_fmt(
|
||||
"SELECT CAST(COUNT(*) AS SIGNED) FROM %s WHERE role = 'LEADER' AND zone = '%s'",
|
||||
share::OB_CDB_OB_LS_LOCATIONS_TNAME, arg.zone_.ptr()))) {
|
||||
LOG_WARN("assign_fmt failed", K(ret));
|
||||
} else if (OB_FAIL(sql_proxy->read(res, sql.ptr()))) {
|
||||
if (OB_RS_SHUTDOWN == ret || OB_RS_NOT_MASTER == ret) {
|
||||
// switching rs, sleep and retry
|
||||
ret = OB_SUCCESS;
|
||||
} else {
|
||||
LOG_WARN("execute sql failed", K(ret), K(sql));
|
||||
}
|
||||
} else if (OB_ISNULL(result = res.get_result())) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_WARN("get result failed", K(ret));
|
||||
} else if (OB_FAIL(result->next())) {
|
||||
if (OB_ITER_END == ret) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_WARN("result is empty", K(ret));
|
||||
} else {
|
||||
LOG_WARN("get next result failed", K(ret));
|
||||
}
|
||||
} else if (OB_FAIL(result->get_int(idx, leader_cnt))) {
|
||||
if (OB_ERR_NULL_VALUE == ret) {
|
||||
ret = OB_SUCCESS;
|
||||
ObSqlString this_sql;
|
||||
SMART_VAR(ObMySQLProxy::MySQLResult, this_res) {
|
||||
sqlclient::ObMySQLResult *this_result = NULL;
|
||||
int64_t server_cnt = -1;
|
||||
if (OB_FAIL(this_sql.assign_fmt("select count(*) from %s where zone = '%s'",
|
||||
share::OB_ALL_SERVER_TNAME, arg.zone_.ptr()))) {
|
||||
LOG_WARN("fail to assign fmt", K(ret));
|
||||
} else if (OB_FAIL(sql_proxy->read(this_res, this_sql.ptr()))) {
|
||||
LOG_WARN("fail to execute sql", K(ret), K(this_sql));
|
||||
} else if (OB_ISNULL(this_result = this_res.get_result())) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_WARN("fail to get result", K(ret));
|
||||
} else if (OB_FAIL(this_result->next())) {
|
||||
LOG_WARN("get result error", K(ret));
|
||||
} else if (OB_FAIL(this_result->get_int(0L, server_cnt))) {
|
||||
LOG_WARN("fail to get result", K(ret));
|
||||
} else if (0 == server_cnt) {
|
||||
// no server in this zone;
|
||||
stop = true;
|
||||
} else {
|
||||
// __all_virtual_server_stat is not ready, sleep and retry
|
||||
}
|
||||
}
|
||||
} else {
|
||||
LOG_WARN("get sum failed", K(ret));
|
||||
}
|
||||
} else if (0 == leader_cnt) {
|
||||
stop = true;
|
||||
} else {
|
||||
LOG_INFO("waiting switching leaders out", K(ret), "left count", leader_cnt);
|
||||
ob_usleep(retry_interval_us);
|
||||
}
|
||||
}
|
||||
obrpc::ObServerList server_list;
|
||||
if (OB_FAIL(construct_servers_in_zone_(*(ctx.get_sql_proxy()), arg, server_list))) {
|
||||
LOG_WARN("fail to construct servers in zone", KR(ret), K(arg));
|
||||
} else if (0 == server_list.count()) {
|
||||
// no need to wait leader election and arb-degration
|
||||
} else if (OB_FAIL(wait_leader_switch_out_(*(ctx.get_sql_proxy()), arg))) {
|
||||
// check whether all leaders are switched out
|
||||
LOG_WARN("fail to wait leader switch out", KR(ret), K(arg));
|
||||
}
|
||||
} else {} // force stop, no need to wait leader switch
|
||||
} else if (ObAdminZoneArg::MODIFY == stmt.get_op()) {
|
||||
@ -852,6 +782,112 @@ int ObAdminZoneExecutor::execute(ObExecContext &ctx, ObAdminZoneStmt &stmt)
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObAdminZoneExecutor::wait_leader_switch_out_(
|
||||
ObISQLClient &sql_proxy,
|
||||
const obrpc::ObAdminZoneArg &arg)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
const int64_t idx = 0;
|
||||
const int64_t retry_interval_us = 1000l * 1000l; // 1s
|
||||
bool stop = false;
|
||||
ObSqlString sql("AdminZoneExe");
|
||||
if (OB_UNLIKELY(!arg.is_valid())) {
|
||||
ret = OB_INVALID_ARGUMENT;
|
||||
LOG_WARN("invalid argument", KR(ret), K(arg));
|
||||
} else if (OB_FAIL(construct_wait_leader_switch_sql_(arg, sql))) {
|
||||
LOG_WARN("fail to construct wait leader switch sql", KR(ret), K(arg));
|
||||
}
|
||||
|
||||
while (OB_SUCC(ret) && !stop) {
|
||||
SMART_VAR(ObMySQLProxy::MySQLResult, res) {
|
||||
sqlclient::ObMySQLResult *result = NULL;
|
||||
int64_t leader_cnt = 0;
|
||||
if (0 > THIS_WORKER.get_timeout_remain()) {
|
||||
ret = OB_WAIT_LEADER_SWITCH_TIMEOUT;
|
||||
LOG_WARN("wait switching out leaders from all servers timeout", KR(ret));
|
||||
} else if (OB_FAIL(THIS_WORKER.check_status())) {
|
||||
LOG_WARN("ctx check status failed", KR(ret));
|
||||
} else if (OB_FAIL(sql_proxy.read(res, sql.ptr()))) {
|
||||
if (OB_RS_SHUTDOWN == ret || OB_RS_NOT_MASTER == ret) {
|
||||
// switching rs, sleep and retry
|
||||
ret = OB_SUCCESS;
|
||||
} else {
|
||||
LOG_WARN("execute sql failed", KR(ret), K(sql));
|
||||
}
|
||||
} else if (OB_ISNULL(result = res.get_result())) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_WARN("get result failed", KR(ret));
|
||||
} else if (OB_FAIL(result->next())) {
|
||||
if (OB_ITER_END == ret) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_WARN("result is empty", KR(ret));
|
||||
} else {
|
||||
LOG_WARN("get next result failed", KR(ret));
|
||||
}
|
||||
} else if (OB_FAIL(result->get_int(idx, leader_cnt))) {
|
||||
if (OB_ERR_NULL_VALUE == ret) {
|
||||
ret = OB_SUCCESS;
|
||||
// __all_virtual_server_stat is not ready, sleep and retry
|
||||
} else {
|
||||
LOG_WARN("get sum failed", KR(ret));
|
||||
}
|
||||
} else if (0 == leader_cnt) {
|
||||
stop = true;
|
||||
} else {
|
||||
LOG_INFO("waiting switching leaders out", KR(ret), "left count", leader_cnt);
|
||||
ob_usleep(retry_interval_us);
|
||||
}
|
||||
}
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObAdminZoneExecutor::construct_wait_leader_switch_sql_(
|
||||
const obrpc::ObAdminZoneArg &arg,
|
||||
ObSqlString &sql)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
if (OB_UNLIKELY(!arg.is_valid())) {
|
||||
ret = OB_INVALID_ARGUMENT;
|
||||
LOG_WARN("invalid argument", KR(ret), K(arg));
|
||||
} else if (OB_FAIL(sql.assign_fmt(
|
||||
"SELECT CAST(COUNT(*) AS SIGNED) FROM %s "
|
||||
"WHERE role = 'LEADER' AND zone = '%s'",
|
||||
share::OB_CDB_OB_LS_LOCATIONS_TNAME, arg.zone_.ptr()))) {
|
||||
LOG_WARN("assign_fmt failed", KR(ret), K(arg));
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObAdminZoneExecutor::construct_servers_in_zone_(
|
||||
ObISQLClient &sql_proxy,
|
||||
const obrpc::ObAdminZoneArg &arg,
|
||||
obrpc::ObServerList &svr_list)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
svr_list.reset();
|
||||
share::ObServerTableOperator st_operator;
|
||||
ObArray<share::ObServerStatus> server_statuses;
|
||||
|
||||
if (OB_UNLIKELY(!arg.is_valid())) {
|
||||
ret = OB_INVALID_ARGUMENT;
|
||||
LOG_WARN("invalid argument", KR(ret), K(arg));
|
||||
} else if (OB_FAIL(st_operator.init(&sql_proxy))) {
|
||||
LOG_WARN("fail to init ObServerTableOperator", KR(ret));
|
||||
} else if (OB_FAIL(st_operator.get(server_statuses))) {
|
||||
LOG_WARN("build server statused from __all_server failed", KR(ret));
|
||||
} else {
|
||||
for (int64_t idx = 0; OB_SUCC(ret) && idx < server_statuses.count(); ++idx) {
|
||||
if (arg.zone_ == server_statuses.at(idx).zone_) {
|
||||
if (OB_FAIL(svr_list.push_back(server_statuses.at(idx).server_))) {
|
||||
LOG_WARN("fail to add server to server_list", KR(ret), K(arg), K(server_statuses));
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObSwitchReplicaRoleExecutor::execute(ObExecContext &ctx, ObSwitchReplicaRoleStmt &stmt)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
|
||||
@ -38,8 +38,6 @@ class ObBootstrapStmt;
|
||||
DISALLOW_COPY_AND_ASSIGN(name##Executor); \
|
||||
}
|
||||
|
||||
DEF_SIMPLE_EXECUTOR(ObAdminZone);
|
||||
|
||||
DEF_SIMPLE_EXECUTOR(ObFreeze);
|
||||
|
||||
DEF_SIMPLE_EXECUTOR(ObFlushCache);
|
||||
@ -194,6 +192,36 @@ private:
|
||||
DISALLOW_COPY_AND_ASSIGN(ObAdminServerExecutor);
|
||||
};
|
||||
|
||||
class ObAdminZoneExecutor
|
||||
{
|
||||
public:
|
||||
ObAdminZoneExecutor() {}
|
||||
virtual ~ObAdminZoneExecutor() {}
|
||||
int execute(ObExecContext &ctx, ObAdminZoneStmt &stmt);
|
||||
private:
|
||||
// wait leader switch out
|
||||
// @params[in] sql_proxy, the proxy to use
|
||||
// @params[in] arg, which zone to stop
|
||||
int wait_leader_switch_out_(
|
||||
ObISQLClient &sql_proxy,
|
||||
const obrpc::ObAdminZoneArg &arg);
|
||||
// construct sql to check waitint-result
|
||||
// @params[in] arg, which zone to stop
|
||||
// @params[out] sql, the sql builded
|
||||
int construct_wait_leader_switch_sql_(
|
||||
const obrpc::ObAdminZoneArg &arg,
|
||||
ObSqlString &sql);
|
||||
// construct server infos in this zone
|
||||
// @params[in] sql_proxy, the proxy to use
|
||||
// @params[in] arg, which zone to stop
|
||||
// @params[out] svr_list, which servers to stop
|
||||
int construct_servers_in_zone_(
|
||||
ObISQLClient &sql_proxy,
|
||||
const obrpc::ObAdminZoneArg &arg,
|
||||
obrpc::ObServerList &svr_list);
|
||||
DISALLOW_COPY_AND_ASSIGN(ObAdminZoneExecutor);
|
||||
};
|
||||
|
||||
#undef DEF_SIMPLE_EXECUTOR
|
||||
|
||||
} // end namespace sql
|
||||
|
||||
Reference in New Issue
Block a user