From 42cc13dfea5d48ed4fd47fcd8fb401c4e210de5f Mon Sep 17 00:00:00 2001 From: zhjc1124 Date: Fri, 10 Nov 2023 07:09:23 +0000 Subject: [PATCH] fix divided_by_zero and errcode overlapped --- .../net/ob_ingress_bw_alloc_service.cpp | 78 ++++++++++--------- .../net/ob_ingress_bw_alloc_service.h | 11 ++- ...ob_net_endpoint_ingress_rpc_proccessor.cpp | 2 +- .../net/ob_net_endpoint_ingress_rpc_struct.h | 9 +-- .../net/test_ingress_bw_alloc_manager.cpp | 4 + 5 files changed, 55 insertions(+), 49 deletions(-) diff --git a/src/observer/net/ob_ingress_bw_alloc_service.cpp b/src/observer/net/ob_ingress_bw_alloc_service.cpp index 430b9a8ae..852ed5c9f 100644 --- a/src/observer/net/ob_ingress_bw_alloc_service.cpp +++ b/src/observer/net/ob_ingress_bw_alloc_service.cpp @@ -59,6 +59,7 @@ int ObNetEndpointIngressManager::register_endpoint(const ObNetEndpointKey &endpo } else if (OB_FAIL(ingress_plan_map_.get_refactored(endpoint_key, endpoint_value))) { if (OB_HASH_NOT_EXIST == ret) { // initialize + ret = OB_SUCCESS; endpoint_value = (ObNetEndpointValue *)ob_malloc(sizeof(ObNetEndpointValue), "INGRESS_SERVICE"); if (OB_ISNULL(endpoint_value)) { LOG_WARN("failed to alloc memory for objs"); @@ -66,7 +67,7 @@ int ObNetEndpointIngressManager::register_endpoint(const ObNetEndpointKey &endpo } else { endpoint_value->expire_time_ = expire_time; } - if (OB_FAIL(ingress_plan_map_.set_refactored(endpoint_key, endpoint_value))) { + if (OB_SUCC(ret) && OB_FAIL(ingress_plan_map_.set_refactored(endpoint_key, endpoint_value))) { ob_free(endpoint_value); LOG_WARN("endpoint register failed", K(ret), K(endpoint_key), K(expire_time)); } else { @@ -91,18 +92,19 @@ int ObNetEndpointIngressManager::collect_predict_bw(ObNetEndpointKVArray &update const int64_t current_time = ObTimeUtility::current_time(); { ObSpinLockGuard guard(lock_); + int tmp_ret = OB_SUCCESS; for (ObIngressPlanMap::iterator iter = ingress_plan_map_.begin(); iter != ingress_plan_map_.end(); ++iter) { const ObNetEndpointKey &endpoint_key = iter->first; ObNetEndpointValue *endpoint_value = iter->second; if (endpoint_value->expire_time_ < current_time) { LOG_INFO("endpoint expired", K(endpoint_key), K(endpoint_value->expire_time_), K(current_time)); - if (OB_FAIL(delete_keys.push_back(endpoint_key))) { + if (OB_TMP_FAIL(delete_keys.push_back(endpoint_key))) { LOG_WARN("fail to push back arrays", K(ret), K(endpoint_key)); } else { ob_free(endpoint_value); } } else { - if (OB_FAIL(update_kvs.push_back(ObNetEndpointKeyValue(endpoint_key, endpoint_value)))) { + if (OB_TMP_FAIL(update_kvs.push_back(ObNetEndpointKeyValue(endpoint_key, endpoint_value)))) { LOG_WARN("fail to push back arrays", K(ret), K(endpoint_key)); } else { endpoint_value->predicted_bw_ = -1; @@ -190,40 +192,42 @@ int ObNetEndpointIngressManager::update_ingress_plan(ObNetEndpointKVArray &updat predicted_bws[valid_count++] = endpoint_value->predicted_bw_; } } - - int64_t baseline_bw = 0; - int64_t extra_bw = 0; - if (remain_bw_limit <= 0) { - remain_bw_limit = 0; - } - std::sort(predicted_bws, predicted_bws + valid_count); - int64_t average = (int64_t)remain_bw_limit / valid_count; - for (int i = 0; i < valid_count; i++) { - average = (int64_t)remain_bw_limit / (valid_count - i); - if (average <= predicted_bws[i]) { + if (0 != valid_count) { + int64_t baseline_bw = 0; + int64_t extra_bw = 0; + if (remain_bw_limit <= 0) { remain_bw_limit = 0; - break; - } else { - remain_bw_limit -= predicted_bws[i]; } - } - baseline_bw = average; - extra_bw = (int64_t)remain_bw_limit / valid_count; - - for (int64_t i = 0; i < update_kvs.count(); i++) { - ObNetEndpointValue *endpoint_value = update_kvs[i].value_; - if (OB_UNLIKELY(endpoint_value->predicted_bw_ == -1)) { - // do nothing, remain old assigned_bw - } else { - int64_t predicted_bw = endpoint_value->predicted_bw_; - int64_t assigned_bw = -1; - if (predicted_bw > baseline_bw) { - assigned_bw = baseline_bw; + std::sort(predicted_bws, predicted_bws + valid_count); + int64_t average = remain_bw_limit / valid_count; + bool is_done = false; + for (int i = 0; i < valid_count && !is_done; i++) { + average = remain_bw_limit / (valid_count - i); + if (average <= predicted_bws[i]) { + remain_bw_limit = 0; + is_done = true; } else { - assigned_bw = predicted_bw; + remain_bw_limit -= predicted_bws[i]; + } + } + baseline_bw = average; + extra_bw = (int64_t)remain_bw_limit / valid_count; + + for (int64_t i = 0; i < update_kvs.count(); i++) { + ObNetEndpointValue *endpoint_value = update_kvs[i].value_; + if (OB_UNLIKELY(endpoint_value->predicted_bw_ == -1)) { + // do nothing, remain old assigned_bw + } else { + int64_t predicted_bw = endpoint_value->predicted_bw_; + int64_t assigned_bw = -1; + if (predicted_bw > baseline_bw) { + assigned_bw = baseline_bw; + } else { + assigned_bw = predicted_bw; + } + assigned_bw += extra_bw; + endpoint_value->assigned_bw_ = assigned_bw; } - assigned_bw += extra_bw; - endpoint_value->assigned_bw_ = assigned_bw; } } } @@ -454,25 +458,25 @@ void ObIngressBWAllocService::runTimerTask() void ObIngressBWAllocService::switch_to_follower_forcedly() { - ATOMIC_SET(&is_leader_, false); + ATOMIC_STORE(&is_leader_, false); } int ObIngressBWAllocService::switch_to_leader() { int ret = OB_SUCCESS; - ATOMIC_SET(&is_leader_, true); + ATOMIC_STORE(&is_leader_, true); return ret; } int ObIngressBWAllocService::switch_to_follower_gracefully() { int ret = OB_SUCCESS; - ATOMIC_SET(&is_leader_, false); + ATOMIC_STORE(&is_leader_, false); return ret; } int ObIngressBWAllocService::resume_leader() { int ret = OB_SUCCESS; if (!is_leader()) { - ATOMIC_SET(&is_leader_, true); + ATOMIC_STORE(&is_leader_, true); } return ret; } diff --git a/src/observer/net/ob_ingress_bw_alloc_service.h b/src/observer/net/ob_ingress_bw_alloc_service.h index 90cfa44f7..6ee6d3ba4 100644 --- a/src/observer/net/ob_ingress_bw_alloc_service.h +++ b/src/observer/net/ob_ingress_bw_alloc_service.h @@ -20,7 +20,6 @@ namespace oceanbase { -using namespace obrpc; namespace rootserver { class ObNetEndpointIngressManager @@ -32,15 +31,15 @@ public: int init(); void reset(); void destroy(); - int register_endpoint(const ObNetEndpointKey &endpoint_key, const int64_t expire_time); - int collect_predict_bw(ObNetEndpointKVArray &update_kvs); - int update_ingress_plan(ObNetEndpointKVArray &update_kvs); - int commit_bw_limit_plan(ObNetEndpointKVArray &update_kvs); + int register_endpoint(const obrpc::ObNetEndpointKey &endpoint_key, const int64_t expire_time); + int collect_predict_bw(obrpc::ObNetEndpointKVArray &update_kvs); + int update_ingress_plan(obrpc::ObNetEndpointKVArray &update_kvs); + int commit_bw_limit_plan(obrpc::ObNetEndpointKVArray &update_kvs); int set_total_bw_limit(int64_t total_bw_limit); int64_t get_map_size(); private: - typedef common::hash::ObHashMap ObIngressPlanMap; + typedef common::hash::ObHashMap ObIngressPlanMap; ObIngressPlanMap ingress_plan_map_; int64_t total_bw_limit_; ObSpinLock lock_; diff --git a/src/observer/net/ob_net_endpoint_ingress_rpc_proccessor.cpp b/src/observer/net/ob_net_endpoint_ingress_rpc_proccessor.cpp index a3b7fc990..1fbabca6e 100644 --- a/src/observer/net/ob_net_endpoint_ingress_rpc_proccessor.cpp +++ b/src/observer/net/ob_net_endpoint_ingress_rpc_proccessor.cpp @@ -27,7 +27,7 @@ int ObNetEndpointRegisterP::process() int ret = OB_SUCCESS; if (OB_ISNULL(gctx_.net_frame_)) { ret = OB_ERR_UNEXPECTED; - LOG_WARN("ob_service is null", KR(ret)); + LOG_WARN("net_frame in GCTX is null", KR(ret)); } else if (OB_FAIL(gctx_.net_frame_->net_endpoint_register(arg_.endpoint_key_, arg_.expire_time_))) { LOG_WARN("failed to net_endpoint_register", KR(ret), K(arg_)); } diff --git a/src/observer/net/ob_net_endpoint_ingress_rpc_struct.h b/src/observer/net/ob_net_endpoint_ingress_rpc_struct.h index 5addf245b..247f59196 100644 --- a/src/observer/net/ob_net_endpoint_ingress_rpc_struct.h +++ b/src/observer/net/ob_net_endpoint_ingress_rpc_struct.h @@ -26,7 +26,6 @@ namespace oceanbase { -using namespace common; namespace obrpc { @@ -35,9 +34,9 @@ class ObNetEndpointKey OB_UNIS_VERSION(1); public: - ObNetEndpointKey() : addr_(), group_id_(0) + ObNetEndpointKey() : addr_(), group_id_(OB_INVALID_ID) {} - ObNetEndpointKey(const ObAddr &addr) : addr_(addr), group_id_(0) + ObNetEndpointKey(const ObAddr &addr) : addr_(addr), group_id_(OB_INVALID_ID) {} ObNetEndpointKey(const ObNetEndpointKey &other) { @@ -59,7 +58,7 @@ public: void reset() { addr_.reset(); - group_id_ = 0; + group_id_ = OB_INVALID_ID; } uint64_t hash() const { @@ -99,7 +98,7 @@ public: TO_STRING_KV(K_(addr), K_(group_id)); ObAddr addr_; - int32_t group_id_; + int64_t group_id_; }; class ObNetEndpointValue diff --git a/unittest/observer/net/test_ingress_bw_alloc_manager.cpp b/unittest/observer/net/test_ingress_bw_alloc_manager.cpp index 620cb4ca4..09010ee18 100644 --- a/unittest/observer/net/test_ingress_bw_alloc_manager.cpp +++ b/unittest/observer/net/test_ingress_bw_alloc_manager.cpp @@ -48,6 +48,7 @@ TEST_F(TestEndpointIngressService, ingress_service) ObAddr addr1(1, 1); ObNetEndpointKey key1; key1.addr_ = addr1; + key1.group_id_ = 1; int64_t time = ObTimeUtility::current_time(); ret = ingress_manager_.register_endpoint(key1, time); @@ -72,6 +73,7 @@ TEST_F(TestEndpointIngressService, ingress_service) ObNetEndpointKey key2; ObNetEndpointValue *value2 = nullptr; key2.addr_ = addr2; + key2.group_id_ = 2; ret = ingress_manager_.register_endpoint(key2, time); ASSERT_EQ(ret, OB_SUCCESS); ret = ingress_manager_.ingress_plan_map_.get_refactored(key2, value2); @@ -83,6 +85,7 @@ TEST_F(TestEndpointIngressService, ingress_service) ObNetEndpointKey key3; ObNetEndpointValue *value3 = nullptr; key3.addr_ = addr3; + key3.group_id_ = 3; ret = ingress_manager_.register_endpoint(key3, time); ASSERT_EQ(ret, OB_SUCCESS); ret = ingress_manager_.ingress_plan_map_.get_refactored(key3, value3); @@ -128,6 +131,7 @@ TEST_F(TestEndpointIngressService, ingress_service) ObNetEndpointKey key4; ObNetEndpointValue *value4 = nullptr; key4.addr_ = addr4; + key4.group_id_ = 4; ret = ingress_manager_.register_endpoint(key4, time); ASSERT_EQ(ret, OB_SUCCESS); ret = ingress_manager_.ingress_plan_map_.get_refactored(key4, value4);