fix concurrent problem of px target monitor

This commit is contained in:
obdev
2022-12-08 08:36:11 +00:00
committed by ob-robot
parent ea4e649833
commit b421987484
2 changed files with 17 additions and 5 deletions

View File

@ -315,6 +315,11 @@ int ObPxTenantTargetMonitor::update_peer_target_used(const ObAddr &server, int64
{ {
int ret = OB_SUCCESS; int ret = OB_SUCCESS;
ServerTargetUsage target_usage; ServerTargetUsage target_usage;
if (OB_FAIL(global_target_usage_.get_refactored(server, target_usage))) {
if (ret != OB_HASH_NOT_EXIST) {
LOG_WARN("get refactored failed", K(ret));
} else {
ObLockGuard<ObSpinLock> lock_guard(spin_lock_);
if (OB_FAIL(global_target_usage_.get_refactored(server, target_usage))) { if (OB_FAIL(global_target_usage_.get_refactored(server, target_usage))) {
if (ret != OB_HASH_NOT_EXIST) { if (ret != OB_HASH_NOT_EXIST) {
LOG_WARN("get refactored failed", K(ret)); LOG_WARN("get refactored failed", K(ret));
@ -324,6 +329,8 @@ int ObPxTenantTargetMonitor::update_peer_target_used(const ObAddr &server, int64
LOG_WARN("set refactored failed", K(ret)); LOG_WARN("set refactored failed", K(ret));
} }
} }
}
}
} else { } else {
auto update_peer_used = [=](hash::HashMapPair<ObAddr, ServerTargetUsage> &entry) -> void { auto update_peer_used = [=](hash::HashMapPair<ObAddr, ServerTargetUsage> &entry) -> void {
if (is_leader()) { if (is_leader()) {
@ -361,6 +368,7 @@ int ObPxTenantTargetMonitor::get_global_target_usage(const hash::ObHashMap<ObAdd
int ObPxTenantTargetMonitor::reset_statistics(uint64_t version) int ObPxTenantTargetMonitor::reset_statistics(uint64_t version)
{ {
int ret = OB_SUCCESS; int ret = OB_SUCCESS;
ObLockGuard<ObSpinLock> lock_guard(spin_lock_);
global_target_usage_.clear(); global_target_usage_.clear();
if (OB_FAIL(global_target_usage_.set_refactored(server_, ServerTargetUsage()))) { if (OB_FAIL(global_target_usage_.set_refactored(server_, ServerTargetUsage()))) {
LOG_WARN("set refactored failed", K(ret)); LOG_WARN("set refactored failed", K(ret));

View File

@ -593,7 +593,10 @@ int ObPxResourceAnalyzer::schedule_dfo(
const int64_t group = 1; const int64_t group = 1;
groups += group; groups += group;
ObHashSet<ObAddr> &addr_set = dfo.location_addr_; ObHashSet<ObAddr> &addr_set = dfo.location_addr_;
if (OB_FAIL(update_parallel_map(current_thread_map, addr_set, dfo.get_dop()))) { // we assume that should allocate same thread count for each sqc in the dfo.
// this may not true. but we can't decide the real count for each sqc. just let it be for now
const int64_t dop_per_addr = 0 == addr_set.size() ? dfo.get_dop() : (dfo.get_dop() + addr_set.size() - 1) / addr_set.size();
if (OB_FAIL(update_parallel_map(current_thread_map, addr_set, dop_per_addr))) {
LOG_WARN("increase current thread map failed", K(ret)); LOG_WARN("increase current thread map failed", K(ret));
} else if (OB_FAIL(update_parallel_map(current_group_map, addr_set, group))) { } else if (OB_FAIL(update_parallel_map(current_group_map, addr_set, group))) {
LOG_WARN("increase current group map failed", K(ret)); LOG_WARN("increase current group map failed", K(ret));
@ -616,7 +619,8 @@ int ObPxResourceAnalyzer::finish_dfo(
const int64_t group = 1; const int64_t group = 1;
groups -= group; groups -= group;
ObHashSet<ObAddr> &addr_set = dfo.location_addr_; ObHashSet<ObAddr> &addr_set = dfo.location_addr_;
if (OB_FAIL(update_parallel_map(current_thread_map, addr_set, -dfo.get_dop()))) { const int64_t dop_per_addr = 0 == addr_set.size() ? dfo.get_dop() : (dfo.get_dop() + addr_set.size() - 1) / addr_set.size();
if (OB_FAIL(update_parallel_map(current_thread_map, addr_set, -dop_per_addr))) {
LOG_WARN("decrease current thread map failed", K(ret)); LOG_WARN("decrease current thread map failed", K(ret));
} else if (OB_FAIL(update_parallel_map(current_group_map, addr_set, -group))) { } else if (OB_FAIL(update_parallel_map(current_group_map, addr_set, -group))) {
LOG_WARN("decrease current group map failed", K(ret)); LOG_WARN("decrease current group map failed", K(ret));