fix px target monitor statistics bug
This commit is contained in:
@ -588,18 +588,32 @@ int ObPxTenantTargetMonitorP::process()
|
||||
ObTimeGuard timeguard("px_target_request", 100000);
|
||||
const uint64_t tenant_id = arg_.get_tenant_id();
|
||||
const uint64_t follower_version = arg_.get_version();
|
||||
// server id of the leader that the follower sync with previously.
|
||||
const uint64_t prev_leader_server_id = ObPxTenantTargetMonitor::get_server_id(follower_version);
|
||||
const uint64_t leader_server_id = GCTX.server_id_;
|
||||
bool is_leader;
|
||||
uint64_t leader_version;
|
||||
result_.set_tenant_id(tenant_id);
|
||||
if (OB_FAIL(OB_PX_TARGET_MGR.is_leader(tenant_id, is_leader))) {
|
||||
LOG_WARN("get is_leader failed", K(ret), K(tenant_id));
|
||||
LOG_ERROR("get is_leader failed", K(ret), K(tenant_id));
|
||||
} else if (!is_leader) {
|
||||
result_.set_status(MONITOR_NOT_MASTER);
|
||||
} else if (arg_.need_refresh_all_ || prev_leader_server_id != leader_server_id) {
|
||||
if (OB_FAIL(OB_PX_TARGET_MGR.reset_leader_statistics(tenant_id))) {
|
||||
LOG_ERROR("reset leader statistics failed", K(ret));
|
||||
} else if (OB_FAIL(OB_PX_TARGET_MGR.get_version(tenant_id, leader_version))) {
|
||||
LOG_WARN("get master_version failed", K(ret), K(tenant_id));
|
||||
} else {
|
||||
result_.set_status(MONITOR_VERSION_NOT_MATCH);
|
||||
result_.set_version(leader_version);
|
||||
LOG_INFO("need refresh all", K(tenant_id), K(arg_.need_refresh_all_),
|
||||
K(follower_version), K(prev_leader_server_id), K(leader_server_id));
|
||||
}
|
||||
} else if (OB_FAIL(OB_PX_TARGET_MGR.get_version(tenant_id, leader_version))) {
|
||||
LOG_WARN("get master_version failed", K(ret), K(tenant_id));
|
||||
} else {
|
||||
result_.set_tenant_id(tenant_id);
|
||||
result_.set_version(leader_version);
|
||||
if (!is_leader) {
|
||||
result_.set_status(MONITOR_NOT_MASTER);
|
||||
} else if (follower_version != leader_version) {
|
||||
if (follower_version != leader_version) {
|
||||
result_.set_status(MONITOR_VERSION_NOT_MATCH);
|
||||
} else {
|
||||
result_.set_status(MONITOR_READY);
|
||||
@ -614,8 +628,8 @@ int ObPxTenantTargetMonitorP::process()
|
||||
// A simple and rude exception handling, re-statistics
|
||||
if (OB_FAIL(ret)) {
|
||||
int tem_ret = OB_SUCCESS;
|
||||
if ((tem_ret = OB_PX_TARGET_MGR.reset_statistics(tenant_id, leader_version + 1)) != OB_SUCCESS) {
|
||||
LOG_WARN("reset statistics failed", K(tem_ret), K(tenant_id), K(leader_version));
|
||||
if ((tem_ret = OB_PX_TARGET_MGR.reset_leader_statistics(tenant_id)) != OB_SUCCESS) {
|
||||
LOG_ERROR("reset statistics failed", K(tem_ret), K(tenant_id), K(leader_version));
|
||||
} else {
|
||||
LOG_INFO("reset statistics succeed", K(tenant_id), K(leader_version));
|
||||
}
|
||||
|
@ -334,12 +334,12 @@ int ObPxTargetMgr::get_global_target_usage(uint64_t tenant_id, const hash::ObHas
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObPxTargetMgr::reset_statistics(uint64_t tenant_id, uint64_t version)
|
||||
int ObPxTargetMgr::reset_leader_statistics(uint64_t tenant_id)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
GET_TARGET_MONITOR(tenant_id, {
|
||||
if (OB_FAIL(target_monitor->reset_statistics(version))) {
|
||||
LOG_WARN("reset statistics failed", K(ret), K(version));
|
||||
if (OB_FAIL(target_monitor->reset_leader_statistics())) {
|
||||
LOG_WARN("reset statistics failed", K(ret));
|
||||
}
|
||||
});
|
||||
return ret;
|
||||
|
@ -105,7 +105,7 @@ public:
|
||||
int update_peer_target_used(uint64_t tenant_id, const ObAddr &server, int64_t peer_used);
|
||||
int rollback_local_report_target_used(uint64_t tenant_id, const ObAddr &server, int64_t local_report);
|
||||
int get_global_target_usage(uint64_t tenant_id, const hash::ObHashMap<ObAddr, ServerTargetUsage> *&global_target_usage);
|
||||
int reset_statistics(uint64_t tenant_id, uint64_t version);
|
||||
int reset_leader_statistics(uint64_t tenant_id);
|
||||
|
||||
// for px_admission
|
||||
int apply_target(uint64_t tenant_id, hash::ObHashMap<ObAddr, int64_t> &worker_map,
|
||||
@ -128,7 +128,7 @@ private:
|
||||
class ObPxResRefreshFunctor
|
||||
{
|
||||
public:
|
||||
ObPxResRefreshFunctor() {}
|
||||
ObPxResRefreshFunctor() : need_refresh_all_(true) {}
|
||||
~ObPxResRefreshFunctor() {}
|
||||
bool operator()(const ObPxTenantInfo &px_tenant_info, ObPxResInfo *px_res_info)
|
||||
{
|
||||
|
@ -19,7 +19,7 @@ namespace sql
|
||||
{
|
||||
|
||||
OB_SERIALIZE_MEMBER(ObPxRpcAddrTarget, addr_, target_);
|
||||
OB_SERIALIZE_MEMBER(ObPxRpcFetchStatArgs, tenant_id_, follower_version_, addr_target_array_);
|
||||
OB_SERIALIZE_MEMBER(ObPxRpcFetchStatArgs, tenant_id_, follower_version_, addr_target_array_, need_refresh_all_);
|
||||
OB_SERIALIZE_MEMBER(ObPxRpcFetchStatResponse, status_, tenant_id_, leader_version_, addr_target_array_);
|
||||
|
||||
}
|
||||
|
@ -40,14 +40,20 @@ public:
|
||||
class ObPxRpcFetchStatArgs {
|
||||
OB_UNIS_VERSION(1);
|
||||
public:
|
||||
ObPxRpcFetchStatArgs() : addr_target_array_() {}
|
||||
~ObPxRpcFetchStatArgs() { addr_target_array_.destroy(); }
|
||||
ObPxRpcFetchStatArgs() : tenant_id_(OB_INVALID_ID), follower_version_(OB_INVALID_ID),
|
||||
addr_target_array_(), need_refresh_all_(false) {}
|
||||
ObPxRpcFetchStatArgs(uint64_t tenant_id, uint64_t ver, uint64_t refresh_all) :
|
||||
tenant_id_(tenant_id), follower_version_(ver),
|
||||
addr_target_array_(), need_refresh_all_(refresh_all) {}
|
||||
~ObPxRpcFetchStatArgs() { }
|
||||
|
||||
void set_tenant_id(uint64_t tenant_id) { tenant_id_ = tenant_id; }
|
||||
uint64_t get_tenant_id() { return tenant_id_; }
|
||||
|
||||
void set_version(uint64_t version) { follower_version_ = version; }
|
||||
uint64_t get_version() { return follower_version_; }
|
||||
void set_need_refresh_all(bool v) { need_refresh_all_ = v; }
|
||||
bool need_refresh_all() { return need_refresh_all_; }
|
||||
|
||||
int push_local_target_usage(const ObAddr &server, int64_t local_usage)
|
||||
{
|
||||
@ -69,6 +75,7 @@ public:
|
||||
// value represents the number of targets used since the last report
|
||||
// Because hashmap does not support serialization, so that it is transform to array
|
||||
ObSEArray<ObPxRpcAddrTarget, 100> addr_target_array_;
|
||||
bool need_refresh_all_;
|
||||
};
|
||||
|
||||
|
||||
@ -76,7 +83,7 @@ class ObPxRpcFetchStatResponse {
|
||||
OB_UNIS_VERSION(1);
|
||||
public:
|
||||
ObPxRpcFetchStatResponse() : addr_target_array_() {}
|
||||
~ObPxRpcFetchStatResponse() { addr_target_array_.destroy(); }
|
||||
~ObPxRpcFetchStatResponse() { }
|
||||
|
||||
void set_status(uint64_t status) { status_ = status; }
|
||||
uint64_t get_status() { return status_; }
|
||||
|
@ -75,6 +75,7 @@ void ObPxTenantTargetMonitor::reset()
|
||||
version_ = UINT64_MAX;
|
||||
parallel_session_count_ = 0;
|
||||
print_debug_log_ = false;
|
||||
need_send_refresh_all_ = true;
|
||||
}
|
||||
|
||||
void ObPxTenantTargetMonitor::set_parallel_servers_target(int64_t parallel_servers_target)
|
||||
@ -99,12 +100,13 @@ int ObPxTenantTargetMonitor::refresh_statistics(bool need_refresh_all)
|
||||
if (OB_FAIL(get_dummy_leader(leader))) {
|
||||
LOG_WARN("get dummy leader fail", K(ret));
|
||||
} else if (server_ != leader) {
|
||||
LOG_INFO("follower refresh statistics", K(tenant_id_), K(server_), K(leader), K(role_), K(version_));
|
||||
LOG_TRACE("follower refresh statistics", K(tenant_id_), K(server_), K(leader), K(role_), K(version_));
|
||||
// 单机情况下,不走全局排队
|
||||
if (role_ == LEADER) {
|
||||
LOG_INFO("leader switch to follower", K(tenant_id_), K(server_), K(leader), K(version_));
|
||||
role_ = FOLLOWER;
|
||||
// from leader to follower, refresh all the statistics
|
||||
if (OB_FAIL(reset_statistics(-1))) {
|
||||
if (OB_FAIL(reset_follower_statistics(-1))) {
|
||||
LOG_WARN("reset statisitcs failed", K(ret));
|
||||
}
|
||||
}
|
||||
@ -113,14 +115,14 @@ int ObPxTenantTargetMonitor::refresh_statistics(bool need_refresh_all)
|
||||
}
|
||||
} else {
|
||||
// 只有leader能进,可以无主,但不能多主
|
||||
LOG_INFO("leader refresh statistics", K(tenant_id_), K(server_), K(leader), K(role_), K(need_refresh_all), K(version_));
|
||||
LOG_TRACE("leader refresh statistics", K(tenant_id_), K(server_), K(leader), K(role_), K(need_refresh_all), K(version_));
|
||||
if (role_ == FOLLOWER || need_refresh_all) {
|
||||
role_ = LEADER;
|
||||
// from follower to leader or observer is not longer alive, refresh all the statistics
|
||||
if (OB_FAIL(reset_statistics(version_ + 1))) {
|
||||
if (OB_FAIL(reset_leader_statistics())) {
|
||||
LOG_WARN("reset statisitcs failed", K(ret));
|
||||
}
|
||||
LOG_INFO("refresh global_target_usage_", K(tenant_id_), K(version_), K(server_));
|
||||
LOG_INFO("refresh global_target_usage_", K(tenant_id_), K(version_), K(server_), K(need_refresh_all));
|
||||
}
|
||||
}
|
||||
if (!print_debug_log_ && OB_SUCCESS != OB_E(EventTable::EN_PX_PRINT_TARGET_MONITOR_LOG) OB_SUCCESS) {
|
||||
@ -237,38 +239,37 @@ int ObPxTenantTargetMonitor::query_statistics(ObAddr &leader)
|
||||
int ret = OB_SUCCESS;
|
||||
// send once in 100ms, 1s timeout should be more appropriate than default
|
||||
static const int64_t OB_TARGET_MONITOR_RPC_TIMEOUT = 1000 * 1000; // 1s
|
||||
ObPxRpcFetchStatArgs args;
|
||||
ObPxRpcFetchStatArgs args(tenant_id_, version_, need_send_refresh_all_);
|
||||
SMART_VAR(ObPxRpcFetchStatResponse, result) {
|
||||
args.set_tenant_id(tenant_id_);
|
||||
args.set_version(version_);
|
||||
for (hash::ObHashMap<ObAddr, ServerTargetUsage>::iterator it = global_target_usage_.begin();
|
||||
OB_SUCC(ret) && it != global_target_usage_.end(); ++it) {
|
||||
auto report_local_used = [&](hash::HashMapPair<ObAddr, ServerTargetUsage> &entry) -> int {
|
||||
int ret = OB_SUCCESS;
|
||||
// 和上次汇报相比,本机又消耗了entry 机器几个资源,把这个数目汇报给 leader,leader 会把这个值加到全局统计中。
|
||||
// 为什么是汇报“增量”呢?因为 entry 机器的资源被多台机器使用,任何一个人都拿不到全量数据
|
||||
if (OB_FAIL(args.push_local_target_usage(entry.first, entry.second.get_local_used() - entry.second.get_report_used()))) {
|
||||
LOG_WARN("push server and target_usage failed", K(ret));
|
||||
} else {
|
||||
LOG_INFO("report statistics to leader", K(tenant_id_), K(leader), K(version_), K(entry.first), K(entry.second));
|
||||
entry.second.set_report_used(entry.second.get_local_used());
|
||||
}
|
||||
return ret;
|
||||
};
|
||||
need_send_refresh_all_ = false;
|
||||
if (!args.need_refresh_all()) {
|
||||
for (hash::ObHashMap<ObAddr, ServerTargetUsage>::iterator it = global_target_usage_.begin();
|
||||
OB_SUCC(ret) && it != global_target_usage_.end(); ++it) {
|
||||
auto report_local_used = [&](hash::HashMapPair<ObAddr, ServerTargetUsage> &entry) -> int {
|
||||
int ret = OB_SUCCESS;
|
||||
// 和上次汇报相比,本机又消耗了entry 机器几个资源,把这个数目汇报给 leader,leader 会把这个值加到全局统计中。
|
||||
// 为什么是汇报“增量”呢?因为 entry 机器的资源被多台机器使用,任何一个人都拿不到全量数据
|
||||
if (OB_FAIL(args.push_local_target_usage(entry.first, entry.second.get_local_used() - entry.second.get_report_used()))) {
|
||||
LOG_WARN("push server and target_usage failed", K(ret));
|
||||
} else {
|
||||
entry.second.set_report_used(entry.second.get_local_used());
|
||||
}
|
||||
return ret;
|
||||
};
|
||||
|
||||
if (OB_FAIL(global_target_usage_.atomic_refactored(it->first, report_local_used))) {
|
||||
LOG_WARN("atomic refactored, report_local_used failed", K(ret));
|
||||
if (OB_FAIL(global_target_usage_.atomic_refactored(it->first, report_local_used))) {
|
||||
LOG_WARN("atomic refactored, report_local_used failed", K(ret));
|
||||
}
|
||||
}
|
||||
}
|
||||
bool need_rollback = false;
|
||||
if (OB_FAIL(rpc_proxy_
|
||||
.to(leader)
|
||||
.by(tenant_id_)
|
||||
.timeout(OB_TARGET_MONITOR_RPC_TIMEOUT)
|
||||
.fetch_statistics(args, result))) {
|
||||
need_rollback = true;
|
||||
LOG_WARN("send rpc to query statistics failed", K(ret));
|
||||
ret = OB_SUCCESS;
|
||||
// whether leader receive the rpc is unknown
|
||||
need_send_refresh_all_ = true;
|
||||
LOG_WARN("send rpc to query statistics failed, need send refresh all", K(ret));
|
||||
} else if (result.get_status() == MONITOR_READY) {
|
||||
for (int i = 0; OB_SUCC(ret) && i < result.addr_target_array_.count(); i++) {
|
||||
ObAddr &server = result.addr_target_array_.at(i).addr_;
|
||||
@ -279,30 +280,12 @@ int ObPxTenantTargetMonitor::query_statistics(ObAddr &leader)
|
||||
}
|
||||
} else if (result.get_status() == MONITOR_NOT_MASTER) {
|
||||
refresh_dummy_location();
|
||||
need_rollback = true;
|
||||
LOG_INFO("report to not master", K(tenant_id_), K(leader), K(version_));
|
||||
need_send_refresh_all_ = true;
|
||||
LOG_INFO("report to not master, need send refresh all", K(tenant_id_), K(leader), K(version_));
|
||||
} else if (result.get_status() == MONITOR_VERSION_NOT_MATCH) {
|
||||
uint64_t leader_version = result.get_version();
|
||||
LOG_INFO("monitor version not match", K(tenant_id_), K(leader_version), K(version_));
|
||||
if (OB_FAIL(reset_statistics(leader_version))) {
|
||||
LOG_WARN("reset statistics failed", K(ret));
|
||||
}
|
||||
}
|
||||
|
||||
if (need_rollback) {
|
||||
refresh_dummy_location();
|
||||
for (int i = 0; OB_SUCC(ret) && i < args.addr_target_array_.count(); i++) {
|
||||
ObAddr &server = args.addr_target_array_.at(i).addr_;
|
||||
int64_t report_target_usage = args.addr_target_array_.at(i).target_;
|
||||
if (OB_FAIL(rollback_local_report_target_used(server, report_target_usage))) {
|
||||
LOG_WARN("rollback local report target_used failed", K(ret), K(server), K(report_target_usage));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// A simple and rude exception handling, re-statistics
|
||||
if (OB_FAIL(ret)) {
|
||||
if (OB_FAIL(reset_statistics(0))) {
|
||||
if (OB_FAIL(reset_follower_statistics(leader_version))) {
|
||||
LOG_WARN("reset statistics failed", K(ret));
|
||||
}
|
||||
}
|
||||
@ -382,7 +365,7 @@ int ObPxTenantTargetMonitor::get_global_target_usage(const hash::ObHashMap<ObAdd
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObPxTenantTargetMonitor::reset_statistics(uint64_t version)
|
||||
int ObPxTenantTargetMonitor::reset_follower_statistics(uint64_t version)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
ObLockGuard<ObSpinLock> lock_guard(spin_lock_);
|
||||
@ -392,6 +375,21 @@ int ObPxTenantTargetMonitor::reset_statistics(uint64_t version)
|
||||
} else {
|
||||
version_ = version;
|
||||
}
|
||||
LOG_INFO("reset follower statistics", K(tenant_id_), K(ret), K(version));
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObPxTenantTargetMonitor::reset_leader_statistics()
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
ObLockGuard<ObSpinLock> lock_guard(spin_lock_);
|
||||
global_target_usage_.clear();
|
||||
if (OB_FAIL(global_target_usage_.set_refactored(server_, ServerTargetUsage()))) {
|
||||
LOG_WARN("set refactored failed", K(ret));
|
||||
} else {
|
||||
version_ = get_new_version();
|
||||
}
|
||||
LOG_INFO("reset leader statistics", K(tenant_id_), K(ret), K(version_), K(GCTX.server_id_));
|
||||
return ret;
|
||||
}
|
||||
|
||||
@ -544,6 +542,18 @@ int ObPxTenantTargetMonitor::get_all_target_info(common::ObIArray<ObPxTargetInfo
|
||||
return ret;
|
||||
}
|
||||
|
||||
uint64_t ObPxTenantTargetMonitor::get_new_version()
|
||||
{
|
||||
uint64_t current_time = common::ObTimeUtility::current_time();
|
||||
uint64_t svr_id = GCTX.server_id_;
|
||||
uint64_t new_version = ((current_time & 0x0000FFFFFFFFFFFF) | (svr_id << SERVER_ID_SHIFT));
|
||||
return new_version;
|
||||
}
|
||||
|
||||
uint64_t ObPxTenantTargetMonitor::get_server_id(uint64_t version) {
|
||||
return (version >> SERVER_ID_SHIFT);
|
||||
}
|
||||
|
||||
int ObPxTargetCond::wait(const int64_t wait_time_us)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
|
@ -130,7 +130,12 @@ public:
|
||||
int update_peer_target_used(const ObAddr &server, int64_t peer_used);
|
||||
int rollback_local_report_target_used(const ObAddr &server, int64_t local_report);
|
||||
int get_global_target_usage(const hash::ObHashMap<ObAddr, ServerTargetUsage> *&global_target_usage);
|
||||
int reset_statistics(uint64_t version);
|
||||
// if role is follower and find that its version is different with leader's
|
||||
// call this function to reset statistics, the param version is from the leader.
|
||||
int reset_follower_statistics(uint64_t version);
|
||||
// if role is leader and wants to start a new round of statistics, call this function.
|
||||
// A new version is generated in this function and will be sync to all followers later.
|
||||
int reset_leader_statistics();
|
||||
|
||||
// for px_admission
|
||||
int apply_target(hash::ObHashMap<ObAddr, int64_t> &worker_map,
|
||||
@ -140,6 +145,7 @@ public:
|
||||
|
||||
// for virtual_table iter
|
||||
int get_all_target_info(common::ObIArray<ObPxTargetInfo> &target_info_array);
|
||||
static uint64_t get_server_id(uint64_t version);
|
||||
|
||||
TO_STRING_KV(K_(is_init), K_(tenant_id), K_(server), K_(dummy_cache_leader), K_(role));
|
||||
|
||||
@ -149,8 +155,10 @@ private:
|
||||
int get_role(ObRole &role);
|
||||
int refresh_dummy_location();
|
||||
int query_statistics(ObAddr &leader);
|
||||
uint64_t get_new_version();
|
||||
|
||||
private:
|
||||
static const int64_t SERVER_ID_SHIFT = 48;
|
||||
bool is_init_;
|
||||
uint64_t tenant_id_;
|
||||
ObAddr server_;
|
||||
@ -166,6 +174,7 @@ private:
|
||||
int64_t parallel_session_count_;
|
||||
ObPxTargetCond target_cond_;
|
||||
bool print_debug_log_;
|
||||
bool need_send_refresh_all_;
|
||||
};
|
||||
|
||||
}
|
||||
|
Reference in New Issue
Block a user