Fix bug that unitmanager hung when creating tenant.

This commit is contained in:
LeonChaoHi
2023-11-30 02:41:59 +00:00
committed by ob-robot
parent 228cfb5382
commit 2af75a5522
16 changed files with 501 additions and 245 deletions

View File

@ -37,6 +37,7 @@
#include "share/ob_rpc_struct.h"
#include "storage/ob_file_system_router.h"
#include "observer/ob_server_struct.h"
#include "observer/omt/ob_tenant_node_balancer.h"
#include "rootserver/ob_balance_info.h"
#include "rootserver/ob_zone_manager.h"
#include "rootserver/ob_rs_event_history_table_operator.h"
@ -190,7 +191,8 @@ int ObUnitManager::init(ObMySQLProxy &proxy,
ObServerConfig &server_config,
obrpc::ObSrvRpcProxy &srv_rpc_proxy,
share::schema::ObMultiVersionSchemaService &schema_service,
ObRootBalancer &root_balance)
ObRootBalancer &root_balance,
ObRootService &root_service)
{
int ret = OB_SUCCESS;
if (inited_) {
@ -245,6 +247,7 @@ int ObUnitManager::init(ObMySQLProxy &proxy,
proxy_ = &proxy;
server_config_ = &server_config;
srv_rpc_proxy_ = &srv_rpc_proxy;
root_service_ = &root_service;
schema_service_ = &schema_service;
root_balance_ = &root_balance;
loaded_ = false;
@ -627,7 +630,7 @@ int ObUnitManager::check_tenant_pools_in_shrinking(
} else if (OB_UNLIKELY(!is_valid_tenant_id(tenant_id))) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("invalid argument", K(ret));
} else if (OB_FAIL(get_pools_by_tenant(tenant_id, pools))) {
} else if (OB_FAIL(get_pools_by_tenant_(tenant_id, pools))) {
LOG_WARN("fail to get pools by tenant", K(ret), K(tenant_id));
} else if (OB_UNLIKELY(NULL == pools)) {
ret = OB_ERR_UNEXPECTED;
@ -1919,7 +1922,7 @@ int ObUnitManager::get_to_be_deleted_unit_group(
if (OB_UNLIKELY(nullptr == pools.at(i))) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("pool ptr is null", KR(ret), KP(pools.at(i)));
} else if (OB_FAIL(inner_get_unit_infos_of_pool(pools.at(i)->resource_pool_id_, pool_unit_infos))) {
} else if (OB_FAIL(inner_get_unit_infos_of_pool_(pools.at(i)->resource_pool_id_, pool_unit_infos))) {
LOG_WARN("inner_get_unit_infos_of_pool failed", KR(ret), "resource_pool_id", pools.at(i)->resource_pool_id_);
} else if (pool_unit_infos.count() <= 0) {
ret = OB_ERR_UNEXPECTED;
@ -2257,7 +2260,7 @@ int ObUnitManager::alter_resource_tenant(
} else if (OB_UNLIKELY(!is_valid_tenant_id(tenant_id) || new_unit_num <= 0)) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("invalid argument", KR(ret), K(tenant_id), K(new_unit_num));
} else if (OB_FAIL(get_pools_by_tenant(tenant_id, pools))) {
} else if (OB_FAIL(get_pools_by_tenant_(tenant_id, pools))) {
LOG_WARN("fail to get pools by tenant", KR(ret), K(tenant_id));
} else if (OB_UNLIKELY(nullptr == pools)) {
ret = OB_ERR_UNEXPECTED;
@ -3087,7 +3090,7 @@ int ObUnitManager::check_server_enough(const uint64_t tenant_id,
} else if (OB_ISNULL(pool)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("get invalid pool", K(ret), K(pool));
} else if (OB_FAIL(inner_get_unit_infos_of_pool(pool->resource_pool_id_, unit_infos))) {
} else if (OB_FAIL(inner_get_unit_infos_of_pool_(pool->resource_pool_id_, unit_infos))) {
LOG_WARN("fail to get unit infos", K(ret), K(*pool));
} else {
for (int64_t j = 0; j < unit_infos.count() && OB_SUCC(ret); j++) {
@ -3101,7 +3104,7 @@ int ObUnitManager::check_server_enough(const uint64_t tenant_id,
} // end for
}
//Count the number of existing units
if (FAILEDx(get_pools_by_tenant(tenant_id, pools))) {
if (FAILEDx(get_pools_by_tenant_(tenant_id, pools))) {
if (OB_ENTRY_NOT_EXIST == ret) {
// a new tenant, without resource pool already granted
ret = OB_SUCCESS;
@ -3118,7 +3121,7 @@ int ObUnitManager::check_server_enough(const uint64_t tenant_id,
if (OB_UNLIKELY(NULL == pool)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("pool ptr is null", K(ret), KP(pool));
} else if (OB_FAIL(inner_get_unit_infos_of_pool(pool->resource_pool_id_, unit_infos))) {
} else if (OB_FAIL(inner_get_unit_infos_of_pool_(pool->resource_pool_id_, unit_infos))) {
LOG_WARN("fail to get unit infos", K(ret), K(*pool));
} else {
for (int64_t j = 0; j < unit_infos.count() && OB_SUCC(ret); j++) {
@ -3231,7 +3234,7 @@ int ObUnitManager::check_expand_zone_resource_allowed_by_old_unit_stat_(
ret = OB_INVALID_ARGUMENT;
LOG_WARN("invalid argument", KR(ret), K(tenant_id));
} else {
int tmp_ret = get_pools_by_tenant(tenant_id, cur_pool_array);
int tmp_ret = get_pools_by_tenant_(tenant_id, cur_pool_array);
if (OB_ENTRY_NOT_EXIST == tmp_ret) {
is_allowed = true;
} else if (OB_UNLIKELY(nullptr == cur_pool_array)) {
@ -3323,7 +3326,7 @@ int ObUnitManager::check_tenant_pools_unit_num_legal_(
} else {
unit_num_legal = true;
sample_unit_num = -1;
int tmp_ret = get_pools_by_tenant(tenant_id, cur_pool_array);
int tmp_ret = get_pools_by_tenant_(tenant_id, cur_pool_array);
if (OB_ENTRY_NOT_EXIST == tmp_ret) {
// when create tenant pools belong to this tenant is empty
} else if (OB_UNLIKELY(nullptr == cur_pool_array)) {
@ -3414,7 +3417,7 @@ int ObUnitManager::get_tenant_pool_unit_group_id_(
} else {
// when grant pools, an unit group id greater than 0 is needed for every unit
ObArray<share::ObResourcePool *> *tenant_pool_array = nullptr;
int tmp_ret = get_pools_by_tenant(tenant_id, tenant_pool_array);
int tmp_ret = get_pools_by_tenant_(tenant_id, tenant_pool_array);
if (OB_ENTRY_NOT_EXIST == tmp_ret) {
// need to fetch unit group from inner table, since this is invoked by create tenant
for (int64_t i = 0; OB_SUCC(ret) && i < unit_group_num; ++i) {
@ -3450,7 +3453,7 @@ int ObUnitManager::inner_get_all_unit_group_id(
LOG_WARN("invalid argument", KR(ret), K(tenant_id));
} else {
ObArray<share::ObResourcePool *> *cur_pool_array = nullptr;
if (OB_FAIL(get_pools_by_tenant(tenant_id, cur_pool_array))) {
if (OB_FAIL(get_pools_by_tenant_(tenant_id, cur_pool_array))) {
if (OB_ENTRY_NOT_EXIST == ret) {
// bypass
ret = OB_SUCCESS;
@ -3552,7 +3555,7 @@ int ObUnitManager::get_unit_group(
LOG_WARN("invalid argument", KR(ret), K(tenant_id), K(unit_group_id));
} else {
ObArray<share::ObResourcePool *> *cur_pool_array = nullptr;
ret = get_pools_by_tenant(tenant_id, cur_pool_array);
ret = get_pools_by_tenant_(tenant_id, cur_pool_array);
if (OB_ENTRY_NOT_EXIST == ret) {
// bypass
} else if (OB_SUCCESS != ret) {
@ -3724,7 +3727,7 @@ int ObUnitManager::inner_get_pool_ids_of_tenant(const uint64_t tenant_id,
} else {
pool_ids.reuse();
ObArray<share::ObResourcePool *> *pools = NULL;
if (OB_FAIL(get_pools_by_tenant(tenant_id, pools))) {
if (OB_FAIL(get_pools_by_tenant_(tenant_id, pools))) {
if (OB_ENTRY_NOT_EXIST != ret) {
LOG_DEBUG("get_pools_by_tenant failed", K(tenant_id), K(ret));
} else {
@ -3751,6 +3754,94 @@ int ObUnitManager::inner_get_pool_ids_of_tenant(const uint64_t tenant_id,
return ret;
}
int ObUnitManager::get_tenant_alive_servers_non_block(const uint64_t tenant_id,
common::ObIArray<ObAddr> &servers)
{
int ret = OB_SUCCESS;
if (!check_inner_stat()) {
ret = OB_INNER_STAT_ERROR;
LOG_WARN("check_inner_stat failed", KR(ret), K(inited_), K(loaded_));
} else if (OB_UNLIKELY(!is_valid_tenant_id(tenant_id))) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("invalid argument", KR(ret), K(tenant_id));
} else {
bool get_succ = false;
ObArray<ObUnit> tenant_units;
uint64_t valid_tnt_id = is_meta_tenant(tenant_id) ? gen_user_tenant_id(tenant_id) : tenant_id;
// STEP 1: try lock and get units from inmemory data
if (lock_.try_rdlock()) {
ObArray<share::ObResourcePool *> *pools = nullptr;
if (OB_FAIL(get_pools_by_tenant_(valid_tnt_id, pools))) {
LOG_WARN("failed to get pools by tenant", KR(ret), K(valid_tnt_id));
} else if (OB_ISNULL(pools)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("pools is nullptr", KR(ret), KP(pools));
} else {
for (int64_t i = 0; OB_SUCC(ret) && i < pools->count(); ++i) {
const share::ObResourcePool *pool = pools->at(i);
ObArray<ObUnit *> *pool_units;
if (OB_ISNULL(pool)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("pool is nullptr", KR(ret), KP(pool));
} else if (OB_FAIL(get_units_by_pool(pool->resource_pool_id_, pool_units))) {
LOG_WARN("fail to get_units_by_pool", KR(ret),
"pool_id", pool->resource_pool_id_);
} else if (OB_ISNULL(pool_units)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("pool_units is nullptr", KR(ret), KP(pool_units));
} else {
ARRAY_FOREACH_X(*pool_units, idx, cnt, OB_SUCC(ret)) {
const ObUnit *unit = pool_units->at(idx);
if (OB_ISNULL(unit)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("unit is nullptr", KR(ret), KP(unit));
} else if (OB_FAIL(tenant_units.push_back(*unit))) {
LOG_WARN("failed to push_back unit", KR(ret), KPC(unit));
}
}
}
}
}
get_succ = OB_SUCC(ret);
lock_.unlock();
}
// STEP 2: try to get units from inner_table
if (OB_SUCC(ret) && !get_succ) {
tenant_units.reuse();
if (OB_FAIL(ut_operator_.get_units_by_tenant(valid_tnt_id, tenant_units))) {
LOG_WARN("fail to get_units_by_tenant from inner_table",
KR(ret), K(tenant_id), K(valid_tnt_id));
}
}
// STEP 3: filter alive servers
if (OB_SUCC(ret)) {
servers.reuse();
FOREACH_X(unit, tenant_units, OB_SUCC(ret)) {
bool is_alive = false;
if (OB_FAIL(SVR_TRACER.check_server_alive(unit->server_, is_alive))) {
LOG_WARN("check_server_alive failed", KR(ret), K(unit->server_));
} else if (is_alive) {
if (OB_FAIL(servers.push_back(unit->server_))) {
LOG_WARN("push_back failed", KR(ret), K(unit->server_));
}
}
if (OB_FAIL(ret) || !unit->migrate_from_server_.is_valid()) {
// skip
} else if (OB_FAIL(SVR_TRACER.check_server_alive(unit->migrate_from_server_, is_alive))) {
LOG_WARN("check_server_alive failed", KR(ret), K(unit->migrate_from_server_));
} else if (is_alive) {
if (OB_FAIL(servers.push_back(unit->migrate_from_server_))) {
LOG_WARN("push_back failed", KR(ret), K(unit->migrate_from_server_));
}
}
}
}
}
return ret;
}
int ObUnitManager::get_pool_ids_of_tenant(const uint64_t tenant_id,
ObIArray<uint64_t> &pool_ids) const
{
@ -3781,7 +3872,7 @@ int ObUnitManager::get_pool_names_of_tenant(const uint64_t tenant_id,
LOG_WARN("invalid argument", K(tenant_id), K(ret));
} else {
ObArray<share::ObResourcePool *> *pools = NULL;
if (OB_FAIL(get_pools_by_tenant(tenant_id, pools))) {
if (OB_FAIL(get_pools_by_tenant_(tenant_id, pools))) {
if (OB_ENTRY_NOT_EXIST != ret) {
LOG_WARN("get_pools_by_tenant failed", K(tenant_id), K(ret));
} else {
@ -3936,7 +4027,7 @@ int ObUnitManager::inner_get_tenant_pool_zone_list(
} else if (OB_UNLIKELY(!is_valid_tenant_id(tenant_id))) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("invalid argument", K(ret), K(tenant_id));
} else if (OB_FAIL(get_pools_by_tenant(tenant_id, pools))) {
} else if (OB_FAIL(get_pools_by_tenant_(tenant_id, pools))) {
LOG_WARN("fail to get pools by tenant", K(ret), K(tenant_id));
} else if (OB_UNLIKELY(NULL == pools)) {
ret = OB_ERR_UNEXPECTED;
@ -4037,7 +4128,7 @@ int ObUnitManager::finish_migrate_unit_not_in_tenant(
//ignore in tenant unit
} else {
ObArray<ObUnitInfo> unit_infos;
if (OB_FAIL(inner_get_unit_infos_of_pool(pool->resource_pool_id_, unit_infos))) {
if (OB_FAIL(inner_get_unit_infos_of_pool_(pool->resource_pool_id_, unit_infos))) {
LOG_WARN("fail to get units by pool", K(ret));
} else {
FOREACH_CNT_X(unit_info, unit_infos, OB_SUCC(ret)) {
@ -4155,7 +4246,7 @@ int ObUnitManager::inner_get_zone_alive_unit_infos_by_tenant(
} else if (OB_UNLIKELY(NULL == pool)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("pool is null", K(ret));
} else if (OB_FAIL(inner_get_unit_infos_of_pool(*pool, unit_array))) {
} else if (OB_FAIL(inner_get_unit_infos_of_pool_(*pool, unit_array))) {
LOG_WARN("fail to get unit infos of pool", K(ret));
} else if (unit_array.count() > 0) {
FOREACH_X(u, unit_array, OB_SUCCESS == ret) {
@ -4254,7 +4345,7 @@ int ObUnitManager::commit_shrink_tenant_resource_pool(const uint64_t tenant_id)
} else if (OB_UNLIKELY(!is_valid_tenant_id(tenant_id))) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("invalid argument", KR(ret), K(tenant_id));
} else if (OB_FAIL(get_pools_by_tenant(tenant_id, pools))) {
} else if (OB_FAIL(get_pools_by_tenant_(tenant_id, pools))) {
LOG_WARN("failed to get pool by tenant", KR(ret), K(tenant_id));
} else if (OB_UNLIKELY(NULL == pools)) {
ret = OB_ERR_UNEXPECTED;
@ -4402,13 +4493,13 @@ int ObUnitManager::get_unit_infos_of_pool(const uint64_t resource_pool_id,
} else if (OB_INVALID_ID == resource_pool_id) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("invalid argument", K(resource_pool_id), K(ret));
} else if (OB_FAIL(inner_get_unit_infos_of_pool(resource_pool_id, unit_infos))) {
} else if (OB_FAIL(inner_get_unit_infos_of_pool_(resource_pool_id, unit_infos))) {
LOG_WARN("inner_get_unit_infos_of_pool failed", K(resource_pool_id), K(ret));
}
return ret;
}
int ObUnitManager::inner_get_unit_infos_of_pool(const uint64_t resource_pool_id,
int ObUnitManager::inner_get_unit_infos_of_pool_(const uint64_t resource_pool_id,
ObIArray<ObUnitInfo> &unit_infos) const
{
int ret = OB_SUCCESS;
@ -4454,6 +4545,8 @@ int ObUnitManager::inner_get_unit_infos_of_pool(const uint64_t resource_pool_id,
} else if (units->count() <= 0) {
ret = OB_ENTRY_NOT_EXIST;
LOG_WARN("units of resource pool not exist", K(resource_pool_id), K(ret));
} else if (OB_FAIL(unit_infos.reserve(units->count()))) {
LOG_WARN("failed to reserve for unit_infos", KR(ret));
} else {
for (int64_t i = 0; OB_SUCC(ret) && i < units->count(); ++i) {
if (NULL == units->at(i)) {
@ -4685,11 +4778,18 @@ int ObUnitManager::allocate_new_pool_units_(
return ret;
}
int ObUnitManager::try_notify_tenant_server_unit_resource(
/* Notify creating or dropping unit on ObServer.
* 1. Specify @is_delete as true when dropping unit,
* and only @tenant_id and @unit will be used in this case.
* 2. This function merely sends RPC call, so executor should make sure waiting is called later.
* But there is one exception that when @unit is on this server where RS Leader locates,
* notification will be locally executed without RPC, which means no need to wait proxy.
*/
int ObUnitManager::try_notify_tenant_server_unit_resource_(
const uint64_t tenant_id,
const bool is_delete,
ObNotifyTenantServerResourceProxy &notify_proxy,
const share::ObResourcePool &new_pool,
const uint64_t unit_config_id,
const lib::Worker::CompatMode compat_mode,
const share::ObUnit &unit,
const bool if_not_grant,
@ -4700,69 +4800,150 @@ int ObUnitManager::try_notify_tenant_server_unit_resource(
if (!check_inner_stat()) {
ret = OB_INNER_STAT_ERROR;
LOG_WARN("check_inner_stat failed", K(ret), K(inited_), K(loaded_));
} else if (OB_ISNULL(srv_rpc_proxy_)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("srv_rpc_proxy_ is null", KR(ret), KP(srv_rpc_proxy_));
} else if (!is_valid_tenant_id(tenant_id)) {
// do nothing, unit not granted
} else if (OB_FAIL(SVR_TRACER.check_server_alive(unit.server_, is_alive))) {
LOG_WARN("fail to get server_info", KR(ret), K(unit.server_));
} else if (!is_alive && (is_delete || skip_offline_server)) {
LOG_INFO("server not alive when delete unit, ignore", "server", unit.server_);
} else {
share::ObUnitConfig *unit_config = nullptr;
if (!is_valid_tenant_id(new_pool.tenant_id_) && !is_delete) {
// bypass
} else if (OB_FAIL(get_unit_config_by_id(new_pool.unit_config_id_, unit_config))) {
LOG_WARN("fail to get unit config by id", K(ret));
} else if (OB_UNLIKELY(nullptr == unit_config)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("unit config is null", K(ret), "unit_config_id", new_pool.unit_config_id_);
} else if (!is_alive) {
if (is_delete || skip_offline_server) {
// do nothing
LOG_INFO("ignore not alive server when is_delete or skip_offline_server is true",
"server", unit.server_, K(is_delete), K(skip_offline_server));
} else {
obrpc::TenantServerUnitConfig tenant_unit_server_config;
int64_t start = ObTimeUtility::current_time();
int64_t rpc_timeout = NOTIFY_RESOURCE_RPC_TIMEOUT;
if (INT64_MAX != THIS_WORKER.get_timeout_ts()) {
rpc_timeout = max(rpc_timeout, THIS_WORKER.get_timeout_remain());
ret = OB_SERVER_NOT_ALIVE;
LOG_WARN("destination server not alive", KR(ret), K(unit.server_));
}
} else {
// STEP 1: Get and init notifying arg
obrpc::TenantServerUnitConfig tenant_unit_server_config;
if (!is_delete) {
if (OB_FAIL(build_notify_create_unit_resource_rpc_arg_(
tenant_id, unit, compat_mode, unit_config_id, if_not_grant,
tenant_unit_server_config))) {
LOG_WARN("fail to init tenant_unit_server_config", KR(ret), K(tenant_id), K(is_delete));
}
if (OB_FAIL(tenant_unit_server_config.init(
tenant_id, unit.unit_id_, compat_mode, *unit_config,
ObReplicaType::REPLICA_TYPE_FULL, if_not_grant, is_delete))) {
LOG_WARN("fail to init server config", KR(ret), K(tenant_id));
} else if (OB_FAIL(notify_proxy.call(unit.server_, rpc_timeout, tenant_unit_server_config))) {
LOG_WARN("fail to call notify resource to server",
K(ret), K(rpc_timeout), "dst", unit.server_);
if (OB_TENANT_EXIST == ret) {
ret = OB_TENANT_RESOURCE_UNIT_EXIST;
LOG_USER_ERROR(OB_TENANT_RESOURCE_UNIT_EXIST, tenant_id, to_cstring(unit.server_));
}
} else {
if (OB_FAIL(tenant_unit_server_config.init_for_dropping(tenant_id, is_delete))) {
LOG_WARN("fail to init tenant_unit_server_config", KR(ret), K(tenant_id), K(is_delete));
}
LOG_INFO("call notify resource to server", K(ret), "dst", unit.server_,
}
// STEP 2: Do notification
if (OB_FAIL(ret)) {
} else if (OB_FAIL(do_notify_unit_resource_(
unit.server_, tenant_unit_server_config, notify_proxy))) {
LOG_WARN("failed to do_notify_unit_resource", "dst", unit.server_, K(tenant_unit_server_config));
if (!is_delete && OB_TENANT_EXIST == ret) {
ret = OB_TENANT_RESOURCE_UNIT_EXIST;
LOG_USER_ERROR(OB_TENANT_RESOURCE_UNIT_EXIST, tenant_id, to_cstring(unit.server_));
}
}
}
return ret;
}
int ObUnitManager::build_notify_create_unit_resource_rpc_arg_(
const uint64_t tenant_id,
const share::ObUnit &unit,
const lib::Worker::CompatMode compat_mode,
const uint64_t unit_config_id,
const bool if_not_grant,
obrpc::TenantServerUnitConfig &rpc_arg) const
{
int ret = OB_SUCCESS;
// get unit_config
share::ObUnitConfig *unit_config = nullptr;
if (OB_FAIL(get_unit_config_by_id(unit_config_id, unit_config))) {
LOG_WARN("fail to get unit config by id", KR(ret));
} else if (OB_ISNULL(unit_config)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("unit config is null", KR(ret), "unit_config_id", unit_config_id);
}
#ifdef OB_BUILD_TDE_SECURITY
// get root_key
obrpc::ObRootKeyResult root_key;
obrpc::ObRootKeyArg get_rootkey_arg;
if (OB_FAIL(ret)) {
// do nothing
} else if (OB_ISNULL(root_service_)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("root_service_ is nullptr", KR(ret), KP(root_service_));
} else if (OB_FAIL(get_rootkey_arg.init_for_get(tenant_id))) {
LOG_WARN("failed to init get_root_key arg", KR(ret), K(tenant_id));
} else if (OB_FAIL(root_service_->handle_get_root_key(get_rootkey_arg, root_key))) {
LOG_WARN("fail to get root_key", KR(ret), K(get_rootkey_arg));
}
#endif
// init rpc_arg
const bool is_delete = false;
if (OB_FAIL(ret)) {
} else if (OB_FAIL(rpc_arg.init(tenant_id,
unit.unit_id_,
compat_mode,
#ifdef OB_BUILD_TDE_SECURITY
root_key,
#endif
*unit_config,
ObReplicaType::REPLICA_TYPE_FULL,
if_not_grant,
is_delete))) {
LOG_WARN("fail to init rpc_arg", KR(ret), K(tenant_id), K(is_delete));
}
return ret;
}
int ObUnitManager::do_notify_unit_resource_(
const common::ObAddr server,
const obrpc::TenantServerUnitConfig &notify_arg,
ObNotifyTenantServerResourceProxy &notify_proxy)
{
int ret = OB_SUCCESS;
if (GCONF.self_addr_ == server) {
// Directly call local interface without using RPC
if (OB_FAIL(omt::ObTenantNodeBalancer::get_instance().handle_notify_unit_resource(notify_arg))) {
LOG_WARN("fail to handle_notify_unit_resource", K(ret), K(notify_arg));
} else {
LOG_INFO("call notify resource to server (locally)", KR(ret), "dst", server, K(notify_arg));
}
} else {
int64_t start = ObTimeUtility::current_time();
int64_t rpc_timeout = NOTIFY_RESOURCE_RPC_TIMEOUT;
if (INT64_MAX != THIS_WORKER.get_timeout_ts()) {
rpc_timeout = max(rpc_timeout, THIS_WORKER.get_timeout_remain());
}
if (OB_FAIL(notify_proxy.call(server, rpc_timeout, notify_arg))) {
LOG_WARN("fail to call notify resource to server", KR(ret), K(rpc_timeout),
"dst", server, "cost", ObTimeUtility::current_time() - start);
} else {
LOG_INFO("call notify resource to server", KR(ret), "dst", server, K(notify_arg),
"cost", ObTimeUtility::current_time() - start);
}
}
return ret;
}
int ObUnitManager::rollback_persistent_units(
int ObUnitManager::rollback_persistent_units_(
const common::ObArray<share::ObUnit> &units,
const share::ObResourcePool &pool,
const lib::Worker::CompatMode compat_mode,
const bool if_not_grant,
const bool skip_offline_server,
ObNotifyTenantServerResourceProxy &notify_proxy)
{
int ret = OB_SUCCESS;
bool is_delete = true;
int tmp_ret = OB_SUCCESS;
const bool is_delete = true;
ObArray<int> return_ret_array;
notify_proxy.reuse();
for (int64_t i = 0; i < units.count(); i++) {
const ObUnit & unit = units.at(i);
if (OB_TMP_FAIL(try_notify_tenant_server_unit_resource(
const lib::Worker::CompatMode dummy_mode = lib::Worker::CompatMode::INVALID;
if (OB_TMP_FAIL(try_notify_tenant_server_unit_resource_(
pool.tenant_id_, is_delete, notify_proxy,
pool, compat_mode, unit, if_not_grant, skip_offline_server))) {
pool.unit_config_id_, dummy_mode, unit,
false/*if_not_grant*/, false/*skip_offline_server*/))) {
ret = OB_SUCC(ret) ? tmp_ret : ret;
LOG_WARN("fail to try notify server unit resource", KR(ret), KR(tmp_ret),
K(pool), K(compat_mode), K(unit), K(if_not_grant), K(skip_offline_server));
K(is_delete), K(pool), K(dummy_mode), K(unit));
}
}
if (OB_TMP_FAIL(notify_proxy.wait_all(return_ret_array))) {
@ -4799,7 +4980,7 @@ int ObUnitManager::get_tenant_unit_servers(
LOG_WARN("invalid argument", K(ret), K(tenant_id));
} else {
server_array.reset();
int tmp_ret = get_pools_by_tenant(tenant_id, pools);
int tmp_ret = get_pools_by_tenant_(tenant_id, pools);
if (OB_ENTRY_NOT_EXIST == tmp_ret) {
// pass, and return empty server array
} else if (OB_SUCCESS != tmp_ret) {
@ -4949,9 +5130,9 @@ int ObUnitManager::allocate_pool_units_(
unit.replica_type_ = pool.replica_type_;
if (OB_FAIL(unit.zone_.assign(zone))) {
LOG_WARN("fail to assign zone", KR(ret), K(zone));
} else if (OB_FAIL(try_notify_tenant_server_unit_resource(
} else if (OB_FAIL(try_notify_tenant_server_unit_resource_(
pool.tenant_id_, is_delete, notify_proxy,
pool, compat_mode, unit, false/*if not grant*/,
pool.unit_config_id_, compat_mode, unit, false/*if not grant*/,
false/*skip offline server*/))) {
LOG_WARN("fail to try notify server unit resource", K(ret));
} else if (OB_FAIL(add_unit(client, unit))) {
@ -4976,13 +5157,7 @@ int ObUnitManager::allocate_pool_units_(
}
if (OB_FAIL(ret)) {
LOG_WARN("start to rollback unit persistence", KR(ret), K(units), K(pool));
if(OB_TMP_FAIL(rollback_persistent_units(
units,
pool,
compat_mode,
false/*if not grant*/,
false/*skip offline server*/,
notify_proxy))) {
if(OB_TMP_FAIL(rollback_persistent_units_(units, pool, notify_proxy))) {
LOG_WARN("fail to rollback unit persistence", KR(ret), KR(tmp_ret), K(units),
K(pool), K(compat_mode));
}
@ -6497,7 +6672,7 @@ int ObUnitManager::check_shrink_granted_pool_allowed_by_locality(
} else if (OB_UNLIKELY(NULL == tenant_schema)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("tenant schema is null", K(ret), KP(tenant_schema));
} else if (OB_FAIL(get_pools_by_tenant(tenant_schema->get_tenant_id(), pool_list))) {
} else if (OB_FAIL(get_pools_by_tenant_(tenant_schema->get_tenant_id(), pool_list))) {
LOG_WARN("fail to get pools by tenant", K(ret));
} else if (OB_UNLIKELY(nullptr == pool_list)) {
ret = OB_ERR_UNEXPECTED;
@ -6910,7 +7085,7 @@ int ObUnitManager::check_can_remove_pool_zone_list(
LOG_WARN("invalid argument", K(ret), KP(pool), K(to_be_removed_zones));
} else if (!pool->is_granted_to_tenant()) {
can_remove = true; // this pool do not grant to any tenant, can remove zone unit
} else if (OB_FAIL(get_pools_by_tenant(pool->tenant_id_, pool_list))) {
} else if (OB_FAIL(get_pools_by_tenant_(pool->tenant_id_, pool_list))) {
LOG_WARN("fail to get pools by tenant", K(ret));
} else if (OB_UNLIKELY(NULL == pool_list)) {
ret = OB_ERR_UNEXPECTED;
@ -7192,7 +7367,7 @@ int ObUnitManager::check_can_add_pool_zone_list_by_locality(
LOG_WARN("ivnalid argument", K(ret), KP(pool), K(to_be_add_zones));
} else if (! pool->is_granted_to_tenant()) {
can_add = true; // not in tenant, can add zone unit
} else if (OB_FAIL(get_pools_by_tenant(pool->tenant_id_, pool_list))) {
} else if (OB_FAIL(get_pools_by_tenant_(pool->tenant_id_, pool_list))) {
LOG_WARN("fail to get pools by tenant", K(ret));
} else if (OB_ISNULL(schema_service_)) {
ret = OB_NOT_INIT;
@ -7716,7 +7891,7 @@ int ObUnitManager::check_pool_intersect_(
} // end foreach pool
if (OB_FAIL(ret)) {
} else if (intersect) {
} else if (OB_FAIL(get_pools_by_tenant(tenant_id, pools))) {
} else if (OB_FAIL(get_pools_by_tenant_(tenant_id, pools))) {
if (OB_ENTRY_NOT_EXIST == ret) {
// a new tenant, without resource pool already granted
ret = OB_SUCCESS;
@ -7841,9 +8016,9 @@ int ObUnitManager::do_grant_pools_(
if (OB_ISNULL(unit)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("unit ptr is null", KR(ret));
} else if (OB_FAIL(try_notify_tenant_server_unit_resource(
} else if (OB_FAIL(try_notify_tenant_server_unit_resource_(
tenant_id, false /*is_delete*/, notify_proxy,
new_pool, compat_mode, *unit,
new_pool.unit_config_id_, compat_mode, *unit,
false/*if_not_grant*/, false/*skip_offline_server*/))) {
LOG_WARN("fail to try notify server unit resource", KR(ret));
} else if (FALSE_IT(new_unit = *unit)) {
@ -7874,12 +8049,7 @@ int ObUnitManager::do_grant_pools_(
if (OB_FAIL(ret) && pools.count() == all_pool_units.count()) {
LOG_WARN("start to rollback unit persistence", KR(ret), K(pools), K(tenant_id));
for (int64_t i = 0; i < pools.count(); ++i) {
if (OB_TMP_FAIL(rollback_persistent_units(all_pool_units.at(i),
pools.at(i),
compat_mode,
false/*if_not_grant*/,
false/*skip_offline_server*/,
notify_proxy))) {
if (OB_TMP_FAIL(rollback_persistent_units_(all_pool_units.at(i), pools.at(i), notify_proxy))) {
LOG_WARN("fail to rollback unit persistence", KR(ret), KR(tmp_ret), K(all_pool_units.at(i)),
K(pools.at(i)), K(compat_mode));
}
@ -7946,9 +8116,9 @@ int ObUnitManager::do_revoke_pools_(
if (OB_ISNULL(unit)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("unit ptr is null", KR(ret));
} else if (OB_FAIL(try_notify_tenant_server_unit_resource(
} else if (OB_FAIL(try_notify_tenant_server_unit_resource_(
tenant_id, true /*is_delete*/, notify_proxy,
new_pool, dummy_mode, *unit,
new_pool.unit_config_id_, dummy_mode, *unit,
false/*if_not_grant*/, false/*skip_offline_server*/))) {
LOG_WARN("fail to try notify server unit resource", KR(ret));
} else if (FALSE_IT(new_unit = *unit)) {
@ -7967,7 +8137,7 @@ int ObUnitManager::do_revoke_pools_(
LOG_WARN("failed to commit shrinking pools in revoking", KR(ret), K(tenant_id));
}
int tmp_ret = OB_SUCCESS;
if (OB_TMP_FAIL(notify_proxy.wait())) {
if (OB_TMP_FAIL(notify_proxy.wait()) && OB_TENANT_NOT_IN_SERVER != tmp_ret) {
LOG_WARN("fail to wait notify resource", KR(ret), K(tmp_ret));
ret = (OB_SUCCESS == ret) ? tmp_ret : ret;
} else if (OB_SUCC(ret)) {
@ -8107,7 +8277,7 @@ int ObUnitManager::get_zone_units(const ObArray<share::ObResourcePool *> &pools,
if (NULL == *pool) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("pool is null", "pool", OB_P(*pool), K(ret));
} else if (OB_FAIL(inner_get_unit_infos_of_pool((*pool)->resource_pool_id_, unit_infos))) {
} else if (OB_FAIL(inner_get_unit_infos_of_pool_((*pool)->resource_pool_id_, unit_infos))) {
LOG_WARN("inner_get_unit_infos_of_pool failed",
"pool id", (*pool)->resource_pool_id_, K(ret));
} else {
@ -8534,9 +8704,9 @@ int ObUnitManager::do_migrate_unit_notify_resource_(const share::ObResourcePool
&obrpc::ObSrvRpcProxy::notify_tenant_server_unit_resource);
// only notify new unit resource on dst server here.
// Old unit on src server will be delete later when doing end_migrate
if (OB_FAIL(try_notify_tenant_server_unit_resource(
if (OB_FAIL(try_notify_tenant_server_unit_resource_(
pool.tenant_id_, false/*is_delete*/, notify_proxy, // is_delete is false when migrate unit
pool, compat_mode, new_unit, false/*if not grant*/,
pool.unit_config_id_, compat_mode, new_unit, false/*if not grant*/,
false/*skip offline server*/))) {
LOG_WARN("fail to try notify server unit resource", K(ret));
}
@ -8556,13 +8726,7 @@ int ObUnitManager::do_migrate_unit_notify_resource_(const share::ObResourcePool
ObArray<ObUnit> units;
if (OB_TMP_FAIL(units.push_back(new_unit))) {
LOG_WARN("fail to push an element into units", KR(ret), KR(tmp_ret), K(new_unit));
} else if (OB_TMP_FAIL(rollback_persistent_units(
units,
pool,
compat_mode,
false/*if not grant*/,
false/*skip offline server*/,
notify_proxy))) {
} else if (OB_TMP_FAIL(rollback_persistent_units_(units, pool, notify_proxy))) {
LOG_WARN("fail to rollback unit persistence", KR(ret), KR(tmp_ret),
K(units), K(pool), K(compat_mode));
}
@ -8740,9 +8904,7 @@ int ObUnitManager::inner_try_delete_migrate_unit_resource(
ObNotifyTenantServerResourceProxy notify_proxy(
*srv_rpc_proxy_,
&obrpc::ObSrvRpcProxy::notify_tenant_server_unit_resource);
if (OB_FAIL(tenant_unit_server_config.init(
pool->tenant_id_, unit->unit_id_, compat_mode, *unit_config,
unit->replica_type_, false/*if not grant*/, true/*delete*/))) {
if (OB_FAIL(tenant_unit_server_config.init_for_dropping(pool->tenant_id_, true/*delete*/))) {
LOG_WARN("fail to init tenant server unit config", K(ret), "tenant_id", pool->tenant_id_);
} else if (OB_FAIL(notify_proxy.call(
migrate_from_server, rpc_timeout, tenant_unit_server_config))) {
@ -10208,7 +10370,7 @@ int ObUnitManager::get_loads_by_server(const ObAddr &addr,
return ret;
}
int ObUnitManager::get_pools_by_tenant(const uint64_t tenant_id,
int ObUnitManager::get_pools_by_tenant_(const uint64_t tenant_id,
ObArray<share::ObResourcePool *> *&pools) const
{
int ret = OB_SUCCESS;
@ -10545,7 +10707,7 @@ int ObUnitManager::inner_get_tenant_zone_full_unit_num(
} else if (OB_UNLIKELY(!is_valid_tenant_id(tenant_id))) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("invalid argument", K(ret), K(tenant_id));
} else if (OB_FAIL(get_pools_by_tenant(tenant_id, pools))) {
} else if (OB_FAIL(get_pools_by_tenant_(tenant_id, pools))) {
LOG_WARN("fail to get pools by tenant", K(ret), K(tenant_id));
} else if (OB_UNLIKELY(NULL == pools)) {
ret = OB_ERR_UNEXPECTED;
@ -10589,7 +10751,7 @@ int ObUnitManager::get_tenant_zone_unit_loads(
|| !ObReplicaTypeCheck::is_replica_type_valid(replica_type))) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("invalid argument", K(ret), K(tenant_id), K(zone), K(replica_type));
} else if (OB_FAIL(get_pools_by_tenant(tenant_id, pools))) {
} else if (OB_FAIL(get_pools_by_tenant_(tenant_id, pools))) {
if (OB_ENTRY_NOT_EXIST == ret) {
ret = OB_SUCCESS;
} else {
@ -10658,7 +10820,7 @@ int ObUnitManager::get_tenant_zone_all_unit_loads(
} else if (OB_UNLIKELY(!is_valid_tenant_id(tenant_id) || zone.is_empty())) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("invalid argument", K(ret), K(tenant_id), K(zone));
} else if (OB_FAIL(get_pools_by_tenant(tenant_id, pools))) {
} else if (OB_FAIL(get_pools_by_tenant_(tenant_id, pools))) {
if (OB_ENTRY_NOT_EXIST == ret) {
ret = OB_SUCCESS;
} else {
@ -10787,7 +10949,7 @@ int ObUnitManager::get_unit_infos(const common::ObIArray<share::ObResourcePoolNa
} else if (OB_ISNULL(pool)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("get invalid pool", K(ret), K(pools));
} else if (OB_FAIL(inner_get_unit_infos_of_pool(pool->resource_pool_id_, pool_units))) {
} else if (OB_FAIL(inner_get_unit_infos_of_pool_(pool->resource_pool_id_, pool_units))) {
LOG_WARN("fail to get unit infos", K(ret), K(*pool));
} else {
for (int64_t j = 0; j < pool_units.count() && OB_SUCC(ret); j++) {
@ -10833,7 +10995,7 @@ int ObUnitManager::inner_get_active_unit_infos_of_tenant(const ObTenantSchema &t
LOG_WARN("check_inner_stat failed", K(inited_), K(loaded_), K(ret));
} else if (OB_FAIL(tenant_schema.get_zone_list(tenant_zone_list))) {
LOG_WARN("fail to get zone list", K(ret));
} else if (OB_FAIL(get_pools_by_tenant(tenant_id, pools))) {
} else if (OB_FAIL(get_pools_by_tenant_(tenant_id, pools))) {
if (OB_ENTRY_NOT_EXIST != ret) {
LOG_WARN("get_pools_by_tenant failed", K(tenant_id), K(ret));
} else {
@ -10859,7 +11021,7 @@ int ObUnitManager::inner_get_active_unit_infos_of_tenant(const ObTenantSchema &t
for (int64_t i = 0; i < pool_ids.count() && OB_SUCC(ret); i++) {
uint64_t pool_id = pool_ids.at(i);
unit_in_pool.reset();
if (OB_FAIL(inner_get_unit_infos_of_pool(pool_id, unit_in_pool))) {
if (OB_FAIL(inner_get_unit_infos_of_pool_(pool_id, unit_in_pool))) {
LOG_WARN("fail to inner get unit infos", K(ret), K(pool_id));
} else {
for (int64_t j = 0; j < unit_in_pool.count() && OB_SUCC(ret); j++) {