diff --git a/src/sql/engine/px/ob_px_rpc_processor.cpp b/src/sql/engine/px/ob_px_rpc_processor.cpp index 095f8326db..211483f83f 100644 --- a/src/sql/engine/px/ob_px_rpc_processor.cpp +++ b/src/sql/engine/px/ob_px_rpc_processor.cpp @@ -588,18 +588,32 @@ int ObPxTenantTargetMonitorP::process() ObTimeGuard timeguard("px_target_request", 100000); const uint64_t tenant_id = arg_.get_tenant_id(); const uint64_t follower_version = arg_.get_version(); + // server id of the leader that the follower sync with previously. + const uint64_t prev_leader_server_id = ObPxTenantTargetMonitor::get_server_id(follower_version); + const uint64_t leader_server_id = GCTX.server_id_; bool is_leader; uint64_t leader_version; + result_.set_tenant_id(tenant_id); if (OB_FAIL(OB_PX_TARGET_MGR.is_leader(tenant_id, is_leader))) { - LOG_WARN("get is_leader failed", K(ret), K(tenant_id)); + LOG_ERROR("get is_leader failed", K(ret), K(tenant_id)); + } else if (!is_leader) { + result_.set_status(MONITOR_NOT_MASTER); + } else if (arg_.need_refresh_all_ || prev_leader_server_id != leader_server_id) { + if (OB_FAIL(OB_PX_TARGET_MGR.reset_leader_statistics(tenant_id))) { + LOG_ERROR("reset leader statistics failed", K(ret)); + } else if (OB_FAIL(OB_PX_TARGET_MGR.get_version(tenant_id, leader_version))) { + LOG_WARN("get master_version failed", K(ret), K(tenant_id)); + } else { + result_.set_status(MONITOR_VERSION_NOT_MATCH); + result_.set_version(leader_version); + LOG_INFO("need refresh all", K(tenant_id), K(arg_.need_refresh_all_), + K(follower_version), K(prev_leader_server_id), K(leader_server_id)); + } } else if (OB_FAIL(OB_PX_TARGET_MGR.get_version(tenant_id, leader_version))) { LOG_WARN("get master_version failed", K(ret), K(tenant_id)); } else { - result_.set_tenant_id(tenant_id); result_.set_version(leader_version); - if (!is_leader) { - result_.set_status(MONITOR_NOT_MASTER); - } else if (follower_version != leader_version) { + if (follower_version != leader_version) { result_.set_status(MONITOR_VERSION_NOT_MATCH); } else { result_.set_status(MONITOR_READY); @@ -614,8 +628,8 @@ int ObPxTenantTargetMonitorP::process() // A simple and rude exception handling, re-statistics if (OB_FAIL(ret)) { int tem_ret = OB_SUCCESS; - if ((tem_ret = OB_PX_TARGET_MGR.reset_statistics(tenant_id, leader_version + 1)) != OB_SUCCESS) { - LOG_WARN("reset statistics failed", K(tem_ret), K(tenant_id), K(leader_version)); + if ((tem_ret = OB_PX_TARGET_MGR.reset_leader_statistics(tenant_id)) != OB_SUCCESS) { + LOG_ERROR("reset statistics failed", K(tem_ret), K(tenant_id), K(leader_version)); } else { LOG_INFO("reset statistics succeed", K(tenant_id), K(leader_version)); } diff --git a/src/sql/engine/px/ob_px_target_mgr.cpp b/src/sql/engine/px/ob_px_target_mgr.cpp index 570a1a425e..9b4ad5a9d0 100644 --- a/src/sql/engine/px/ob_px_target_mgr.cpp +++ b/src/sql/engine/px/ob_px_target_mgr.cpp @@ -334,12 +334,12 @@ int ObPxTargetMgr::get_global_target_usage(uint64_t tenant_id, const hash::ObHas return ret; } -int ObPxTargetMgr::reset_statistics(uint64_t tenant_id, uint64_t version) +int ObPxTargetMgr::reset_leader_statistics(uint64_t tenant_id) { int ret = OB_SUCCESS; GET_TARGET_MONITOR(tenant_id, { - if (OB_FAIL(target_monitor->reset_statistics(version))) { - LOG_WARN("reset statistics failed", K(ret), K(version)); + if (OB_FAIL(target_monitor->reset_leader_statistics())) { + LOG_WARN("reset statistics failed", K(ret)); } }); return ret; diff --git a/src/sql/engine/px/ob_px_target_mgr.h b/src/sql/engine/px/ob_px_target_mgr.h index 53d9f07b94..2f6e964886 100644 --- a/src/sql/engine/px/ob_px_target_mgr.h +++ b/src/sql/engine/px/ob_px_target_mgr.h @@ -105,7 +105,7 @@ public: int update_peer_target_used(uint64_t tenant_id, const ObAddr &server, int64_t peer_used); int rollback_local_report_target_used(uint64_t tenant_id, const ObAddr &server, int64_t local_report); int get_global_target_usage(uint64_t tenant_id, const hash::ObHashMap *&global_target_usage); - int reset_statistics(uint64_t tenant_id, uint64_t version); + int reset_leader_statistics(uint64_t tenant_id); // for px_admission int apply_target(uint64_t tenant_id, hash::ObHashMap &worker_map, @@ -128,7 +128,7 @@ private: class ObPxResRefreshFunctor { public: - ObPxResRefreshFunctor() {} + ObPxResRefreshFunctor() : need_refresh_all_(true) {} ~ObPxResRefreshFunctor() {} bool operator()(const ObPxTenantInfo &px_tenant_info, ObPxResInfo *px_res_info) { diff --git a/src/sql/engine/px/ob_px_target_monitor_rpc.cpp b/src/sql/engine/px/ob_px_target_monitor_rpc.cpp index 15ab093248..e121ae1466 100644 --- a/src/sql/engine/px/ob_px_target_monitor_rpc.cpp +++ b/src/sql/engine/px/ob_px_target_monitor_rpc.cpp @@ -19,7 +19,7 @@ namespace sql { OB_SERIALIZE_MEMBER(ObPxRpcAddrTarget, addr_, target_); -OB_SERIALIZE_MEMBER(ObPxRpcFetchStatArgs, tenant_id_, follower_version_, addr_target_array_); +OB_SERIALIZE_MEMBER(ObPxRpcFetchStatArgs, tenant_id_, follower_version_, addr_target_array_, need_refresh_all_); OB_SERIALIZE_MEMBER(ObPxRpcFetchStatResponse, status_, tenant_id_, leader_version_, addr_target_array_); } diff --git a/src/sql/engine/px/ob_px_target_monitor_rpc.h b/src/sql/engine/px/ob_px_target_monitor_rpc.h index 8c95f75cb5..52cdd6d47c 100644 --- a/src/sql/engine/px/ob_px_target_monitor_rpc.h +++ b/src/sql/engine/px/ob_px_target_monitor_rpc.h @@ -40,14 +40,20 @@ public: class ObPxRpcFetchStatArgs { OB_UNIS_VERSION(1); public: - ObPxRpcFetchStatArgs() : addr_target_array_() {} - ~ObPxRpcFetchStatArgs() { addr_target_array_.destroy(); } + ObPxRpcFetchStatArgs() : tenant_id_(OB_INVALID_ID), follower_version_(OB_INVALID_ID), + addr_target_array_(), need_refresh_all_(false) {} + ObPxRpcFetchStatArgs(uint64_t tenant_id, uint64_t ver, uint64_t refresh_all) : + tenant_id_(tenant_id), follower_version_(ver), + addr_target_array_(), need_refresh_all_(refresh_all) {} + ~ObPxRpcFetchStatArgs() { } void set_tenant_id(uint64_t tenant_id) { tenant_id_ = tenant_id; } uint64_t get_tenant_id() { return tenant_id_; } void set_version(uint64_t version) { follower_version_ = version; } uint64_t get_version() { return follower_version_; } + void set_need_refresh_all(bool v) { need_refresh_all_ = v; } + bool need_refresh_all() { return need_refresh_all_; } int push_local_target_usage(const ObAddr &server, int64_t local_usage) { @@ -69,6 +75,7 @@ public: // value represents the number of targets used since the last report // Because hashmap does not support serialization, so that it is transform to array ObSEArray addr_target_array_; + bool need_refresh_all_; }; @@ -76,7 +83,7 @@ class ObPxRpcFetchStatResponse { OB_UNIS_VERSION(1); public: ObPxRpcFetchStatResponse() : addr_target_array_() {} - ~ObPxRpcFetchStatResponse() { addr_target_array_.destroy(); } + ~ObPxRpcFetchStatResponse() { } void set_status(uint64_t status) { status_ = status; } uint64_t get_status() { return status_; } diff --git a/src/sql/engine/px/ob_px_tenant_target_monitor.cpp b/src/sql/engine/px/ob_px_tenant_target_monitor.cpp index acd12c29cd..25fd396b92 100644 --- a/src/sql/engine/px/ob_px_tenant_target_monitor.cpp +++ b/src/sql/engine/px/ob_px_tenant_target_monitor.cpp @@ -75,6 +75,7 @@ void ObPxTenantTargetMonitor::reset() version_ = UINT64_MAX; parallel_session_count_ = 0; print_debug_log_ = false; + need_send_refresh_all_ = true; } void ObPxTenantTargetMonitor::set_parallel_servers_target(int64_t parallel_servers_target) @@ -99,12 +100,13 @@ int ObPxTenantTargetMonitor::refresh_statistics(bool need_refresh_all) if (OB_FAIL(get_dummy_leader(leader))) { LOG_WARN("get dummy leader fail", K(ret)); } else if (server_ != leader) { - LOG_INFO("follower refresh statistics", K(tenant_id_), K(server_), K(leader), K(role_), K(version_)); + LOG_TRACE("follower refresh statistics", K(tenant_id_), K(server_), K(leader), K(role_), K(version_)); // 单机情况下,不走全局排队 if (role_ == LEADER) { + LOG_INFO("leader switch to follower", K(tenant_id_), K(server_), K(leader), K(version_)); role_ = FOLLOWER; // from leader to follower, refresh all the statistics - if (OB_FAIL(reset_statistics(-1))) { + if (OB_FAIL(reset_follower_statistics(-1))) { LOG_WARN("reset statisitcs failed", K(ret)); } } @@ -113,14 +115,14 @@ int ObPxTenantTargetMonitor::refresh_statistics(bool need_refresh_all) } } else { // 只有leader能进,可以无主,但不能多主 - LOG_INFO("leader refresh statistics", K(tenant_id_), K(server_), K(leader), K(role_), K(need_refresh_all), K(version_)); + LOG_TRACE("leader refresh statistics", K(tenant_id_), K(server_), K(leader), K(role_), K(need_refresh_all), K(version_)); if (role_ == FOLLOWER || need_refresh_all) { role_ = LEADER; // from follower to leader or observer is not longer alive, refresh all the statistics - if (OB_FAIL(reset_statistics(version_ + 1))) { + if (OB_FAIL(reset_leader_statistics())) { LOG_WARN("reset statisitcs failed", K(ret)); } - LOG_INFO("refresh global_target_usage_", K(tenant_id_), K(version_), K(server_)); + LOG_INFO("refresh global_target_usage_", K(tenant_id_), K(version_), K(server_), K(need_refresh_all)); } } if (!print_debug_log_ && OB_SUCCESS != OB_E(EventTable::EN_PX_PRINT_TARGET_MONITOR_LOG) OB_SUCCESS) { @@ -237,38 +239,37 @@ int ObPxTenantTargetMonitor::query_statistics(ObAddr &leader) int ret = OB_SUCCESS; // send once in 100ms, 1s timeout should be more appropriate than default static const int64_t OB_TARGET_MONITOR_RPC_TIMEOUT = 1000 * 1000; // 1s - ObPxRpcFetchStatArgs args; + ObPxRpcFetchStatArgs args(tenant_id_, version_, need_send_refresh_all_); SMART_VAR(ObPxRpcFetchStatResponse, result) { - args.set_tenant_id(tenant_id_); - args.set_version(version_); - for (hash::ObHashMap::iterator it = global_target_usage_.begin(); - OB_SUCC(ret) && it != global_target_usage_.end(); ++it) { - auto report_local_used = [&](hash::HashMapPair &entry) -> int { - int ret = OB_SUCCESS; - // 和上次汇报相比,本机又消耗了entry 机器几个资源,把这个数目汇报给 leader,leader 会把这个值加到全局统计中。 - // 为什么是汇报“增量”呢?因为 entry 机器的资源被多台机器使用,任何一个人都拿不到全量数据 - if (OB_FAIL(args.push_local_target_usage(entry.first, entry.second.get_local_used() - entry.second.get_report_used()))) { - LOG_WARN("push server and target_usage failed", K(ret)); - } else { - LOG_INFO("report statistics to leader", K(tenant_id_), K(leader), K(version_), K(entry.first), K(entry.second)); - entry.second.set_report_used(entry.second.get_local_used()); - } - return ret; - }; + need_send_refresh_all_ = false; + if (!args.need_refresh_all()) { + for (hash::ObHashMap::iterator it = global_target_usage_.begin(); + OB_SUCC(ret) && it != global_target_usage_.end(); ++it) { + auto report_local_used = [&](hash::HashMapPair &entry) -> int { + int ret = OB_SUCCESS; + // 和上次汇报相比,本机又消耗了entry 机器几个资源,把这个数目汇报给 leader,leader 会把这个值加到全局统计中。 + // 为什么是汇报“增量”呢?因为 entry 机器的资源被多台机器使用,任何一个人都拿不到全量数据 + if (OB_FAIL(args.push_local_target_usage(entry.first, entry.second.get_local_used() - entry.second.get_report_used()))) { + LOG_WARN("push server and target_usage failed", K(ret)); + } else { + entry.second.set_report_used(entry.second.get_local_used()); + } + return ret; + }; - if (OB_FAIL(global_target_usage_.atomic_refactored(it->first, report_local_used))) { - LOG_WARN("atomic refactored, report_local_used failed", K(ret)); + if (OB_FAIL(global_target_usage_.atomic_refactored(it->first, report_local_used))) { + LOG_WARN("atomic refactored, report_local_used failed", K(ret)); + } } } - bool need_rollback = false; if (OB_FAIL(rpc_proxy_ .to(leader) .by(tenant_id_) .timeout(OB_TARGET_MONITOR_RPC_TIMEOUT) .fetch_statistics(args, result))) { - need_rollback = true; - LOG_WARN("send rpc to query statistics failed", K(ret)); - ret = OB_SUCCESS; + // whether leader receive the rpc is unknown + need_send_refresh_all_ = true; + LOG_WARN("send rpc to query statistics failed, need send refresh all", K(ret)); } else if (result.get_status() == MONITOR_READY) { for (int i = 0; OB_SUCC(ret) && i < result.addr_target_array_.count(); i++) { ObAddr &server = result.addr_target_array_.at(i).addr_; @@ -279,30 +280,12 @@ int ObPxTenantTargetMonitor::query_statistics(ObAddr &leader) } } else if (result.get_status() == MONITOR_NOT_MASTER) { refresh_dummy_location(); - need_rollback = true; - LOG_INFO("report to not master", K(tenant_id_), K(leader), K(version_)); + need_send_refresh_all_ = true; + LOG_INFO("report to not master, need send refresh all", K(tenant_id_), K(leader), K(version_)); } else if (result.get_status() == MONITOR_VERSION_NOT_MATCH) { uint64_t leader_version = result.get_version(); LOG_INFO("monitor version not match", K(tenant_id_), K(leader_version), K(version_)); - if (OB_FAIL(reset_statistics(leader_version))) { - LOG_WARN("reset statistics failed", K(ret)); - } - } - - if (need_rollback) { - refresh_dummy_location(); - for (int i = 0; OB_SUCC(ret) && i < args.addr_target_array_.count(); i++) { - ObAddr &server = args.addr_target_array_.at(i).addr_; - int64_t report_target_usage = args.addr_target_array_.at(i).target_; - if (OB_FAIL(rollback_local_report_target_used(server, report_target_usage))) { - LOG_WARN("rollback local report target_used failed", K(ret), K(server), K(report_target_usage)); - } - } - } - - // A simple and rude exception handling, re-statistics - if (OB_FAIL(ret)) { - if (OB_FAIL(reset_statistics(0))) { + if (OB_FAIL(reset_follower_statistics(leader_version))) { LOG_WARN("reset statistics failed", K(ret)); } } @@ -382,7 +365,7 @@ int ObPxTenantTargetMonitor::get_global_target_usage(const hash::ObHashMap lock_guard(spin_lock_); @@ -392,6 +375,21 @@ int ObPxTenantTargetMonitor::reset_statistics(uint64_t version) } else { version_ = version; } + LOG_INFO("reset follower statistics", K(tenant_id_), K(ret), K(version)); + return ret; +} + +int ObPxTenantTargetMonitor::reset_leader_statistics() +{ + int ret = OB_SUCCESS; + ObLockGuard lock_guard(spin_lock_); + global_target_usage_.clear(); + if (OB_FAIL(global_target_usage_.set_refactored(server_, ServerTargetUsage()))) { + LOG_WARN("set refactored failed", K(ret)); + } else { + version_ = get_new_version(); + } + LOG_INFO("reset leader statistics", K(tenant_id_), K(ret), K(version_), K(GCTX.server_id_)); return ret; } @@ -544,6 +542,18 @@ int ObPxTenantTargetMonitor::get_all_target_info(common::ObIArray> SERVER_ID_SHIFT); +} + int ObPxTargetCond::wait(const int64_t wait_time_us) { int ret = OB_SUCCESS; diff --git a/src/sql/engine/px/ob_px_tenant_target_monitor.h b/src/sql/engine/px/ob_px_tenant_target_monitor.h index c9619b3636..87284eb9ec 100644 --- a/src/sql/engine/px/ob_px_tenant_target_monitor.h +++ b/src/sql/engine/px/ob_px_tenant_target_monitor.h @@ -130,7 +130,12 @@ public: int update_peer_target_used(const ObAddr &server, int64_t peer_used); int rollback_local_report_target_used(const ObAddr &server, int64_t local_report); int get_global_target_usage(const hash::ObHashMap *&global_target_usage); - int reset_statistics(uint64_t version); + // if role is follower and find that its version is different with leader's + // call this function to reset statistics, the param version is from the leader. + int reset_follower_statistics(uint64_t version); + // if role is leader and wants to start a new round of statistics, call this function. + // A new version is generated in this function and will be sync to all followers later. + int reset_leader_statistics(); // for px_admission int apply_target(hash::ObHashMap &worker_map, @@ -140,6 +145,7 @@ public: // for virtual_table iter int get_all_target_info(common::ObIArray &target_info_array); + static uint64_t get_server_id(uint64_t version); TO_STRING_KV(K_(is_init), K_(tenant_id), K_(server), K_(dummy_cache_leader), K_(role)); @@ -149,8 +155,10 @@ private: int get_role(ObRole &role); int refresh_dummy_location(); int query_statistics(ObAddr &leader); + uint64_t get_new_version(); private: + static const int64_t SERVER_ID_SHIFT = 48; bool is_init_; uint64_t tenant_id_; ObAddr server_; @@ -166,6 +174,7 @@ private: int64_t parallel_session_count_; ObPxTargetCond target_cond_; bool print_debug_log_; + bool need_send_refresh_all_; }; }