fix group log bug caused by concurrency

This commit is contained in:
obdev
2023-07-18 06:42:26 +00:00
committed by ob-robot
parent f49cb4aaeb
commit a776cb6076
2 changed files with 61 additions and 3 deletions

View File

@ -79,6 +79,7 @@ LogSlidingWindow::LogSlidingWindow()
plugins_(NULL), plugins_(NULL),
lsn_allocator_(), lsn_allocator_(),
group_buffer_(), group_buffer_(),
last_freeze_end_lsn_(PALF_INITIAL_LSN_VAL),
last_submit_info_lock_(common::ObLatchIds::PALF_SW_SUBMIT_INFO_LOCK), last_submit_info_lock_(common::ObLatchIds::PALF_SW_SUBMIT_INFO_LOCK),
last_submit_lsn_(), last_submit_lsn_(),
last_submit_end_lsn_(), last_submit_end_lsn_(),
@ -193,6 +194,9 @@ int LogSlidingWindow::flashback(const PalfBaseInfo &palf_base_info, const int64_
last_slide_log_accum_checksum_ = prev_log_info.accum_checksum_; last_slide_log_accum_checksum_ = prev_log_info.accum_checksum_;
committed_end_lsn_ = palf_base_info.curr_lsn_; committed_end_lsn_ = palf_base_info.curr_lsn_;
set_last_freeze_end_lsn_(last_submit_end_lsn_);
reset_match_lsn_map_(); reset_match_lsn_map_();
LogGroupEntryHeader group_header; LogGroupEntryHeader group_header;
@ -522,7 +526,9 @@ int LogSlidingWindow::submit_log(const char *buf,
LSN last_submit_end_lsn, max_flushed_end_lsn; LSN last_submit_end_lsn, max_flushed_end_lsn;
get_last_submit_end_lsn_(last_submit_end_lsn); get_last_submit_end_lsn_(last_submit_end_lsn);
get_max_flushed_end_lsn(max_flushed_end_lsn); get_max_flushed_end_lsn(max_flushed_end_lsn);
if (max_flushed_end_lsn >= last_submit_end_lsn) { LSN last_freeze_end_lsn;
get_last_freeze_end_lsn_(last_freeze_end_lsn);
if (max_flushed_end_lsn >= last_freeze_end_lsn) {
// all logs have been flushed, freeze last log in feedback mode // all logs have been flushed, freeze last log in feedback mode
(void) feedback_freeze_last_log_(); (void) feedback_freeze_last_log_();
} }
@ -670,6 +676,7 @@ int LogSlidingWindow::generate_new_group_log_(const LSN &lsn,
bool &is_need_handle) bool &is_need_handle)
{ {
int ret = OB_SUCCESS; int ret = OB_SUCCESS;
int tmp_ret = OB_SUCCESS;
is_need_handle = false; is_need_handle = false;
LogTaskGuard guard(this); LogTaskGuard guard(this);
LogTask *log_task = NULL; LogTask *log_task = NULL;
@ -707,7 +714,13 @@ int LogSlidingWindow::generate_new_group_log_(const LSN &lsn,
PALF_LOG(WARN, "set_initial_header_info failed", K(ret), K_(palf_id), K_(self), K(log_id), KPC(log_task)); PALF_LOG(WARN, "set_initial_header_info failed", K(ret), K_(palf_id), K_(self), K(log_id), KPC(log_task));
} else { } else {
// The first log is responsible to try freezing self, if its end_lsn_ has been set by next log. // The first log is responsible to try freezing self, if its end_lsn_ has been set by next log.
log_task->try_freeze_by_myself(); if (OB_TMP_FAIL(log_task->try_freeze_by_myself())) {
PALF_LOG(WARN, "try_freeze_by_myself failed", K(tmp_ret), K_(palf_id), K_(self), K(log_id), KPC(log_task));
} else if (!log_task->is_freezed()) {
PALF_LOG(WARN, "log_task is not freezed", K_(palf_id), K_(self), K(log_id), KPC(log_task));
} else {
(void) inc_update_last_freeze_end_lsn_(header_info.end_lsn_);
}
} }
log_task->unlock(); log_task->unlock();
@ -1189,6 +1202,7 @@ int LogSlidingWindow::try_freeze_last_log_task_(const int64_t expected_log_id,
log_task->unlock(); log_task->unlock();
// check if this log_task can be submitted // check if this log_task can be submitted
if (log_task->is_freezed()) { if (log_task->is_freezed()) {
inc_update_last_freeze_end_lsn_(expected_end_lsn);
log_task->set_freeze_ts(ObTimeUtility::current_time()); log_task->set_freeze_ts(ObTimeUtility::current_time());
is_need_handle = (0 == log_task->get_ref_cnt()) ? true : false; is_need_handle = (0 == log_task->get_ref_cnt()) ? true : false;
} }
@ -3056,6 +3070,13 @@ int LogSlidingWindow::truncate(const TruncateLogInfo &truncate_log_info, const L
PALF_LOG(INFO, "truncate max_flushed_log_info_", K_(palf_id), K_(self), K(truncate_log_info), K(log_end_lsn), PALF_LOG(INFO, "truncate max_flushed_log_info_", K_(palf_id), K_(self), K(truncate_log_info), K(log_end_lsn),
"old flushed_end_lsn", max_flushed_end_lsn); "old flushed_end_lsn", max_flushed_end_lsn);
} }
LSN last_freeze_end_lsn;
get_last_freeze_end_lsn_(last_freeze_end_lsn);
if (last_freeze_end_lsn > truncate_begin_lsn) {
set_last_freeze_end_lsn_(truncate_begin_lsn);
PALF_LOG(INFO, "reset last_freeze_end_lsn", K_(palf_id), K_(self), K(truncate_log_info), K(log_end_lsn),
"last_freeze_end_lsn", last_freeze_end_lsn);
}
PALF_LOG(INFO, "truncate success", K(ret), K_(palf_id), K_(self), K(truncate_log_info), "max_log_id", get_max_log_id(), PALF_LOG(INFO, "truncate success", K(ret), K_(palf_id), K_(self), K(truncate_log_info), "max_log_id", get_max_log_id(),
K(log_begin_lsn), K(expected_prev_lsn), K(expected_prev_log_pid), K(prev_accum_checksum)); K(log_begin_lsn), K(expected_prev_lsn), K(expected_prev_log_pid), K(prev_accum_checksum));
} }
@ -4457,5 +4478,38 @@ int LogSlidingWindow::read_data_from_buffer(const LSN &read_begin_lsn,
return ret; return ret;
} }
void LogSlidingWindow::get_last_freeze_end_lsn_(LSN &end_lsn)
{
end_lsn.val_ = ATOMIC_LOAD(&last_freeze_end_lsn_.val_);
}
void LogSlidingWindow::set_last_freeze_end_lsn_(const LSN &end_lsn)
{
ATOMIC_STORE(&last_freeze_end_lsn_.val_, end_lsn.val_);
}
int LogSlidingWindow::inc_update_last_freeze_end_lsn_(const LSN &end_lsn)
{
int ret = OB_SUCCESS;
LSN old_last_freeze_end_lsn;
if (IS_NOT_INIT) {
ret = OB_NOT_INIT;
} else if (!end_lsn.is_valid()) {
ret = OB_INVALID_ARGUMENT;
PALF_LOG(WARN, "invalid arguments", K_(palf_id), K_(self), K(end_lsn));
} else {
get_last_freeze_end_lsn_(old_last_freeze_end_lsn);
while (end_lsn > old_last_freeze_end_lsn) {
if (ATOMIC_BCAS(&last_freeze_end_lsn_.val_, old_last_freeze_end_lsn.val_, end_lsn.val_)) {
break;
} else {
get_last_freeze_end_lsn_(old_last_freeze_end_lsn);
}
}
PALF_LOG(TRACE, "inc_update_last_freeze_end_lsn_ finished", K_(palf_id), K_(self), K(old_last_freeze_end_lsn), K(end_lsn));
}
return ret;
}
} // namespace palf } // namespace palf
} // namespace oceanbase } // namespace oceanbase

