fix tenent target monitor used negative bug

This commit is contained in:
sdc
2023-06-06 03:12:46 +00:00
committed by ob-robot
parent 81fe30e58a
commit e4279e8bee
5 changed files with 43 additions and 26 deletions

View File

@ -621,7 +621,7 @@ int ObPxTenantTargetMonitorP::process()
for (int i = 0; OB_SUCC(ret) && i < arg_.addr_target_array_.count(); i++) {
ObAddr &server = arg_.addr_target_array_.at(i).addr_;
int64_t peer_used_inc = arg_.addr_target_array_.at(i).target_;
if (OB_FAIL(OB_PX_TARGET_MGR.update_peer_target_used(tenant_id, server, peer_used_inc))) {
if (OB_FAIL(OB_PX_TARGET_MGR.update_peer_target_used(tenant_id, server, peer_used_inc, leader_version))) {
LOG_WARN("set thread count failed", K(ret), K(tenant_id), K(server), K(peer_used_inc));
}
}

View File

@ -297,13 +297,14 @@ int ObPxTargetMgr::get_version(uint64_t tenant_id, uint64_t &version)
return ret;
}
int ObPxTargetMgr::update_peer_target_used(uint64_t tenant_id, const ObAddr &server, int64_t peer_used)
int ObPxTargetMgr::update_peer_target_used(uint64_t tenant_id, const ObAddr &server,
int64_t peer_used, uint64_t version)
{
int ret = OB_SUCCESS;
// return OB_HASH_EXIST instead of replacing the element.
int flag = 0;
GET_TARGET_MONITOR(tenant_id, {
if (OB_FAIL(target_monitor->update_peer_target_used(server, peer_used))) {
if (OB_FAIL(target_monitor->update_peer_target_used(server, peer_used, version))) {
LOG_WARN("update peer target_used failed", K(ret), K(tenant_id), K(peer_used));
} else if (server_ != server && OB_FAIL(alive_server_set_.set_refactored(server, flag))) {
if (OB_HASH_EXIST == ret) {

View File

@ -118,7 +118,7 @@ public:
// for rpc
int is_leader(uint64_t tenant_id, bool &is_leader);
int get_version(uint64_t tenant_id, uint64_t &version);
int update_peer_target_used(uint64_t tenant_id, const ObAddr &server, int64_t peer_used);
int update_peer_target_used(uint64_t tenant_id, const ObAddr &server, int64_t peer_used, uint64_t version);
int gather_global_target_usage(uint64_t tenant_id, ObPxGlobalResGather &gather);
int reset_leader_statistics(uint64_t tenant_id);

View File

@ -277,7 +277,7 @@ int ObPxTenantTargetMonitor::query_statistics(ObAddr &leader)
for (int i = 0; OB_SUCC(ret) && i < result.addr_target_array_.count(); i++) {
ObAddr &server = result.addr_target_array_.at(i).addr_;
int64_t peer_used_full = result.addr_target_array_.at(i).target_;
if (OB_FAIL(update_peer_target_used(server, peer_used_full))) {
if (OB_FAIL(update_peer_target_used(server, peer_used_full, UINT64_MAX))) {
LOG_WARN("set thread count failed", K(ret), K(server), K(peer_used_full));
}
}
@ -308,7 +308,7 @@ uint64_t ObPxTenantTargetMonitor::get_version()
return version_;
}
int ObPxTenantTargetMonitor::update_peer_target_used(const ObAddr &server, int64_t peer_used)
int ObPxTenantTargetMonitor::update_peer_target_used(const ObAddr &server, int64_t peer_used, uint64_t version)
{
int ret = OB_SUCCESS;
ServerTargetUsage target_usage;
@ -319,30 +319,28 @@ int ObPxTenantTargetMonitor::update_peer_target_used(const ObAddr &server, int64
if (is_leader()) {
entry.second.update_peer_used(peer_used);
if (OB_UNLIKELY(entry.second.get_peer_used() < 0)) {
LOG_ERROR("peer used negative", K(tenant_id_), K(version_), K(server), K(peer_used));
LOG_ERROR("peer used negative", K(tenant_id_), K(version_), K(server), K(entry.second), K(peer_used));
}
} else {
entry.second.set_peer_used(peer_used);
}
};
if (OB_FAIL(global_target_usage_.get_refactored(server, target_usage))) {
SpinWLockGuard rlock_guard(spin_lock_);
if (OB_UNLIKELY(version != OB_INVALID_ID && version != version_)) {
// version mismatch, do nothing.
} else if (OB_FAIL(global_target_usage_.get_refactored(server, target_usage))) {
LOG_WARN("get refactored failed", K(ret), K(tenant_id_), K(server), K(version_));
if (ret != OB_HASH_NOT_EXIST) {
} else {
ObLockGuard<ObSpinLock> lock_guard(spin_lock_);
if (OB_FAIL(global_target_usage_.get_refactored(server, target_usage))) {
if (ret != OB_HASH_NOT_EXIST) {
LOG_WARN("get refactored failed", K(ret));
} else {
target_usage.set_peer_used(peer_used);
if (OB_FAIL(global_target_usage_.set_refactored(server, target_usage))) {
LOG_WARN("set refactored failed", K(ret));
}
}
} else if (OB_FAIL(global_target_usage_.atomic_refactored(server, update_peer_used))) {
if (OB_HASH_EXIST == ret
&& OB_FAIL(global_target_usage_.atomic_refactored(server, update_peer_used))) {
LOG_WARN("atomic refactored, update_peer_used failed", K(ret));
}
}
}
} else if (OB_FAIL(global_target_usage_.atomic_refactored(server, update_peer_used))) {
LOG_WARN("atomic refactored, update_peer_used failed", K(ret));
}
@ -359,7 +357,7 @@ int ObPxTenantTargetMonitor::get_global_target_usage(const hash::ObHashMap<ObAdd
int ObPxTenantTargetMonitor::reset_follower_statistics(uint64_t version)
{
int ret = OB_SUCCESS;
ObLockGuard<ObSpinLock> lock_guard(spin_lock_);
SpinWLockGuard wlock_guard(spin_lock_);
global_target_usage_.clear();
if (OB_FAIL(global_target_usage_.set_refactored(server_, ServerTargetUsage()))) {
LOG_WARN("set refactored failed", K(ret));
@ -373,7 +371,8 @@ int ObPxTenantTargetMonitor::reset_follower_statistics(uint64_t version)
int ObPxTenantTargetMonitor::reset_leader_statistics()
{
int ret = OB_SUCCESS;
ObLockGuard<ObSpinLock> lock_guard(spin_lock_);
// write lock before reset map and refresh version.
SpinWLockGuard wlock_guard(spin_lock_);
global_target_usage_.clear();
if (OB_FAIL(global_target_usage_.set_refactored(server_, ServerTargetUsage()))) {
LOG_WARN("set refactored failed", K(ret));
@ -393,7 +392,8 @@ int ObPxTenantTargetMonitor::apply_target(hash::ObHashMap<ObAddr, int64_t> &work
admit_version = UINT64_MAX;
bool need_wait = false;
if (OB_SUCC(ret)) {
ObLockGuard<ObSpinLock> lock_guard(spin_lock_); // Just for avoid multiple SQL applications at the same time
// read lock to avoid reset map.
SpinRLockGuard rlock_guard(spin_lock_); // Just for avoid multiple SQL applications at the same time
// for pmas
int64_t target = session_target;
uint64_t version = version_;
@ -412,10 +412,15 @@ int ObPxTenantTargetMonitor::apply_target(hash::ObHashMap<ObAddr, int64_t> &work
// but still can use local, so now, rebuild local
ret = OB_SUCCESS;
if (OB_FAIL(global_target_usage_.set_refactored(server, target_usage))) {
if (OB_HASH_EXIST == ret) {
// add empty target_usage for server. if hash exist, means others already set just ignore.
ret = OB_SUCCESS;
} else {
LOG_WARN("set refactored failed", K(ret));
}
}
}
}
if (OB_SUCC(ret)) {
// 计算当前 server 视角下,target_usage 对应 server 已经分配出去的资源数量:
// total_user = leader 反馈的 + 本地尚未同步给 leader 的
@ -476,7 +481,7 @@ int ObPxTenantTargetMonitor::apply_target(hash::ObHashMap<ObAddr, int64_t> &work
int ObPxTenantTargetMonitor::release_target(hash::ObHashMap<ObAddr, int64_t> &worker_map, uint64_t version)
{
int ret = OB_SUCCESS;
ObLockGuard<ObSpinLock> lock_guard(spin_lock_);
SpinRLockGuard rlock_guard(spin_lock_);
if (version == version_) {
for (hash::ObHashMap<ObAddr, int64_t>::iterator it = worker_map.begin();
OB_SUCC(ret) && it != worker_map.end(); it++) {

View File

@ -127,7 +127,7 @@ public:
int refresh_statistics(bool need_refresh_all);
bool is_leader();
uint64_t get_version();
int update_peer_target_used(const ObAddr &server, int64_t peer_used);
int update_peer_target_used(const ObAddr &server, int64_t peer_used, uint64_t version);
int get_global_target_usage(const hash::ObHashMap<ObAddr, ServerTargetUsage> *&global_target_usage);
// if role is follower and find that its version is different with leader's
// call this function to reset statistics, the param version is from the leader.
@ -169,7 +169,18 @@ private:
int64_t parallel_servers_target_; // equal in every server
uint64_t version_; // for refresh target_info
hash::ObHashMap<ObAddr, ServerTargetUsage> global_target_usage_; // include self
ObSpinLock spin_lock_;
// a lock to handle the concurrent access and modification of version_ and usage map.
// use write lock before reset map and refresh the version
// use read lock before other operations of version_ and usage map.
// That means, there may be multiple threads modify the map concurrently,
// including insert/update operations, without delete.
// The basic principle is that:
// 1. When we update the map and find that the key does not exist, we try to insert an entry first,
// if insert failed with OB_HASH_EXIST, that means someone else insert already, we should try update again.
// This update operation will always succeed because we have created read lock and this entry cannot be removed.
// 2. When we insert into the map and failed with OB_HASH_EXIST, skip insert if the usage is empty,
// otherwise do a update operation.
SpinRWLock spin_lock_;
int64_t parallel_session_count_;
ObPxTargetCond target_cond_;
bool print_debug_log_;