From 5f5e1bc8c438ff8fd93375dbb51edf7b091ae6c5 Mon Sep 17 00:00:00 2001 From: chinaxing Date: Fri, 29 Nov 2024 17:45:44 +0000 Subject: [PATCH] pdml parallel logging race condition on callback_list.log_cursor_ --- .../memtable/mvcc/ob_tx_callback_list.cpp | 21 ++++++++++++------- .../memtable/mvcc/ob_tx_callback_list.h | 3 +++ 2 files changed, 17 insertions(+), 7 deletions(-) diff --git a/src/storage/memtable/mvcc/ob_tx_callback_list.cpp b/src/storage/memtable/mvcc/ob_tx_callback_list.cpp index 652c239bf..f0c90b3cf 100644 --- a/src/storage/memtable/mvcc/ob_tx_callback_list.cpp +++ b/src/storage/memtable/mvcc/ob_tx_callback_list.cpp @@ -27,6 +27,7 @@ ObTxCallbackList::ObTxCallbackList(ObTransCallbackMgr &callback_mgr, const int16 : id_(id), head_(), log_cursor_(&head_), + log_epoch_(INT64_MAX), parallel_start_pos_(NULL), length_(0), appended_(0), @@ -68,7 +69,7 @@ void ObTxCallbackList::reset() } head_.set_prev(&head_); head_.set_next(&head_); - log_cursor_ = &head_; + set_log_cursor_(&head_); checksum_ = 0; tmp_checksum_ = 0; checksum_scn_ = SCN::min_scn(); @@ -130,7 +131,7 @@ int ObTxCallbackList::append_callback(ObITransCallback *callback, int64_t data_size = callback->get_data_size(); data_size_ += data_size; if (repos_lc) { - log_cursor_ = get_tail(); + set_log_cursor_(get_tail()); } if (for_replay) { ++logged_; @@ -165,7 +166,7 @@ int64_t ObTxCallbackList::concat_callbacks(ObTxCallbackList &that) length_ += cnt; appended_ += cnt; if (log_cursor_ == &head_) { - log_cursor_ = that_head; + set_log_cursor_(that_head); } { // fake callback removement to pass sanity check when reset that.length_ = 0; @@ -261,7 +262,7 @@ int ObTxCallbackList::callback_(ObITxCallbackFunctor &functor, TRANS_LOG(ERROR, "remove callback failed", K(ret), KPC(iter), K(deleted)); } else { if (log_cursor_ == iter) { - log_cursor_ = next; + set_log_cursor_(next); } if (parallel_start_pos_ == iter) { parallel_start_pos_ = (is_reverse || next == &head_) ? NULL : next; @@ -518,9 +519,9 @@ int ObTxCallbackList::submit_log_succ(const ObCallbackScope &callbacks) if (next == &head_) { // next is un-stable, need serialize with append LockGuard guard(*this, LOCK_MODE::LOCK_APPEND); - ATOMIC_STORE(&log_cursor_, (*callbacks.end_)->get_next()); + set_log_cursor_((*callbacks.end_)->get_next()); } else { - ATOMIC_STORE(&log_cursor_, next); + set_log_cursor_(next); } ATOMIC_AAF(&logged_, (int64_t)callbacks.cnt_); ATOMIC_AAF(&logged_data_size_, callbacks.data_size_); @@ -873,7 +874,13 @@ bool ObTxCallbackList::check_all_redo_flushed(const bool quite) const __attribute__((noinline)) int64_t ObTxCallbackList::get_log_epoch() const { - return log_cursor_ == &head_ ? INT64_MAX : log_cursor_->get_epoch(); + return ATOMIC_LOAD(&log_epoch_); +} + +void ObTxCallbackList::set_log_cursor_(ObITransCallback* log_cursor) +{ + ATOMIC_STORE(&log_epoch_, (log_cursor == &head_) ? INT64_MAX : log_cursor->get_epoch()); + ATOMIC_STORE(&log_cursor_, log_cursor); } void ObTxCallbackList::inc_update_sync_scn(const share::SCN scn) diff --git a/src/storage/memtable/mvcc/ob_tx_callback_list.h b/src/storage/memtable/mvcc/ob_tx_callback_list.h index a0c455c72..4fdf20421 100644 --- a/src/storage/memtable/mvcc/ob_tx_callback_list.h +++ b/src/storage/memtable/mvcc/ob_tx_callback_list.h @@ -210,11 +210,14 @@ public: return logged_data_size_; } DECLARE_TO_STRING; +private: + void set_log_cursor_(ObITransCallback *log_cursor); private: const int16_t id_; // callback list sentinel ObITransCallback head_; ObITransCallback *log_cursor_; + int64_t log_epoch_; ObITransCallback *parallel_start_pos_; int64_t length_; // stats