diff --git a/src/rootserver/ob_heartbeat_service.cpp b/src/rootserver/ob_heartbeat_service.cpp index 39e9877646..3053e38cca 100644 --- a/src/rootserver/ob_heartbeat_service.cpp +++ b/src/rootserver/ob_heartbeat_service.cpp @@ -52,9 +52,7 @@ ObHeartbeatService::ObHeartbeatService() all_servers_info_in_table_(), inactive_zone_list_(), hb_responses_(), - need_process_hb_responses_(false), - need_update_server_tracer_(false), - is_rs_server_info_updated_(false) + need_process_hb_responses_(false) { } ObHeartbeatService::~ObHeartbeatService() @@ -101,8 +99,6 @@ int ObHeartbeatService::init() inactive_zone_list_.set_attr(attr); hb_responses_.set_attr(attr); set_epoch_id_(palf::INVALID_PROPOSAL_ID); - need_update_server_tracer_ = false; - is_rs_server_info_updated_ = false; is_inited_ = true; HBS_LOG_INFO("ObHeartbeatService is inited"); } @@ -140,8 +136,6 @@ void ObHeartbeatService::destroy() is_inited_ = false; sql_proxy_ = NULL; srv_rpc_proxy_ = NULL; - need_update_server_tracer_ = false; - is_rs_server_info_updated_ = false; set_epoch_id_(palf::INVALID_PROPOSAL_ID); all_servers_hb_info_.destroy(); HBS_LOG_INFO("ObHeartbeatService is destroyed"); @@ -389,23 +383,6 @@ int ObHeartbeatService::manage_heartbeat_() ret = OB_SUCC(ret) ? tmp_ret : ret; LOG_WARN("fail to prepare heartbeat response", KR(ret), KR(tmp_ret)); } - if (need_update_server_tracer_) { - if (OB_TMP_FAIL(SVR_TRACER.refresh())) { - LOG_WARN("fail to refresh server tracer", KR(ret), KR(tmp_ret)); - } else { - need_update_server_tracer_ = false; - } - } - if (is_rs_server_info_updated_) { - if (OB_ISNULL(GCTX.root_service_)) { - tmp_ret = OB_ERR_UNEXPECTED; - LOG_WARN("GCTX.root_service_ is null", KR(ret), KR(tmp_ret), KP(GCTX.root_service_)); - } else if (OB_TMP_FAIL(GCTX.root_service_->get_status_change_cb().on_server_status_change(GCTX.self_addr()))) { - LOG_WARN("fail to execute on_server_status_change", KR(ret), KR(tmp_ret), K(GCTX.self_addr())); - } else { - is_rs_server_info_updated_ = false; - } - } } return ret; } @@ -632,10 +609,6 @@ int ObHeartbeatService::check_server_without_hb_response_( K(server_hb_info)); } else if ((now - server_hb_info.get_last_hb_time() > GCONF.lease_time && 0 == server_info_in_table.get_last_offline_time())) { - need_update_server_tracer_ = true; - if (GCTX.self_addr() == server_info_in_table.get_server()) { - is_rs_server_info_updated_ = true; - } if (OB_FAIL(update_table_for_online_to_offline_server_( server_info_in_table, now, @@ -690,21 +663,46 @@ int ObHeartbeatService::update_table_for_online_to_offline_server_( K(server_info_in_table), K(now)); } } - if (OB_UNLIKELY(!trans.is_started())) { + int tmp_ret = OB_SUCCESS; + if (OB_TMP_FAIL(end_trans_and_refresh_server_(server_info_in_table.get_server(), + OB_SUCC(ret), trans))) { + LOG_WARN("failed to end trans", KR(ret), K(tmp_ret), K(server_info_in_table)); + ret = OB_SUCC(ret) ? tmp_ret : ret; + } + return ret; +} + +int ObHeartbeatService::end_trans_and_refresh_server_( + const ObAddr &server, + const bool commit, + common::ObMySQLTransaction &trans) +{ + int ret = OB_SUCCESS; + if (OB_UNLIKELY(!server.is_valid())) { + ret = OB_INVALID_ARGUMENT; + LOG_WARN("server is invalid", KR(ret), K(server)); + } else if (!trans.is_started()) { LOG_WARN("the transaction is not started"); } else { int tmp_ret = OB_SUCCESS; - if (OB_TMP_FAIL(trans.end(OB_SUCC(ret)))) { - HBS_LOG_WARN("fail to commit the transaction", KR(ret), KR(tmp_ret), - K(server_info_in_table.get_server())); - ret = OB_SUCC(ret) ? tmp_ret : ret; + if (OB_FAIL(trans.end(commit))) { + HBS_LOG_WARN("fail to commit the transaction", KR(ret), + K(server), K(commit)); } - if (OB_FAIL(ret)) { - LOG_WARN("fail to update __all_server table", KR(ret)); + //ignore error of refresh and on server_status_change + if (OB_TMP_FAIL(SVR_TRACER.refresh())) { + LOG_WARN("fail to refresh server tracer", KR(ret), KR(tmp_ret)); + } + if (OB_ISNULL(GCTX.root_service_)) { + tmp_ret = OB_ERR_UNEXPECTED; + LOG_WARN("GCTX.root_service_ is null", KR(ret), KR(tmp_ret), KP(GCTX.root_service_)); + } else if (OB_TMP_FAIL(GCTX.root_service_->get_status_change_cb().on_server_status_change(server))) { + LOG_WARN("fail to execute on_server_status_change", KR(ret), KR(tmp_ret), K(server)); } } return ret; } + int ObHeartbeatService::init_server_hb_info_( const int64_t now, const share::ObServerInfoInTable &server_info_in_table, @@ -775,10 +773,6 @@ int ObHeartbeatService::check_server_with_hb_response_( || 0 != server_info_in_table.get_last_offline_time() || server_info_in_table.get_build_version() != hb_response.get_build_version() || server_info_in_table.get_start_service_time() != hb_response.get_start_service_time()) { - need_update_server_tracer_ = true; - if (GCTX.self_addr() == server_info_in_table.get_server()) { - is_rs_server_info_updated_ = true; - } if (OB_FAIL(update_table_for_server_with_hb_response_( hb_response, server_info_in_table, @@ -1064,18 +1058,11 @@ int ObHeartbeatService::update_table_for_server_with_hb_response_( } } } - if (OB_UNLIKELY(!trans.is_started())) { - LOG_WARN("the transaction is not started"); - } else { - int tmp_ret = OB_SUCCESS; - if (OB_TMP_FAIL(trans.end(OB_SUCC(ret)))) { - LOG_WARN("fail to commit the transaction", KR(ret), KR(tmp_ret), - K(server_info_in_table), K(hb_response)); - ret = OB_SUCC(ret) ? tmp_ret : ret; - } - if (OB_FAIL(ret)) { - LOG_WARN("fail to update __all_server table", KR(ret)); - } + int tmp_ret = OB_SUCCESS; + if (OB_TMP_FAIL(end_trans_and_refresh_server_(server_info_in_table.get_server(), + OB_SUCC(ret), trans))) { + LOG_WARN("failed to end trans", KR(ret), K(tmp_ret), K(server_info_in_table)); + ret = OB_SUCC(ret) ? tmp_ret : ret; } return ret; } @@ -1097,4 +1084,4 @@ bool ObHeartbeatService::has_server_exist_in_array_( return bret; } } -} \ No newline at end of file +} diff --git a/src/rootserver/ob_heartbeat_service.h b/src/rootserver/ob_heartbeat_service.h index 1fccc36db0..581568c3b3 100644 --- a/src/rootserver/ob_heartbeat_service.h +++ b/src/rootserver/ob_heartbeat_service.h @@ -24,6 +24,10 @@ #include "observer/ob_heartbeat_handler.h" // ObServerHealthStatus namespace oceanbase { +namespace common +{ +class ObMySQLTransaction; +}; namespace rootserver { class ObHeartbeatService : public ObTenantThreadHelper, @@ -122,6 +126,10 @@ private: const ObIArray &array, const common::ObAddr &server, int64_t &idx); + int end_trans_and_refresh_server_( + const ObAddr &server, + const bool commit, + common::ObMySQLTransaction &trans); bool is_inited_; common::ObMySQLProxy *sql_proxy_; obrpc::ObSrvRpcProxy *srv_rpc_proxy_; @@ -137,12 +145,10 @@ private: common::ObArray inactive_zone_list_; ObHBResponseArray hb_responses_; // send_heartbeat() write it and manage_heartbeat() read it bool need_process_hb_responses_; // true if send rpc, and will be reset if responses are processed - bool need_update_server_tracer_; - bool is_rs_server_info_updated_; static bool is_service_enabled_; private: DISALLOW_COPY_AND_ASSIGN(ObHeartbeatService); }; // end class ObHeartbeatService } // end namespace rootserver } // end namespace oceanbase -#endif \ No newline at end of file +#endif diff --git a/src/rootserver/ob_root_service.cpp b/src/rootserver/ob_root_service.cpp index f763267751..99b8235ff9 100644 --- a/src/rootserver/ob_root_service.cpp +++ b/src/rootserver/ob_root_service.cpp @@ -1385,7 +1385,7 @@ int ObRootService::submit_update_all_server_task(const ObAddr &server) } // FIXME: @wanhong.wwh: If self is RS and self status change, submit_update_rslist_task - if (OB_SUCC(ret) && server == self_addr_) { + if (OB_SUCC(ret)) { if (!in_service()) { LOG_INFO("self is not RS, need not submit update rslist task in update_all_server_task", K(server));