check server online when switchover
This commit is contained in:
@ -297,7 +297,8 @@ int ObHeartbeatService::set_hb_responses_(const int64_t whitelist_epoch_id, ObSe
|
||||
} else if (OB_UNLIKELY(!hb_response->is_valid())) {
|
||||
// if an observer does not reply the rpc, we will get an invalid hb_response.
|
||||
tmp_ret = OB_INVALID_ARGUMENT;
|
||||
LOG_WARN("invalid hb_response", KR(ret), KR(tmp_ret), KPC(hb_response));
|
||||
LOG_WARN("there might be servers which haven't replied to heartbeat requests",
|
||||
KR(ret), KR(tmp_ret), KPC(hb_response));
|
||||
} else if (OB_FAIL(hb_responses_.push_back(*hb_response))) {
|
||||
LOG_WARN("fail to push an element into hb_responses_", KR(ret), KPC(hb_response));
|
||||
} else {
|
||||
|
||||
@ -107,6 +107,8 @@ int ObTenantRoleTransitionService::failover_to_primary()
|
||||
ObAllTenantInfo tenant_info;
|
||||
if (OB_FAIL(check_inner_stat())) {
|
||||
LOG_WARN("error unexpected", KR(ret), K(tenant_id_), KP(sql_proxy_), KP(rpc_proxy_));
|
||||
} else if (OB_FAIL(check_tenant_server_online_())) {
|
||||
LOG_WARN("fail to check whether the tenant's servers are online", KR(ret), K(tenant_id_));
|
||||
} else if (OB_FAIL(ObAllTenantInfoProxy::load_tenant_info(tenant_id_, sql_proxy_,
|
||||
false, tenant_info))) {
|
||||
LOG_WARN("failed to load tenant info", KR(ret), K(tenant_id_));
|
||||
@ -1397,7 +1399,6 @@ void ObTenantRoleTransitionService::broadcast_tenant_info(const char* const log_
|
||||
ObArray<int> return_code_array;
|
||||
|
||||
if (OB_FAIL(check_inner_stat())) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_WARN("inner stat error", KR(ret));
|
||||
} else if (OB_ISNULL(GCTX.server_tracer_) || OB_ISNULL(GCTX.srv_rpc_proxy_)) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
@ -1468,5 +1469,73 @@ void ObTenantRoleTransitionService::broadcast_tenant_info(const char* const log_
|
||||
return ;
|
||||
}
|
||||
|
||||
int ObTenantRoleTransitionService::check_tenant_server_online_()
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
ObSqlString sql;
|
||||
ObArray<ObAddr> inactive_servers;
|
||||
if (OB_FAIL(check_inner_stat())) {
|
||||
LOG_WARN("inner stat error", KR(ret));
|
||||
} else if (OB_FAIL(sql.append_fmt("select distinct svr_ip, svr_port from %s "
|
||||
"where (svr_ip, svr_port) in (select u.svr_ip, u.svr_port from %s as u "
|
||||
"join %s as r on r.resource_pool_id=u.resource_pool_id where tenant_id=%ld) and status != 'ACTIVE'",
|
||||
OB_ALL_SERVER_TNAME, OB_ALL_UNIT_TNAME, OB_ALL_RESOURCE_POOL_TNAME, tenant_id_))) {
|
||||
LOG_WARN("fail to append sql", KR(ret));
|
||||
} else {
|
||||
HEAP_VAR(ObMySQLProxy::ReadResult, res) {
|
||||
ObMySQLResult *result = NULL;
|
||||
if (OB_FAIL(sql_proxy_->read(res, OB_SYS_TENANT_ID, sql.ptr()))) {
|
||||
LOG_WARN("fail to read the tenant's online servers", KR(ret), K(sql), K(tenant_id_));
|
||||
} else if (NULL == (result = res.get_result())) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_WARN("sql result is null", KR(ret), K(tenant_id_));
|
||||
} else if (OB_FAIL(construct_inactive_servers_(*result, inactive_servers))) {
|
||||
LOG_WARN("fail to construct inactive_servers", KR(ret), K(tenant_id_));
|
||||
}
|
||||
}
|
||||
}
|
||||
int64_t cnt = inactive_servers.count();
|
||||
if (OB_SUCC(ret) && inactive_servers.count() != 0) {
|
||||
ret = OB_OP_NOT_ALLOW;
|
||||
LOG_INFO("the tenant has inactive servers", KR(ret), K(inactive_servers));
|
||||
LOG_USER_ERROR(OB_OP_NOT_ALLOW, "the tenant has units on inactive servers, switch to primary");
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObTenantRoleTransitionService::construct_inactive_servers_(
|
||||
common::sqlclient::ObMySQLResult &res,
|
||||
ObArray<ObAddr> &inactive_servers)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
ObAddr server;
|
||||
inactive_servers.reset();
|
||||
while (OB_SUCC(ret)) {
|
||||
server.reset();
|
||||
char svr_ip[OB_IP_STR_BUFF] = "";
|
||||
int64_t svr_port = 0;
|
||||
int64_t tmp_real_str_len = 0;
|
||||
if (OB_FAIL(res.next())) {
|
||||
if (OB_ITER_END != ret) {
|
||||
LOG_WARN("result next failed", KR(ret));
|
||||
} else {
|
||||
ret = OB_SUCCESS;
|
||||
break;
|
||||
}
|
||||
} else {
|
||||
EXTRACT_STRBUF_FIELD_MYSQL(res, "svr_ip", svr_ip, OB_IP_STR_BUFF, tmp_real_str_len);
|
||||
EXTRACT_INT_FIELD_MYSQL(res, "svr_port", svr_port, int64_t);
|
||||
}
|
||||
if (OB_FAIL(ret)) {
|
||||
} else if (OB_UNLIKELY(false == server.set_ip_addr(svr_ip, static_cast<int32_t>(svr_port)))) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_WARN("invalid server address", KR(ret), K(svr_ip), K(svr_port));
|
||||
} else if (OB_FAIL(inactive_servers.push_back(server))) {
|
||||
LOG_WARN("fail to push back server", KR(ret), K(server));
|
||||
}
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
@ -230,6 +230,8 @@ private:
|
||||
const palf::AccessMode target_access_mode,
|
||||
const share::SCN &ref_scn,
|
||||
const share::SCN &sys_ls_sync_scn);
|
||||
int check_tenant_server_online_();
|
||||
int construct_inactive_servers_(common::sqlclient::ObMySQLResult &res, ObArray<ObAddr> &inactive_servers);
|
||||
|
||||
private:
|
||||
const static int64_t SEC_UNIT = 1000L * 1000L;
|
||||
|
||||
Reference in New Issue
Block a user