From fa1909751c19b4f61700f04cdd4fc510ec40bf57 Mon Sep 17 00:00:00 2001 From: obdev Date: Thu, 2 Jan 2025 07:15:36 +0000 Subject: [PATCH] Reduce the scope of the lock --- .../ob_shared_storage_net_throt_service.cpp | 69 +++++++++++-------- 1 file changed, 39 insertions(+), 30 deletions(-) diff --git a/src/observer/net/ob_shared_storage_net_throt_service.cpp b/src/observer/net/ob_shared_storage_net_throt_service.cpp index 163df578b..b67558198 100644 --- a/src/observer/net/ob_shared_storage_net_throt_service.cpp +++ b/src/observer/net/ob_shared_storage_net_throt_service.cpp @@ -261,13 +261,15 @@ int ObSharedStorageNetThrotManager::collect_predict_resource(const int64_t &expi *GCTX.srv_rpc_proxy_, &obrpc::ObSrvRpcProxy::shared_storage_net_throt_predict); const int64_t timeout_ts = GCONF.rpc_timeout; const int64_t current_time = ObTimeUtility::current_time(); - ObSpinLockGuard guard(lock_); // RPC2.1: RS asks observer for the required quotas of all storages - obrpc::ObEndpointRegMap::iterator it = endpoint_infos_map_.begin(); - for (; OB_SUCC(ret) && it != endpoint_infos_map_.end(); ++it) { - ObSSNTEndpointArg arg(it->first, it->second.storage_keys_, it->second.expire_time_); - if (OB_FAIL(proxy_batch.call(arg.addr_, timeout_ts, arg))) { - LOG_WARN("SSNT:failed to async call rpc", K(ret)); + { + ObSpinLockGuard guard(lock_); + obrpc::ObEndpointRegMap::iterator it = endpoint_infos_map_.begin(); + for (; OB_SUCC(ret) && it != endpoint_infos_map_.end(); ++it) { + ObSSNTEndpointArg arg(it->first, it->second.storage_keys_, it->second.expire_time_); + if (OB_FAIL(proxy_batch.call(arg.addr_, timeout_ts, arg))) { + LOG_WARN("SSNT:failed to async call rpc", K(ret)); + } } } ObArray return_code_array; @@ -288,6 +290,7 @@ int ObSharedStorageNetThrotManager::collect_predict_resource(const int64_t &expi LOG_WARN( "cnt not match", KR(ret), "return_cnt", return_code_array.count(), "server_cnt", endpoint_infos_map_.size()); } else { + ObSpinLockGuard guard(lock_); for (int64_t i = 0; OB_SUCC(ret) && i < return_code_array.size(); ++i) { const ObAddr &addr = proxy_batch.get_dests().at(i); if (OB_SUCCESS != return_code_array.at(i)) { @@ -622,6 +625,7 @@ int ObSharedStorageNetThrotManager::commit_quota_plan() *GCTX.srv_rpc_proxy_, &obrpc::ObSrvRpcProxy::shared_storage_net_throt_set); const int64_t timeout_ts = GCONF.rpc_timeout; ObArray return_code_array; + ObSEArray, 7> send_rpc_arr; ObSpinLockGuard guard(lock_); { obrpc::ObEndpointRegMap::iterator iter = endpoint_infos_map_.begin(); @@ -652,33 +656,38 @@ int ObSharedStorageNetThrotManager::commit_quota_plan() } } } - if (OB_TMP_FAIL(proxy_batch.call(addr, timeout_ts, arg))) { - LOG_WARN("SSNT:fail to call async batch rpc", KR(ret), K(tmp_ret), K(addr), K(arg)); + if (OB_TMP_FAIL(send_rpc_arr.push_back(std::pair(addr, arg)))) { + LOG_WARN("SSNT:push back rpc info failed", K(ret), K(tmp_ret), K(addr), K(arg)); } } - + } + for (int64_t i = 0; i < send_rpc_arr.count(); ++i) { int tmp_ret = OB_SUCCESS; - if (OB_TMP_FAIL(proxy_batch.wait_all(return_code_array))) { - LOG_WARN("SSNT:fail to wait all async batch rpc", KR(ret)); - ret = OB_SUCC(ret) ? tmp_ret : ret; - } else if (OB_FAIL(ret)) { - } else if (return_code_array.count() != endpoint_infos_map_.size()) { - ret = OB_ERR_UNDEFINED; - LOG_WARN("SSNT:cnt not match", - K(ret), - "return_count", - return_code_array.count(), - "kv_count", - endpoint_infos_map_.size()); - } else if (OB_FAIL(proxy_batch.check_return_cnt(return_code_array.count()))) { - LOG_WARN( - "cnt not match", KR(ret), "return_cnt", return_code_array.count(), "server_cnt", endpoint_infos_map_.size()); - } else { - for (int64_t i = 0; i < return_code_array.count(); ++i) { - if (OB_SUCCESS != return_code_array.at(i)) { - const ObAddr &addr = proxy_batch.get_dests().at(i); - LOG_WARN("rpc execute failed", KR(ret), K(addr)); - } + if (OB_TMP_FAIL(proxy_batch.call(send_rpc_arr.at(i).first, timeout_ts, send_rpc_arr.at(i).second))) { + LOG_WARN("SSNT:fail to call async batch rpc", KR(ret), K(tmp_ret), "addr", send_rpc_arr.at(i).first, "arg", send_rpc_arr.at(i).second); + } + } + int tmp_ret = OB_SUCCESS; + if (OB_TMP_FAIL(proxy_batch.wait_all(return_code_array))) { + LOG_WARN("SSNT:fail to wait all async batch rpc", KR(ret)); + ret = OB_SUCC(ret) ? tmp_ret : ret; + } else if (OB_FAIL(ret)) { + } else if (return_code_array.count() != endpoint_infos_map_.size()) { + ret = OB_ERR_UNDEFINED; + LOG_WARN("SSNT:cnt not match", + K(ret), + "return_count", + return_code_array.count(), + "kv_count", + endpoint_infos_map_.size()); + } else if (OB_FAIL(proxy_batch.check_return_cnt(return_code_array.count()))) { + LOG_WARN( + "cnt not match", KR(ret), "return_cnt", return_code_array.count(), "server_cnt", endpoint_infos_map_.size()); + } else { + for (int64_t i = 0; i < return_code_array.count(); ++i) { + if (OB_SUCCESS != return_code_array.at(i)) { + const ObAddr &addr = proxy_batch.get_dests().at(i); + LOG_WARN("rpc execute failed", KR(ret), K(addr)); } } }