diff --git a/deps/easy/src/io/easy_negotiation.c b/deps/easy/src/io/easy_negotiation.c index e8052fb9c..9b43a41c7 100644 --- a/deps/easy/src/io/easy_negotiation.c +++ b/deps/easy/src/io/easy_negotiation.c @@ -19,9 +19,9 @@ const uint64_t g_support_eio_maigc[] = { 0x12567348667799aa, 0x1237734866785431, - 0x5933893228167181, - 0x6683221dd298cc23, -}; + 0x5933893228167181, + 0x6683221dd298cc23, +}; const int g_support_eio_maigc_num = sizeof(g_support_eio_maigc) / sizeof(g_support_eio_maigc[0]); @@ -209,9 +209,9 @@ int net_send_negotiate_message(uint8_t negotiation_enable, int fd, uint64_t magi ret = easy_encode_negotiation_msg(&ne_msg, buf, MAX_SEND_LEN, &encode_len); if (ret != EASY_OK) { easy_error_log("easy encode negotiation msg failed!ret:%d, fd:%d, addr: %s", ret, fd, addr_str); - return ret; - } - + return ret; + } + if ((ret = easy_socket_error(fd) ) != 0) { easy_info_log("retry! socket status is abnormal yet, fd:%d, addr:%s, err:%d.", fd, addr_str, ret); if (conn_has_error) *conn_has_error = 1; @@ -231,9 +231,9 @@ int net_send_negotiate_message(uint8_t negotiation_enable, int fd, uint64_t magi } else { easy_info_log("negotiation not enabled!(addr:%s)", addr_str); } - - return ret; -} + + return ret; +} void net_consume_negotiation_msg(int fd, uint64_t magic) { diff --git a/src/observer/ob_lease_state_mgr.cpp b/src/observer/ob_lease_state_mgr.cpp index dc55fa771..0a3c13288 100644 --- a/src/observer/ob_lease_state_mgr.cpp +++ b/src/observer/ob_lease_state_mgr.cpp @@ -55,7 +55,7 @@ void ObRefreshSchemaStatusTimerTask::runTimerTask() ObLeaseStateMgr::ObLeaseStateMgr() : inited_(false), stopped_(false), lease_response_(), lease_expire_time_(0), hb_timer_(), cluster_info_timer_(), merge_timer_(), rs_mgr_(NULL), rpc_proxy_(NULL), heartbeat_process_(NULL), - hb_(), renew_timeout_(RENEW_TIMEOUT), ob_service_(NULL), avg_calculator_(), + hb_(), renew_timeout_(RENEW_TIMEOUT), ob_service_(NULL), baseline_schema_version_(0), heartbeat_expire_time_(0) { } @@ -104,8 +104,6 @@ int ObLeaseStateMgr::init( LOG_WARN("cluster_info_timer_ init failed", KR(ret)); } else if (OB_FAIL(merge_timer_.init("MergeTimer"))) { LOG_WARN("merge_timer_ init failed", KR(ret)); - } else if (OB_FAIL(avg_calculator_.init(STORE_RTT_NUM))) { - LOG_WARN("avg calculator init fail", K(ret)); } else { rs_mgr_ = rs_mgr; rpc_proxy_ = rpc_proxy; @@ -314,7 +312,6 @@ int ObLeaseStateMgr::do_renew_lease() int ret = OB_SUCCESS; ObLeaseRequest lease_request; ObLeaseResponse lease_response; - double avg_round_trip_time = 0; ObAddr rs_addr; NG_TRACE(do_renew_lease_begin); if (!inited_) { @@ -324,16 +321,10 @@ int ObLeaseStateMgr::do_renew_lease() LOG_WARN("init lease request failed", K(ret)); } else if (OB_FAIL(rs_mgr_->get_master_root_server(rs_addr))) { LOG_WARN("get master root service failed", K(ret)); - } else if (OB_FAIL(avg_calculator_.get_avg(avg_round_trip_time))) { - LOG_WARN("get avg round_trip_time fail", K(ret)); } else { NG_TRACE(send_heartbeat_begin); - lease_request.round_trip_time_ = static_cast(avg_round_trip_time); - const int64_t begin = ObTimeUtility::current_time(); - lease_request.current_server_time_ = begin; ret = rpc_proxy_->to(rs_addr).timeout(renew_timeout_) .renew_lease(lease_request, lease_response); - const int64_t end = ObTimeUtility::current_time(); if (lease_response.lease_expire_time_ > 0) { // for compatible with old version lease_response.heartbeat_expire_time_ = lease_response.lease_expire_time_; @@ -361,8 +352,6 @@ int ObLeaseStateMgr::do_renew_lease() LOG_DEBUG("renew_lease from master_rs successfully", K(rs_addr)); if (OB_FAIL(set_lease_response(lease_response))) { LOG_WARN("fail to set lease response", K(ret)); - } else if (OB_FAIL(avg_calculator_.calc_avg(end - begin))) { - LOG_WARN("calculate avg round_trip_time fail", K(ret), "current_rtt", end - begin); } else if (OB_FAIL(heartbeat_process_->do_heartbeat_event(lease_response_))) { LOG_WARN("fail to process new lease info", K_(lease_response), K(ret)); } @@ -412,71 +401,5 @@ void ObLeaseStateMgr::HeartBeat::runTimerTask() } } -ObLeaseStateMgr::AvgCalculator::AvgCalculator() - : calc_buffer_(), inited_(false), - limit_num_(0), head_(0), avg_(0) -{ -} - -ObLeaseStateMgr::AvgCalculator::~AvgCalculator() -{ - calc_buffer_.destroy(); -} - -int ObLeaseStateMgr::AvgCalculator::init(int64_t limit_num) -{ - int ret = OB_SUCCESS; - if (inited_) { - ret = OB_INIT_TWICE; - LOG_WARN("init twice", K(ret)); - } else { - limit_num_ = limit_num; - inited_ = true; - head_ = 0; - avg_ = 0; - } - return ret; -} - -int ObLeaseStateMgr::AvgCalculator::get_avg(double &avg) -{ - int ret = OB_SUCCESS; - if (!inited_) { - ret = OB_NOT_INIT; - LOG_WARN("not init", K(ret)); - } else { - avg = avg_; - } - return ret; -} - -int ObLeaseStateMgr::AvgCalculator::calc_avg(int64_t new_value) -{ - int ret = OB_SUCCESS; - if (!inited_) { - ret = OB_NOT_INIT; - LOG_WARN("not init", K(ret)); - } else if (0 == limit_num_) { - ret = OB_ERR_UNEXPECTED; - LOG_WARN("limit num should not be zero", K(ret)); - } else if (calc_buffer_.count() > limit_num_) { - ret = OB_ERR_UNEXPECTED; - LOG_WARN("array size is out of limit", K(ret), K(limit_num_), "array_size", calc_buffer_.count()); - } else if (calc_buffer_.count() < limit_num_) { - if (OB_FAIL(calc_buffer_.push_back(new_value))) { - LOG_WARN("array push_back fail", K(ret)); - } else { - double num = static_cast(calc_buffer_.count()); - avg_ = (num - 1) / num * avg_ + static_cast(new_value) / num; - } - } else if (calc_buffer_.count() == limit_num_) { - int64_t old_value = calc_buffer_[head_]; - calc_buffer_.at(head_) = new_value; - avg_ += (static_cast(new_value - old_value) / static_cast(limit_num_)); - head_ = (head_ + 1) % limit_num_; - } - return ret; -} - }//end namespace observer }//end namespace oceanbase diff --git a/src/observer/ob_lease_state_mgr.h b/src/observer/ob_lease_state_mgr.h index 0f8398aa2..d00c86769 100644 --- a/src/observer/ob_lease_state_mgr.h +++ b/src/observer/ob_lease_state_mgr.h @@ -57,7 +57,6 @@ class ObLeaseStateMgr { public: friend class HeartBeat; - friend class AvgCalculator; ObLeaseStateMgr(); ~ObLeaseStateMgr(); @@ -73,9 +72,7 @@ public: inline void set_stop() { stopped_ = true; } inline bool is_inited() const { return inited_; } inline bool is_valid_heartbeat() const; - inline bool is_valid_lease() const; inline int64_t get_heartbeat_expire_time(); - inline int64_t get_lease_expire_time(); inline int set_lease_response(const share::ObLeaseResponse &lease_response); private: int try_report_sys_ls(); @@ -93,31 +90,10 @@ private: private: DISALLOW_COPY_AND_ASSIGN(HeartBeat); }; - - static const int64_t STORE_RTT_NUM = 10; static const int64_t DELAY_TIME = 2 * 1000 * 1000;//2s static const int64_t RENEW_TIMEOUT = 2 * 1000 * 1000; //2s static const int64_t REGISTER_TIME_SLEEP = 2 * 1000 * 1000; //5s - // maintain fixed-length buffer and calculate avarage value - class AvgCalculator - { - public: - AvgCalculator(); - ~AvgCalculator(); - - int init(int64_t limit_num); - int calc_avg(int64_t num); - int get_avg(double &avg); - private: - common::ObSEArray calc_buffer_; - bool inited_; - int64_t limit_num_; - int64_t head_; - double avg_; - private: - DISALLOW_COPY_AND_ASSIGN(AvgCalculator); - }; private: int start_heartbeat(); int do_renew_lease(); @@ -137,7 +113,6 @@ private: HeartBeat hb_; int64_t renew_timeout_; ObService *ob_service_; - AvgCalculator avg_calculator_; int64_t baseline_schema_version_; volatile int64_t heartbeat_expire_time_ CACHE_ALIGNED; private: @@ -149,11 +124,6 @@ bool ObLeaseStateMgr::is_valid_heartbeat() const return heartbeat_expire_time_ > common::ObTimeUtility::current_time(); } -bool ObLeaseStateMgr::is_valid_lease() const -{ - return lease_expire_time_ > common::ObTimeUtility::current_time(); -} - int ObLeaseStateMgr::set_lease_response(const share::ObLeaseResponse &lease_response) { if (lease_expire_time_ < lease_response.lease_expire_time_) { @@ -167,11 +137,6 @@ int64_t ObLeaseStateMgr::get_heartbeat_expire_time() { return heartbeat_expire_time_; } - -int64_t ObLeaseStateMgr::get_lease_expire_time() -{ - return lease_expire_time_; -} }//end namespace observer }//end namespace oceanbase #endif //OCEANBASE_OBSERVER_OB_LEASE_STATE_MGR_H_ diff --git a/src/observer/ob_service.cpp b/src/observer/ob_service.cpp index 32d003d88..fb510f879 100644 --- a/src/observer/ob_service.cpp +++ b/src/observer/ob_service.cpp @@ -1747,29 +1747,6 @@ int ObService::get_server_heartbeat_expire_time(int64_t &lease_expire_time) return ret; } -bool ObService::is_heartbeat_expired() const -{ - bool bret = false; // returns false on error - if (OB_UNLIKELY(!inited_)) { - LOG_WARN_RET(OB_NOT_INIT, "not init"); - } else { - bret = !lease_state_mgr_.is_valid_heartbeat(); - } - return bret; -} - -bool ObService::is_svr_lease_valid() const -{ - // Determine if local lease is valid in OFS mode - bool bret = false; - if (OB_UNLIKELY(!inited_)) { - LOG_WARN_RET(OB_NOT_INIT, "not init"); - } else { - bret = lease_state_mgr_.is_valid_lease(); - } - return bret; -} - int ObService::set_tracepoint(const obrpc::ObAdminSetTPArg &arg) { int ret = OB_SUCCESS; diff --git a/src/observer/ob_service.h b/src/observer/ob_service.h index 79ff699a6..e20d0ee2e 100644 --- a/src/observer/ob_service.h +++ b/src/observer/ob_service.h @@ -210,8 +210,6 @@ public: int set_tracepoint(const obrpc::ObAdminSetTPArg &arg); // for ObPartitionService::check_mc_allowed_by_server_lease int get_server_heartbeat_expire_time(int64_t &lease_expire_time); - bool is_heartbeat_expired() const; - bool is_svr_lease_valid() const; int cancel_sys_task(const share::ObTaskId &task_id); int refresh_memory_stat(); int wash_memory_fragmentation(); diff --git a/src/rootserver/ob_root_service.cpp b/src/rootserver/ob_root_service.cpp index 121fc3292..affe18031 100644 --- a/src/rootserver/ob_root_service.cpp +++ b/src/rootserver/ob_root_service.cpp @@ -1837,13 +1837,12 @@ int ObRootService::request_heartbeats() lease_request.reset(); int temp_ret = OB_SUCCESS; bool to_alive = false; - bool update_delay_time_flag = false; if (OB_SUCCESS != (temp_ret = rpc_proxy_.to(status->server_).timeout(rpc_timeout) .request_heartbeat(lease_request))) { LOG_WARN("request_heartbeat failed", "server", status->server_, K(rpc_timeout), K(temp_ret)); } else if (OB_SUCCESS != (temp_ret = server_manager_.receive_hb( - lease_request, server_id, to_alive, update_delay_time_flag))) { + lease_request, server_id, to_alive))) { LOG_WARN("receive hb failed", K(lease_request), K(temp_ret)); } ret = (OB_SUCCESS != ret) ? ret : temp_ret; @@ -2202,7 +2201,6 @@ int ObRootService::renew_lease(const ObLeaseRequest &lease_request, ObServerStatus server_stat; uint64_t server_id = OB_INVALID_ID; bool to_alive = false; - bool update_delay_time_flag = true; DEBUG_SYNC(HANG_HEART_BEAT_ON_RS); if (!inited_) { ret = OB_NOT_INIT; @@ -2210,7 +2208,7 @@ int ObRootService::renew_lease(const ObLeaseRequest &lease_request, } else if (!lease_request.is_valid()) { ret = OB_INVALID_ARGUMENT; LOG_WARN("invalid lease_request", K(lease_request), K(ret)); - } else if (OB_FAIL(server_manager_.receive_hb(lease_request, server_id, to_alive, update_delay_time_flag))) { + } else if (OB_FAIL(server_manager_.receive_hb(lease_request, server_id, to_alive))) { LOG_WARN("server manager receive hb failed", K(lease_request), K(ret)); } else if (OB_ISNULL(schema_service_)) { ret = OB_ERR_UNEXPECTED; diff --git a/src/rootserver/ob_server_manager.cpp b/src/rootserver/ob_server_manager.cpp index 3ce1b9508..50206739d 100644 --- a/src/rootserver/ob_server_manager.cpp +++ b/src/rootserver/ob_server_manager.cpp @@ -822,8 +822,7 @@ int ObServerManager::try_modify_recovery_server_takenover_by_rs( int ObServerManager::receive_hb( const ObLeaseRequest &lease_request, uint64_t &server_id, - bool &to_alive, - bool &update_delay_time_flag) + bool &to_alive) { int ret = OB_SUCCESS; bool zone_exist = true; @@ -994,17 +993,6 @@ int ObServerManager::receive_hb( } } else { status_ptr->last_hb_time_ = now; - if (update_delay_time_flag) { - const int64_t current_rs_time = ObTimeUtility::current_time(); - int64_t server_behind_time = current_rs_time - (lease_request.current_server_time_ + lease_request.round_trip_time_ / 2); - if (std::abs(server_behind_time) > GCONF.rpc_timeout) { - LOG_WARN("clock between rs and server not sync", "ret", OB_ERR_UNEXPECTED, K(lease_request), K(current_rs_time)); - } - if (OB_FAIL(set_server_delay_time(server_behind_time, lease_request.round_trip_time_, *status_ptr))) { - LOG_WARN("set server delay time failed", K(ret), K(server_behind_time), - "round_trip_time", lease_request.round_trip_time_); - } - } } if (OB_SUCC(ret)) { if (ObServerStatus::OB_HEARTBEAT_ALIVE == status_ptr->hb_status_ @@ -1017,24 +1005,6 @@ int ObServerManager::receive_hb( return ret; } -int ObServerManager::set_server_delay_time(const int64_t server_behind_time, - const int64_t round_trip_time, - ObServerStatus &server_status) -{ - int ret = OB_SUCCESS; - if (!inited_) { - ret = OB_NOT_INIT; - LOG_WARN("not init", K(ret)); - } else if (round_trip_time < 0 ) { - ret = OB_INVALID_ARGUMENT; - LOG_WARN("round_trip_time is invalid", K(ret), K(round_trip_time)); - } else { - server_status.last_server_behind_time_ = server_behind_time; - server_status.last_round_trip_time_ = round_trip_time; - } - return ret; -} - int ObServerManager::process_report_status_change(const share::ObLeaseRequest &lease_request, share::ObServerStatus &server_status) { diff --git a/src/rootserver/ob_server_manager.h b/src/rootserver/ob_server_manager.h index 3e022b775..7dfa549a6 100644 --- a/src/rootserver/ob_server_manager.h +++ b/src/rootserver/ob_server_manager.h @@ -89,8 +89,7 @@ public: // server_id is OB_INVALID_ID before build server manager from __all_server int receive_hb(const share::ObLeaseRequest &lease_request, uint64_t &server_id, - bool &to_alive, - bool &update_delay_time_flag); + bool &to_alive); int get_server_id( const common::ObAddr &server, uint64_t &server_id) const; @@ -201,9 +200,6 @@ protected: const int64_t hb_timestamp, const bool with_rootserver, share::ObServerStatus &server_status); - int set_server_delay_time(const int64_t server_behind_time, - const int64_t round_trip_time, - share::ObServerStatus &server_status); int reset_existing_rootserver(); int try_delete_server_working_dir( const common::ObZone &zone, diff --git a/src/share/ob_lease_struct.cpp b/src/share/ob_lease_struct.cpp index b170345fc..bfb88d267 100644 --- a/src/share/ob_lease_struct.cpp +++ b/src/share/ob_lease_struct.cpp @@ -190,7 +190,7 @@ bool ObLeaseRequest::is_valid() const // No need to determine the value of server_status_ return version_ > 0 && !zone_.is_empty() && server_.is_valid() && sql_port_ > 0 && resource_info_.is_valid() - && start_service_time_ >= 0 && current_server_time_ >= 0 && round_trip_time_ >= 0; + && start_service_time_ >= 0; } OB_SERIALIZE_MEMBER(ObLeaseRequest, diff --git a/src/share/ob_server_status.cpp b/src/share/ob_server_status.cpp index d3f03b89e..5c0f590ef 100644 --- a/src/share/ob_server_status.cpp +++ b/src/share/ob_server_status.cpp @@ -84,7 +84,7 @@ bool ObServerStatus::is_valid() const { return OB_INVALID_ID != id_ && server_.is_valid() && is_status_valid() && register_time_ >= 0 && last_hb_time_ >= 0 && block_migrate_in_time_ >= 0 - && stop_time_ >= 0 && start_service_time_ >= 0 && last_round_trip_time_ >= 0; + && stop_time_ >= 0 && start_service_time_ >= 0; } static const char *g_server_display_status_str[] = { diff --git a/src/storage/compaction/ob_tenant_freeze_info_mgr.cpp b/src/storage/compaction/ob_tenant_freeze_info_mgr.cpp index 608f48e61..708e20770 100644 --- a/src/storage/compaction/ob_tenant_freeze_info_mgr.cpp +++ b/src/storage/compaction/ob_tenant_freeze_info_mgr.cpp @@ -26,7 +26,6 @@ #include "share/schema/ob_tenant_schema_service.h" #include "share/backup/ob_backup_info_mgr.h" #include "observer/ob_server_struct.h" -#include "observer/ob_service.h" #include "share/schema/ob_tenant_schema_service.h" #include "share/system_variable/ob_system_variable_alias.h" #include "share/ob_zone_merge_info.h" @@ -913,7 +912,6 @@ int ObTenantFreezeInfoMgr::ReloadTask::try_update_info() ObSEArray freeze_info; ObSEArray snapshots; bool gc_snapshot_ts_changed = false; - observer::ObService *ob_service = GCTX.ob_service_; int64_t min_major_snapshot = INT64_MAX; if (OB_FAIL(get_global_info(snapshot_gc_ts))) { @@ -935,7 +933,7 @@ int ObTenantFreezeInfoMgr::ReloadTask::try_update_info() STORAGE_LOG(WARN, "update info failed", K(ret), K(snapshot_gc_ts), K(freeze_info), K(snapshots)); } else { - if (gc_snapshot_ts_changed || ob_service->is_heartbeat_expired()) { + if (gc_snapshot_ts_changed) { last_change_ts_ = ObTimeUtility::current_time(); } else { const int64_t last_not_change_interval_us = ObTimeUtility::current_time() - last_change_ts_;