Reduce the scope of the lock

This commit is contained in:
obdev 2025-01-02 07:15:36 +00:00 committed by ob-robot
parent 59d93cc7a2
commit fa1909751c

View File

@ -261,13 +261,15 @@ int ObSharedStorageNetThrotManager::collect_predict_resource(const int64_t &expi
*GCTX.srv_rpc_proxy_, &obrpc::ObSrvRpcProxy::shared_storage_net_throt_predict);
const int64_t timeout_ts = GCONF.rpc_timeout;
const int64_t current_time = ObTimeUtility::current_time();
ObSpinLockGuard guard(lock_);
// RPC2.1: RS asks observer for the required quotas of all storages
obrpc::ObEndpointRegMap::iterator it = endpoint_infos_map_.begin();
for (; OB_SUCC(ret) && it != endpoint_infos_map_.end(); ++it) {
ObSSNTEndpointArg arg(it->first, it->second.storage_keys_, it->second.expire_time_);
if (OB_FAIL(proxy_batch.call(arg.addr_, timeout_ts, arg))) {
LOG_WARN("SSNT:failed to async call rpc", K(ret));
{
ObSpinLockGuard guard(lock_);
obrpc::ObEndpointRegMap::iterator it = endpoint_infos_map_.begin();
for (; OB_SUCC(ret) && it != endpoint_infos_map_.end(); ++it) {
ObSSNTEndpointArg arg(it->first, it->second.storage_keys_, it->second.expire_time_);
if (OB_FAIL(proxy_batch.call(arg.addr_, timeout_ts, arg))) {
LOG_WARN("SSNT:failed to async call rpc", K(ret));
}
}
}
ObArray<int> return_code_array;
@ -288,6 +290,7 @@ int ObSharedStorageNetThrotManager::collect_predict_resource(const int64_t &expi
LOG_WARN(
"cnt not match", KR(ret), "return_cnt", return_code_array.count(), "server_cnt", endpoint_infos_map_.size());
} else {
ObSpinLockGuard guard(lock_);
for (int64_t i = 0; OB_SUCC(ret) && i < return_code_array.size(); ++i) {
const ObAddr &addr = proxy_batch.get_dests().at(i);
if (OB_SUCCESS != return_code_array.at(i)) {
@ -622,6 +625,7 @@ int ObSharedStorageNetThrotManager::commit_quota_plan()
*GCTX.srv_rpc_proxy_, &obrpc::ObSrvRpcProxy::shared_storage_net_throt_set);
const int64_t timeout_ts = GCONF.rpc_timeout;
ObArray<int> return_code_array;
ObSEArray<std::pair<ObAddr, obrpc::ObSharedDeviceResourceArray>, 7> send_rpc_arr;
ObSpinLockGuard guard(lock_);
{
obrpc::ObEndpointRegMap::iterator iter = endpoint_infos_map_.begin();
@ -652,33 +656,38 @@ int ObSharedStorageNetThrotManager::commit_quota_plan()
}
}
}
if (OB_TMP_FAIL(proxy_batch.call(addr, timeout_ts, arg))) {
LOG_WARN("SSNT:fail to call async batch rpc", KR(ret), K(tmp_ret), K(addr), K(arg));
if (OB_TMP_FAIL(send_rpc_arr.push_back(std::pair<ObAddr, obrpc::ObSharedDeviceResourceArray>(addr, arg)))) {
LOG_WARN("SSNT:push back rpc info failed", K(ret), K(tmp_ret), K(addr), K(arg));
}
}
}
for (int64_t i = 0; i < send_rpc_arr.count(); ++i) {
int tmp_ret = OB_SUCCESS;
if (OB_TMP_FAIL(proxy_batch.wait_all(return_code_array))) {
LOG_WARN("SSNT:fail to wait all async batch rpc", KR(ret));
ret = OB_SUCC(ret) ? tmp_ret : ret;
} else if (OB_FAIL(ret)) {
} else if (return_code_array.count() != endpoint_infos_map_.size()) {
ret = OB_ERR_UNDEFINED;
LOG_WARN("SSNT:cnt not match",
K(ret),
"return_count",
return_code_array.count(),
"kv_count",
endpoint_infos_map_.size());
} else if (OB_FAIL(proxy_batch.check_return_cnt(return_code_array.count()))) {
LOG_WARN(
"cnt not match", KR(ret), "return_cnt", return_code_array.count(), "server_cnt", endpoint_infos_map_.size());
} else {
for (int64_t i = 0; i < return_code_array.count(); ++i) {
if (OB_SUCCESS != return_code_array.at(i)) {
const ObAddr &addr = proxy_batch.get_dests().at(i);
LOG_WARN("rpc execute failed", KR(ret), K(addr));
}
if (OB_TMP_FAIL(proxy_batch.call(send_rpc_arr.at(i).first, timeout_ts, send_rpc_arr.at(i).second))) {
LOG_WARN("SSNT:fail to call async batch rpc", KR(ret), K(tmp_ret), "addr", send_rpc_arr.at(i).first, "arg", send_rpc_arr.at(i).second);
}
}
int tmp_ret = OB_SUCCESS;
if (OB_TMP_FAIL(proxy_batch.wait_all(return_code_array))) {
LOG_WARN("SSNT:fail to wait all async batch rpc", KR(ret));
ret = OB_SUCC(ret) ? tmp_ret : ret;
} else if (OB_FAIL(ret)) {
} else if (return_code_array.count() != endpoint_infos_map_.size()) {
ret = OB_ERR_UNDEFINED;
LOG_WARN("SSNT:cnt not match",
K(ret),
"return_count",
return_code_array.count(),
"kv_count",
endpoint_infos_map_.size());
} else if (OB_FAIL(proxy_batch.check_return_cnt(return_code_array.count()))) {
LOG_WARN(
"cnt not match", KR(ret), "return_cnt", return_code_array.count(), "server_cnt", endpoint_infos_map_.size());
} else {
for (int64_t i = 0; i < return_code_array.count(); ++i) {
if (OB_SUCCESS != return_code_array.at(i)) {
const ObAddr &addr = proxy_batch.get_dests().at(i);
LOG_WARN("rpc execute failed", KR(ret), K(addr));
}
}
}