[CP] opt server & hb management

This commit is contained in:
linqiucen
2024-01-08 03:12:49 +00:00
committed by ob-robot
parent 2d1316649a
commit 82db679bae
8 changed files with 107 additions and 28 deletions

View File

@ -285,6 +285,10 @@ int ObHeartbeatService::set_hb_responses_(const int64_t whitelist_epoch_id, ObSe
} else if (OB_ISNULL(proxy)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("proxy is null", KR(ret), KP(proxy));
} else if (OB_UNLIKELY(proxy->get_dests().count() != proxy->get_results().count())) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("dest addr count != result count", KR(ret), "dest addr count", proxy->get_dests().count(),
"result count", proxy->get_results().count());
} else {
int tmp_ret = OB_SUCCESS;
SpinWLockGuard guard_for_hb_responses(hb_responses_rwlock_);
@ -294,14 +298,15 @@ int ObHeartbeatService::set_hb_responses_(const int64_t whitelist_epoch_id, ObSe
// don't use arg/dest here because call() may has failue.
ARRAY_FOREACH_X(proxy->get_results(), idx, cnt, OB_SUCC(ret)) {
const ObHBResponse *hb_response = proxy->get_results().at(idx);
const ObAddr &dest_addr = proxy->get_dests().at(idx);
if (OB_ISNULL(hb_response)) {
tmp_ret = OB_ERR_UNEXPECTED;
LOG_WARN("hb_response is null", KR(ret), KR(tmp_ret), KP(hb_response));
} else if (OB_UNLIKELY(!hb_response->is_valid())) {
// if an observer does not reply the rpc, we will get an invalid hb_response.
tmp_ret = OB_INVALID_ARGUMENT;
LOG_WARN("there might be servers which haven't replied to heartbeat requests",
KR(ret), KR(tmp_ret), KPC(hb_response));
LOG_WARN("There exists a server not responding to the hb service",
KR(ret), KR(tmp_ret), KPC(hb_response), K(dest_addr));
} else if (OB_FAIL(hb_responses_.push_back(*hb_response))) {
LOG_WARN("fail to push an element into hb_responses_", KR(ret), KPC(hb_response));
} else {

View File

@ -186,7 +186,7 @@ int ObServerZoneOpService::delete_servers(
} else if (OB_FAIL(check_server_have_enough_resource_for_delete_server_(servers, zone))) {
LOG_WARN("not enough resource, cannot delete servers", KR(ret), K(servers), K(zone));
} else if (OB_FAIL(GCTX.root_service_->check_all_ls_has_leader("delete server"))) {
LOG_WARN("fail to check all ls has leader", KR(ret));
LOG_WARN("fail to check whether all ls has leader", KR(ret));
} else {
for (int64_t i = 0; OB_SUCC(ret) && i < servers.count(); ++i) {
if (OB_FAIL(delete_server_(servers.at(i), zone))) {
@ -598,6 +598,9 @@ int ObServerZoneOpService::check_and_end_delete_server_(
}
} else {
LOG_WARN("failed to find job", KR(ret), K(server));
if (OB_ENTRY_NOT_EXIST == ret) {
ret = OB_SUCCESS;
}
}
}
return ret;

View File

@ -5532,9 +5532,9 @@ int ObUnitManager::allocate_pool_units_(
ret = OB_INVALID_ARGUMENT;
LOG_WARN("new unit group id array status not match",
KR(ret), K(increase_delta_unit_num), KP(unit_group_id_array));
} else if (OB_ISNULL(srv_rpc_proxy_)) {
} else if (OB_ISNULL(srv_rpc_proxy_) || OB_ISNULL(GCTX.sql_proxy_)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("srv_rpc_proxy_ is null", KR(ret), KP(srv_rpc_proxy_));
LOG_WARN("srv_rpc_proxy_ or GCTX.sql_proxy_ is null", KR(ret), KP(srv_rpc_proxy_), KP(GCTX.sql_proxy_));
} else if (is_valid_tenant_id(pool.tenant_id_)
&& OB_FAIL(ObCompatModeGetter::get_tenant_mode(pool.tenant_id_, compat_mode))) {
LOG_WARN("fail to get tenant compat mode", KR(ret), K(pool.tenant_id_));
@ -5555,13 +5555,17 @@ int ObUnitManager::allocate_pool_units_(
excluded_servers.reuse();
active_servers_info_of_zone.reuse();
active_servers_resource_info_of_zone.reuse();
const bool ONLY_ACTIVE_SERVERS = true;
if (FAILEDx(get_excluded_servers(pool.resource_pool_id_, zone, module,
new_allocate_pool, excluded_servers))) {
LOG_WARN("get excluded servers fail", KR(ret), K(pool.resource_pool_id_), K(zone),
K(module), K(new_allocate_pool));
} else if (OB_FAIL(SVR_TRACER.get_active_servers_info(zone, active_servers_info_of_zone))) {
LOG_WARN("fail to get active_servers_info_of_zone", KR(ret), K(servers_info), K(zone));
} else if (OB_FAIL(ObServerTableOperator::get_servers_info_of_zone(
*GCTX.sql_proxy_,
zone,
ONLY_ACTIVE_SERVERS,
active_servers_info_of_zone))) {
LOG_WARN("fail to get servers info of zone", KR(ret), K(zone));
} else if (OB_FAIL(get_servers_resource_info_via_rpc(
active_servers_info_of_zone,
active_servers_resource_info_of_zone))) {
@ -6293,19 +6297,26 @@ int ObUnitManager::check_enough_resource_for_delete_server(
int ret = OB_SUCCESS;
// get_servers_of_zone
ObArray<obrpc::ObGetServerResourceInfoResult> report_servers_resource_info;
ObArray<ObServerInfoInTable> servers_info;
ObArray<ObServerInfoInTable> servers_info_of_zone;
bool empty = false;
if (OB_UNLIKELY(!server.is_valid() || zone.is_empty())) {
const bool ONLY_ACTIVE_SERVERS = true;
if (OB_ISNULL(GCTX.sql_proxy_)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("GCTX.sql_proxy_ is null", KR(ret), KP(GCTX.sql_proxy_));
} else if (OB_UNLIKELY(!server.is_valid() || zone.is_empty())) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("invalid server or zone", KR(ret), K(server), K(zone));
} else if (OB_FAIL(check_server_empty(server, empty))) {
} else if (OB_FAIL(ut_operator_.check_server_empty(server, empty))) {
// the validity of the server is checked here
LOG_WARN("fail to check whether the server is empty", KR(ret));
} else if (empty) {
//nothing todo
} else if (OB_FAIL(SVR_TRACER.get_active_servers_info(zone, servers_info_of_zone))) {
LOG_WARN("fail to get servers_info_of_zone", KR(ret), K(servers_info), K(zone));
} else if (OB_FAIL(ObServerTableOperator::get_servers_info_of_zone(
*GCTX.sql_proxy_,
zone,
ONLY_ACTIVE_SERVERS,
servers_info_of_zone))) {
LOG_WARN("fail to get servers info of zone", KR(ret), K(zone));
} else if (OB_FAIL(get_servers_resource_info_via_rpc(servers_info_of_zone, report_servers_resource_info))) {
LOG_WARN("fail to get servers_resouce_info via rpc", KR(ret), K(servers_info_of_zone), K(report_servers_resource_info));
} else if (OB_FAIL(check_enough_resource_for_delete_server_(
@ -6328,9 +6339,11 @@ int ObUnitManager::get_servers_resource_info_via_rpc(
obrpc::ObGetServerResourceInfoArg arg;
ObArray<obrpc::ObGetServerResourceInfoResult> tmp_report_servers_resource_info;
report_servers_resource_info.reset();
if (OB_UNLIKELY(servers_info.count() <= 0)) {
if (OB_UNLIKELY(servers_info.count() < 0)) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("servers_info.count() should be greater than zero", KR(ret), K(servers_info.count()));
LOG_WARN("servers_info.count() should be >= 0", KR(ret), K(servers_info.count()));
} else if (0 == servers_info.count()) {
// do nothing
} else if (!ObHeartbeatService::is_service_enabled()) { // old logic
ObServerResourceInfo resource_info;
obrpc::ObGetServerResourceInfoResult result;
@ -6480,7 +6493,7 @@ int ObUnitManager::check_enough_resource_for_delete_server_(
if (OB_UNLIKELY(zone.is_empty())) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("zone is invalid", KR(ret), K(zone), K(server));
} else if (OB_FAIL(check_server_empty(server, empty))) {
} else if (OB_FAIL(ut_operator_.check_server_empty(server, empty))) {
LOG_WARN("fail to check server empty", K(ret));
} else if (empty) {
//nothing todo

View File

@ -566,6 +566,9 @@ int ObServerTraceMap::refresh()
LOG_WARN("GCTX.sql_proxy_ is null", KR(ret), KP(GCTX.sql_proxy_));
} else if (OB_FAIL(ObServerTableOperator::get(*GCTX.sql_proxy_, servers_info))) {
LOG_WARN("fail to get servers_info", KR(ret), KP(GCTX.sql_proxy_));
} else if (OB_UNLIKELY(servers_info.count() <= 0)) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("read nothing in table", KR(ret), K(servers_info));
} else {
SpinWLockGuard guard(lock_);
// reuse memory
@ -579,6 +582,7 @@ int ObServerTraceMap::refresh()
}
if (OB_SUCC(ret) && !has_build_) {
has_build_ = true;
FLOG_INFO("server tracer has built", KR(ret), K(has_build_), K(server_info_arr_));
}
}
return ret;

View File

@ -236,19 +236,21 @@ int ObServerTableOperator::init(common::ObISQLClient *proxy)
int ObServerTableOperator::get(common::ObIArray<share::ObServerStatus> &server_statuses)
{
int ret = OB_SUCCESS;
ObZone empty_zone;
if (OB_UNLIKELY(!inited_)) {
ret = OB_NOT_INIT;
LOG_WARN("not init", KR(ret), K(inited_));
} else if (OB_ISNULL(proxy_)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("proxy_ is null", KR(ret), KP(proxy_));
} else if (OB_FAIL(get(*proxy_, server_statuses))) {
} else if (OB_FAIL(get(*proxy_, empty_zone, server_statuses))) {
LOG_WARN("fail to get", KR(ret), KP(proxy_));
}
return ret;
}
int ObServerTableOperator::get(
common::ObISQLClient &sql_proxy,
const ObZone &zone,
common::ObIArray<ObServerStatus> &server_statuses)
{
int ret = OB_SUCCESS;
@ -259,7 +261,9 @@ int ObServerTableOperator::get(
LOG_WARN("fail to get timeout ctx", KR(ret), K(ctx));
} else if (OB_FAIL(sql.assign_fmt("SELECT *, time_to_usec(gmt_modified) AS last_hb_time "
"FROM %s", OB_ALL_SERVER_TNAME))) {
LOG_WARN("sql assign_fmt failed", KR(ret));
LOG_WARN("sql assign_fmt failed", KR(ret), K(sql));
} else if (!zone.is_empty() && OB_FAIL(sql.append_fmt(" WHERE zone = '%s'", zone.str().ptr()))) {
LOG_WARN("fail to assign zone condition", KR(ret), K(sql));
} else {
SMART_VAR(ObMySQLProxy::MySQLResult, res) {
ObMySQLResult *result = NULL;
@ -665,25 +669,39 @@ int ObServerTableOperator::get_start_service_time(
int ObServerTableOperator::get(
common::ObISQLClient &sql_proxy,
common::ObIArray<ObServerInfoInTable> &all_servers_info_in_table)
{
all_servers_info_in_table.reset();
ObZone empty_zone;
const bool ONLY_ACTIVE_SERVERS = false;
return get_servers_info_of_zone(sql_proxy, empty_zone, ONLY_ACTIVE_SERVERS, all_servers_info_in_table);
}
int ObServerTableOperator::get_servers_info_of_zone(
common::ObISQLClient &sql_proxy,
const ObZone &zone,
const bool only_active_servers,
common::ObIArray<ObServerInfoInTable> &all_servers_info_in_zone)
{
int ret = OB_SUCCESS;
ObArray<ObServerStatus> server_statuses;
ObServerInfoInTable server_info_in_table;
all_servers_info_in_table.reset();
if (OB_FAIL(get(sql_proxy, server_statuses))) {
all_servers_info_in_zone.reset();
if (OB_FAIL(get(sql_proxy, zone, server_statuses))) {
LOG_WARN("fail to get server status", KR(ret));
} else {
ObServerInfoInTable server_info;
ARRAY_FOREACH_X(server_statuses, idx, cnt, OB_SUCC(ret)) {
server_info_in_table.reset();
if (OB_FAIL(server_info_in_table.build_server_info_in_table(server_statuses.at(idx)))) {
server_info.reset();
if (OB_FAIL(server_info.build_server_info_in_table(server_statuses.at(idx)))) {
LOG_WARN("fail to build server info in table", KR(ret), K(server_statuses.at(idx)));
} else if (OB_FAIL(all_servers_info_in_table.push_back(server_info_in_table))) {
LOG_WARN("fail to push element into all_servers_info_in_table", KR(ret), K(server_info_in_table));
} else {}
} else if (only_active_servers && !server_info.is_active()) {
// do nothing
} else if (OB_FAIL(all_servers_info_in_zone.push_back(server_info))) {
LOG_WARN("fail to push element into all_servers_info_in_zone", KR(ret), K(server_info));
}
}
}
return ret;
}
int ObServerTableOperator::get(
common::ObISQLClient &sql_proxy,
const common::ObAddr &server,

View File

@ -115,6 +115,7 @@ public:
virtual int get(common::ObIArray<share::ObServerStatus> &server_statuses);
static int get(
common::ObISQLClient &sql_proxy,
const ObZone &zone,
common::ObIArray<share::ObServerStatus> &server_statuses);
static int remove(const common::ObAddr &server, common::ObMySQLTransaction &trans);
virtual int update(const share::ObServerStatus &server_status);
@ -136,8 +137,23 @@ public:
// @ret other error code failure
static int get(
common::ObISQLClient &sql_proxy,
common::ObIArray<ObServerInfoInTable> &all_servers_info_in_table);
common::ObISQLClient &sql_proxy,
common::ObIArray<ObServerInfoInTable> &all_servers_info_in_table);
// read __all_server table and return servers' info in the given zone
//
// @param[in] zone the given zone
// @param[in] only_active_servers true: only return servers' info whose status is active
// @param[out] all_servers_info_in_zone an array, which represents all rows of the given zone in __all_server table
//
// @ret OB_SUCCESS get servers' info from __all_server table successfully
// @ret OB_TABLE_NOT_EXIST it occurs in the bootstrap period, we need to wait for some time.
// @ret other error code failure
static int get_servers_info_of_zone(
common::ObISQLClient &sql_proxy,
const ObZone &zone,
const bool only_active_servers,
common::ObIArray<ObServerInfoInTable> &all_servers_info_in_zone);
// read the given server's corresponding row in __all_server table
// this func can be called in version >= 4.2
//

View File

@ -154,6 +154,25 @@ int ObUnitTableOperator::get_units(const common::ObAddr &server,
return ret;
}
int ObUnitTableOperator::check_server_empty(const common::ObAddr &server, bool &is_empty)
{
int ret = OB_SUCCESS;
ObArray<ObUnit> units;
is_empty = true;
if (OB_UNLIKELY(!inited_)) {
ret = OB_NOT_INIT;
LOG_WARN("not init", KR(ret), K(inited_));
} else if (OB_UNLIKELY(!server.is_valid())) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("invalid server", KR(ret), K(server));
} else if (OB_FAIL(get_units(server, units))) {
LOG_WARN("fail to get units", KR(ret), K(server));
} else if (units.count() > 0) {
is_empty = false;
}
return ret;
}
int ObUnitTableOperator::get_units(const common::ObIArray<uint64_t> &pool_ids,
common::ObIArray<ObUnit> &units) const
{

View File

@ -89,6 +89,7 @@ public:
virtual int get_unit_stats(common::ObIArray<ObUnitStat> &unit_stats) const;
static int read_unit(const common::sqlclient::ObMySQLResult &result, ObUnit &unit);
virtual int check_server_empty(const common::ObAddr &server, bool &is_empty);
private:
static int zone_list2str(const common::ObIArray<common::ObZone> &zone_list,
char *str, const int64_t buf_size);