[PALF] fix wait_log_barrier condition

This commit is contained in:
BinChenn
2023-05-23 12:41:41 +00:00
committed by ob-robot
parent aa3c922117
commit 978d31c7e1
7 changed files with 51 additions and 16 deletions

View File

@ -236,6 +236,7 @@ TEST_F(TestObSimpleLogClusterArbService, test_2f1a_reconfirm_degrade_upgrade)
EXPECT_EQ(OB_SUCCESS, submit_log(leader, 100, id));
sleep(2);
palf_list[leader_idx]->palf_handle_impl_->set_location_cache_cb(&loc_cb);
palf_list[another_f_idx]->palf_handle_impl_->set_location_cache_cb(&loc_cb);
// block net of old leader, new leader will be elected
// and degrade in RECONFIRM state
block_net(leader_idx, another_f_idx);

View File

@ -43,6 +43,7 @@ LogConfigMgr::LogConfigMgr()
self_(),
prev_log_proposal_id_(INVALID_PROPOSAL_ID),
prev_lsn_(),
prev_end_lsn_(PALF_INITIAL_LSN_VAL),
prev_mode_pid_(INVALID_PROPOSAL_ID),
state_(INIT),
last_submit_config_log_time_us_(OB_INVALID_TIMESTAMP),
@ -743,7 +744,7 @@ int LogConfigMgr::change_config(const LogConfigChangeArgs &args,
} else {
ret = change_config_(args, proposal_id, election_epoch, config_version);
PALF_LOG(INFO, "config_change stat", K_(palf_id), K_(self), K(args), K(proposal_id),
K_(prev_log_proposal_id), K_(prev_lsn), K_(prev_mode_pid), K_(state), K(config_version),
K_(prev_log_proposal_id), K_(prev_lsn), K_(prev_end_lsn), K_(prev_mode_pid), K_(state), K(config_version),
K_(persistent_config_version), K_(ms_ack_list), K_(resend_config_version),
K_(resend_log_list), K_(log_ms_meta), K_(last_submit_config_log_time_us));
}
@ -921,23 +922,26 @@ int LogConfigMgr::renew_config_change_barrier_()
int ret = OB_SUCCESS;
int64_t prev_log_proposal_id = INVALID_PROPOSAL_ID;
int64_t prev_mode_pid = INVALID_PROPOSAL_ID;
LSN prev_log_lsn;
if (OB_FAIL(get_log_barrier_(prev_log_lsn, prev_log_proposal_id))) {
LSN prev_log_lsn, prev_log_end_lsn;
if (OB_FAIL(get_log_barrier_(prev_log_lsn, prev_log_end_lsn, prev_log_proposal_id))) {
PALF_LOG(WARN, "get_log_barrier_ failed", KR(ret), K_(palf_id), K_(self));
} else {
prev_mode_pid_ = mode_mgr_->get_last_submit_mode_meta().proposal_id_;
prev_log_proposal_id_ = prev_log_proposal_id;
prev_lsn_ = prev_log_lsn;
prev_end_lsn_ = prev_log_end_lsn;
PALF_LOG(INFO, "renew_config_change_barrier_ success", KR(ret), K_(palf_id), K_(self),
K_(prev_lsn), K_(prev_end_lsn), K_(prev_log_proposal_id), K_(prev_mode_pid));
}
return ret;
}
int LogConfigMgr::get_log_barrier_(LSN &prev_log_lsn, int64_t &prev_log_proposal_id) const
int LogConfigMgr::get_log_barrier_(LSN &prev_log_lsn, LSN &prev_log_end_lsn, int64_t &prev_log_proposal_id) const
{
int ret = OB_SUCCESS;
int64_t unused_log_id;
// log barrier must be last_submit_log_info
if (OB_FAIL(sw_->get_last_submit_log_info(prev_log_lsn, unused_log_id, prev_log_proposal_id))) {
if (OB_FAIL(sw_->get_last_submit_log_info(prev_log_lsn, prev_log_end_lsn, unused_log_id, prev_log_proposal_id))) {
PALF_LOG(WARN, "get_last_submit_log_info failed", KR(ret), K_(palf_id), K_(self));
}
return ret;
@ -1947,12 +1951,12 @@ int LogConfigMgr::wait_log_barrier_(const LogConfigChangeArgs &args,
LSN unused_lsn;
int64_t unused_id = INT64_MAX;
bool unused_bool = false;
LSN prev_log_lsn;
constexpr int64_t conn_timeout_us = 3 * 1000 * 1000L; // 3s
constexpr bool need_purge_throttling = true;
constexpr bool need_remote_check = false;
const bool need_skip_log_barrier = mode_mgr_->need_skip_log_barrier();
LSN prev_log_end_lsn = prev_end_lsn_;
if (new_member_list.get_member_number() == 0) {
ret = OB_INVALID_ARGUMENT;
} else if (OB_FAIL(sync_get_committed_end_lsn_(args, new_member_list, new_replica_num,
@ -1963,10 +1967,7 @@ int LogConfigMgr::wait_log_barrier_(const LogConfigChangeArgs &args,
ret = OB_SUCCESS;
PALF_LOG(INFO, "PALF is in FLASHBACK mode, skip log barrier", K(ret), K_(palf_id), K_(self), \
"accepted_mode_meta", mode_mgr_->get_accepted_mode_meta());
// wait log barrier during log commiting are stopped
} else if (OB_FAIL(get_log_barrier_(prev_log_lsn, unused_id))) {
PALF_LOG(WARN, "get_log_barrier_ failed", KR(ret), K_(palf_id), K_(self));
} else if (FALSE_IT(ret = (first_committed_end_lsn >= prev_log_lsn)? OB_SUCCESS: OB_EAGAIN)) {
} else if (FALSE_IT(ret = (first_committed_end_lsn >= prev_log_end_lsn)? OB_SUCCESS: OB_EAGAIN)) {
} else if (OB_EAGAIN == ret) {
// committed_end_lsn do not change during 2s, skip the reconfiguration
const int64_t curr_ts_us = common::ObTimeUtility::current_time();
@ -1991,12 +1992,12 @@ int LogConfigMgr::wait_log_barrier_(const LogConfigChangeArgs &args,
args.type_ != LogConfigChangeType::STARTWORKING) {
ret = OB_LOG_NOT_SYNC;
PALF_LOG(WARN, "waiting for log barrier timeout, skip", KR(ret), K_(palf_id), K_(self),
K_(start_wait_barrier_time_us), K(first_committed_end_lsn), K(prev_log_lsn));
K_(start_wait_barrier_time_us), K(first_committed_end_lsn), K(prev_log_end_lsn));
start_wait_barrier_time_us_ = curr_ts_us;
}
}
PALF_LOG(INFO, "waiting for log barrier", K(ret), K_(palf_id), K_(self), K(first_committed_end_lsn),
K(prev_log_lsn), K(new_member_list), K(new_replica_num));
K(prev_log_end_lsn), K(new_member_list), K(new_replica_num));
return ret;
}

View File

@ -359,9 +359,7 @@ public:
J_OBJ_START();
J_KV(K_(palf_id), K_(self), K_(alive_paxos_memberlist), K_(alive_paxos_replica_num), \
K_(log_ms_meta), K_(prev_log_proposal_id), \
K_(applied_alive_paxos_memberlist), K_(applied_alive_paxos_replica_num), \
K_(applied_all_learnerlist), \
K_(prev_lsn), K_(prev_mode_pid), K_(state), K_(persistent_config_version), \
K_(prev_lsn), K_(prev_end_lsn), K_(prev_mode_pid), K_(state), K_(persistent_config_version), \
K_(ms_ack_list), K_(resend_config_version), K_(resend_log_list), \
K_(last_submit_config_log_time_us), K_(region), K_(paxos_member_region_map), \
K_(register_time_us), K_(parent), K_(parent_keepalive_time_us), \
@ -428,7 +426,7 @@ private:
int try_resend_config_log_(const int64_t proposal_id);
// broadcast leader info to global learners, only called in leader active
int submit_broadcast_leader_info_(const int64_t proposal_id) const;
int get_log_barrier_(LSN &prev_log_lsn, int64_t &prev_log_proposal_id) const;
int get_log_barrier_(LSN &prev_log_lsn, LSN &prev_log_end_lsn, int64_t &prev_log_proposal_id) const;
int check_servers_lsn_and_version_(const common::ObAddr &server,
const LogConfigVersion &config_version,
const int64_t conn_timeout_us,
@ -511,6 +509,7 @@ private:
int64_t prev_log_proposal_id_;
// previous lsn for barrier
LSN prev_lsn_;
LSN prev_end_lsn_;
// previous mode proposal_id for barrier
int64_t prev_mode_pid_;
ConfigChangeState state_;

View File

@ -1446,6 +1446,18 @@ int LogSlidingWindow::get_last_submit_log_info(LSN &last_submit_lsn,
return ret;
}
int LogSlidingWindow::get_last_submit_log_info(LSN &last_submit_lsn,
LSN &last_submit_end_lsn, int64_t &log_id, int64_t &log_proposal_id) const
{
int ret = OB_SUCCESS;
if (IS_NOT_INIT) {
ret = OB_NOT_INIT;
} else {
get_last_submit_log_info_(last_submit_lsn, last_submit_end_lsn, log_id, log_proposal_id);
}
return ret;
}
int64_t LogSlidingWindow::get_last_submit_log_id_() const
{
ObSpinLockGuard guard(last_submit_info_lock_);

View File

@ -241,6 +241,8 @@ public:
virtual int64_t get_last_submit_log_id_() const;
virtual void get_last_submit_end_lsn_(LSN &end_lsn) const;
virtual int get_last_submit_log_info(LSN &last_submit_lsn, int64_t &log_id, int64_t &log_proposal_id) const;
virtual int get_last_submit_log_info(LSN &last_submit_lsn,
LSN &last_submit_end_lsn, int64_t &log_id, int64_t &log_proposal_id) const;
virtual int get_last_slide_end_lsn(LSN &out_end_lsn) const;
virtual const share::SCN get_last_slide_scn() const;
virtual int check_and_switch_freeze_mode();

View File

@ -295,6 +295,18 @@ public:
log_proposal_id = mock_last_submit_pid_;
return ret;
}
int get_last_submit_log_info(LSN &last_submit_lsn,
LSN &last_submit_end_lsn,
int64_t &log_id,
int64_t &log_proposal_id) const
{
int ret = OB_SUCCESS;
last_submit_lsn = mock_last_submit_lsn_;
last_submit_end_lsn = mock_last_submit_end_lsn_;
log_id = mock_last_submit_log_id_;
log_proposal_id = mock_last_submit_pid_;
return ret;
}
int get_last_slide_end_lsn(LSN &out_end_lsn) const
{
int ret = OB_SUCCESS;
@ -356,6 +368,7 @@ public:
int64_t mock_start_id_;
int64_t mock_last_submit_log_id_;
LSN mock_last_submit_lsn_;
LSN mock_last_submit_end_lsn_;
int64_t mock_last_submit_pid_;
LSN mock_max_flushed_lsn_;
LSN mock_max_flushed_end_lsn_;

View File

@ -81,6 +81,7 @@ public:
mock_election_->role_ = role;
mock_election_->leader_epoch_ = INIT_ELE_EPOCH;
mock_sw_->mock_last_submit_lsn_ = LSN(PALF_INITIAL_LSN_VAL);
mock_sw_->mock_last_submit_end_lsn_ = LSN(PALF_INITIAL_LSN_VAL);
mock_sw_->mock_last_submit_pid_ = INIT_PROPOSAL_ID;
mock_sw_->state_mgr_ = mock_state_mgr_;
EXPECT_TRUE(config_info.is_valid());
@ -755,6 +756,7 @@ TEST_F(TestLogConfigMgr, test_submit_start_working_log)
int64_t prev_log_proposal_id = INVALID_PROPOSAL_ID;
int64_t prev_mode_pid = 1;
mock_sw_->mock_last_submit_lsn_ = prev_lsn;
mock_sw_->mock_last_submit_end_lsn_ = prev_lsn;
mock_sw_->mock_last_submit_pid_ = prev_log_proposal_id;
mock_mode_mgr_->mock_last_submit_mode_meta_.proposal_id_ = prev_mode_pid;
mock_sw_->mock_max_flushed_lsn_ = prev_lsn;
@ -842,6 +844,7 @@ TEST_F(TestLogConfigMgr, test_submit_config_log)
args.server_ = common::ObMember(addr4, -1);
bool is_already_finished = false;
mock_sw_->mock_last_submit_lsn_ = prev_lsn;
mock_sw_->mock_last_submit_end_lsn_ = prev_lsn;
mock_sw_->mock_last_submit_pid_ = INVALID_PROPOSAL_ID;
mock_mode_mgr_->mock_last_submit_mode_meta_.proposal_id_ = prev_pid;
mock_sw_->mock_max_flushed_lsn_.val_ = PALF_INITIAL_LSN_VAL;
@ -871,6 +874,7 @@ TEST_F(TestLogConfigMgr, test_submit_config_log)
args.server_ = common::ObMember(addr4, -1);
bool is_already_finished = false;
mock_sw_->mock_last_submit_lsn_ = prev_lsn;
mock_sw_->mock_last_submit_end_lsn_ = prev_lsn;
mock_sw_->mock_last_submit_pid_ = INVALID_PROPOSAL_ID;
mock_mode_mgr_->mock_last_submit_mode_meta_.proposal_id_ = prev_pid;
mock_sw_->mock_max_flushed_lsn_.val_ = PALF_INITIAL_LSN_VAL;
@ -896,6 +900,7 @@ TEST_F(TestLogConfigMgr, test_submit_config_log)
args.server_ = common::ObMember(addr4, -1);
bool is_already_finished = false;
mock_sw_->mock_last_submit_lsn_ = prev_lsn;
mock_sw_->mock_last_submit_end_lsn_ = prev_lsn;
mock_sw_->mock_last_submit_pid_ = prev_pid;
mock_mode_mgr_->mock_last_submit_mode_meta_.proposal_id_ = prev_pid;
mock_sw_->mock_max_flushed_lsn_.val_ = 1000;
@ -926,6 +931,7 @@ TEST_F(TestLogConfigMgr, test_submit_config_log)
args.server_ = common::ObMember(addr4, -1);
bool is_already_finished = false;
mock_sw_->mock_last_submit_lsn_ = prev_lsn;
mock_sw_->mock_last_submit_end_lsn_ = prev_lsn;
mock_sw_->mock_last_submit_pid_ = prev_pid;
mock_mode_mgr_->mock_last_submit_mode_meta_.proposal_id_ = prev_pid;
mock_sw_->mock_max_flushed_lsn_.val_ = 11000;
@ -987,6 +993,7 @@ TEST_F(TestLogConfigMgr, test_degrade__upgrade_scenario)
int64_t prev_log_proposal_id = INVALID_PROPOSAL_ID;
int64_t prev_mode_pid = 1;
mock_sw_->mock_last_submit_lsn_ = prev_lsn;
mock_sw_->mock_last_submit_end_lsn_ = prev_lsn;
mock_sw_->mock_last_submit_pid_ = prev_log_proposal_id;
mock_mode_mgr_->mock_last_submit_mode_meta_.proposal_id_ = prev_mode_pid;
mock_sw_->mock_max_flushed_lsn_ = prev_lsn;