bug fix: remove round_trip_time related code and some unused code
This commit is contained in:
parent
378b220933
commit
2485b4b1be
18
deps/easy/src/io/easy_negotiation.c
vendored
18
deps/easy/src/io/easy_negotiation.c
vendored
@ -19,9 +19,9 @@
|
||||
const uint64_t g_support_eio_maigc[] = {
|
||||
0x12567348667799aa,
|
||||
0x1237734866785431,
|
||||
0x5933893228167181,
|
||||
0x6683221dd298cc23,
|
||||
};
|
||||
0x5933893228167181,
|
||||
0x6683221dd298cc23,
|
||||
};
|
||||
|
||||
const int g_support_eio_maigc_num = sizeof(g_support_eio_maigc) / sizeof(g_support_eio_maigc[0]);
|
||||
|
||||
@ -209,9 +209,9 @@ int net_send_negotiate_message(uint8_t negotiation_enable, int fd, uint64_t magi
|
||||
ret = easy_encode_negotiation_msg(&ne_msg, buf, MAX_SEND_LEN, &encode_len);
|
||||
if (ret != EASY_OK) {
|
||||
easy_error_log("easy encode negotiation msg failed!ret:%d, fd:%d, addr: %s", ret, fd, addr_str);
|
||||
return ret;
|
||||
}
|
||||
|
||||
return ret;
|
||||
}
|
||||
|
||||
if ((ret = easy_socket_error(fd) ) != 0) {
|
||||
easy_info_log("retry! socket status is abnormal yet, fd:%d, addr:%s, err:%d.", fd, addr_str, ret);
|
||||
if (conn_has_error) *conn_has_error = 1;
|
||||
@ -231,9 +231,9 @@ int net_send_negotiate_message(uint8_t negotiation_enable, int fd, uint64_t magi
|
||||
} else {
|
||||
easy_info_log("negotiation not enabled!(addr:%s)", addr_str);
|
||||
}
|
||||
|
||||
return ret;
|
||||
}
|
||||
|
||||
return ret;
|
||||
}
|
||||
|
||||
void net_consume_negotiation_msg(int fd, uint64_t magic)
|
||||
{
|
||||
|
@ -55,7 +55,7 @@ void ObRefreshSchemaStatusTimerTask::runTimerTask()
|
||||
ObLeaseStateMgr::ObLeaseStateMgr()
|
||||
: inited_(false), stopped_(false), lease_response_(), lease_expire_time_(0),
|
||||
hb_timer_(), cluster_info_timer_(), merge_timer_(), rs_mgr_(NULL), rpc_proxy_(NULL), heartbeat_process_(NULL),
|
||||
hb_(), renew_timeout_(RENEW_TIMEOUT), ob_service_(NULL), avg_calculator_(),
|
||||
hb_(), renew_timeout_(RENEW_TIMEOUT), ob_service_(NULL),
|
||||
baseline_schema_version_(0), heartbeat_expire_time_(0)
|
||||
{
|
||||
}
|
||||
@ -104,8 +104,6 @@ int ObLeaseStateMgr::init(
|
||||
LOG_WARN("cluster_info_timer_ init failed", KR(ret));
|
||||
} else if (OB_FAIL(merge_timer_.init("MergeTimer"))) {
|
||||
LOG_WARN("merge_timer_ init failed", KR(ret));
|
||||
} else if (OB_FAIL(avg_calculator_.init(STORE_RTT_NUM))) {
|
||||
LOG_WARN("avg calculator init fail", K(ret));
|
||||
} else {
|
||||
rs_mgr_ = rs_mgr;
|
||||
rpc_proxy_ = rpc_proxy;
|
||||
@ -314,7 +312,6 @@ int ObLeaseStateMgr::do_renew_lease()
|
||||
int ret = OB_SUCCESS;
|
||||
ObLeaseRequest lease_request;
|
||||
ObLeaseResponse lease_response;
|
||||
double avg_round_trip_time = 0;
|
||||
ObAddr rs_addr;
|
||||
NG_TRACE(do_renew_lease_begin);
|
||||
if (!inited_) {
|
||||
@ -324,16 +321,10 @@ int ObLeaseStateMgr::do_renew_lease()
|
||||
LOG_WARN("init lease request failed", K(ret));
|
||||
} else if (OB_FAIL(rs_mgr_->get_master_root_server(rs_addr))) {
|
||||
LOG_WARN("get master root service failed", K(ret));
|
||||
} else if (OB_FAIL(avg_calculator_.get_avg(avg_round_trip_time))) {
|
||||
LOG_WARN("get avg round_trip_time fail", K(ret));
|
||||
} else {
|
||||
NG_TRACE(send_heartbeat_begin);
|
||||
lease_request.round_trip_time_ = static_cast<int64_t>(avg_round_trip_time);
|
||||
const int64_t begin = ObTimeUtility::current_time();
|
||||
lease_request.current_server_time_ = begin;
|
||||
ret = rpc_proxy_->to(rs_addr).timeout(renew_timeout_)
|
||||
.renew_lease(lease_request, lease_response);
|
||||
const int64_t end = ObTimeUtility::current_time();
|
||||
if (lease_response.lease_expire_time_ > 0) {
|
||||
// for compatible with old version
|
||||
lease_response.heartbeat_expire_time_ = lease_response.lease_expire_time_;
|
||||
@ -361,8 +352,6 @@ int ObLeaseStateMgr::do_renew_lease()
|
||||
LOG_DEBUG("renew_lease from master_rs successfully", K(rs_addr));
|
||||
if (OB_FAIL(set_lease_response(lease_response))) {
|
||||
LOG_WARN("fail to set lease response", K(ret));
|
||||
} else if (OB_FAIL(avg_calculator_.calc_avg(end - begin))) {
|
||||
LOG_WARN("calculate avg round_trip_time fail", K(ret), "current_rtt", end - begin);
|
||||
} else if (OB_FAIL(heartbeat_process_->do_heartbeat_event(lease_response_))) {
|
||||
LOG_WARN("fail to process new lease info", K_(lease_response), K(ret));
|
||||
}
|
||||
@ -412,71 +401,5 @@ void ObLeaseStateMgr::HeartBeat::runTimerTask()
|
||||
}
|
||||
}
|
||||
|
||||
ObLeaseStateMgr::AvgCalculator::AvgCalculator()
|
||||
: calc_buffer_(), inited_(false),
|
||||
limit_num_(0), head_(0), avg_(0)
|
||||
{
|
||||
}
|
||||
|
||||
ObLeaseStateMgr::AvgCalculator::~AvgCalculator()
|
||||
{
|
||||
calc_buffer_.destroy();
|
||||
}
|
||||
|
||||
int ObLeaseStateMgr::AvgCalculator::init(int64_t limit_num)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
if (inited_) {
|
||||
ret = OB_INIT_TWICE;
|
||||
LOG_WARN("init twice", K(ret));
|
||||
} else {
|
||||
limit_num_ = limit_num;
|
||||
inited_ = true;
|
||||
head_ = 0;
|
||||
avg_ = 0;
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObLeaseStateMgr::AvgCalculator::get_avg(double &avg)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
if (!inited_) {
|
||||
ret = OB_NOT_INIT;
|
||||
LOG_WARN("not init", K(ret));
|
||||
} else {
|
||||
avg = avg_;
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObLeaseStateMgr::AvgCalculator::calc_avg(int64_t new_value)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
if (!inited_) {
|
||||
ret = OB_NOT_INIT;
|
||||
LOG_WARN("not init", K(ret));
|
||||
} else if (0 == limit_num_) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_WARN("limit num should not be zero", K(ret));
|
||||
} else if (calc_buffer_.count() > limit_num_) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_WARN("array size is out of limit", K(ret), K(limit_num_), "array_size", calc_buffer_.count());
|
||||
} else if (calc_buffer_.count() < limit_num_) {
|
||||
if (OB_FAIL(calc_buffer_.push_back(new_value))) {
|
||||
LOG_WARN("array push_back fail", K(ret));
|
||||
} else {
|
||||
double num = static_cast<double>(calc_buffer_.count());
|
||||
avg_ = (num - 1) / num * avg_ + static_cast<double>(new_value) / num;
|
||||
}
|
||||
} else if (calc_buffer_.count() == limit_num_) {
|
||||
int64_t old_value = calc_buffer_[head_];
|
||||
calc_buffer_.at(head_) = new_value;
|
||||
avg_ += (static_cast<double>(new_value - old_value) / static_cast<double>(limit_num_));
|
||||
head_ = (head_ + 1) % limit_num_;
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
}//end namespace observer
|
||||
}//end namespace oceanbase
|
||||
|
@ -57,7 +57,6 @@ class ObLeaseStateMgr
|
||||
{
|
||||
public:
|
||||
friend class HeartBeat;
|
||||
friend class AvgCalculator;
|
||||
ObLeaseStateMgr();
|
||||
~ObLeaseStateMgr();
|
||||
|
||||
@ -73,9 +72,7 @@ public:
|
||||
inline void set_stop() { stopped_ = true; }
|
||||
inline bool is_inited() const { return inited_; }
|
||||
inline bool is_valid_heartbeat() const;
|
||||
inline bool is_valid_lease() const;
|
||||
inline int64_t get_heartbeat_expire_time();
|
||||
inline int64_t get_lease_expire_time();
|
||||
inline int set_lease_response(const share::ObLeaseResponse &lease_response);
|
||||
private:
|
||||
int try_report_sys_ls();
|
||||
@ -93,31 +90,10 @@ private:
|
||||
private:
|
||||
DISALLOW_COPY_AND_ASSIGN(HeartBeat);
|
||||
};
|
||||
|
||||
static const int64_t STORE_RTT_NUM = 10;
|
||||
static const int64_t DELAY_TIME = 2 * 1000 * 1000;//2s
|
||||
static const int64_t RENEW_TIMEOUT = 2 * 1000 * 1000; //2s
|
||||
static const int64_t REGISTER_TIME_SLEEP = 2 * 1000 * 1000; //5s
|
||||
|
||||
// maintain fixed-length buffer and calculate avarage value
|
||||
class AvgCalculator
|
||||
{
|
||||
public:
|
||||
AvgCalculator();
|
||||
~AvgCalculator();
|
||||
|
||||
int init(int64_t limit_num);
|
||||
int calc_avg(int64_t num);
|
||||
int get_avg(double &avg);
|
||||
private:
|
||||
common::ObSEArray<int64_t, STORE_RTT_NUM> calc_buffer_;
|
||||
bool inited_;
|
||||
int64_t limit_num_;
|
||||
int64_t head_;
|
||||
double avg_;
|
||||
private:
|
||||
DISALLOW_COPY_AND_ASSIGN(AvgCalculator);
|
||||
};
|
||||
private:
|
||||
int start_heartbeat();
|
||||
int do_renew_lease();
|
||||
@ -137,7 +113,6 @@ private:
|
||||
HeartBeat hb_;
|
||||
int64_t renew_timeout_;
|
||||
ObService *ob_service_;
|
||||
AvgCalculator avg_calculator_;
|
||||
int64_t baseline_schema_version_;
|
||||
volatile int64_t heartbeat_expire_time_ CACHE_ALIGNED;
|
||||
private:
|
||||
@ -149,11 +124,6 @@ bool ObLeaseStateMgr::is_valid_heartbeat() const
|
||||
return heartbeat_expire_time_ > common::ObTimeUtility::current_time();
|
||||
}
|
||||
|
||||
bool ObLeaseStateMgr::is_valid_lease() const
|
||||
{
|
||||
return lease_expire_time_ > common::ObTimeUtility::current_time();
|
||||
}
|
||||
|
||||
int ObLeaseStateMgr::set_lease_response(const share::ObLeaseResponse &lease_response)
|
||||
{
|
||||
if (lease_expire_time_ < lease_response.lease_expire_time_) {
|
||||
@ -167,11 +137,6 @@ int64_t ObLeaseStateMgr::get_heartbeat_expire_time()
|
||||
{
|
||||
return heartbeat_expire_time_;
|
||||
}
|
||||
|
||||
int64_t ObLeaseStateMgr::get_lease_expire_time()
|
||||
{
|
||||
return lease_expire_time_;
|
||||
}
|
||||
}//end namespace observer
|
||||
}//end namespace oceanbase
|
||||
#endif //OCEANBASE_OBSERVER_OB_LEASE_STATE_MGR_H_
|
||||
|
@ -1747,29 +1747,6 @@ int ObService::get_server_heartbeat_expire_time(int64_t &lease_expire_time)
|
||||
return ret;
|
||||
}
|
||||
|
||||
bool ObService::is_heartbeat_expired() const
|
||||
{
|
||||
bool bret = false; // returns false on error
|
||||
if (OB_UNLIKELY(!inited_)) {
|
||||
LOG_WARN_RET(OB_NOT_INIT, "not init");
|
||||
} else {
|
||||
bret = !lease_state_mgr_.is_valid_heartbeat();
|
||||
}
|
||||
return bret;
|
||||
}
|
||||
|
||||
bool ObService::is_svr_lease_valid() const
|
||||
{
|
||||
// Determine if local lease is valid in OFS mode
|
||||
bool bret = false;
|
||||
if (OB_UNLIKELY(!inited_)) {
|
||||
LOG_WARN_RET(OB_NOT_INIT, "not init");
|
||||
} else {
|
||||
bret = lease_state_mgr_.is_valid_lease();
|
||||
}
|
||||
return bret;
|
||||
}
|
||||
|
||||
int ObService::set_tracepoint(const obrpc::ObAdminSetTPArg &arg)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
|
@ -210,8 +210,6 @@ public:
|
||||
int set_tracepoint(const obrpc::ObAdminSetTPArg &arg);
|
||||
// for ObPartitionService::check_mc_allowed_by_server_lease
|
||||
int get_server_heartbeat_expire_time(int64_t &lease_expire_time);
|
||||
bool is_heartbeat_expired() const;
|
||||
bool is_svr_lease_valid() const;
|
||||
int cancel_sys_task(const share::ObTaskId &task_id);
|
||||
int refresh_memory_stat();
|
||||
int wash_memory_fragmentation();
|
||||
|
@ -1837,13 +1837,12 @@ int ObRootService::request_heartbeats()
|
||||
lease_request.reset();
|
||||
int temp_ret = OB_SUCCESS;
|
||||
bool to_alive = false;
|
||||
bool update_delay_time_flag = false;
|
||||
if (OB_SUCCESS != (temp_ret = rpc_proxy_.to(status->server_).timeout(rpc_timeout)
|
||||
.request_heartbeat(lease_request))) {
|
||||
LOG_WARN("request_heartbeat failed", "server", status->server_,
|
||||
K(rpc_timeout), K(temp_ret));
|
||||
} else if (OB_SUCCESS != (temp_ret = server_manager_.receive_hb(
|
||||
lease_request, server_id, to_alive, update_delay_time_flag))) {
|
||||
lease_request, server_id, to_alive))) {
|
||||
LOG_WARN("receive hb failed", K(lease_request), K(temp_ret));
|
||||
}
|
||||
ret = (OB_SUCCESS != ret) ? ret : temp_ret;
|
||||
@ -2202,7 +2201,6 @@ int ObRootService::renew_lease(const ObLeaseRequest &lease_request,
|
||||
ObServerStatus server_stat;
|
||||
uint64_t server_id = OB_INVALID_ID;
|
||||
bool to_alive = false;
|
||||
bool update_delay_time_flag = true;
|
||||
DEBUG_SYNC(HANG_HEART_BEAT_ON_RS);
|
||||
if (!inited_) {
|
||||
ret = OB_NOT_INIT;
|
||||
@ -2210,7 +2208,7 @@ int ObRootService::renew_lease(const ObLeaseRequest &lease_request,
|
||||
} else if (!lease_request.is_valid()) {
|
||||
ret = OB_INVALID_ARGUMENT;
|
||||
LOG_WARN("invalid lease_request", K(lease_request), K(ret));
|
||||
} else if (OB_FAIL(server_manager_.receive_hb(lease_request, server_id, to_alive, update_delay_time_flag))) {
|
||||
} else if (OB_FAIL(server_manager_.receive_hb(lease_request, server_id, to_alive))) {
|
||||
LOG_WARN("server manager receive hb failed", K(lease_request), K(ret));
|
||||
} else if (OB_ISNULL(schema_service_)) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
|
@ -822,8 +822,7 @@ int ObServerManager::try_modify_recovery_server_takenover_by_rs(
|
||||
int ObServerManager::receive_hb(
|
||||
const ObLeaseRequest &lease_request,
|
||||
uint64_t &server_id,
|
||||
bool &to_alive,
|
||||
bool &update_delay_time_flag)
|
||||
bool &to_alive)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
bool zone_exist = true;
|
||||
@ -994,17 +993,6 @@ int ObServerManager::receive_hb(
|
||||
}
|
||||
} else {
|
||||
status_ptr->last_hb_time_ = now;
|
||||
if (update_delay_time_flag) {
|
||||
const int64_t current_rs_time = ObTimeUtility::current_time();
|
||||
int64_t server_behind_time = current_rs_time - (lease_request.current_server_time_ + lease_request.round_trip_time_ / 2);
|
||||
if (std::abs(server_behind_time) > GCONF.rpc_timeout) {
|
||||
LOG_WARN("clock between rs and server not sync", "ret", OB_ERR_UNEXPECTED, K(lease_request), K(current_rs_time));
|
||||
}
|
||||
if (OB_FAIL(set_server_delay_time(server_behind_time, lease_request.round_trip_time_, *status_ptr))) {
|
||||
LOG_WARN("set server delay time failed", K(ret), K(server_behind_time),
|
||||
"round_trip_time", lease_request.round_trip_time_);
|
||||
}
|
||||
}
|
||||
}
|
||||
if (OB_SUCC(ret)) {
|
||||
if (ObServerStatus::OB_HEARTBEAT_ALIVE == status_ptr->hb_status_
|
||||
@ -1017,24 +1005,6 @@ int ObServerManager::receive_hb(
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObServerManager::set_server_delay_time(const int64_t server_behind_time,
|
||||
const int64_t round_trip_time,
|
||||
ObServerStatus &server_status)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
if (!inited_) {
|
||||
ret = OB_NOT_INIT;
|
||||
LOG_WARN("not init", K(ret));
|
||||
} else if (round_trip_time < 0 ) {
|
||||
ret = OB_INVALID_ARGUMENT;
|
||||
LOG_WARN("round_trip_time is invalid", K(ret), K(round_trip_time));
|
||||
} else {
|
||||
server_status.last_server_behind_time_ = server_behind_time;
|
||||
server_status.last_round_trip_time_ = round_trip_time;
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObServerManager::process_report_status_change(const share::ObLeaseRequest &lease_request,
|
||||
share::ObServerStatus &server_status)
|
||||
{
|
||||
|
@ -89,8 +89,7 @@ public:
|
||||
// server_id is OB_INVALID_ID before build server manager from __all_server
|
||||
int receive_hb(const share::ObLeaseRequest &lease_request,
|
||||
uint64_t &server_id,
|
||||
bool &to_alive,
|
||||
bool &update_delay_time_flag);
|
||||
bool &to_alive);
|
||||
int get_server_id(
|
||||
const common::ObAddr &server,
|
||||
uint64_t &server_id) const;
|
||||
@ -201,9 +200,6 @@ protected:
|
||||
const int64_t hb_timestamp,
|
||||
const bool with_rootserver,
|
||||
share::ObServerStatus &server_status);
|
||||
int set_server_delay_time(const int64_t server_behind_time,
|
||||
const int64_t round_trip_time,
|
||||
share::ObServerStatus &server_status);
|
||||
int reset_existing_rootserver();
|
||||
int try_delete_server_working_dir(
|
||||
const common::ObZone &zone,
|
||||
|
@ -190,7 +190,7 @@ bool ObLeaseRequest::is_valid() const
|
||||
// No need to determine the value of server_status_
|
||||
return version_ > 0 && !zone_.is_empty() && server_.is_valid()
|
||||
&& sql_port_ > 0 && resource_info_.is_valid()
|
||||
&& start_service_time_ >= 0 && current_server_time_ >= 0 && round_trip_time_ >= 0;
|
||||
&& start_service_time_ >= 0;
|
||||
}
|
||||
|
||||
OB_SERIALIZE_MEMBER(ObLeaseRequest,
|
||||
|
@ -84,7 +84,7 @@ bool ObServerStatus::is_valid() const
|
||||
{
|
||||
return OB_INVALID_ID != id_ && server_.is_valid() && is_status_valid()
|
||||
&& register_time_ >= 0 && last_hb_time_ >= 0 && block_migrate_in_time_ >= 0
|
||||
&& stop_time_ >= 0 && start_service_time_ >= 0 && last_round_trip_time_ >= 0;
|
||||
&& stop_time_ >= 0 && start_service_time_ >= 0;
|
||||
}
|
||||
|
||||
static const char *g_server_display_status_str[] = {
|
||||
|
@ -26,7 +26,6 @@
|
||||
#include "share/schema/ob_tenant_schema_service.h"
|
||||
#include "share/backup/ob_backup_info_mgr.h"
|
||||
#include "observer/ob_server_struct.h"
|
||||
#include "observer/ob_service.h"
|
||||
#include "share/schema/ob_tenant_schema_service.h"
|
||||
#include "share/system_variable/ob_system_variable_alias.h"
|
||||
#include "share/ob_zone_merge_info.h"
|
||||
@ -913,7 +912,6 @@ int ObTenantFreezeInfoMgr::ReloadTask::try_update_info()
|
||||
ObSEArray<FreezeInfo, 4> freeze_info;
|
||||
ObSEArray<ObSnapshotInfo, 4> snapshots;
|
||||
bool gc_snapshot_ts_changed = false;
|
||||
observer::ObService *ob_service = GCTX.ob_service_;
|
||||
int64_t min_major_snapshot = INT64_MAX;
|
||||
|
||||
if (OB_FAIL(get_global_info(snapshot_gc_ts))) {
|
||||
@ -935,7 +933,7 @@ int ObTenantFreezeInfoMgr::ReloadTask::try_update_info()
|
||||
STORAGE_LOG(WARN, "update info failed",
|
||||
K(ret), K(snapshot_gc_ts), K(freeze_info), K(snapshots));
|
||||
} else {
|
||||
if (gc_snapshot_ts_changed || ob_service->is_heartbeat_expired()) {
|
||||
if (gc_snapshot_ts_changed) {
|
||||
last_change_ts_ = ObTimeUtility::current_time();
|
||||
} else {
|
||||
const int64_t last_not_change_interval_us = ObTimeUtility::current_time() - last_change_ts_;
|
||||
|
Loading…
x
Reference in New Issue
Block a user