fix obtest and on_server_status_change
This commit is contained in:
@ -52,9 +52,7 @@ ObHeartbeatService::ObHeartbeatService()
|
||||
all_servers_info_in_table_(),
|
||||
inactive_zone_list_(),
|
||||
hb_responses_(),
|
||||
need_process_hb_responses_(false),
|
||||
need_update_server_tracer_(false),
|
||||
is_rs_server_info_updated_(false)
|
||||
need_process_hb_responses_(false)
|
||||
{
|
||||
}
|
||||
ObHeartbeatService::~ObHeartbeatService()
|
||||
@ -101,8 +99,6 @@ int ObHeartbeatService::init()
|
||||
inactive_zone_list_.set_attr(attr);
|
||||
hb_responses_.set_attr(attr);
|
||||
set_epoch_id_(palf::INVALID_PROPOSAL_ID);
|
||||
need_update_server_tracer_ = false;
|
||||
is_rs_server_info_updated_ = false;
|
||||
is_inited_ = true;
|
||||
HBS_LOG_INFO("ObHeartbeatService is inited");
|
||||
}
|
||||
@ -140,8 +136,6 @@ void ObHeartbeatService::destroy()
|
||||
is_inited_ = false;
|
||||
sql_proxy_ = NULL;
|
||||
srv_rpc_proxy_ = NULL;
|
||||
need_update_server_tracer_ = false;
|
||||
is_rs_server_info_updated_ = false;
|
||||
set_epoch_id_(palf::INVALID_PROPOSAL_ID);
|
||||
all_servers_hb_info_.destroy();
|
||||
HBS_LOG_INFO("ObHeartbeatService is destroyed");
|
||||
@ -389,23 +383,6 @@ int ObHeartbeatService::manage_heartbeat_()
|
||||
ret = OB_SUCC(ret) ? tmp_ret : ret;
|
||||
LOG_WARN("fail to prepare heartbeat response", KR(ret), KR(tmp_ret));
|
||||
}
|
||||
if (need_update_server_tracer_) {
|
||||
if (OB_TMP_FAIL(SVR_TRACER.refresh())) {
|
||||
LOG_WARN("fail to refresh server tracer", KR(ret), KR(tmp_ret));
|
||||
} else {
|
||||
need_update_server_tracer_ = false;
|
||||
}
|
||||
}
|
||||
if (is_rs_server_info_updated_) {
|
||||
if (OB_ISNULL(GCTX.root_service_)) {
|
||||
tmp_ret = OB_ERR_UNEXPECTED;
|
||||
LOG_WARN("GCTX.root_service_ is null", KR(ret), KR(tmp_ret), KP(GCTX.root_service_));
|
||||
} else if (OB_TMP_FAIL(GCTX.root_service_->get_status_change_cb().on_server_status_change(GCTX.self_addr()))) {
|
||||
LOG_WARN("fail to execute on_server_status_change", KR(ret), KR(tmp_ret), K(GCTX.self_addr()));
|
||||
} else {
|
||||
is_rs_server_info_updated_ = false;
|
||||
}
|
||||
}
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
@ -632,10 +609,6 @@ int ObHeartbeatService::check_server_without_hb_response_(
|
||||
K(server_hb_info));
|
||||
} else if ((now - server_hb_info.get_last_hb_time() > GCONF.lease_time
|
||||
&& 0 == server_info_in_table.get_last_offline_time())) {
|
||||
need_update_server_tracer_ = true;
|
||||
if (GCTX.self_addr() == server_info_in_table.get_server()) {
|
||||
is_rs_server_info_updated_ = true;
|
||||
}
|
||||
if (OB_FAIL(update_table_for_online_to_offline_server_(
|
||||
server_info_in_table,
|
||||
now,
|
||||
@ -690,21 +663,46 @@ int ObHeartbeatService::update_table_for_online_to_offline_server_(
|
||||
K(server_info_in_table), K(now));
|
||||
}
|
||||
}
|
||||
if (OB_UNLIKELY(!trans.is_started())) {
|
||||
int tmp_ret = OB_SUCCESS;
|
||||
if (OB_TMP_FAIL(end_trans_and_refresh_server_(server_info_in_table.get_server(),
|
||||
OB_SUCC(ret), trans))) {
|
||||
LOG_WARN("failed to end trans", KR(ret), K(tmp_ret), K(server_info_in_table));
|
||||
ret = OB_SUCC(ret) ? tmp_ret : ret;
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObHeartbeatService::end_trans_and_refresh_server_(
|
||||
const ObAddr &server,
|
||||
const bool commit,
|
||||
common::ObMySQLTransaction &trans)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
if (OB_UNLIKELY(!server.is_valid())) {
|
||||
ret = OB_INVALID_ARGUMENT;
|
||||
LOG_WARN("server is invalid", KR(ret), K(server));
|
||||
} else if (!trans.is_started()) {
|
||||
LOG_WARN("the transaction is not started");
|
||||
} else {
|
||||
int tmp_ret = OB_SUCCESS;
|
||||
if (OB_TMP_FAIL(trans.end(OB_SUCC(ret)))) {
|
||||
HBS_LOG_WARN("fail to commit the transaction", KR(ret), KR(tmp_ret),
|
||||
K(server_info_in_table.get_server()));
|
||||
ret = OB_SUCC(ret) ? tmp_ret : ret;
|
||||
if (OB_FAIL(trans.end(commit))) {
|
||||
HBS_LOG_WARN("fail to commit the transaction", KR(ret),
|
||||
K(server), K(commit));
|
||||
}
|
||||
if (OB_FAIL(ret)) {
|
||||
LOG_WARN("fail to update __all_server table", KR(ret));
|
||||
//ignore error of refresh and on server_status_change
|
||||
if (OB_TMP_FAIL(SVR_TRACER.refresh())) {
|
||||
LOG_WARN("fail to refresh server tracer", KR(ret), KR(tmp_ret));
|
||||
}
|
||||
if (OB_ISNULL(GCTX.root_service_)) {
|
||||
tmp_ret = OB_ERR_UNEXPECTED;
|
||||
LOG_WARN("GCTX.root_service_ is null", KR(ret), KR(tmp_ret), KP(GCTX.root_service_));
|
||||
} else if (OB_TMP_FAIL(GCTX.root_service_->get_status_change_cb().on_server_status_change(server))) {
|
||||
LOG_WARN("fail to execute on_server_status_change", KR(ret), KR(tmp_ret), K(server));
|
||||
}
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObHeartbeatService::init_server_hb_info_(
|
||||
const int64_t now,
|
||||
const share::ObServerInfoInTable &server_info_in_table,
|
||||
@ -775,10 +773,6 @@ int ObHeartbeatService::check_server_with_hb_response_(
|
||||
|| 0 != server_info_in_table.get_last_offline_time()
|
||||
|| server_info_in_table.get_build_version() != hb_response.get_build_version()
|
||||
|| server_info_in_table.get_start_service_time() != hb_response.get_start_service_time()) {
|
||||
need_update_server_tracer_ = true;
|
||||
if (GCTX.self_addr() == server_info_in_table.get_server()) {
|
||||
is_rs_server_info_updated_ = true;
|
||||
}
|
||||
if (OB_FAIL(update_table_for_server_with_hb_response_(
|
||||
hb_response,
|
||||
server_info_in_table,
|
||||
@ -1064,19 +1058,12 @@ int ObHeartbeatService::update_table_for_server_with_hb_response_(
|
||||
}
|
||||
}
|
||||
}
|
||||
if (OB_UNLIKELY(!trans.is_started())) {
|
||||
LOG_WARN("the transaction is not started");
|
||||
} else {
|
||||
int tmp_ret = OB_SUCCESS;
|
||||
if (OB_TMP_FAIL(trans.end(OB_SUCC(ret)))) {
|
||||
LOG_WARN("fail to commit the transaction", KR(ret), KR(tmp_ret),
|
||||
K(server_info_in_table), K(hb_response));
|
||||
if (OB_TMP_FAIL(end_trans_and_refresh_server_(server_info_in_table.get_server(),
|
||||
OB_SUCC(ret), trans))) {
|
||||
LOG_WARN("failed to end trans", KR(ret), K(tmp_ret), K(server_info_in_table));
|
||||
ret = OB_SUCC(ret) ? tmp_ret : ret;
|
||||
}
|
||||
if (OB_FAIL(ret)) {
|
||||
LOG_WARN("fail to update __all_server table", KR(ret));
|
||||
}
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
template <typename T>
|
||||
|
||||
@ -24,6 +24,10 @@
|
||||
#include "observer/ob_heartbeat_handler.h" // ObServerHealthStatus
|
||||
namespace oceanbase
|
||||
{
|
||||
namespace common
|
||||
{
|
||||
class ObMySQLTransaction;
|
||||
};
|
||||
namespace rootserver
|
||||
{
|
||||
class ObHeartbeatService : public ObTenantThreadHelper,
|
||||
@ -122,6 +126,10 @@ private:
|
||||
const ObIArray<T> &array,
|
||||
const common::ObAddr &server,
|
||||
int64_t &idx);
|
||||
int end_trans_and_refresh_server_(
|
||||
const ObAddr &server,
|
||||
const bool commit,
|
||||
common::ObMySQLTransaction &trans);
|
||||
bool is_inited_;
|
||||
common::ObMySQLProxy *sql_proxy_;
|
||||
obrpc::ObSrvRpcProxy *srv_rpc_proxy_;
|
||||
@ -137,8 +145,6 @@ private:
|
||||
common::ObArray<common::ObZone> inactive_zone_list_;
|
||||
ObHBResponseArray hb_responses_; // send_heartbeat() write it and manage_heartbeat() read it
|
||||
bool need_process_hb_responses_; // true if send rpc, and will be reset if responses are processed
|
||||
bool need_update_server_tracer_;
|
||||
bool is_rs_server_info_updated_;
|
||||
static bool is_service_enabled_;
|
||||
private:
|
||||
DISALLOW_COPY_AND_ASSIGN(ObHeartbeatService);
|
||||
|
||||
@ -1385,7 +1385,7 @@ int ObRootService::submit_update_all_server_task(const ObAddr &server)
|
||||
}
|
||||
|
||||
// FIXME: @wanhong.wwh: If self is RS and self status change, submit_update_rslist_task
|
||||
if (OB_SUCC(ret) && server == self_addr_) {
|
||||
if (OB_SUCC(ret)) {
|
||||
if (!in_service()) {
|
||||
LOG_INFO("self is not RS, need not submit update rslist task in update_all_server_task",
|
||||
K(server));
|
||||
|
||||
Reference in New Issue
Block a user