diff --git a/src/rootserver/ob_heartbeat_service.cpp b/src/rootserver/ob_heartbeat_service.cpp index 1aff65089d..3fff683a1d 100644 --- a/src/rootserver/ob_heartbeat_service.cpp +++ b/src/rootserver/ob_heartbeat_service.cpp @@ -297,7 +297,8 @@ int ObHeartbeatService::set_hb_responses_(const int64_t whitelist_epoch_id, ObSe } else if (OB_UNLIKELY(!hb_response->is_valid())) { // if an observer does not reply the rpc, we will get an invalid hb_response. tmp_ret = OB_INVALID_ARGUMENT; - LOG_WARN("invalid hb_response", KR(ret), KR(tmp_ret), KPC(hb_response)); + LOG_WARN("there might be servers which haven't replied to heartbeat requests", + KR(ret), KR(tmp_ret), KPC(hb_response)); } else if (OB_FAIL(hb_responses_.push_back(*hb_response))) { LOG_WARN("fail to push an element into hb_responses_", KR(ret), KPC(hb_response)); } else { diff --git a/src/rootserver/ob_tenant_role_transition_service.cpp b/src/rootserver/ob_tenant_role_transition_service.cpp index e6c571946b..a6deecd678 100644 --- a/src/rootserver/ob_tenant_role_transition_service.cpp +++ b/src/rootserver/ob_tenant_role_transition_service.cpp @@ -107,6 +107,8 @@ int ObTenantRoleTransitionService::failover_to_primary() ObAllTenantInfo tenant_info; if (OB_FAIL(check_inner_stat())) { LOG_WARN("error unexpected", KR(ret), K(tenant_id_), KP(sql_proxy_), KP(rpc_proxy_)); + } else if (OB_FAIL(check_tenant_server_online_())) { + LOG_WARN("fail to check whether the tenant's servers are online", KR(ret), K(tenant_id_)); } else if (OB_FAIL(ObAllTenantInfoProxy::load_tenant_info(tenant_id_, sql_proxy_, false, tenant_info))) { LOG_WARN("failed to load tenant info", KR(ret), K(tenant_id_)); @@ -1397,7 +1399,6 @@ void ObTenantRoleTransitionService::broadcast_tenant_info(const char* const log_ ObArray return_code_array; if (OB_FAIL(check_inner_stat())) { - ret = OB_ERR_UNEXPECTED; LOG_WARN("inner stat error", KR(ret)); } else if (OB_ISNULL(GCTX.server_tracer_) || OB_ISNULL(GCTX.srv_rpc_proxy_)) { ret = OB_ERR_UNEXPECTED; @@ -1468,5 +1469,73 @@ void ObTenantRoleTransitionService::broadcast_tenant_info(const char* const log_ return ; } +int ObTenantRoleTransitionService::check_tenant_server_online_() +{ + int ret = OB_SUCCESS; + ObSqlString sql; + ObArray inactive_servers; + if (OB_FAIL(check_inner_stat())) { + LOG_WARN("inner stat error", KR(ret)); + } else if (OB_FAIL(sql.append_fmt("select distinct svr_ip, svr_port from %s " + "where (svr_ip, svr_port) in (select u.svr_ip, u.svr_port from %s as u " + "join %s as r on r.resource_pool_id=u.resource_pool_id where tenant_id=%ld) and status != 'ACTIVE'", + OB_ALL_SERVER_TNAME, OB_ALL_UNIT_TNAME, OB_ALL_RESOURCE_POOL_TNAME, tenant_id_))) { + LOG_WARN("fail to append sql", KR(ret)); + } else { + HEAP_VAR(ObMySQLProxy::ReadResult, res) { + ObMySQLResult *result = NULL; + if (OB_FAIL(sql_proxy_->read(res, OB_SYS_TENANT_ID, sql.ptr()))) { + LOG_WARN("fail to read the tenant's online servers", KR(ret), K(sql), K(tenant_id_)); + } else if (NULL == (result = res.get_result())) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("sql result is null", KR(ret), K(tenant_id_)); + } else if (OB_FAIL(construct_inactive_servers_(*result, inactive_servers))) { + LOG_WARN("fail to construct inactive_servers", KR(ret), K(tenant_id_)); + } + } + } + int64_t cnt = inactive_servers.count(); + if (OB_SUCC(ret) && inactive_servers.count() != 0) { + ret = OB_OP_NOT_ALLOW; + LOG_INFO("the tenant has inactive servers", KR(ret), K(inactive_servers)); + LOG_USER_ERROR(OB_OP_NOT_ALLOW, "the tenant has units on inactive servers, switch to primary"); + } + return ret; +} + +int ObTenantRoleTransitionService::construct_inactive_servers_( + common::sqlclient::ObMySQLResult &res, + ObArray &inactive_servers) +{ + int ret = OB_SUCCESS; + ObAddr server; + inactive_servers.reset(); + while (OB_SUCC(ret)) { + server.reset(); + char svr_ip[OB_IP_STR_BUFF] = ""; + int64_t svr_port = 0; + int64_t tmp_real_str_len = 0; + if (OB_FAIL(res.next())) { + if (OB_ITER_END != ret) { + LOG_WARN("result next failed", KR(ret)); + } else { + ret = OB_SUCCESS; + break; + } + } else { + EXTRACT_STRBUF_FIELD_MYSQL(res, "svr_ip", svr_ip, OB_IP_STR_BUFF, tmp_real_str_len); + EXTRACT_INT_FIELD_MYSQL(res, "svr_port", svr_port, int64_t); + } + if (OB_FAIL(ret)) { + } else if (OB_UNLIKELY(false == server.set_ip_addr(svr_ip, static_cast(svr_port)))) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("invalid server address", KR(ret), K(svr_ip), K(svr_port)); + } else if (OB_FAIL(inactive_servers.push_back(server))) { + LOG_WARN("fail to push back server", KR(ret), K(server)); + } + } + return ret; +} + } } diff --git a/src/rootserver/ob_tenant_role_transition_service.h b/src/rootserver/ob_tenant_role_transition_service.h index d1e2e11495..7d3d5d4b61 100644 --- a/src/rootserver/ob_tenant_role_transition_service.h +++ b/src/rootserver/ob_tenant_role_transition_service.h @@ -230,6 +230,8 @@ private: const palf::AccessMode target_access_mode, const share::SCN &ref_scn, const share::SCN &sys_ls_sync_scn); + int check_tenant_server_online_(); + int construct_inactive_servers_(common::sqlclient::ObMySQLResult &res, ObArray &inactive_servers); private: const static int64_t SEC_UNIT = 1000L * 1000L;