[PALF] supprts stricter condition for reconfiguration

This commit is contained in:
BinChenn
2023-05-09 19:10:00 +00:00
committed by ob-robot
parent 5f2370f1c7
commit e08d286866
6 changed files with 116 additions and 72 deletions

View File

@ -1354,12 +1354,12 @@ int LogConfigMgr::append_config_meta_(const int64_t curr_proposal_id,
} else if (has_arb_member && OB_FAIL(state_mgr_->set_changing_config_with_arb())) {
PALF_LOG(ERROR, "set_changing_config_with_arb failed", KR(ret), K_(palf_id), K_(self), K(args));
} else if (has_arb_member &&
OB_FAIL(check_follower_sync_status_(args, new_config_info.log_sync_memberlist_,
new_config_info.log_sync_replica_num_, true, unused_bool))) {
PALF_LOG(WARN, "check_follower_sync_status_ eagain", KR(ret), K_(palf_id), K_(self), K(args), K(new_config_info));
if (OB_LOG_NOT_SYNC == ret) {
state_mgr_->reset_changing_config_with_arb();
}
OB_FAIL(wait_log_barrier_(args, new_config_info.log_sync_memberlist_,
new_config_info.log_sync_replica_num_))) {
PALF_LOG(WARN, "check_follower_sync_status_ eagain", KR(ret), K_(palf_id), K_(self), K(args), K(new_config_info));
if (OB_LOG_NOT_SYNC == ret) {
state_mgr_->reset_changing_config_with_arb();
}
} else if (OB_FAIL(update_election_meta_(new_config_info))) {
if (OB_OP_NOT_ALLOW == ret) {
ret = OB_EAGAIN;
@ -1919,13 +1919,74 @@ int LogConfigMgr::check_follower_sync_status(const LogConfigChangeArgs &args,
{
int ret = OB_SUCCESS;
SpinLockGuard guard(lock_);
return check_follower_sync_status_(args, new_member_list, new_replica_num, false, added_member_has_new_version);
return check_follower_sync_status_(args, new_member_list, new_replica_num, added_member_has_new_version);
}
int LogConfigMgr::wait_log_barrier_(const LogConfigChangeArgs &args,
const ObMemberList &new_member_list,
const int64_t new_replica_num) const
{
int ret = OB_SUCCESS;
LSN first_committed_end_lsn;
LSN unused_lsn;
int64_t unused_id = INT64_MAX;
bool unused_bool = false;
LSN prev_log_lsn;
constexpr int64_t conn_timeout_us = 3 * 1000 * 1000L; // 3s
constexpr bool need_purge_throttling = true;
constexpr bool need_remote_check = false;
const bool need_skip_log_barrier = mode_mgr_->need_skip_log_barrier();
if (new_member_list.get_member_number() == 0) {
ret = OB_INVALID_ARGUMENT;
} else if (OB_FAIL(sync_get_committed_end_lsn_(args, new_member_list, new_replica_num,
need_purge_throttling, need_remote_check, conn_timeout_us, first_committed_end_lsn, unused_bool, unused_lsn, unused_id))) {
PALF_LOG(WARN, "sync_get_committed_end_lsn failed", K(ret), K_(palf_id), K_(self),
K(new_member_list), K(new_replica_num));
} else if (need_skip_log_barrier) {
ret = OB_SUCCESS;
PALF_LOG(INFO, "PALF is in FLASHBACK mode, skip log barrier", K(ret), K_(palf_id), K_(self), \
"accepted_mode_meta", mode_mgr_->get_accepted_mode_meta());
// wait log barrier during log commiting are stopped
} else if (OB_FAIL(get_log_barrier_(prev_log_lsn, unused_id))) {
PALF_LOG(WARN, "get_log_barrier_ failed", KR(ret), K_(palf_id), K_(self));
} else if (FALSE_IT(ret = (first_committed_end_lsn >= prev_log_lsn)? OB_SUCCESS: OB_EAGAIN)) {
} else if (OB_EAGAIN == ret) {
// committed_end_lsn do not change during 2s, skip the reconfiguration
const int64_t curr_ts_us = common::ObTimeUtility::current_time();
if (OB_INVALID_TIMESTAMP == last_wait_barrier_time_us_) {
last_wait_committed_end_lsn_ = first_committed_end_lsn;
last_wait_barrier_time_us_ = curr_ts_us;
} else if (curr_ts_us - last_wait_barrier_time_us_ > MAX_WAIT_BARRIER_TIME_US_FOR_STABLE_LOG) {
if (last_wait_committed_end_lsn_ == first_committed_end_lsn) {
ret = OB_LOG_NOT_SYNC;
PALF_LOG(WARN, "waiting for log barrier failed, committed_end_lsn havn't been advanced", KR(ret),
K_(palf_id), K_(self), K_(last_wait_barrier_time_us), K_(last_wait_committed_end_lsn));
last_wait_barrier_time_us_ = OB_INVALID_TIMESTAMP;
last_wait_committed_end_lsn_.reset();
} else {
last_wait_committed_end_lsn_ = first_committed_end_lsn;
last_wait_barrier_time_us_ = curr_ts_us;
}
}
if (OB_INVALID_TIMESTAMP == start_wait_barrier_time_us_) {
start_wait_barrier_time_us_ = curr_ts_us;
} else if (curr_ts_us - start_wait_barrier_time_us_ > MAX_WAIT_BARRIER_TIME_US_FOR_RECONFIGURATION &&
args.type_ != LogConfigChangeType::STARTWORKING) {
ret = OB_LOG_NOT_SYNC;
PALF_LOG(WARN, "waiting for log barrier timeout, skip", KR(ret), K_(palf_id), K_(self),
K_(start_wait_barrier_time_us), K(first_committed_end_lsn), K(prev_log_lsn));
start_wait_barrier_time_us_ = curr_ts_us;
}
}
PALF_LOG(INFO, "waiting for log barrier", K(ret), K_(palf_id), K_(self), K(first_committed_end_lsn),
K(prev_log_lsn), K(new_member_list), K(new_replica_num));
return ret;
}
int LogConfigMgr::check_follower_sync_status_(const LogConfigChangeArgs &args,
const ObMemberList &new_member_list,
const int64_t new_replica_num,
const bool is_check_log_barrier,
bool &added_member_has_new_version) const
{
int ret = OB_SUCCESS;
@ -1937,7 +1998,8 @@ int LogConfigMgr::check_follower_sync_status_(const LogConfigChangeArgs &args,
LSN added_member_flushed_end_lsn;
int64_t added_member_last_slide_log_id = INT64_MAX;
int64_t leader_last_slide_log_id = sw_->get_last_slide_log_id();
const bool need_purge_throttling = (is_check_log_barrier || DEGRADE_ACCEPTOR_TO_LEARNER == args.type_);
const bool need_purge_throttling = (DEGRADE_ACCEPTOR_TO_LEARNER == args.type_);
const bool need_remote_check = true;
(void) sw_->get_committed_end_lsn(first_leader_committed_end_lsn);
const bool need_skip_log_barrier = mode_mgr_->need_skip_log_barrier();
@ -1946,7 +2008,7 @@ int LogConfigMgr::check_follower_sync_status_(const LogConfigChangeArgs &args,
} else if (new_member_list.get_member_number() == 1) {
ret = OB_SUCCESS;
} else if (OB_FAIL(sync_get_committed_end_lsn_(args, new_member_list, new_replica_num, need_purge_throttling,
conn_timeout_us, first_committed_end_lsn, added_member_has_new_version, added_member_flushed_end_lsn,
need_remote_check, conn_timeout_us, first_committed_end_lsn, added_member_has_new_version, added_member_flushed_end_lsn,
added_member_last_slide_log_id))) {
PALF_LOG(WARN, "sync_get_committed_end_lsn failed", K(ret), K_(palf_id), K_(self), K(new_member_list),
K(new_replica_num), K(added_member_has_new_version));
@ -1954,44 +2016,6 @@ int LogConfigMgr::check_follower_sync_status_(const LogConfigChangeArgs &args,
ret = OB_SUCCESS;
PALF_LOG(INFO, "PALF is in FLASHBACK mode, skip log barrier", K(ret), K_(palf_id), K_(self), \
"accepted_mode_meta", mode_mgr_->get_accepted_mode_meta());
} else if (is_check_log_barrier) {
// check log barrier during log appending are stopped
LSN prev_log_lsn;
int64_t unused_id;
if (OB_FAIL(get_log_barrier_(prev_log_lsn, unused_id))) {
PALF_LOG(WARN, "get_log_barrier_ failed", KR(ret), K_(palf_id), K_(self));
} else {
ret = (first_committed_end_lsn >= prev_log_lsn)? OB_SUCCESS: OB_EAGAIN;
// committed_end_lsn do not change during 2s, skip the reconfiguration
if (OB_EAGAIN == ret) {
const int64_t curr_ts_us = common::ObTimeUtility::current_time();
if (OB_INVALID_TIMESTAMP == last_wait_barrier_time_us_) {
last_wait_committed_end_lsn_ = first_committed_end_lsn;
last_wait_barrier_time_us_ = curr_ts_us;
} else if (curr_ts_us - last_wait_barrier_time_us_ > MAX_WAIT_BARRIER_TIME_US_FOR_STABLE_LOG) {
if (last_wait_committed_end_lsn_ == first_committed_end_lsn) {
ret = OB_LOG_NOT_SYNC;
PALF_LOG(WARN, "committed_end_lsn havn't been advanced for long time, exit", KR(ret), K_(palf_id), K_(self),
K_(last_wait_barrier_time_us), K_(last_wait_committed_end_lsn));
last_wait_barrier_time_us_ = OB_INVALID_TIMESTAMP;
last_wait_committed_end_lsn_.reset();
} else {
last_wait_committed_end_lsn_ = first_committed_end_lsn;
last_wait_barrier_time_us_ = curr_ts_us;
}
}
if (OB_INVALID_TIMESTAMP == start_wait_barrier_time_us_) {
start_wait_barrier_time_us_ = curr_ts_us;
} else if (curr_ts_us - start_wait_barrier_time_us_ > MAX_WAIT_BARRIER_TIME_US_FOR_RECONFIGURATION &&
args.type_ != LogConfigChangeType::STARTWORKING) {
ret = OB_LOG_NOT_SYNC;
PALF_LOG(WARN, "wait barrier timeout, skip", KR(ret), K_(palf_id), K_(self),
K_(start_wait_barrier_time_us), K(first_committed_end_lsn), K(prev_log_lsn));
start_wait_barrier_time_us_ = curr_ts_us;
}
}
PALF_LOG(INFO, "waiting for log barrier", K(ret), K_(palf_id), K_(self), K(first_committed_end_lsn), K(prev_log_lsn));
}
} else if (first_committed_end_lsn >= first_leader_committed_end_lsn) {
// if committed lsn of new majority do not retreat, then start config change
PALF_LOG(INFO, "majority of new_member_list are sync with leader, start config change", K(ret), K_(palf_id), K_(self),
@ -2025,8 +2049,9 @@ int LogConfigMgr::check_follower_sync_status_(const LogConfigChangeArgs &args,
int64_t expected_sync_time_s;
int64_t sync_speed_gap;
added_member_has_new_version = is_add_member_list(args.type_)? false: true;
if (OB_FAIL(sync_get_committed_end_lsn_(args, new_member_list, new_replica_num, false/*no need purge throttling*/, conn_timeout_us,
second_committed_end_lsn, added_member_has_new_version, added_member_flushed_end_lsn, added_member_last_slide_log_id))) {
if (OB_FAIL(sync_get_committed_end_lsn_(args, new_member_list, new_replica_num, false/*no need purge throttling*/,
need_remote_check, conn_timeout_us, second_committed_end_lsn, added_member_has_new_version,
added_member_flushed_end_lsn, added_member_last_slide_log_id))) {
PALF_LOG(WARN, "sync_get_committed_end_lsn failed", K(ret), K_(palf_id), K_(self), K(new_member_list),
K(new_replica_num), K(added_member_has_new_version));
} else if (second_committed_end_lsn >= second_leader_committed_end_lsn) {
@ -2076,7 +2101,9 @@ int LogConfigMgr::check_servers_lsn_and_version_(const common::ObAddr &server,
LOG_WARN_RET(tmp_ret, "submit_purge_throttling_task", K_(palf_id), K_(self));
}
}
} else if (false == force_remote_check && (OB_FAIL(sw_->get_server_ack_info(server, ack_info)) && OB_ENTRY_NOT_EXIST != ret)) {
} else if (false == force_remote_check &&
(OB_FAIL(sw_->get_server_ack_info(server, ack_info)) &&
OB_ENTRY_NOT_EXIST != ret)) {
PALF_LOG(WARN, "get_server_ack_info failed", KR(ret), K_(palf_id), K_(self), K(server));
} else if (false == force_remote_check &&
OB_SUCC(ret) &&
@ -2087,8 +2114,8 @@ int LogConfigMgr::check_servers_lsn_and_version_(const common::ObAddr &server,
get_from_local = true;
} else if (OB_FAIL(log_engine_->submit_config_change_pre_check_req(server, config_version, need_purge_throttling,
conn_timeout_us, resp))) {
PALF_LOG(WARN, "submit_config_change_pre_check_req failed", KR(ret), K_(palf_id), K_(self), K(server), K(need_purge_throttling),
K(config_version), K(conn_timeout_us), K(resp));
PALF_LOG(WARN, "submit_config_change_pre_check_req failed", KR(ret), K_(palf_id), K_(self),
K(server), K(need_purge_throttling), K(config_version), K(conn_timeout_us), K(resp));
has_same_version = false;
} else if (false == resp.is_normal_replica_) {
has_same_version = false;
@ -2112,6 +2139,7 @@ int LogConfigMgr::sync_get_committed_end_lsn_(const LogConfigChangeArgs &args,
const ObMemberList &new_member_list,
const int64_t new_replica_num,
const bool need_purge_throttling,
const bool need_remote_check,
const int64_t conn_timeout_us,
LSN &committed_end_lsn,
bool &added_member_has_new_version,
@ -2130,19 +2158,19 @@ int LogConfigMgr::sync_get_committed_end_lsn_(const LogConfigChangeArgs &args,
for (int64_t i = 0; i < new_member_list.get_member_number(); ++i) {
common::ObAddr server;
bool is_added_member = false;
bool force_remote_check = is_added_member;
bool force_remote_check = false;
LSN max_flushed_end_lsn;
bool has_same_version = false;
if (OB_SUCCESS != (tmp_ret = new_member_list.get_server_by_index(i, server))) {
PALF_LOG(ERROR, "get_server_by_index failed", KR(ret), K_(palf_id), K_(self), K(i),
"new_member_list size:", new_member_list.get_member_number());
} else if (FALSE_IT(is_added_member = is_add_member_list(args.type_) && (args.server_.get_server() == server))) {
} else if (FALSE_IT(force_remote_check = is_added_member || need_purge_throttling)) {
} else if (FALSE_IT(force_remote_check = is_added_member || need_purge_throttling || need_remote_check)) {
} else if (OB_SUCCESS != (tmp_ret = check_servers_lsn_and_version_(server, config_version,
conn_timeout_us, force_remote_check, need_purge_throttling, max_flushed_end_lsn, has_same_version,
added_member_last_slide_log_id))) {
PALF_LOG(WARN, "check_servers_lsn_and_version_ failed", K(ret), K(tmp_ret), K_(palf_id), K_(self), K(server),
K(config_version), K(conn_timeout_us), K(force_remote_check), K(max_flushed_end_lsn), K(has_same_version));
// PALF_LOG(WARN, "check_servers_lsn_and_version_ failed", K(ret), K(tmp_ret), K_(palf_id), K_(self), K(server),
// K(config_version), K(conn_timeout_us), K(force_remote_check), K(max_flushed_end_lsn), K(has_same_version));
} else {
lsn_array[resp_cnt++] = max_flushed_end_lsn;
}
@ -2177,8 +2205,9 @@ int LogConfigMgr::sync_get_committed_end_lsn_(const LogConfigChangeArgs &args,
committed_end_lsn = lsn_array[new_replica_num / 2];
}
PALF_LOG(INFO, "sync_get_committed_end_lsn_ finish", K(ret), K_(palf_id), K_(self), K(args),
K(new_member_list), K(new_replica_num), K(conn_timeout_us), K(committed_end_lsn),
K(added_member_has_new_version), K(added_member_flushed_end_lsn), K(added_member_last_slide_log_id),
K(new_member_list), K(new_replica_num), K(need_purge_throttling), K(need_remote_check),
K(conn_timeout_us), K(committed_end_lsn), K(added_member_has_new_version),
K(added_member_flushed_end_lsn), K(added_member_last_slide_log_id),
"lsn_array:", common::ObArrayWrap<LSN>(lsn_array, resp_cnt));
return ret;
}

View File

@ -325,6 +325,9 @@ public:
const ObMemberList &new_member_list,
const int64_t new_replica_num,
bool &added_member_has_new_version) const;
int wait_log_barrier_(const LogConfigChangeArgs &args,
const ObMemberList &new_member_list,
const int64_t new_replica_num) const;
int sync_meta_for_arb_election_leader();
bool need_sync_to_degraded_learners() const;
// ================ Config Change ==================
@ -434,6 +437,7 @@ private:
const ObMemberList &new_member_list,
const int64_t new_replica_num,
const bool need_purge_throttling,
const bool need_remote_check,
const int64_t conn_timeout_us,
LSN &committed_end_lsn,
bool &added_member_has_new_version,
@ -442,7 +446,6 @@ private:
int check_follower_sync_status_(const LogConfigChangeArgs &args,
const ObMemberList &new_member_list,
const int64_t new_replica_num,
const bool is_check_log_barrier,
bool &added_member_has_new_version) const;
int pre_sync_config_log_and_mode_meta_(const common::ObMember &server,
const int64_t proposal_id,

View File

@ -453,7 +453,7 @@ int LogRequestHandler::handle_sync_request<LogGetMCStReq, LogGetMCStResp>(
PALF_LOG(WARN, "check_disk_space_enough returns false", K(req), K(resp));
} else if (OB_FAIL(palf_env_impl_->get_palf_handle_impl(palf_id, guard))) {
PALF_LOG(WARN, "PalfEnvImpl get_palf_handle_impl failed", K(ret), K(palf_id));
} else if (OB_FAIL(guard.get_palf_handle_impl()->config_change_pre_check(server, req, resp))) {
} else if (OB_FAIL(guard.get_palf_handle_impl()->handle_config_change_pre_check(server, req, resp))) {
PALF_LOG(WARN, "PalfHandleImpl config_change_pre_check failed", K(ret), K(palf_id), K(server), K(req), KPC(palf_env_impl_));
} else {
PALF_LOG(INFO, "PalfHandleImpl config_change_pre_check success", K(ret), K(palf_id), K(server), K(req), K(resp), KPC(palf_env_impl_));

View File

@ -519,13 +519,15 @@ int PalfHandleImpl::get_election_leader(ObAddr &addr) const
return ret;
}
int PalfHandleImpl::config_change_pre_check(const ObAddr &server,
const LogGetMCStReq &req,
LogGetMCStResp &resp)
int PalfHandleImpl::handle_config_change_pre_check(const ObAddr &server,
const LogGetMCStReq &req,
LogGetMCStResp &resp)
{
int ret = OB_SUCCESS;
uint64_t tenant_data_version = 0;
int tmp_ret = common::OB_SUCCESS;
const bool is_vote_enabled = state_mgr_.is_allow_vote();
const bool is_sync_enabled = state_mgr_.is_sync_enabled();
if (IS_NOT_INIT) {
ret = OB_NOT_INIT;
PALF_LOG(ERROR, "PalfHandleImpl has not inited", K(ret), K_(palf_id));
@ -534,13 +536,17 @@ int PalfHandleImpl::config_change_pre_check(const ObAddr &server,
// to be added in the Paxos group. Check PalfHandleImpl only
resp.is_normal_replica_ = false;
PALF_LOG(WARN, "get tenant data version failed", K(tmp_ret), K(req), K(resp));
} else if (false == is_vote_enabled || false == is_sync_enabled) {
resp.is_normal_replica_ = false;
PALF_LOG(WARN, "replica has been disabled vote/sync", K(ret), K(req), K(resp),
K(is_vote_enabled), K(is_sync_enabled));
} else {
RLockGuard guard(lock_);
if (req.need_purge_throttling_) {
int tmp_ret = OB_SUCCESS;
const PurgeThrottlingType purge_type = PurgeThrottlingType::PURGE_BY_GET_MC_REQ;
if (OB_SUCCESS != (tmp_ret = log_engine_.submit_purge_throttling_task(purge_type))) {
PALF_LOG_RET(WARN, tmp_ret, "failed to submit_purge_throttling_task with config_change_pre_check", K_(palf_id));
PALF_LOG_RET(WARN, tmp_ret, "failed to submit_purge_throttling_task with handle_config_change_pre_check", K_(palf_id));
}
}
int64_t curr_proposal_id = state_mgr_.get_proposal_id();
@ -570,7 +576,7 @@ int PalfHandleImpl::config_change_pre_check(const ObAddr &server,
} else {
PALF_LOG(INFO, "try_fetch_log with ADD_MEMBER_PRE_CHECK success", KR(tmp_ret), KPC(this));
}
PALF_LOG(INFO, "config_change_pre_check success", K(ret), KPC(this), K(server),
PALF_LOG(INFO, "handle_config_change_pre_check success", K(ret), KPC(this), K(server),
K(req), K(resp), K(curr_config_version));
}
return ret;

View File

@ -585,9 +585,9 @@ public:
virtual int handle_notify_rebuild_req(const common::ObAddr &server,
const LSN &base_lsn,
const LogInfo &base_prev_log_info) = 0;
virtual int config_change_pre_check(const ObAddr &server,
const LogGetMCStReq &req,
LogGetMCStResp &resp) = 0;
virtual int handle_config_change_pre_check(const ObAddr &server,
const LogGetMCStReq &req,
LogGetMCStResp &resp) = 0;
virtual int handle_register_parent_req(const LogLearner &child,
const bool is_to_leader) = 0;
virtual int handle_register_parent_resp(const LogLearner &server,
@ -928,9 +928,9 @@ public:
const int64_t prev_log_id,
const int64_t &prev_log_proposal_id,
const LSN &committed_end_lsn) override final;
int config_change_pre_check(const ObAddr &server,
const LogGetMCStReq &req,
LogGetMCStResp &resp) override final;
int handle_config_change_pre_check(const ObAddr &server,
const LogGetMCStReq &req,
LogGetMCStResp &resp) override final;
int revoke_leader(const int64_t proposal_id) override final;
int stat(PalfStat &palf_stat) override final;
int handle_register_parent_req(const LogLearner &child,