fix divided_by_zero and errcode overlapped

This commit is contained in:
zhjc1124 2023-11-10 07:09:23 +00:00 committed by ob-robot
parent 7c0bee2ec2
commit 42cc13dfea
5 changed files with 55 additions and 49 deletions

View File

@ -59,6 +59,7 @@ int ObNetEndpointIngressManager::register_endpoint(const ObNetEndpointKey &endpo
} else if (OB_FAIL(ingress_plan_map_.get_refactored(endpoint_key, endpoint_value))) {
if (OB_HASH_NOT_EXIST == ret) {
// initialize
ret = OB_SUCCESS;
endpoint_value = (ObNetEndpointValue *)ob_malloc(sizeof(ObNetEndpointValue), "INGRESS_SERVICE");
if (OB_ISNULL(endpoint_value)) {
LOG_WARN("failed to alloc memory for objs");
@ -66,7 +67,7 @@ int ObNetEndpointIngressManager::register_endpoint(const ObNetEndpointKey &endpo
} else {
endpoint_value->expire_time_ = expire_time;
}
if (OB_FAIL(ingress_plan_map_.set_refactored(endpoint_key, endpoint_value))) {
if (OB_SUCC(ret) && OB_FAIL(ingress_plan_map_.set_refactored(endpoint_key, endpoint_value))) {
ob_free(endpoint_value);
LOG_WARN("endpoint register failed", K(ret), K(endpoint_key), K(expire_time));
} else {
@ -91,18 +92,19 @@ int ObNetEndpointIngressManager::collect_predict_bw(ObNetEndpointKVArray &update
const int64_t current_time = ObTimeUtility::current_time();
{
ObSpinLockGuard guard(lock_);
int tmp_ret = OB_SUCCESS;
for (ObIngressPlanMap::iterator iter = ingress_plan_map_.begin(); iter != ingress_plan_map_.end(); ++iter) {
const ObNetEndpointKey &endpoint_key = iter->first;
ObNetEndpointValue *endpoint_value = iter->second;
if (endpoint_value->expire_time_ < current_time) {
LOG_INFO("endpoint expired", K(endpoint_key), K(endpoint_value->expire_time_), K(current_time));
if (OB_FAIL(delete_keys.push_back(endpoint_key))) {
if (OB_TMP_FAIL(delete_keys.push_back(endpoint_key))) {
LOG_WARN("fail to push back arrays", K(ret), K(endpoint_key));
} else {
ob_free(endpoint_value);
}
} else {
if (OB_FAIL(update_kvs.push_back(ObNetEndpointKeyValue(endpoint_key, endpoint_value)))) {
if (OB_TMP_FAIL(update_kvs.push_back(ObNetEndpointKeyValue(endpoint_key, endpoint_value)))) {
LOG_WARN("fail to push back arrays", K(ret), K(endpoint_key));
} else {
endpoint_value->predicted_bw_ = -1;
@ -190,40 +192,42 @@ int ObNetEndpointIngressManager::update_ingress_plan(ObNetEndpointKVArray &updat
predicted_bws[valid_count++] = endpoint_value->predicted_bw_;
}
}
int64_t baseline_bw = 0;
int64_t extra_bw = 0;
if (remain_bw_limit <= 0) {
remain_bw_limit = 0;
}
std::sort(predicted_bws, predicted_bws + valid_count);
int64_t average = (int64_t)remain_bw_limit / valid_count;
for (int i = 0; i < valid_count; i++) {
average = (int64_t)remain_bw_limit / (valid_count - i);
if (average <= predicted_bws[i]) {
if (0 != valid_count) {
int64_t baseline_bw = 0;
int64_t extra_bw = 0;
if (remain_bw_limit <= 0) {
remain_bw_limit = 0;
break;
} else {
remain_bw_limit -= predicted_bws[i];
}
}
baseline_bw = average;
extra_bw = (int64_t)remain_bw_limit / valid_count;
for (int64_t i = 0; i < update_kvs.count(); i++) {
ObNetEndpointValue *endpoint_value = update_kvs[i].value_;
if (OB_UNLIKELY(endpoint_value->predicted_bw_ == -1)) {
// do nothing, remain old assigned_bw
} else {
int64_t predicted_bw = endpoint_value->predicted_bw_;
int64_t assigned_bw = -1;
if (predicted_bw > baseline_bw) {
assigned_bw = baseline_bw;
std::sort(predicted_bws, predicted_bws + valid_count);
int64_t average = remain_bw_limit / valid_count;
bool is_done = false;
for (int i = 0; i < valid_count && !is_done; i++) {
average = remain_bw_limit / (valid_count - i);
if (average <= predicted_bws[i]) {
remain_bw_limit = 0;
is_done = true;
} else {
assigned_bw = predicted_bw;
remain_bw_limit -= predicted_bws[i];
}
}
baseline_bw = average;
extra_bw = (int64_t)remain_bw_limit / valid_count;
for (int64_t i = 0; i < update_kvs.count(); i++) {
ObNetEndpointValue *endpoint_value = update_kvs[i].value_;
if (OB_UNLIKELY(endpoint_value->predicted_bw_ == -1)) {
// do nothing, remain old assigned_bw
} else {
int64_t predicted_bw = endpoint_value->predicted_bw_;
int64_t assigned_bw = -1;
if (predicted_bw > baseline_bw) {
assigned_bw = baseline_bw;
} else {
assigned_bw = predicted_bw;
}
assigned_bw += extra_bw;
endpoint_value->assigned_bw_ = assigned_bw;
}
assigned_bw += extra_bw;
endpoint_value->assigned_bw_ = assigned_bw;
}
}
}
@ -454,25 +458,25 @@ void ObIngressBWAllocService::runTimerTask()
void ObIngressBWAllocService::switch_to_follower_forcedly()
{
ATOMIC_SET(&is_leader_, false);
ATOMIC_STORE(&is_leader_, false);
}
int ObIngressBWAllocService::switch_to_leader()
{
int ret = OB_SUCCESS;
ATOMIC_SET(&is_leader_, true);
ATOMIC_STORE(&is_leader_, true);
return ret;
}
int ObIngressBWAllocService::switch_to_follower_gracefully()
{
int ret = OB_SUCCESS;
ATOMIC_SET(&is_leader_, false);
ATOMIC_STORE(&is_leader_, false);
return ret;
}
int ObIngressBWAllocService::resume_leader()
{
int ret = OB_SUCCESS;
if (!is_leader()) {
ATOMIC_SET(&is_leader_, true);
ATOMIC_STORE(&is_leader_, true);
}
return ret;
}

View File

@ -20,7 +20,6 @@
namespace oceanbase
{
using namespace obrpc;
namespace rootserver
{
class ObNetEndpointIngressManager
@ -32,15 +31,15 @@ public:
int init();
void reset();
void destroy();
int register_endpoint(const ObNetEndpointKey &endpoint_key, const int64_t expire_time);
int collect_predict_bw(ObNetEndpointKVArray &update_kvs);
int update_ingress_plan(ObNetEndpointKVArray &update_kvs);
int commit_bw_limit_plan(ObNetEndpointKVArray &update_kvs);
int register_endpoint(const obrpc::ObNetEndpointKey &endpoint_key, const int64_t expire_time);
int collect_predict_bw(obrpc::ObNetEndpointKVArray &update_kvs);
int update_ingress_plan(obrpc::ObNetEndpointKVArray &update_kvs);
int commit_bw_limit_plan(obrpc::ObNetEndpointKVArray &update_kvs);
int set_total_bw_limit(int64_t total_bw_limit);
int64_t get_map_size();
private:
typedef common::hash::ObHashMap<ObNetEndpointKey, ObNetEndpointValue *> ObIngressPlanMap;
typedef common::hash::ObHashMap<obrpc::ObNetEndpointKey, obrpc::ObNetEndpointValue *> ObIngressPlanMap;
ObIngressPlanMap ingress_plan_map_;
int64_t total_bw_limit_;
ObSpinLock lock_;

View File

@ -27,7 +27,7 @@ int ObNetEndpointRegisterP::process()
int ret = OB_SUCCESS;
if (OB_ISNULL(gctx_.net_frame_)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("ob_service is null", KR(ret));
LOG_WARN("net_frame in GCTX is null", KR(ret));
} else if (OB_FAIL(gctx_.net_frame_->net_endpoint_register(arg_.endpoint_key_, arg_.expire_time_))) {
LOG_WARN("failed to net_endpoint_register", KR(ret), K(arg_));
}

View File

@ -26,7 +26,6 @@
namespace oceanbase
{
using namespace common;
namespace obrpc
{
@ -35,9 +34,9 @@ class ObNetEndpointKey
OB_UNIS_VERSION(1);
public:
ObNetEndpointKey() : addr_(), group_id_(0)
ObNetEndpointKey() : addr_(), group_id_(OB_INVALID_ID)
{}
ObNetEndpointKey(const ObAddr &addr) : addr_(addr), group_id_(0)
ObNetEndpointKey(const ObAddr &addr) : addr_(addr), group_id_(OB_INVALID_ID)
{}
ObNetEndpointKey(const ObNetEndpointKey &other)
{
@ -59,7 +58,7 @@ public:
void reset()
{
addr_.reset();
group_id_ = 0;
group_id_ = OB_INVALID_ID;
}
uint64_t hash() const
{
@ -99,7 +98,7 @@ public:
TO_STRING_KV(K_(addr), K_(group_id));
ObAddr addr_;
int32_t group_id_;
int64_t group_id_;
};
class ObNetEndpointValue

View File

@ -48,6 +48,7 @@ TEST_F(TestEndpointIngressService, ingress_service)
ObAddr addr1(1, 1);
ObNetEndpointKey key1;
key1.addr_ = addr1;
key1.group_id_ = 1;
int64_t time = ObTimeUtility::current_time();
ret = ingress_manager_.register_endpoint(key1, time);
@ -72,6 +73,7 @@ TEST_F(TestEndpointIngressService, ingress_service)
ObNetEndpointKey key2;
ObNetEndpointValue *value2 = nullptr;
key2.addr_ = addr2;
key2.group_id_ = 2;
ret = ingress_manager_.register_endpoint(key2, time);
ASSERT_EQ(ret, OB_SUCCESS);
ret = ingress_manager_.ingress_plan_map_.get_refactored(key2, value2);
@ -83,6 +85,7 @@ TEST_F(TestEndpointIngressService, ingress_service)
ObNetEndpointKey key3;
ObNetEndpointValue *value3 = nullptr;
key3.addr_ = addr3;
key3.group_id_ = 3;
ret = ingress_manager_.register_endpoint(key3, time);
ASSERT_EQ(ret, OB_SUCCESS);
ret = ingress_manager_.ingress_plan_map_.get_refactored(key3, value3);
@ -128,6 +131,7 @@ TEST_F(TestEndpointIngressService, ingress_service)
ObNetEndpointKey key4;
ObNetEndpointValue *value4 = nullptr;
key4.addr_ = addr4;
key4.group_id_ = 4;
ret = ingress_manager_.register_endpoint(key4, time);
ASSERT_EQ(ret, OB_SUCCESS);
ret = ingress_manager_.ingress_plan_map_.get_refactored(key4, value4);