[CP] Standardize usage of ObAsyncRpcProxy
This commit is contained in:
@ -4768,15 +4768,18 @@ int ObUnitManager::rollback_persistent_units(
|
||||
if (OB_TMP_FAIL(notify_proxy.wait_all(return_ret_array))) {
|
||||
LOG_WARN("fail to wait notify resource", KR(ret), K(tmp_ret));
|
||||
ret = OB_SUCC(ret) ? tmp_ret : ret;
|
||||
}
|
||||
if (OB_SUCC(ret)) {
|
||||
} else if (OB_FAIL(ret)) {
|
||||
} else {
|
||||
// don't use arg/dest/result here because call() may has failure.
|
||||
ObAddr invalid_addr;
|
||||
for (int64_t i = 0; OB_SUCC(ret) && i < return_ret_array.count(); i++) {
|
||||
const int ret_i = return_ret_array.at(i);
|
||||
const ObAddr &addr = return_ret_array.count() != notify_proxy.get_dests().count() ?
|
||||
invalid_addr : notify_proxy.get_dests().at(i);
|
||||
// if (OB_SUCCESS != ret_i && OB_TENANT_NOT_IN_SERVER != ret_i) {
|
||||
if (OB_SUCCESS != ret_i && OB_TENANT_NOT_IN_SERVER != ret_i) {
|
||||
ret = ret_i;
|
||||
LOG_WARN("fail to mark tenant removed", KR(ret), KR(ret_i),
|
||||
K(notify_proxy.get_dests().at(i)));
|
||||
LOG_WARN("fail to mark tenant removed", KR(ret), KR(ret_i), K(addr));
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -4965,6 +4968,8 @@ int ObUnitManager::allocate_pool_units_(
|
||||
if (OB_TMP_FAIL(notify_proxy.wait())) {
|
||||
LOG_WARN("fail to wait notify resource", K(ret), K(tmp_ret));
|
||||
ret = (OB_SUCCESS == ret) ? tmp_ret : ret;
|
||||
} else if (OB_SUCC(ret)) {
|
||||
// arg/dest/result can be used here.
|
||||
}
|
||||
if (is_valid_tenant_id(pool.tenant_id_)) {
|
||||
ret = ERRSIM_UNIT_PERSISTENCE_ERROR ? : ret;
|
||||
@ -5585,18 +5590,18 @@ int ObUnitManager::get_servers_resource_info_via_rpc(
|
||||
if (OB_TMP_FAIL(proxy.wait())) {
|
||||
LOG_WARN("fail to wait all batch result", KR(ret), KR(tmp_ret));
|
||||
ret = OB_SUCC(ret) ? tmp_ret : ret;
|
||||
}
|
||||
tmp_report_servers_resource_info.reset();
|
||||
ARRAY_FOREACH_X(proxy.get_results(), idx, cnt, OB_SUCC(ret)) {
|
||||
const obrpc::ObGetServerResourceInfoResult *rpc_result = proxy.get_results().at(idx);
|
||||
if (OB_ISNULL(rpc_result)) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_WARN("rpc_result is null", KR(ret), KP(rpc_result));
|
||||
} else if (OB_UNLIKELY(!rpc_result->is_valid())) {
|
||||
ret = OB_INVALID_ARGUMENT;
|
||||
LOG_WARN("rpc_result is invalid", KR(ret), KPC(rpc_result));
|
||||
} else if (OB_FAIL(tmp_report_servers_resource_info.push_back(*rpc_result))) {
|
||||
LOG_WARN("fail to push an element into tmp_report_servers_resource_info", KR(ret), KPC(rpc_result));
|
||||
} else if (OB_SUCC(ret)) {
|
||||
ARRAY_FOREACH_X(proxy.get_results(), idx, cnt, OB_SUCC(ret)) {
|
||||
const obrpc::ObGetServerResourceInfoResult *rpc_result = proxy.get_results().at(idx);
|
||||
if (OB_ISNULL(rpc_result)) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_WARN("rpc_result is null", KR(ret), KP(rpc_result));
|
||||
} else if (OB_UNLIKELY(!rpc_result->is_valid())) {
|
||||
ret = OB_INVALID_ARGUMENT;
|
||||
LOG_WARN("rpc_result is invalid", KR(ret), KPC(rpc_result));
|
||||
} else if (OB_FAIL(tmp_report_servers_resource_info.push_back(*rpc_result))) {
|
||||
LOG_WARN("fail to push an element into tmp_report_servers_resource_info", KR(ret), KPC(rpc_result));
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -7859,9 +7864,11 @@ int ObUnitManager::do_grant_pools_(
|
||||
}
|
||||
}
|
||||
int tmp_ret = OB_SUCCESS;
|
||||
if (OB_SUCCESS != (tmp_ret = notify_proxy.wait())) {
|
||||
if (OB_TMP_FAIL(notify_proxy.wait())) {
|
||||
LOG_WARN("fail to wait notify resource", KR(ret), K(tmp_ret));
|
||||
ret = (OB_SUCCESS == ret) ? tmp_ret : ret;
|
||||
} else if (OB_SUCC(ret)) {
|
||||
// arg/dest/result can be used here.
|
||||
}
|
||||
ret = ERRSIM_UNIT_PERSISTENCE_ERROR ? : ret;
|
||||
if (OB_FAIL(ret) && pools.count() == all_pool_units.count()) {
|
||||
@ -7960,9 +7967,11 @@ int ObUnitManager::do_revoke_pools_(
|
||||
LOG_WARN("failed to commit shrinking pools in revoking", KR(ret), K(tenant_id));
|
||||
}
|
||||
int tmp_ret = OB_SUCCESS;
|
||||
if (OB_SUCCESS != (tmp_ret = notify_proxy.wait())) {
|
||||
if (OB_TMP_FAIL(notify_proxy.wait())) {
|
||||
LOG_WARN("fail to wait notify resource", KR(ret), K(tmp_ret));
|
||||
ret = (OB_SUCCESS == ret) ? tmp_ret : ret;
|
||||
} else if (OB_SUCC(ret)) {
|
||||
// arg/dest/result can be used here.
|
||||
}
|
||||
}
|
||||
return ret;
|
||||
@ -8532,9 +8541,11 @@ int ObUnitManager::do_migrate_unit_notify_resource_(const share::ObResourcePool
|
||||
LOG_WARN("fail to try notify server unit resource", K(ret));
|
||||
}
|
||||
int tmp_ret = OB_SUCCESS;
|
||||
if (OB_SUCCESS != (tmp_ret = notify_proxy.wait())) {
|
||||
if (OB_TMP_FAIL(notify_proxy.wait())) {
|
||||
LOG_WARN("fail to wait notify resource", K(ret), K(tmp_ret));
|
||||
ret = (OB_SUCCESS == ret) ? tmp_ret : ret;
|
||||
} else if (OB_SUCC(ret)) {
|
||||
// arg/dest/result can be used here.
|
||||
}
|
||||
|
||||
// Rollback persistent unit if persistence failed
|
||||
@ -8739,11 +8750,12 @@ int ObUnitManager::inner_try_delete_migrate_unit_resource(
|
||||
K(ret), K(rpc_timeout), "unit", *unit, "dest", migrate_from_server);
|
||||
}
|
||||
int tmp_ret = OB_SUCCESS;
|
||||
if (OB_SUCCESS != (tmp_ret = notify_proxy.wait())) {
|
||||
if (OB_TMP_FAIL(notify_proxy.wait())) {
|
||||
LOG_WARN("fail to wait notify resource", K(ret), K(tmp_ret));
|
||||
ret = (OB_SUCCESS == ret) ? tmp_ret : ret;
|
||||
}
|
||||
if (OB_SUCC(ret)) {
|
||||
// arg/dest/result can be used here.
|
||||
LOG_INFO("notify resource to server succeed", "unit", *unit, "dest", migrate_from_server);
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user