pdml parallel logging race condition on callback_list.log_cursor_
This commit is contained in:
parent
57ecb8a20e
commit
5f5e1bc8c4
@ -27,6 +27,7 @@ ObTxCallbackList::ObTxCallbackList(ObTransCallbackMgr &callback_mgr, const int16
|
||||
: id_(id),
|
||||
head_(),
|
||||
log_cursor_(&head_),
|
||||
log_epoch_(INT64_MAX),
|
||||
parallel_start_pos_(NULL),
|
||||
length_(0),
|
||||
appended_(0),
|
||||
@ -68,7 +69,7 @@ void ObTxCallbackList::reset()
|
||||
}
|
||||
head_.set_prev(&head_);
|
||||
head_.set_next(&head_);
|
||||
log_cursor_ = &head_;
|
||||
set_log_cursor_(&head_);
|
||||
checksum_ = 0;
|
||||
tmp_checksum_ = 0;
|
||||
checksum_scn_ = SCN::min_scn();
|
||||
@ -130,7 +131,7 @@ int ObTxCallbackList::append_callback(ObITransCallback *callback,
|
||||
int64_t data_size = callback->get_data_size();
|
||||
data_size_ += data_size;
|
||||
if (repos_lc) {
|
||||
log_cursor_ = get_tail();
|
||||
set_log_cursor_(get_tail());
|
||||
}
|
||||
if (for_replay) {
|
||||
++logged_;
|
||||
@ -165,7 +166,7 @@ int64_t ObTxCallbackList::concat_callbacks(ObTxCallbackList &that)
|
||||
length_ += cnt;
|
||||
appended_ += cnt;
|
||||
if (log_cursor_ == &head_) {
|
||||
log_cursor_ = that_head;
|
||||
set_log_cursor_(that_head);
|
||||
}
|
||||
{ // fake callback removement to pass sanity check when reset
|
||||
that.length_ = 0;
|
||||
@ -261,7 +262,7 @@ int ObTxCallbackList::callback_(ObITxCallbackFunctor &functor,
|
||||
TRANS_LOG(ERROR, "remove callback failed", K(ret), KPC(iter), K(deleted));
|
||||
} else {
|
||||
if (log_cursor_ == iter) {
|
||||
log_cursor_ = next;
|
||||
set_log_cursor_(next);
|
||||
}
|
||||
if (parallel_start_pos_ == iter) {
|
||||
parallel_start_pos_ = (is_reverse || next == &head_) ? NULL : next;
|
||||
@ -518,9 +519,9 @@ int ObTxCallbackList::submit_log_succ(const ObCallbackScope &callbacks)
|
||||
if (next == &head_) {
|
||||
// next is un-stable, need serialize with append
|
||||
LockGuard guard(*this, LOCK_MODE::LOCK_APPEND);
|
||||
ATOMIC_STORE(&log_cursor_, (*callbacks.end_)->get_next());
|
||||
set_log_cursor_((*callbacks.end_)->get_next());
|
||||
} else {
|
||||
ATOMIC_STORE(&log_cursor_, next);
|
||||
set_log_cursor_(next);
|
||||
}
|
||||
ATOMIC_AAF(&logged_, (int64_t)callbacks.cnt_);
|
||||
ATOMIC_AAF(&logged_data_size_, callbacks.data_size_);
|
||||
@ -873,7 +874,13 @@ bool ObTxCallbackList::check_all_redo_flushed(const bool quite) const
|
||||
__attribute__((noinline))
|
||||
int64_t ObTxCallbackList::get_log_epoch() const
|
||||
{
|
||||
return log_cursor_ == &head_ ? INT64_MAX : log_cursor_->get_epoch();
|
||||
return ATOMIC_LOAD(&log_epoch_);
|
||||
}
|
||||
|
||||
void ObTxCallbackList::set_log_cursor_(ObITransCallback* log_cursor)
|
||||
{
|
||||
ATOMIC_STORE(&log_epoch_, (log_cursor == &head_) ? INT64_MAX : log_cursor->get_epoch());
|
||||
ATOMIC_STORE(&log_cursor_, log_cursor);
|
||||
}
|
||||
|
||||
void ObTxCallbackList::inc_update_sync_scn(const share::SCN scn)
|
||||
|
@ -210,11 +210,14 @@ public:
|
||||
return logged_data_size_;
|
||||
}
|
||||
DECLARE_TO_STRING;
|
||||
private:
|
||||
void set_log_cursor_(ObITransCallback *log_cursor);
|
||||
private:
|
||||
const int16_t id_;
|
||||
// callback list sentinel
|
||||
ObITransCallback head_;
|
||||
ObITransCallback *log_cursor_;
|
||||
int64_t log_epoch_;
|
||||
ObITransCallback *parallel_start_pos_;
|
||||
int64_t length_;
|
||||
// stats
|
||||
|
Loading…
x
Reference in New Issue
Block a user