[opt][PALF] optimize ObLogHandler::is_in_sync

This commit is contained in:
BinChenn
2023-07-14 08:42:18 +00:00
committed by ob-robot
parent e31eb905c6
commit 63858018e3
13 changed files with 10 additions and 238 deletions

View File

@ -924,7 +924,7 @@ PCODE_DEF(OB_LOG_LEARNER_REQ, 0x1515)
PCODE_DEF(OB_LOG_COMMITTED_INFO, 0x1516) PCODE_DEF(OB_LOG_COMMITTED_INFO, 0x1516)
PCODE_DEF(OB_LOG_CHANGE_MODE_META_REQ, 0x1517) PCODE_DEF(OB_LOG_CHANGE_MODE_META_REQ, 0x1517)
PCODE_DEF(OB_LOG_CHANGE_MODE_META_RESP, 0x1518) PCODE_DEF(OB_LOG_CHANGE_MODE_META_RESP, 0x1518)
PCODE_DEF(OB_LOG_GET_LEADER_MAX_SCN, 0x1519) PCODE_DEF(OB_LOG_GET_LEADER_MAX_SCN, 0x1519) // deprecated since version 4.2.0.0
PCODE_DEF(OB_LOG_ARB_PROBE_MSG, 0x151A) PCODE_DEF(OB_LOG_ARB_PROBE_MSG, 0x151A)
PCODE_DEF(OB_LOG_CHANGE_ACCESS_MODE_CMD, 0x151B) PCODE_DEF(OB_LOG_CHANGE_ACCESS_MODE_CMD, 0x151B)
PCODE_DEF(OB_LOG_FLASHBACK_CMD, 0x151C) PCODE_DEF(OB_LOG_FLASHBACK_CMD, 0x151C)

View File

