optimize implement of get_max_applied_scn()

This commit is contained in:
yyy-hust 2024-02-04 13:12:03 +00:00 committed by ob-robot
parent a180396d9b
commit 999e6b5da9
4 changed files with 245 additions and 10 deletions

View File

@ -1292,6 +1292,22 @@ int ObSimpleLogClusterTestEnv::wait_until_has_committed(PalfHandleImplGuard &lea
return ret; return ret;
} }
int ObSimpleLogClusterTestEnv::wait_lsn_until_slide(const LSN &lsn, PalfHandleImplGuard &guard)
{
int ret = OB_SUCCESS;
int64_t print_log_time = OB_INVALID_TIMESTAMP;
LSN last_slide_end_lsn = guard.palf_handle_impl_->sw_.last_slide_end_lsn_;
while (lsn > last_slide_end_lsn) {
usleep(1*1000);
if (palf_reach_time_interval(1*1000*1000, print_log_time)) {
PALF_LOG(WARN, "wait_lsn_until_slide", K(last_slide_end_lsn), K(lsn));
}
last_slide_end_lsn = guard.palf_handle_impl_->sw_.last_slide_end_lsn_;
}
return ret;
}
int ObSimpleLogClusterTestEnv::wait_lsn_until_flushed(const LSN &lsn, PalfHandleImplGuard &guard) int ObSimpleLogClusterTestEnv::wait_lsn_until_flushed(const LSN &lsn, PalfHandleImplGuard &guard)
{ {
int ret = OB_SUCCESS; int ret = OB_SUCCESS;

View File

@ -246,6 +246,7 @@ public:
virtual int advance_base_info(const int64_t id, const PalfBaseInfo &base_info); virtual int advance_base_info(const int64_t id, const PalfBaseInfo &base_info);
virtual int get_palf_env(const int64_t server_idx, PalfEnv *&palf_env); virtual int get_palf_env(const int64_t server_idx, PalfEnv *&palf_env);
virtual int wait_until_has_committed(PalfHandleImplGuard &leader, const LSN &lsn); virtual int wait_until_has_committed(PalfHandleImplGuard &leader, const LSN &lsn);
virtual int wait_lsn_until_slide(const LSN &lsn, PalfHandleImplGuard &guard);
virtual int wait_lsn_until_flushed(const LSN &lsn, PalfHandleImplGuard &guard); virtual int wait_lsn_until_flushed(const LSN &lsn, PalfHandleImplGuard &guard);
//wait until all log task pushed into queue of LogIOWorker //wait until all log task pushed into queue of LogIOWorker
virtual int wait_lsn_until_submitted(const LSN &lsn, PalfHandleImplGuard &guard); virtual int wait_lsn_until_submitted(const LSN &lsn, PalfHandleImplGuard &guard);

View File

@ -249,6 +249,217 @@ TEST_F(TestObSimpleLogApplyFunc, apply)
ap_sv.destroy(); ap_sv.destroy();
CLOG_LOG(INFO, "test apply finish", K(id)); CLOG_LOG(INFO, "test apply finish", K(id));
} }
TEST_F(TestObSimpleLogApplyFunc, get_max_decided_scn)
{
const int64_t task_count = 10;
const int64_t id = ATOMIC_AAF(&palf_id_, 1);
ObLSID ls_id(id);
int64_t leader_idx = 0;
LSN basic_lsn(0);
PalfHandleImplGuard leader;
MockAppendCb *cb_array[task_count];
CLOG_LOG(INFO, "test get_max_decided_scn begin", K(id));
EXPECT_EQ(OB_SUCCESS, create_paxos_group(id, leader_idx, leader));
MockLSAdapter ls_adapter;
ls_adapter.init((ObLSService *)(0x1));
ObLogApplyService ap_sv;
//submit log first
for (int i = 0; i < task_count; i++)
{
cb_array[i] = new MockAppendCb();
cb_array[i]->init(i + 1, &ls_adapter);
}
PalfEnv *palf_env;
ObTenantEnv::set_tenant(get_cluster()[leader_idx]->get_tenant_base());
EXPECT_EQ(OB_SUCCESS, get_palf_env(leader_idx, palf_env));
EXPECT_EQ(OB_SUCCESS, ap_sv.init(palf_env, &ls_adapter));
EXPECT_EQ(OB_SUCCESS, ap_sv.start());
EXPECT_EQ(OB_SUCCESS, ap_sv.add_ls(ls_id));
EXPECT_EQ(OB_SUCCESS, ap_sv.switch_to_leader(ls_id, 1));
const int64_t idx_1 = (leader_idx + 1) % get_node_cnt();
const int64_t idx_2 = (leader_idx + 2) % get_node_cnt();
share::SCN ref_scn;
int i = 0;
LSN lsn;
share::SCN scn;
do {
ObApplyStatus *apply_status = NULL;
ObApplyStatusGuard guard;
ap_sv.get_apply_status(ls_id, guard);
apply_status = guard.get_apply_status();
ASSERT_TRUE(NULL != apply_status);
int ret = OB_SUCCESS;
for (i = 0; i < task_count; i++)
{
{
ls_adapter.critical_guard();
CLOG_LOG(INFO, "submit log start", K(i));
ref_scn.convert_for_logservice(i);
ret = submit_log(leader, ref_scn, lsn, scn);
if (OB_SUCC(ret)) {
EXPECT_EQ(true, lsn.is_valid());
cb_array[i]->__set_lsn(lsn);
cb_array[i]->__set_scn(scn);
EXPECT_EQ(OB_SUCCESS, apply_status->push_append_cb(cb_array[i]));
} else {
break;
}
}
CLOG_LOG(INFO, "submit log finish", K(i), K(lsn), K(scn));
}
} while (0);
LSN max_lsn = leader.palf_handle_impl_->sw_.get_max_lsn();
SCN max_scn = leader.palf_handle_impl_->sw_.get_max_scn();
//wait apply done
wait_lsn_until_slide(max_lsn, leader);
EXPECT_EQ(OB_SUCCESS, ap_sv.switch_to_follower(ls_id));
LSN apply_end_lsn;
bool is_apply_done = false;
while (!is_apply_done)
{
ap_sv.is_apply_done(ls_id, is_apply_done, apply_end_lsn);
usleep(100);
}
EXPECT_EQ(max_lsn, apply_end_lsn);
//switch leader and submit log
PalfHandleImplGuard new_leader;
EXPECT_EQ(OB_SUCCESS, switch_leader(id, idx_1, new_leader));
PalfHandleImplGuard new_get_leader;
int64_t new_leader_idx = 0;
EXPECT_EQ(OB_SUCCESS, get_leader(id, new_get_leader, new_leader_idx));
EXPECT_NE(new_leader_idx, leader_idx);
int ret = OB_SUCCESS;
++i;
ref_scn.convert_for_logservice(i);
ret = submit_log(new_leader, ref_scn, lsn, scn);
EXPECT_EQ(OB_SUCCESS, ret);
++i;
ref_scn.convert_for_logservice(i);
ret = submit_log(new_leader, ref_scn, lsn, scn);
EXPECT_EQ(OB_SUCCESS, ret);
LSN end_lsn;
do {
end_lsn = leader.palf_handle_impl_->get_end_lsn();
if (end_lsn <= max_lsn) {
CLOG_LOG(INFO, "wait end_lsn", K(end_lsn), K(max_lsn));
sleep (1);
}
} while (end_lsn <= max_lsn);
share::SCN max_decided_scn;
EXPECT_EQ(OB_SUCCESS, ap_sv.get_max_applied_scn(ls_id, max_decided_scn));
CLOG_LOG(INFO, "check here", K(max_scn), K(max_decided_scn), K(end_lsn), K(max_lsn), K(apply_end_lsn));
EXPECT_EQ(max_decided_scn, max_scn);
for (int i = 0; i < task_count; i++)
{
delete cb_array[i];
}
ObTenantEnv::set_tenant(get_cluster()[leader_idx]->get_tenant_base());
EXPECT_EQ(OB_SUCCESS, ap_sv.remove_ls(ls_id));
ap_sv.stop();
ap_sv.wait();
ap_sv.destroy();
CLOG_LOG(INFO, "test get_max_decided_scn finish", K(id));
}
TEST_F(TestObSimpleLogApplyFunc, get_max_decided_scn_no_more_log_after_switch_to_follower)
{
const int64_t task_count = 10;
const int64_t id = ATOMIC_AAF(&palf_id_, 1);
ObLSID ls_id(id);
int64_t leader_idx = 0;
LSN basic_lsn(0);
PalfHandleImplGuard leader;
MockAppendCb *cb_array[task_count];
CLOG_LOG(INFO, "test get_max_decided_scn_no_more_log_after_switch_to_follower begin", K(id));
EXPECT_EQ(OB_SUCCESS, create_paxos_group(id, leader_idx, leader));
MockLSAdapter ls_adapter;
ls_adapter.init((ObLSService *)(0x1));
ObLogApplyService ap_sv;
//submit log first
for (int i = 0; i < task_count; i++)
{
cb_array[i] = new MockAppendCb();
cb_array[i]->init(i + 1, &ls_adapter);
}
PalfEnv *palf_env;
ObTenantEnv::set_tenant(get_cluster()[leader_idx]->get_tenant_base());
EXPECT_EQ(OB_SUCCESS, get_palf_env(leader_idx, palf_env));
EXPECT_EQ(OB_SUCCESS, ap_sv.init(palf_env, &ls_adapter));
EXPECT_EQ(OB_SUCCESS, ap_sv.start());
EXPECT_EQ(OB_SUCCESS, ap_sv.add_ls(ls_id));
EXPECT_EQ(OB_SUCCESS, ap_sv.switch_to_leader(ls_id, 1));
const int64_t idx_1 = (leader_idx + 1) % get_node_cnt();
const int64_t idx_2 = (leader_idx + 2) % get_node_cnt();
share::SCN ref_scn;
int i = 0;
LSN lsn;
share::SCN scn;
do {
ObApplyStatus *apply_status = NULL;
ObApplyStatusGuard guard;
ap_sv.get_apply_status(ls_id, guard);
apply_status = guard.get_apply_status();
ASSERT_TRUE(NULL != apply_status);
int ret = OB_SUCCESS;
for (i = 0; i < task_count; i++)
{
{
ls_adapter.critical_guard();
CLOG_LOG(INFO, "submit log start", K(i));
ref_scn.convert_for_logservice(i);
ret = submit_log(leader, ref_scn, lsn, scn);
if (OB_SUCC(ret)) {
EXPECT_EQ(true, lsn.is_valid());
cb_array[i]->__set_lsn(lsn);
cb_array[i]->__set_scn(scn);
EXPECT_EQ(OB_SUCCESS, apply_status->push_append_cb(cb_array[i]));
} else {
break;
}
}
CLOG_LOG(INFO, "submit log finish", K(i), K(lsn), K(scn));
}
} while (0);
LSN max_lsn = leader.palf_handle_impl_->sw_.get_max_lsn();
SCN max_scn = leader.palf_handle_impl_->sw_.get_max_scn();
//wait apply done
wait_lsn_until_slide(max_lsn, leader);
EXPECT_EQ(OB_SUCCESS, ap_sv.switch_to_follower(ls_id));
LSN apply_end_lsn;
bool is_apply_done = false;
while (!is_apply_done)
{
ap_sv.is_apply_done(ls_id, is_apply_done, apply_end_lsn);
usleep(100);
}
EXPECT_EQ(max_lsn, apply_end_lsn);
share::SCN max_decided_scn;
EXPECT_EQ(OB_SUCCESS, ap_sv.get_max_applied_scn(ls_id, max_decided_scn));
CLOG_LOG(INFO, "check here", K(max_scn), K(max_decided_scn), K(max_lsn), K(apply_end_lsn));
EXPECT_EQ(max_decided_scn, max_scn);
ObTenantEnv::set_tenant(get_cluster()[leader_idx]->get_tenant_base());
EXPECT_EQ(OB_SUCCESS, ap_sv.remove_ls(ls_id));
ap_sv.stop();
ap_sv.wait();
ap_sv.destroy();
CLOG_LOG(INFO, "test get_max_decided_scn_with_no_more_log finish", K(id));
}
} // unitest } // unitest
} // oceanbase } // oceanbase

View File

@ -692,23 +692,28 @@ int ObApplyStatus::get_max_applied_scn(SCN &scn)
} else if (OB_UNLIKELY(is_in_stop_state_)) { } else if (OB_UNLIKELY(is_in_stop_state_)) {
// stop后不会再上任, 始终返回上轮作为leader时缓存的值 // stop后不会再上任, 始终返回上轮作为leader时缓存的值
} else if (FOLLOWER == role_) { } else if (FOLLOWER == role_) {
palf::LSN palf_end_lsn; //The max_applied_cb_scn_ undergoes asynchronous updating, and under circumstances where a
//transiting to a follower role, there exists a possibility that its recorded value might underestimate the actual one.
//Upon a log stream replica's shift from the leader role to a follower role, it guarantees the
//application of every log entry that has been confirmed. Thus, while in the follower phase, the
//value of max_applied_cb_scn_ can be securely incremented to match palf_committed_end_scn_.
palf::LSN apply_end_lsn; palf::LSN apply_end_lsn;
SCN palf_end_scn;
bool is_done = false; bool is_done = false;
if (OB_FAIL(is_apply_done(is_done, apply_end_lsn))) { const SCN cur_palf_committed_end_scn = palf_committed_end_scn_.atomic_load();
if (max_applied_cb_scn_ > cur_palf_committed_end_scn) {
ret = OB_ERR_UNEXPECTED;
CLOG_LOG(ERROR, "invalid max_applied_cb_scn", KPC(this));
} else if (max_applied_cb_scn_ == cur_palf_committed_end_scn) {
//no need to push up
} else if (OB_FAIL(is_apply_done(is_done, apply_end_lsn))) {
CLOG_LOG(WARN, "check is_apply_done failed", K(ret), KPC(this)); CLOG_LOG(WARN, "check is_apply_done failed", K(ret), KPC(this));
} else if (!is_done) { } else if (!is_done) {
// follower期间cb未完全回调之前暂不做任何更新 // follower期间cb未完全回调之前暂不做任何更新
// 始终返回上轮作为leader时缓存的值 // 始终返回上轮作为leader时缓存的值
// 所有cb回调完成后, 尝试推进一次最大连续回调位点 // 所有cb回调完成后, 尝试推进一次最大连续回调位点
} else if (OB_FAIL(palf_handle_.get_end_scn(palf_end_scn))) { } else if (max_applied_cb_scn_ < cur_palf_committed_end_scn) {
CLOG_LOG(WARN, "get_end_scn failed", K(ret), KPC(this)); max_applied_cb_scn_ = cur_palf_committed_end_scn;
} else if (OB_FAIL(palf_handle_.get_end_lsn(palf_end_lsn))) { CLOG_LOG(INFO, "update max_applied_cb_scn_", K(cur_palf_committed_end_scn), KPC(this));
CLOG_LOG(WARN, "get_end_lsn failed", K(ret), KPC(this));
} else if (palf_end_lsn == apply_end_lsn) {
max_applied_cb_scn_ = palf_end_scn;
CLOG_LOG(INFO, "update max_applied_cb_scn_", K(ret), KPC(this));
} }
} else if ((!last_check_scn.is_valid()) || last_check_scn == max_applied_cb_scn_) { } else if ((!last_check_scn.is_valid()) || last_check_scn == max_applied_cb_scn_) {
if (OB_FAIL(update_last_check_scn_())) { if (OB_FAIL(update_last_check_scn_())) {
@ -800,7 +805,9 @@ void ObApplyStatus::reset_meta()
lib::ObMutexGuard guard(mutex_); lib::ObMutexGuard guard(mutex_);
last_check_scn_.reset(); last_check_scn_.reset();
max_applied_cb_scn_.reset(); max_applied_cb_scn_.reset();
// palf_committed_end_scn_ also should be reset along with palf_committed_end_lsn_.
palf_committed_end_lsn_.val_ = 0; palf_committed_end_lsn_.val_ = 0;
palf_committed_end_scn_.reset();
} }
int ObApplyStatus::submit_task_to_apply_service_(ObApplyServiceTask &task) int ObApplyStatus::submit_task_to_apply_service_(ObApplyServiceTask &task)