From a776cb60768aefa00090de9a11a9b83204736eea Mon Sep 17 00:00:00 2001 From: obdev Date: Tue, 18 Jul 2023 06:42:26 +0000 Subject: [PATCH] fix group log bug caused by concurrency --- src/logservice/palf/log_sliding_window.cpp | 58 +++++++++++++++++++++- src/logservice/palf/log_sliding_window.h | 6 ++- 2 files changed, 61 insertions(+), 3 deletions(-) diff --git a/src/logservice/palf/log_sliding_window.cpp b/src/logservice/palf/log_sliding_window.cpp index 1a8eb9baca..d3ef393016 100644 --- a/src/logservice/palf/log_sliding_window.cpp +++ b/src/logservice/palf/log_sliding_window.cpp @@ -79,6 +79,7 @@ LogSlidingWindow::LogSlidingWindow() plugins_(NULL), lsn_allocator_(), group_buffer_(), + last_freeze_end_lsn_(PALF_INITIAL_LSN_VAL), last_submit_info_lock_(common::ObLatchIds::PALF_SW_SUBMIT_INFO_LOCK), last_submit_lsn_(), last_submit_end_lsn_(), @@ -193,6 +194,9 @@ int LogSlidingWindow::flashback(const PalfBaseInfo &palf_base_info, const int64_ last_slide_log_accum_checksum_ = prev_log_info.accum_checksum_; committed_end_lsn_ = palf_base_info.curr_lsn_; + + set_last_freeze_end_lsn_(last_submit_end_lsn_); + reset_match_lsn_map_(); LogGroupEntryHeader group_header; @@ -522,7 +526,9 @@ int LogSlidingWindow::submit_log(const char *buf, LSN last_submit_end_lsn, max_flushed_end_lsn; get_last_submit_end_lsn_(last_submit_end_lsn); get_max_flushed_end_lsn(max_flushed_end_lsn); - if (max_flushed_end_lsn >= last_submit_end_lsn) { + LSN last_freeze_end_lsn; + get_last_freeze_end_lsn_(last_freeze_end_lsn); + if (max_flushed_end_lsn >= last_freeze_end_lsn) { // all logs have been flushed, freeze last log in feedback mode (void) feedback_freeze_last_log_(); } @@ -670,6 +676,7 @@ int LogSlidingWindow::generate_new_group_log_(const LSN &lsn, bool &is_need_handle) { int ret = OB_SUCCESS; + int tmp_ret = OB_SUCCESS; is_need_handle = false; LogTaskGuard guard(this); LogTask *log_task = NULL; @@ -707,7 +714,13 @@ int LogSlidingWindow::generate_new_group_log_(const LSN &lsn, PALF_LOG(WARN, "set_initial_header_info failed", K(ret), K_(palf_id), K_(self), K(log_id), KPC(log_task)); } else { // The first log is responsible to try freezing self, if its end_lsn_ has been set by next log. - log_task->try_freeze_by_myself(); + if (OB_TMP_FAIL(log_task->try_freeze_by_myself())) { + PALF_LOG(WARN, "try_freeze_by_myself failed", K(tmp_ret), K_(palf_id), K_(self), K(log_id), KPC(log_task)); + } else if (!log_task->is_freezed()) { + PALF_LOG(WARN, "log_task is not freezed", K_(palf_id), K_(self), K(log_id), KPC(log_task)); + } else { + (void) inc_update_last_freeze_end_lsn_(header_info.end_lsn_); + } } log_task->unlock(); @@ -1189,6 +1202,7 @@ int LogSlidingWindow::try_freeze_last_log_task_(const int64_t expected_log_id, log_task->unlock(); // check if this log_task can be submitted if (log_task->is_freezed()) { + inc_update_last_freeze_end_lsn_(expected_end_lsn); log_task->set_freeze_ts(ObTimeUtility::current_time()); is_need_handle = (0 == log_task->get_ref_cnt()) ? true : false; } @@ -3056,6 +3070,13 @@ int LogSlidingWindow::truncate(const TruncateLogInfo &truncate_log_info, const L PALF_LOG(INFO, "truncate max_flushed_log_info_", K_(palf_id), K_(self), K(truncate_log_info), K(log_end_lsn), "old flushed_end_lsn", max_flushed_end_lsn); } + LSN last_freeze_end_lsn; + get_last_freeze_end_lsn_(last_freeze_end_lsn); + if (last_freeze_end_lsn > truncate_begin_lsn) { + set_last_freeze_end_lsn_(truncate_begin_lsn); + PALF_LOG(INFO, "reset last_freeze_end_lsn", K_(palf_id), K_(self), K(truncate_log_info), K(log_end_lsn), + "last_freeze_end_lsn", last_freeze_end_lsn); + } PALF_LOG(INFO, "truncate success", K(ret), K_(palf_id), K_(self), K(truncate_log_info), "max_log_id", get_max_log_id(), K(log_begin_lsn), K(expected_prev_lsn), K(expected_prev_log_pid), K(prev_accum_checksum)); } @@ -4457,5 +4478,38 @@ int LogSlidingWindow::read_data_from_buffer(const LSN &read_begin_lsn, return ret; } +void LogSlidingWindow::get_last_freeze_end_lsn_(LSN &end_lsn) +{ + end_lsn.val_ = ATOMIC_LOAD(&last_freeze_end_lsn_.val_); +} + +void LogSlidingWindow::set_last_freeze_end_lsn_(const LSN &end_lsn) +{ + ATOMIC_STORE(&last_freeze_end_lsn_.val_, end_lsn.val_); +} + +int LogSlidingWindow::inc_update_last_freeze_end_lsn_(const LSN &end_lsn) +{ + int ret = OB_SUCCESS; + LSN old_last_freeze_end_lsn; + if (IS_NOT_INIT) { + ret = OB_NOT_INIT; + } else if (!end_lsn.is_valid()) { + ret = OB_INVALID_ARGUMENT; + PALF_LOG(WARN, "invalid arguments", K_(palf_id), K_(self), K(end_lsn)); + } else { + get_last_freeze_end_lsn_(old_last_freeze_end_lsn); + while (end_lsn > old_last_freeze_end_lsn) { + if (ATOMIC_BCAS(&last_freeze_end_lsn_.val_, old_last_freeze_end_lsn.val_, end_lsn.val_)) { + break; + } else { + get_last_freeze_end_lsn_(old_last_freeze_end_lsn); + } + } + PALF_LOG(TRACE, "inc_update_last_freeze_end_lsn_ finished", K_(palf_id), K_(self), K(old_last_freeze_end_lsn), K(end_lsn)); + } + return ret; +} + } // namespace palf } // namespace oceanbase diff --git a/src/logservice/palf/log_sliding_window.h b/src/logservice/palf/log_sliding_window.h index 7df71eb4d2..7603d02009 100755 --- a/src/logservice/palf/log_sliding_window.h +++ b/src/logservice/palf/log_sliding_window.h @@ -329,7 +329,7 @@ public: K_(last_slide_log_pid), K_(last_slide_log_accum_checksum), K_(last_fetch_end_lsn), \ K_(last_fetch_max_log_id), K_(last_fetch_committed_end_lsn), K_(last_truncate_lsn), \ K_(last_fetch_req_time), K_(is_truncating), K_(is_rebuilding), K_(last_rebuild_lsn), \ - "freeze_mode", freeze_mode_2_str(freeze_mode_), \ + K_(last_freeze_end_lsn), "freeze_mode", freeze_mode_2_str(freeze_mode_), \ "last_fetch_trigger_type", fetch_trigger_type_2_str(last_fetch_trigger_type_), KP(this)); private: int do_init_mem_(const int64_t palf_id, @@ -471,6 +471,9 @@ private: const LSN &lsn, const LogWriteBuf &log_write_buf); bool need_execute_fetch_(const FetchTriggerType &fetch_trigger_type); + void get_last_freeze_end_lsn_(LSN &end_lsn); + void set_last_freeze_end_lsn_(const LSN &end_lsn); + int inc_update_last_freeze_end_lsn_(const LSN &end_lsn); public: typedef common::ObLinearHashMap SvrMatchOffsetMap; static const int64_t TMP_HEADER_SER_BUF_LEN = 256; // log header序列化的临时buffer大小 @@ -516,6 +519,7 @@ private: LogGroupBuffer group_buffer_; // Record the last submit log info. // It is used to submit logs sequentially, for restarting, set it as last_replay_log_id. + LSN last_freeze_end_lsn_; mutable common::ObSpinLock last_submit_info_lock_; LSN last_submit_lsn_; LSN last_submit_end_lsn_;