fix px target monitor core because of concurrent access of map

This commit is contained in:
obdev
2023-04-25 02:10:50 +00:00
committed by ob-robot
parent c4fb5cb985
commit c317071f7e
3 changed files with 23 additions and 11 deletions

View File

@ -634,16 +634,9 @@ int ObPxTenantTargetMonitorP::process()
LOG_INFO("reset statistics succeed", K(tenant_id), K(leader_version)); LOG_INFO("reset statistics succeed", K(tenant_id), K(leader_version));
} }
} else { } else {
const hash::ObHashMap<ObAddr, ServerTargetUsage> *global_target_usage = NULL; ObPxGlobalResGather gather(result_);
if (OB_FAIL(OB_PX_TARGET_MGR.get_global_target_usage(tenant_id, global_target_usage))) { if (OB_FAIL(OB_PX_TARGET_MGR.gather_global_target_usage(tenant_id, gather))) {
LOG_WARN("get global thread count failed", K(ret), K(tenant_id)); LOG_WARN("get global thread count failed", K(ret), K(tenant_id));
} else {
for (hash::ObHashMap<ObAddr, ServerTargetUsage>::const_iterator it = global_target_usage->begin();
OB_SUCC(ret) && it != global_target_usage->end(); ++it) {
if (OB_FAIL(result_.push_peer_target_usage(it->first, it->second.get_peer_used()))) {
COMMON_LOG(WARN, "push_back peer_used failed", K(ret));
}
}
} }
} }
} }

View File

@ -320,15 +320,18 @@ int ObPxTargetMgr::rollback_local_report_target_used(uint64_t tenant_id, const O
return ret; return ret;
} }
int ObPxTargetMgr::get_global_target_usage(uint64_t tenant_id, const hash::ObHashMap<ObAddr, ServerTargetUsage> *&global_target_usage) int ObPxTargetMgr::gather_global_target_usage(uint64_t tenant_id, ObPxGlobalResGather &gather)
{ {
int ret = OB_SUCCESS; int ret = OB_SUCCESS;
const hash::ObHashMap<ObAddr, ServerTargetUsage> *global_target_usage = NULL;
GET_TARGET_MONITOR(tenant_id, { GET_TARGET_MONITOR(tenant_id, {
if (OB_FAIL(target_monitor->get_global_target_usage(global_target_usage))) { if (OB_FAIL(target_monitor->get_global_target_usage(global_target_usage))) {
LOG_WARN("get global target usage failed", K(ret), K(tenant_id)); LOG_WARN("get global target usage failed", K(ret), K(tenant_id));
} else if (OB_ISNULL(global_target_usage)) { } else if (OB_ISNULL(global_target_usage)) {
ret = OB_ERR_UNEXPECTED; ret = OB_ERR_UNEXPECTED;
LOG_WARN("global_target_usage is null", K(ret), K(tenant_id)); LOG_WARN("global_target_usage is null", K(ret), K(tenant_id));
} else if (OB_FAIL(global_target_usage->foreach_refactored(gather))) {
LOG_WARN("gather global px resource usage failed", K(ret));
} }
}); });
return ret; return ret;

View File

@ -71,6 +71,22 @@ public:
typedef common::ObLinkHashMap<ObPxTenantInfo, ObPxResInfo, ObPxInfoAlloc> ObPxInfoMap; typedef common::ObLinkHashMap<ObPxTenantInfo, ObPxResInfo, ObPxInfoAlloc> ObPxInfoMap;
class ObPxGlobalResGather
{
public:
ObPxGlobalResGather(ObPxRpcFetchStatResponse &result) : result_(result) {}
~ObPxGlobalResGather() {}
int operator()(hash::HashMapPair<ObAddr, ServerTargetUsage> &entry)
{
int ret = common::OB_SUCCESS;
if (OB_FAIL(result_.push_peer_target_usage(entry.first, entry.second.get_peer_used()))) {
COMMON_LOG(WARN, "push_back peer_used failed", K(ret));
}
return ret;
}
ObPxRpcFetchStatResponse &result_;
};
class ObPxTargetMgr class ObPxTargetMgr
: public share::ObThreadPool : public share::ObThreadPool
{ {
@ -104,7 +120,7 @@ public:
int get_version(uint64_t tenant_id, uint64_t &version); int get_version(uint64_t tenant_id, uint64_t &version);
int update_peer_target_used(uint64_t tenant_id, const ObAddr &server, int64_t peer_used); int update_peer_target_used(uint64_t tenant_id, const ObAddr &server, int64_t peer_used);
int rollback_local_report_target_used(uint64_t tenant_id, const ObAddr &server, int64_t local_report); int rollback_local_report_target_used(uint64_t tenant_id, const ObAddr &server, int64_t local_report);
int get_global_target_usage(uint64_t tenant_id, const hash::ObHashMap<ObAddr, ServerTargetUsage> *&global_target_usage); int gather_global_target_usage(uint64_t tenant_id, ObPxGlobalResGather &gather);
int reset_leader_statistics(uint64_t tenant_id); int reset_leader_statistics(uint64_t tenant_id);
// for px_admission // for px_admission