|
|
|
@ -18,6 +18,7 @@
|
|
|
|
#include "share/schema/ob_schema_utils.h"
|
|
|
|
#include "share/schema/ob_schema_utils.h"
|
|
|
|
#include "storage/tx/ob_trans_service.h"
|
|
|
|
#include "storage/tx/ob_trans_service.h"
|
|
|
|
#include "logservice/ob_log_service.h"
|
|
|
|
#include "logservice/ob_log_service.h"
|
|
|
|
|
|
|
|
#include "lib/utility/ob_tracepoint.h"
|
|
|
|
|
|
|
|
|
|
|
|
namespace oceanbase
|
|
|
|
namespace oceanbase
|
|
|
|
{
|
|
|
|
{
|
|
|
|
@ -73,6 +74,7 @@ void ObPxTenantTargetMonitor::reset()
|
|
|
|
global_target_usage_.clear();
|
|
|
|
global_target_usage_.clear();
|
|
|
|
version_ = UINT64_MAX;
|
|
|
|
version_ = UINT64_MAX;
|
|
|
|
parallel_session_count_ = 0;
|
|
|
|
parallel_session_count_ = 0;
|
|
|
|
|
|
|
|
print_debug_log_ = false;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
void ObPxTenantTargetMonitor::set_parallel_servers_target(int64_t parallel_servers_target)
|
|
|
|
void ObPxTenantTargetMonitor::set_parallel_servers_target(int64_t parallel_servers_target)
|
|
|
|
@ -97,6 +99,7 @@ int ObPxTenantTargetMonitor::refresh_statistics(bool need_refresh_all)
|
|
|
|
if (OB_FAIL(get_dummy_leader(leader))) {
|
|
|
|
if (OB_FAIL(get_dummy_leader(leader))) {
|
|
|
|
LOG_WARN("get dummy leader fail", K(ret));
|
|
|
|
LOG_WARN("get dummy leader fail", K(ret));
|
|
|
|
} else if (server_ != leader) {
|
|
|
|
} else if (server_ != leader) {
|
|
|
|
|
|
|
|
LOG_INFO("follower refresh statistics", K(tenant_id_), K(server_), K(leader), K(role_), K(version_));
|
|
|
|
// 单机情况下,不走全局排队
|
|
|
|
// 单机情况下,不走全局排队
|
|
|
|
if (role_ == LEADER) {
|
|
|
|
if (role_ == LEADER) {
|
|
|
|
role_ = FOLLOWER;
|
|
|
|
role_ = FOLLOWER;
|
|
|
|
@ -110,15 +113,19 @@ int ObPxTenantTargetMonitor::refresh_statistics(bool need_refresh_all)
|
|
|
|
}
|
|
|
|
}
|
|
|
|
} else {
|
|
|
|
} else {
|
|
|
|
// 只有leader能进,可以无主,但不能多主
|
|
|
|
// 只有leader能进,可以无主,但不能多主
|
|
|
|
|
|
|
|
LOG_INFO("leader refresh statistics", K(tenant_id_), K(server_), K(leader), K(role_), K(need_refresh_all), K(version_));
|
|
|
|
if (role_ == FOLLOWER || need_refresh_all) {
|
|
|
|
if (role_ == FOLLOWER || need_refresh_all) {
|
|
|
|
role_ = LEADER;
|
|
|
|
role_ = LEADER;
|
|
|
|
// from follower to leader or observer is not longer alive, refresh all the statistics
|
|
|
|
// from follower to leader or observer is not longer alive, refresh all the statistics
|
|
|
|
if (OB_FAIL(reset_statistics(version_ + 1))) {
|
|
|
|
if (OB_FAIL(reset_statistics(version_ + 1))) {
|
|
|
|
LOG_WARN("reset statisitcs failed", K(ret));
|
|
|
|
LOG_WARN("reset statisitcs failed", K(ret));
|
|
|
|
}
|
|
|
|
}
|
|
|
|
LOG_INFO("refresh global_target_usage_", K(version_), K(server_));
|
|
|
|
LOG_INFO("refresh global_target_usage_", K(tenant_id_), K(version_), K(server_));
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
if (!print_debug_log_ && OB_SUCCESS != OB_E(EventTable::EN_PX_PRINT_TARGET_MONITOR_LOG) OB_SUCCESS) {
|
|
|
|
|
|
|
|
print_debug_log_ = true;
|
|
|
|
|
|
|
|
}
|
|
|
|
return ret;
|
|
|
|
return ret;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
@ -214,13 +221,13 @@ int ObPxTenantTargetMonitor::refresh_dummy_location()
|
|
|
|
} else if (OB_FAIL(location_adapter->nonblock_renew(cluster_id_, tenant_id_, SYS_LS))) {
|
|
|
|
} else if (OB_FAIL(location_adapter->nonblock_renew(cluster_id_, tenant_id_, SYS_LS))) {
|
|
|
|
LOG_WARN("nonblock renew failed", K(ret));
|
|
|
|
LOG_WARN("nonblock renew failed", K(ret));
|
|
|
|
} else {
|
|
|
|
} else {
|
|
|
|
LOG_INFO("refresh locatition cache for target_monitor", K(refresh_ctrl));
|
|
|
|
LOG_INFO("refresh locatition cache for target_monitor", K(tenant_id_), K(refresh_ctrl));
|
|
|
|
}
|
|
|
|
}
|
|
|
|
} else {
|
|
|
|
} else {
|
|
|
|
LOG_WARN("switch to tenant failed", K(tenant_id_));
|
|
|
|
LOG_WARN("switch to tenant failed", K(tenant_id_));
|
|
|
|
}
|
|
|
|
}
|
|
|
|
} else {
|
|
|
|
} else {
|
|
|
|
LOG_INFO("waiting for refresh locatition cache for target_monitor", K(refresh_ctrl));
|
|
|
|
LOG_INFO("waiting for refresh locatition cache for target_monitor", K(tenant_id_), K(refresh_ctrl));
|
|
|
|
}
|
|
|
|
}
|
|
|
|
return ret;
|
|
|
|
return ret;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
@ -243,6 +250,7 @@ int ObPxTenantTargetMonitor::query_statistics(ObAddr &leader)
|
|
|
|
if (OB_FAIL(args.push_local_target_usage(entry.first, entry.second.get_local_used() - entry.second.get_report_used()))) {
|
|
|
|
if (OB_FAIL(args.push_local_target_usage(entry.first, entry.second.get_local_used() - entry.second.get_report_used()))) {
|
|
|
|
LOG_WARN("push server and target_usage failed", K(ret));
|
|
|
|
LOG_WARN("push server and target_usage failed", K(ret));
|
|
|
|
} else {
|
|
|
|
} else {
|
|
|
|
|
|
|
|
LOG_INFO("report statistics to leader", K(tenant_id_), K(leader), K(version_), K(entry.first), K(entry.second));
|
|
|
|
entry.second.set_report_used(entry.second.get_local_used());
|
|
|
|
entry.second.set_report_used(entry.second.get_local_used());
|
|
|
|
}
|
|
|
|
}
|
|
|
|
return ret;
|
|
|
|
return ret;
|
|
|
|
@ -271,8 +279,11 @@ int ObPxTenantTargetMonitor::query_statistics(ObAddr &leader)
|
|
|
|
}
|
|
|
|
}
|
|
|
|
} else if (result.get_status() == MONITOR_NOT_MASTER) {
|
|
|
|
} else if (result.get_status() == MONITOR_NOT_MASTER) {
|
|
|
|
refresh_dummy_location();
|
|
|
|
refresh_dummy_location();
|
|
|
|
|
|
|
|
need_rollback = true;
|
|
|
|
|
|
|
|
LOG_INFO("report to not master", K(tenant_id_), K(leader), K(version_));
|
|
|
|
} else if (result.get_status() == MONITOR_VERSION_NOT_MATCH) {
|
|
|
|
} else if (result.get_status() == MONITOR_VERSION_NOT_MATCH) {
|
|
|
|
uint64_t leader_version = result.get_version();
|
|
|
|
uint64_t leader_version = result.get_version();
|
|
|
|
|
|
|
|
LOG_INFO("monitor version not match", K(tenant_id_), K(leader_version), K(version_));
|
|
|
|
if (OB_FAIL(reset_statistics(leader_version))) {
|
|
|
|
if (OB_FAIL(reset_statistics(leader_version))) {
|
|
|
|
LOG_WARN("reset statistics failed", K(ret));
|
|
|
|
LOG_WARN("reset statistics failed", K(ret));
|
|
|
|
}
|
|
|
|
}
|
|
|
|
@ -315,16 +326,22 @@ int ObPxTenantTargetMonitor::update_peer_target_used(const ObAddr &server, int64
|
|
|
|
{
|
|
|
|
{
|
|
|
|
int ret = OB_SUCCESS;
|
|
|
|
int ret = OB_SUCCESS;
|
|
|
|
ServerTargetUsage target_usage;
|
|
|
|
ServerTargetUsage target_usage;
|
|
|
|
|
|
|
|
if (print_debug_log_) {
|
|
|
|
|
|
|
|
LOG_INFO("update_peer_target_used", K(tenant_id_), K(is_leader()), K(version_), K(server), K(peer_used));
|
|
|
|
|
|
|
|
}
|
|
|
|
auto update_peer_used = [=](hash::HashMapPair<ObAddr, ServerTargetUsage> &entry) -> void {
|
|
|
|
auto update_peer_used = [=](hash::HashMapPair<ObAddr, ServerTargetUsage> &entry) -> void {
|
|
|
|
if (is_leader()) {
|
|
|
|
if (is_leader()) {
|
|
|
|
entry.second.update_peer_used(peer_used);
|
|
|
|
entry.second.update_peer_used(peer_used);
|
|
|
|
|
|
|
|
if (OB_UNLIKELY(entry.second.get_peer_used() < 0)) {
|
|
|
|
|
|
|
|
LOG_ERROR("peer used negative", K(tenant_id_), K(version_), K(server), K(peer_used));
|
|
|
|
|
|
|
|
}
|
|
|
|
} else {
|
|
|
|
} else {
|
|
|
|
entry.second.set_peer_used(peer_used);
|
|
|
|
entry.second.set_peer_used(peer_used);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
};
|
|
|
|
};
|
|
|
|
if (OB_FAIL(global_target_usage_.get_refactored(server, target_usage))) {
|
|
|
|
if (OB_FAIL(global_target_usage_.get_refactored(server, target_usage))) {
|
|
|
|
|
|
|
|
LOG_WARN("get refactored failed", K(ret), K(tenant_id_), K(server), K(version_));
|
|
|
|
if (ret != OB_HASH_NOT_EXIST) {
|
|
|
|
if (ret != OB_HASH_NOT_EXIST) {
|
|
|
|
LOG_WARN("get refactored failed", K(ret));
|
|
|
|
|
|
|
|
} else {
|
|
|
|
} else {
|
|
|
|
ObLockGuard<ObSpinLock> lock_guard(spin_lock_);
|
|
|
|
ObLockGuard<ObSpinLock> lock_guard(spin_lock_);
|
|
|
|
if (OB_FAIL(global_target_usage_.get_refactored(server, target_usage))) {
|
|
|
|
if (OB_FAIL(global_target_usage_.get_refactored(server, target_usage))) {
|
|
|
|
@ -440,8 +457,8 @@ int ObPxTenantTargetMonitor::apply_target(hash::ObHashMap<ObAddr, int64_t> &work
|
|
|
|
};
|
|
|
|
};
|
|
|
|
if (OB_FAIL(global_target_usage_.atomic_refactored(server, apply_local_target))) {
|
|
|
|
if (OB_FAIL(global_target_usage_.atomic_refactored(server, apply_local_target))) {
|
|
|
|
LOG_WARN("atomic refactored, update_peer_used failed", K(ret));
|
|
|
|
LOG_WARN("atomic refactored, update_peer_used failed", K(ret));
|
|
|
|
} else {
|
|
|
|
} else if (print_debug_log_) {
|
|
|
|
LOG_TRACE("apply target success", K(server), K(acquired_cnt),
|
|
|
|
LOG_INFO("apply target success", K(tenant_id_), K(server), K(acquired_cnt), K(version), K(version_),
|
|
|
|
K(parallel_servers_target_));
|
|
|
|
K(parallel_servers_target_));
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
@ -486,14 +503,14 @@ int ObPxTenantTargetMonitor::release_target(hash::ObHashMap<ObAddr, int64_t> &wo
|
|
|
|
};
|
|
|
|
};
|
|
|
|
if (OB_FAIL(global_target_usage_.atomic_refactored(server, release_local_target))) {
|
|
|
|
if (OB_FAIL(global_target_usage_.atomic_refactored(server, release_local_target))) {
|
|
|
|
LOG_WARN("atomic refactored, update_peer_used failed", K(ret));
|
|
|
|
LOG_WARN("atomic refactored, update_peer_used failed", K(ret));
|
|
|
|
} else {
|
|
|
|
} else if (print_debug_log_) {
|
|
|
|
LOG_TRACE("release target success", K(server), K(acquired_cnt),
|
|
|
|
LOG_INFO("release target success", K(tenant_id_), K(server), K(acquired_cnt), K(version_),
|
|
|
|
K(parallel_servers_target_));
|
|
|
|
K(parallel_servers_target_));
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
target_cond_.notifyAll();
|
|
|
|
target_cond_.notifyAll();
|
|
|
|
} else {
|
|
|
|
} else {
|
|
|
|
LOG_INFO("version changed", K(version_), K(version));
|
|
|
|
LOG_INFO("version changed", K(tenant_id_), K(version_), K(version));
|
|
|
|
}
|
|
|
|
}
|
|
|
|
parallel_session_count_--;
|
|
|
|
parallel_session_count_--;
|
|
|
|
return ret;
|
|
|
|
return ret;
|
|
|
|
|