@ -840,10 +840,6 @@ int ObLogDeliver::handle_req_(rpc::ObRequest &req)
modify_pkt.set_tenant_id(node_id_); modify_pkt.set_tenant_id(node_id_);
PROCESS(LogFlashbackMsgP) PROCESS(LogFlashbackMsgP)
} }
case obrpc::OB_LOG_GET_LEADER_MAX_SCN: {
modify_pkt.set_tenant_id(node_id_);
PROCESS(LogGetLeaderMaxScnP)
}
case obrpc::OB_LOG_GET_STAT: { case obrpc::OB_LOG_GET_STAT: {
modify_pkt.set_tenant_id(node_id_); modify_pkt.set_tenant_id(node_id_);
PROCESS(LogGetStatP) PROCESS(LogGetStatP)

View File

@ -161,38 +161,6 @@ int LogRequestHandler::handle_sync_request<LogConfigChangeCmd, LogConfigChangeCm
return ret; return ret;
} }
template <>
int LogRequestHandler::handle_sync_request<LogGetLeaderMaxScnReq, LogGetLeaderMaxScnResp>(
const LogGetLeaderMaxScnReq &req,
LogGetLeaderMaxScnResp &resp)
{
int ret = common::OB_SUCCESS;
if (false == req.is_valid()) {
ret = OB_INVALID_ARGUMENT;
CLOG_LOG(ERROR, "Invalid argument!!!", K(ret), K(req));
} else {
palf::PalfHandleGuard palf_handle_guard;
const int64_t palf_id = req.palf_id_;
const common::ObAddr &server = req.src_;
common::ObRole role = FOLLOWER;
int64_t unused_pid;
bool is_pending_state = true;
if (OB_FAIL(get_palf_handle_guard_(palf_id, palf_handle_guard))) {
CLOG_LOG(WARN, "get_palf_handle_guard_ failed", K(ret), K(palf_id));
} else if (OB_FAIL(palf_handle_guard.get_role(role, unused_pid, is_pending_state))) {
CLOG_LOG(WARN, "palf_handle get_role failed", K(ret), K(palf_id), K(server));
} else if ((role != LEADER || true == is_pending_state)) {
ret = OB_NOT_MASTER;
CLOG_LOG(WARN, "i am not leader, failed", K(ret), K(req), K(role), K(is_pending_state));
} else if (OB_FAIL(palf_handle_guard.get_max_scn(resp.max_scn_))) {
CLOG_LOG(WARN, "get_max_scn from palf failed", K(ret), K(palf_id), K(server));
} else {
CLOG_LOG(TRACE, "get_max_scn from palf success", K(ret), K(palf_id), K(server), K(req), K(resp));
}
}
return ret;
}
template <> template <>
int LogRequestHandler::handle_sync_request<LogGetPalfStatReq, LogGetPalfStatResp>( int LogRequestHandler::handle_sync_request<LogGetPalfStatReq, LogGetPalfStatResp>(
const LogGetPalfStatReq &req, const LogGetPalfStatReq &req,

View File

@ -98,12 +98,6 @@ DEFINE_LOGSERVICE_SYNC_RPC_PROCESSOR(LogMembershipChangeP,
LogConfigChangeCmdResp, LogConfigChangeCmdResp,
obrpc::OB_LOG_CONFIG_CHANGE_CMD); obrpc::OB_LOG_CONFIG_CHANGE_CMD);
DEFINE_LOGSERVICE_SYNC_RPC_PROCESSOR(LogGetLeaderMaxScnP,
obrpc::ObLogServiceRpcProxy,
LogGetLeaderMaxScnReq,
LogGetLeaderMaxScnResp,
obrpc::OB_LOG_GET_LEADER_MAX_SCN);
DEFINE_LOGSERVICE_SYNC_RPC_PROCESSOR(LogGetPalfStatReqP, DEFINE_LOGSERVICE_SYNC_RPC_PROCESSOR(LogGetPalfStatReqP,
obrpc::ObLogServiceRpcProxy, obrpc::ObLogServiceRpcProxy,
LogGetPalfStatReq, LogGetPalfStatReq,

View File

@ -27,8 +27,6 @@ public:
DEFINE_TO(ObLogServiceRpcProxy); DEFINE_TO(ObLogServiceRpcProxy);
RPC_S(PR3 send_log_config_change_cmd, OB_LOG_CONFIG_CHANGE_CMD, RPC_S(PR3 send_log_config_change_cmd, OB_LOG_CONFIG_CHANGE_CMD,
(logservice::LogConfigChangeCmd), logservice::LogConfigChangeCmdResp); (logservice::LogConfigChangeCmd), logservice::LogConfigChangeCmdResp);
RPC_S(PR3 get_leader_max_scn, OB_LOG_GET_LEADER_MAX_SCN,
(logservice::LogGetLeaderMaxScnReq), logservice::LogGetLeaderMaxScnResp);
RPC_AP(PR3 send_server_probe_req, OB_LOG_ARB_PROBE_MSG, RPC_AP(PR3 send_server_probe_req, OB_LOG_ARB_PROBE_MSG,
(logservice::LogServerProbeMsg)); (logservice::LogServerProbeMsg));
RPC_AP(PR3 send_change_access_mode_cmd, OB_LOG_CHANGE_ACCESS_MODE_CMD, RPC_AP(PR3 send_change_access_mode_cmd, OB_LOG_CHANGE_ACCESS_MODE_CMD,

View File

@ -184,58 +184,6 @@ void LogConfigChangeCmdResp::reset()
OB_SERIALIZE_MEMBER(LogConfigChangeCmdResp, ret_, lock_owner_, is_locked_); OB_SERIALIZE_MEMBER(LogConfigChangeCmdResp, ret_, lock_owner_, is_locked_);
// ============= LogConfigChangeCmdResp end ============= // ============= LogConfigChangeCmdResp end =============
// ============= LogGetLeaderMaxScnReq begin ===========
LogGetLeaderMaxScnReq::LogGetLeaderMaxScnReq(const common::ObAddr &src,
const int64_t palf_id)
: src_(src), palf_id_(palf_id)
{
}
LogGetLeaderMaxScnReq::~LogGetLeaderMaxScnReq()
{
reset();
}
bool LogGetLeaderMaxScnReq::is_valid() const
{
return src_.is_valid() && palf::is_valid_palf_id(palf_id_);
}
void LogGetLeaderMaxScnReq::reset()
{
src_.reset();
palf_id_ = palf::INVALID_PALF_ID;
}
OB_SERIALIZE_MEMBER(LogGetLeaderMaxScnReq, src_, palf_id_);
// ============= LogGetLeaderMaxTsNSReq end =============
// ============= LogGetLeaderMaxTsNSResp begin =============
LogGetLeaderMaxScnResp::LogGetLeaderMaxScnResp(const share::SCN &max_scn)
{
max_scn_ = max_scn;
}
LogGetLeaderMaxScnResp::~LogGetLeaderMaxScnResp()
{
reset();
}
bool LogGetLeaderMaxScnResp::is_valid() const
{
return max_scn_.is_valid();
}
void LogGetLeaderMaxScnResp::reset()
{
max_scn_.reset();
}
OB_SERIALIZE_MEMBER(LogGetLeaderMaxScnResp, max_scn_);
// ============= LogGetLeaderMaxTsNSResp end ===============
//
// ============= LogGetPalfStatReq begin =========== // ============= LogGetPalfStatReq begin ===========
LogGetPalfStatReq::LogGetPalfStatReq( LogGetPalfStatReq::LogGetPalfStatReq(
const common::ObAddr &src_addr, const common::ObAddr &src_addr,

View File

@ -123,32 +123,6 @@ public:
bool is_locked_; bool is_locked_;
}; };
struct LogGetLeaderMaxScnReq {
OB_UNIS_VERSION(1);
public:
LogGetLeaderMaxScnReq(): src_(), palf_id_(palf::INVALID_PALF_ID) { }
LogGetLeaderMaxScnReq(const common::ObAddr &src,
const int64_t palf_id);
~LogGetLeaderMaxScnReq();
bool is_valid() const;
void reset();
TO_STRING_KV(K_(src), K_(palf_id));
common::ObAddr src_;
int64_t palf_id_;
};
struct LogGetLeaderMaxScnResp {
OB_UNIS_VERSION(1);
public:
LogGetLeaderMaxScnResp() : max_scn_() { }
LogGetLeaderMaxScnResp(const share::SCN &max_scn);
~LogGetLeaderMaxScnResp();
bool is_valid() const;
void reset();
TO_STRING_KV(K_(max_scn));
share::SCN max_scn_;
};
struct LogGetPalfStatReq { struct LogGetPalfStatReq {
OB_UNIS_VERSION(1); OB_UNIS_VERSION(1);
public: public:

View File

@ -46,9 +46,6 @@ ObLogHandler::ObLogHandler() : self_(),
lc_cb_(NULL), lc_cb_(NULL),
rpc_proxy_(NULL), rpc_proxy_(NULL),
append_cost_stat_("[PALF STAT APPEND COST]", 1 * 1000 * 1000), append_cost_stat_("[PALF STAT APPEND COST]", 1 * 1000 * 1000),
cached_is_log_sync_(false),
last_check_sync_ts_(OB_INVALID_TIMESTAMP),
last_renew_loc_ts_(OB_INVALID_TIMESTAMP),
is_offline_(false), is_offline_(false),
get_max_decided_scn_debug_time_(OB_INVALID_TIMESTAMP) get_max_decided_scn_debug_time_(OB_INVALID_TIMESTAMP)
{ {
@ -501,78 +498,20 @@ int ObLogHandler::is_in_sync(bool &is_log_sync,
int ret = OB_SUCCESS; int ret = OB_SUCCESS;
is_log_sync = false; is_log_sync = false;
is_need_rebuild = false; is_need_rebuild = false;
LSN end_lsn; palf::PalfStat palf_stat;
LSN last_rebuild_lsn;
RLockGuard guard(lock_); RLockGuard guard(lock_);
if (IS_NOT_INIT) { if (IS_NOT_INIT) {
ret = OB_NOT_INIT; ret = OB_NOT_INIT;
} else if (is_in_stop_state_) { } else if (is_in_stop_state_) {
ret = OB_NOT_RUNNING; ret = OB_NOT_RUNNING;
} else if (OB_FAIL(palf_handle_.get_end_lsn(end_lsn)) || !end_lsn.is_valid()) { } else if (OB_FAIL(palf_handle_.stat(palf_stat))) {
CLOG_LOG(WARN, "get_end_lsn failed", K(ret), K_(id), K(end_lsn)); CLOG_LOG(WARN, "palf stat failed", K(ret), K_(id));
} else if (OB_FAIL(palf_handle_.get_last_rebuild_lsn(last_rebuild_lsn))) {
CLOG_LOG(WARN, "get_last_rebuild_lsn failed", K(ret), K_(id));
} else if (last_rebuild_lsn.is_valid() && end_lsn < last_rebuild_lsn) {
is_need_rebuild = true;
} else { } else {
// check is log sync is_log_sync = palf_stat.is_in_sync_;
is_need_rebuild = palf_stat.is_need_rebuild_;
} }
SCN local_max_scn; if (REACH_TIME_INTERVAL(2 * 1000 * 1000)) {
SCN leader_max_scn; CLOG_LOG(INFO, "is_in_sync", K(ret), K_(id), K(is_log_sync), K(is_need_rebuild));
if (OB_SUCC(ret)) {
static const int64_t SYNC_DELAY_TIME_THRESHOLD_US = 3 * 1000 * 1000L;
const int64_t SYNC_GET_LEADER_INFO_INTERVAL_US = SYNC_DELAY_TIME_THRESHOLD_US / 2;
bool unused_state = false;
int64_t unused_id;
common::ObRole role;
if (OB_FAIL(palf_handle_.get_role(role, unused_id, unused_state))) {
CLOG_LOG(WARN, "get_role failed", K(ret), K_(id));
} else if (LEADER == role) {
is_log_sync = true;
} else if (OB_FAIL(palf_handle_.get_max_scn(local_max_scn)) || !local_max_scn.is_valid()) {
CLOG_LOG(WARN, "get_max_scn failed", K(ret), K_(id), K(local_max_scn));
} else if (palf_reach_time_interval(SYNC_GET_LEADER_INFO_INTERVAL_US, last_check_sync_ts_)) {
// if reachs time interval, get max_scn of leader with sync RPC
if (OB_FAIL(get_leader_max_scn_(leader_max_scn))) {
CLOG_LOG(WARN, "get_palf_max_scn failed", K(ret), K_(id));
}
} else {
is_log_sync = cached_is_log_sync_;
}
if (OB_SUCC(ret) && leader_max_scn.is_valid()) {
is_log_sync = (leader_max_scn.convert_to_ts() - local_max_scn.convert_to_ts() <= SYNC_DELAY_TIME_THRESHOLD_US);
cached_is_log_sync_ = is_log_sync;
}
ret = OB_SUCCESS;
}
if (REACH_TIME_INTERVAL(500 * 1000)) {
CLOG_LOG(INFO, "is_in_sync", K(ret), K_(id), K(is_log_sync), K(leader_max_scn), K(local_max_scn),
K_(cached_is_log_sync), K(is_need_rebuild), K(end_lsn), K(last_rebuild_lsn));
}
return ret;
}
int ObLogHandler::get_leader_max_scn_(SCN &max_scn) const
{
int ret = OB_SUCCESS;
common::ObAddr leader;
max_scn.reset();
LogGetLeaderMaxScnReq req(self_, id_);
LogGetLeaderMaxScnResp resp;
bool need_renew_leader = false;
if (OB_FAIL(lc_cb_->nonblock_get_leader(id_, leader))) {
CLOG_LOG(WARN, "get_leader failed", K(ret), K_(id));
need_renew_leader = true;
} else if (OB_FAIL(rpc_proxy_->to(leader).timeout(500 * 1000).trace_time(true). \
by(MTL_ID()).get_leader_max_scn(req, resp))) {
CLOG_LOG(WARN, "get_palf_max_scn failed", K(ret), K_(id));
need_renew_leader = true;
} else {
max_scn = resp.max_scn_;
}
if (need_renew_leader && palf_reach_time_interval(500 * 1000, last_renew_loc_ts_)) {
(void) lc_cb_->nonblock_renew_leader(id_);
} }
return ret; return ret;
} }

View File

@ -590,7 +590,6 @@ private:
int submit_config_change_cmd_(const LogConfigChangeCmd &req); int submit_config_change_cmd_(const LogConfigChangeCmd &req);
int submit_config_change_cmd_(const LogConfigChangeCmd &req, int submit_config_change_cmd_(const LogConfigChangeCmd &req,
LogConfigChangeCmdResp &resp); LogConfigChangeCmdResp &resp);
int get_leader_max_scn_(share::SCN &max_scn) const;
DISALLOW_COPY_AND_ASSIGN(ObLogHandler); DISALLOW_COPY_AND_ASSIGN(ObLogHandler);
private: private:
common::ObAddr self_; common::ObAddr self_;
@ -604,9 +603,6 @@ private:
mutable obrpc::ObLogServiceRpcProxy *rpc_proxy_; mutable obrpc::ObLogServiceRpcProxy *rpc_proxy_;
common::ObQSync ls_qs_; common::ObQSync ls_qs_;
ObMiniStat::ObStatItem append_cost_stat_; ObMiniStat::ObStatItem append_cost_stat_;
mutable bool cached_is_log_sync_;
mutable int64_t last_check_sync_ts_;
mutable int64_t last_renew_loc_ts_;
bool is_offline_; bool is_offline_;
mutable int64_t get_max_decided_scn_debug_time_; mutable int64_t get_max_decided_scn_debug_time_;
}; };

View File

@ -4674,7 +4674,7 @@ int PalfHandleImpl::stat(PalfStat &palf_stat)
palf_stat.is_need_rebuild_ = (palf_stat.end_lsn_.is_valid() && palf_stat.is_need_rebuild_ = (palf_stat.end_lsn_.is_valid() &&
last_rebuild_lsn.is_valid() && last_rebuild_lsn.is_valid() &&
palf_stat.end_lsn_ < last_rebuild_lsn); palf_stat.end_lsn_ < last_rebuild_lsn);
palf_stat.is_in_sync_ = cached_is_in_sync_; palf_stat.is_in_sync_ = (LEADER == palf_stat.role_)? true: cached_is_in_sync_;
PALF_LOG(TRACE, "PalfHandleImpl stat", K(palf_stat)); PALF_LOG(TRACE, "PalfHandleImpl stat", K(palf_stat));
} }
return OB_SUCCESS; return OB_SUCCESS;

View File

@ -187,7 +187,6 @@ void oceanbase::observer::init_srv_xlator_for_logservice(ObSrvRpcXlator *xlator)
{ {
RPC_PROCESSOR(logservice::LogMembershipChangeP); RPC_PROCESSOR(logservice::LogMembershipChangeP);
RPC_PROCESSOR(logservice::LogGetPalfStatReqP); RPC_PROCESSOR(logservice::LogGetPalfStatReqP);
RPC_PROCESSOR(logservice::LogGetLeaderMaxScnP);
RPC_PROCESSOR(logservice::LogChangeAccessModeP); RPC_PROCESSOR(logservice::LogChangeAccessModeP);
RPC_PROCESSOR(logservice::LogFlashbackMsgP); RPC_PROCESSOR(logservice::LogFlashbackMsgP);
} }

View File

@ -43,7 +43,6 @@ int ObAllVirtualPalfStat::inner_get_next_row(common::ObNewRow *&row)
{ {
int ret = OB_SUCCESS; int ret = OB_SUCCESS;
if (false == start_to_read_) { if (false == start_to_read_) {
const bool is_cluster_already_4100 = GET_MIN_CLUSTER_VERSION() >= CLUSTER_VERSION_4_1_0_0;
auto func_iterate_palf = [&](const palf::PalfHandle &palf_handle) -> int { auto func_iterate_palf = [&](const palf::PalfHandle &palf_handle) -> int {
int ret = OB_SUCCESS; int ret = OB_SUCCESS;
logservice::ObLogStat log_stat; logservice::ObLogStat log_stat;
@ -51,9 +50,6 @@ int ObAllVirtualPalfStat::inner_get_next_row(common::ObNewRow *&row)
palf_handle.get_palf_id(palf_id); palf_handle.get_palf_id(palf_id);
if (OB_FAIL(palf_handle.stat(log_stat.palf_stat_))) { if (OB_FAIL(palf_handle.stat(log_stat.palf_stat_))) {
SERVER_LOG(WARN, "PalfHandle stat failed", K(ret), K(palf_id)); SERVER_LOG(WARN, "PalfHandle stat failed", K(ret), K(palf_id));
} else if (false == is_cluster_already_4100 &&
OB_FAIL(get_log_handler_stat_(log_stat.palf_stat_, log_stat))){
SERVER_LOG(WARN, "get_log_handler_stat_ failed", K(ret), K(palf_id), K(log_stat));
} else if (OB_FAIL(insert_log_stat_(log_stat, &cur_row_))){ } else if (OB_FAIL(insert_log_stat_(log_stat, &cur_row_))){
SERVER_LOG(WARN, "ObAllVirtualPalfStat insert_log_stat_ failed", K(ret), K(palf_id), K(log_stat)); SERVER_LOG(WARN, "ObAllVirtualPalfStat insert_log_stat_ failed", K(ret), K(palf_id), K(log_stat));
} else { } else {
@ -176,8 +172,7 @@ int ObAllVirtualPalfStat::insert_log_stat_(const logservice::ObLogStat &log_stat
break; break;
} }
case OB_APP_MIN_COLUMN_ID + 10: { case OB_APP_MIN_COLUMN_ID + 10: {
const bool is_cluster_already_4100 = GET_MIN_CLUSTER_VERSION() >= CLUSTER_VERSION_4_1_0_0; const bool is_in_sync = log_stat.palf_stat_.is_in_sync_;
const bool is_in_sync = (is_cluster_already_4100)? log_stat.palf_stat_.is_in_sync_: log_stat.in_sync_;
cur_row_.cells_[i].set_bool(is_in_sync); cur_row_.cells_[i].set_bool(is_in_sync);
break; break;
} }
@ -250,40 +245,6 @@ int ObAllVirtualPalfStat::insert_log_stat_(const logservice::ObLogStat &log_stat
return ret; return ret;
} }
int ObAllVirtualPalfStat::get_log_handler_stat_(const palf::PalfStat &palf_stat, logservice::ObLogStat &log_stat)
{
int ret = OB_SUCCESS;
const int64_t palf_id = palf_stat.palf_id_;
storage::ObLSHandle ls_handle;
storage::ObLS *ls = NULL;
storage::ObLSService *ls_service = MTL(storage::ObLSService*);
share::ObLSID ls_id(palf_id);
bool is_in_sync = false;
bool need_rebuild = false;
common::ObRole log_handler_role = INVALID_ROLE;
common::ObRole restore_handler_role = INVALID_ROLE;
common::ObRole unused_role = INVALID_ROLE;
int64_t unused_pid = -1, log_handler_pid = -1, restore_handler_pid = -1;
bool unused_bool = false;
if (false == ls_id.is_valid() || OB_ISNULL(ls_service)) {
ret = OB_INVALID_ARGUMENT;
SERVER_LOG(WARN, "invalid argument", KR(ret), K(ls_id), KP(ls_service));
} else if (OB_FAIL(ls_service->get_ls(ls_id, ls_handle, ObLSGetMod::LOG_MOD))
|| NULL == (ls = ls_handle.get_ls())) {
ret = OB_ENTRY_NOT_EXIST;
SERVER_LOG(WARN, "get log stream from ObLSService failed", K(ret), K(ls_id));
} else {
int tmp_ret = OB_SUCCESS;
if (OB_SUCCESS != (tmp_ret = ls->get_log_handler()->is_in_sync(is_in_sync, need_rebuild))) {
SERVER_LOG(WARN, "is_in_sync failed", K(tmp_ret), K(ls_id));
}
}
if (OB_SUCC(ret)) {
log_stat.in_sync_ = is_in_sync;
}
return ret;
}
int ObAllVirtualPalfStat::member_list_to_string_( int ObAllVirtualPalfStat::member_list_to_string_(
const common::ObMemberList &member_list) const common::ObMemberList &member_list)
{ {

View File

@ -37,7 +37,6 @@ public:
void destroy(); void destroy();
private: private:
int insert_log_stat_(const logservice::ObLogStat &log_stat, common::ObNewRow *row); int insert_log_stat_(const logservice::ObLogStat &log_stat, common::ObNewRow *row);
int get_log_handler_stat_(const palf::PalfStat &palf_stat, logservice::ObLogStat &log_stat);
int member_list_to_string_(const common::ObMemberList &member_list); int member_list_to_string_(const common::ObMemberList &member_list);
int learner_list_to_string_(const common::GlobalLearnerList &learner_list, char *output_buf); int learner_list_to_string_(const common::GlobalLearnerList &learner_list, char *output_buf);
private: private: