[CP] fix log corrupt becuase raw write concurrently
This commit is contained in:
@ -1025,8 +1025,9 @@ int ObSimpleLogClusterTestEnv::raw_write(PalfHandleImplGuard &leader,
|
||||
do {
|
||||
usleep(10);
|
||||
ret = (leader.palf_handle_impl_)->submit_group_log(opts, lsn, buf, buf_len);
|
||||
if (OB_SUCC(ret)) {
|
||||
if (OB_SUCC(ret) || OB_ERR_OUT_OF_LOWER_BOUND == ret) {
|
||||
PALF_LOG(INFO, "raw_write success", KR(ret), K(lsn));
|
||||
ret = OB_SUCCESS;
|
||||
} else {
|
||||
if (REACH_TIME_INTERVAL(100 * 1000)) {
|
||||
PALF_LOG(WARN, "raw_write failed", KR(ret));
|
||||
@ -1167,6 +1168,10 @@ int ObSimpleLogClusterTestEnv::read_and_submit_group_log(PalfHandleImplGuard &le
|
||||
PALF_LOG(WARN, "iterator next failed", K(ret), K(iterator_raw_write));
|
||||
} else if (OB_FAIL(iterator_raw_write.get_entry(buffer, nbytes, scn, lsn, is_raw_write))) {
|
||||
PALF_LOG(WARN, "iterator get_entry failed", K(ret), K(iterator_raw_write), K(is_raw_write));
|
||||
} else if (lsn >= start_lsn && is_raw_write != true) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
PALF_LOG(ERROR, "iterator get_entry failed, is_raw_write must be true", K(ret), K(iterator_raw_write), K(is_raw_write),
|
||||
K(lsn), K(start_lsn));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -1960,6 +1960,35 @@ TEST_F(TestObSimpleLogClusterSingleReplica, test_raw_read)
|
||||
}
|
||||
}
|
||||
|
||||
TEST_F(TestObSimpleLogClusterSingleReplica, test_raw_write_concurrent_lsn)
|
||||
{
|
||||
SET_CASE_LOG_FILE(TEST_NAME, "test_raw_write_concurrent_lsn");
|
||||
int64_t id = ATOMIC_AAF(&palf_id_, 1);
|
||||
OB_LOGGER.set_log_level("TRACE");
|
||||
int64_t leader_idx = 0;
|
||||
PalfHandleImplGuard leader;
|
||||
PalfHandleImplGuard raw_write_leader;
|
||||
EXPECT_EQ(OB_SUCCESS, create_paxos_group(id, leader_idx, leader));
|
||||
PalfHandleImpl *palf_handle_impl = leader.palf_handle_impl_;
|
||||
const int64_t id_raw_write = ATOMIC_AAF(&palf_id_, 1);
|
||||
EXPECT_EQ(OB_SUCCESS, create_paxos_group(id_raw_write, leader_idx, raw_write_leader));
|
||||
EXPECT_EQ(OB_SUCCESS, change_access_mode_to_raw_write(raw_write_leader));
|
||||
|
||||
EXPECT_EQ(OB_SUCCESS, submit_log(leader, 100, leader_idx, MAX_LOG_BASE_TYPE));
|
||||
SCN max_scn1 = leader.palf_handle_impl_->get_max_scn();
|
||||
LSN end_pos_of_log1 = leader.palf_handle_impl_->get_max_lsn();
|
||||
EXPECT_EQ(OB_SUCCESS, wait_until_has_committed(leader, leader.palf_handle_impl_->get_max_lsn()));
|
||||
|
||||
std::thread submit_log_t1([&]() {
|
||||
EXPECT_EQ(OB_ITER_END, read_and_submit_group_log(leader, raw_write_leader));
|
||||
});
|
||||
std::thread submit_log_t2([&]() {
|
||||
EXPECT_EQ(OB_ITER_END, read_and_submit_group_log(leader, raw_write_leader));
|
||||
});
|
||||
submit_log_t1.join();
|
||||
submit_log_t2.join();
|
||||
}
|
||||
|
||||
} // namespace unittest
|
||||
} // namespace oceanbase
|
||||
|
||||
|
@ -3507,6 +3507,7 @@ int LogSlidingWindow::submit_group_log(const LSN &lsn,
|
||||
// get log_task success
|
||||
}
|
||||
if (OB_SUCC(ret)) {
|
||||
log_task->lock();
|
||||
SCN min_scn;
|
||||
if (log_task->is_valid()) {
|
||||
if (lsn != log_task->get_begin_lsn()
|
||||
@ -3530,7 +3531,6 @@ int LogSlidingWindow::submit_group_log(const LSN &lsn,
|
||||
PALF_LOG(WARN, "try_update_max_lsn_ failed", K(ret), K_(palf_id), K_(self), K(lsn), K(group_entry_header));
|
||||
} else {
|
||||
// prev_log_proposal_id match or not exist, receive this log
|
||||
log_task->lock();
|
||||
if (log_task->is_valid()) {
|
||||
// log_task可能被其他线程并发收取了,预期内容与本线程一致.
|
||||
if (group_entry_header.get_log_proposal_id() != log_task->get_proposal_id()) {
|
||||
@ -3547,11 +3547,11 @@ int LogSlidingWindow::submit_group_log(const LSN &lsn,
|
||||
(void) log_task->set_freezed();
|
||||
log_task->set_freeze_ts(ObTimeUtility::current_time());
|
||||
}
|
||||
log_task->unlock();
|
||||
|
||||
PALF_LOG(TRACE, "submit_group_log", K(ret), K_(palf_id), K_(self), K(group_entry_header),
|
||||
K(log_id), KPC(log_task));
|
||||
}
|
||||
log_task->unlock();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
Reference in New Issue
Block a user