View File

@ -329,7 +329,7 @@ public:
K_(last_slide_log_pid), K_(last_slide_log_accum_checksum), K_(last_fetch_end_lsn), \ K_(last_slide_log_pid), K_(last_slide_log_accum_checksum), K_(last_fetch_end_lsn), \
K_(last_fetch_max_log_id), K_(last_fetch_committed_end_lsn), K_(last_truncate_lsn), \ K_(last_fetch_max_log_id), K_(last_fetch_committed_end_lsn), K_(last_truncate_lsn), \
K_(last_fetch_req_time), K_(is_truncating), K_(is_rebuilding), K_(last_rebuild_lsn), \ K_(last_fetch_req_time), K_(is_truncating), K_(is_rebuilding), K_(last_rebuild_lsn), \
"freeze_mode", freeze_mode_2_str(freeze_mode_), \ K_(last_freeze_end_lsn), "freeze_mode", freeze_mode_2_str(freeze_mode_), \
"last_fetch_trigger_type", fetch_trigger_type_2_str(last_fetch_trigger_type_), KP(this)); "last_fetch_trigger_type", fetch_trigger_type_2_str(last_fetch_trigger_type_), KP(this));
private: private:
int do_init_mem_(const int64_t palf_id, int do_init_mem_(const int64_t palf_id,
@ -471,6 +471,9 @@ private:
const LSN &lsn, const LSN &lsn,
const LogWriteBuf &log_write_buf); const LogWriteBuf &log_write_buf);
bool need_execute_fetch_(const FetchTriggerType &fetch_trigger_type); bool need_execute_fetch_(const FetchTriggerType &fetch_trigger_type);
void get_last_freeze_end_lsn_(LSN &end_lsn);
void set_last_freeze_end_lsn_(const LSN &end_lsn);
int inc_update_last_freeze_end_lsn_(const LSN &end_lsn);
public: public:
typedef common::ObLinearHashMap<common::ObAddr, LsnTsInfo> SvrMatchOffsetMap; typedef common::ObLinearHashMap<common::ObAddr, LsnTsInfo> SvrMatchOffsetMap;
static const int64_t TMP_HEADER_SER_BUF_LEN = 256; // log header序列化的临时buffer大小 static const int64_t TMP_HEADER_SER_BUF_LEN = 256; // log header序列化的临时buffer大小
@ -516,6 +519,7 @@ private:
LogGroupBuffer group_buffer_; LogGroupBuffer group_buffer_;
// Record the last submit log info. // Record the last submit log info.
// It is used to submit logs sequentially, for restarting, set it as last_replay_log_id. // It is used to submit logs sequentially, for restarting, set it as last_replay_log_id.
LSN last_freeze_end_lsn_;
mutable common::ObSpinLock last_submit_info_lock_; mutable common::ObSpinLock last_submit_info_lock_;
LSN last_submit_lsn_; LSN last_submit_lsn_;
LSN last_submit_end_lsn_; LSN last_submit_end_lsn_;