From 5954384755bcab244fcfcb177bc844bce3c02b48 Mon Sep 17 00:00:00 2001 From: chinaxing Date: Tue, 2 Jan 2024 08:47:46 +0000 Subject: [PATCH] [bugfix] parallel replay branch savepoint insert tx-data-table lost data Co-authored-by: ZenoWang --- src/storage/tx/ob_ctx_tx_data.cpp | 1 + src/storage/tx/ob_ctx_tx_data.h | 13 ++++ src/storage/tx/ob_trans_part_ctx.cpp | 66 +++++++++++++++++-- src/storage/tx/ob_tx_data_define.h | 16 ++++- src/storage/tx_table/ob_tx_data_hash_map.cpp | 16 ++++- .../storage/tx/mock_utils/basic_fake_define.h | 2 - 6 files changed, 104 insertions(+), 10 deletions(-) diff --git a/src/storage/tx/ob_ctx_tx_data.cpp b/src/storage/tx/ob_ctx_tx_data.cpp index 6761ee615..962b05413 100644 --- a/src/storage/tx/ob_ctx_tx_data.cpp +++ b/src/storage/tx/ob_ctx_tx_data.cpp @@ -69,6 +69,7 @@ void ObCtxTxData::reset() tx_data_guard_.reset(); read_only_ = false; recovered_from_tx_table_ = false; + max_replayed_rollback_scn_.set_min(); } void ObCtxTxData::destroy() diff --git a/src/storage/tx/ob_ctx_tx_data.h b/src/storage/tx/ob_ctx_tx_data.h index d3318795e..89b7c437b 100644 --- a/src/storage/tx/ob_ctx_tx_data.h +++ b/src/storage/tx/ob_ctx_tx_data.h @@ -38,6 +38,8 @@ public: bool is_read_only() const { return read_only_; } bool has_recovered_from_tx_table() const { return recovered_from_tx_table_; } + share::SCN get_max_replayed_rollback_scn() const { return max_replayed_rollback_scn_; } + void set_max_replayed_rollback_scn(const share::SCN &scn) { max_replayed_rollback_scn_ = scn; } int insert_into_tx_table(); int recover_tx_data(storage::ObTxData *tmp_tx_data); int deep_copy_tx_data_out(storage::ObTxDataGuard &tmp_tx_data_guard); @@ -109,6 +111,17 @@ private: storage::ObTxDataGuard tx_data_guard_; bool read_only_; bool recovered_from_tx_table_; + // record the max replayed rollback to end_scn + // used in replay of RollbackToLog + // when replay multiple RollbackToLog in parallell, tx-data inserted into + // tx-data-table with end_scn out of order, in order to ensure the invariant + // of tx-data with larger end_scn contains the tx-data with smaller end_scn + // we rewrite the tx-data by delete and insert the tx-data with same end_scn + // + // this is a temporary solution for the problem, in the comming refine names as + // `shared contents of tx-data`, which can ensure the tx-data has been inserted + // into tx-data memtable was refresh with the latest content replayed out. + share::SCN max_replayed_rollback_scn_; // lock for tx_data_ pointer RWLock lock_; }; diff --git a/src/storage/tx/ob_trans_part_ctx.cpp b/src/storage/tx/ob_trans_part_ctx.cpp index bf5451346..2d9393d82 100644 --- a/src/storage/tx/ob_trans_part_ctx.cpp +++ b/src/storage/tx/ob_trans_part_ctx.cpp @@ -7834,14 +7834,17 @@ int ObPartTransCtx::rollback_to_savepoint_(const ObTxSEQ from_scn, /* * Follower: * 1. add UndoAction into tx_ctx's tx_data - * 2. insert UndoAction into tx_data_table + * 2. insert tx-data into tx_data_table * Leader: * 1. submit 'RollbackToLog' * 2. add UndoAction into tx_ctx's tx_data - * 3. insert UndoAction into tx_data_table after log sync success + * 3. insert tx-data into tx_data_table after log sync success */ + bool need_update_tx_data = false; ObTxDataGuard tmp_tx_data_guard; + ObTxDataGuard update_tx_data_guard; tmp_tx_data_guard.reset(); + update_tx_data_guard.reset(); if (is_follower_()) { /* Follower */ ObUndoAction undo_action(from_scn, to_scn); // _NOTICE_ must load Undo(s) from TxDataTable before overwriten @@ -7853,9 +7856,62 @@ int ObPartTransCtx::rollback_to_savepoint_(const ObTxSEQ from_scn, TRANS_LOG(WARN, "recrod undo info fail", K(ret), K(from_scn), K(to_scn), KPC(this)); } else if (OB_FAIL(ctx_tx_data_.deep_copy_tx_data_out(tmp_tx_data_guard))) { TRANS_LOG(WARN, "deep copy tx data failed", KR(ret), K(*this)); - } else if (FALSE_IT(tmp_tx_data_guard.tx_data()->end_scn_ = share::SCN::max(replay_scn, exec_info_.max_applying_log_ts_))) { - } else if (OB_FAIL(ctx_tx_data_.insert_tmp_tx_data(tmp_tx_data_guard.tx_data()))) { - TRANS_LOG(WARN, "insert to tx table failed", KR(ret), K(*this)); + } + + // + // when multiple branch-level savepoints were replayed out of order, to ensure + // tx-data with larger end_scn include all undo-actions of others before + // + // we do deleting in frozen memtable and updating (which with largest end_scn) in active memtable + // because distinguish frozen/active memtable is not easy, just always do those two actions. + // + // following is an illusion of this strategy: + // + // assume rollback to logs with scn of: 80 90 110 + // + // and frozen scn is 100 + // + // case 1: replay order: 110, 90, 80 + // case 2: replay order: 110, 80, 90 + // case 3: replay order: 90, 110, 80 + // case 4: replay order: 90, 80, 110 + // + // the operations of each case: + // case 1: insert 110 -> [insert 90, update 110] -> [insert 80, delete 90, udpate 110] + // case 2: insert 110 -> [insert 80 update 110] -> [insert 90, delete 80, update 110] + // case 3: insert 90 -> insert 110 -> [insert 80, delete 90, update 110] + // case 4: insert 90 -> [insert 80, update 90] -> insert 100 + // + + if (OB_SUCC(ret)) { + need_update_tx_data = ctx_tx_data_.get_max_replayed_rollback_scn() > replay_scn; + if (need_update_tx_data && OB_FAIL(ctx_tx_data_.deep_copy_tx_data_out(update_tx_data_guard))) { + TRANS_LOG(WARN, "deep copy tx data failed", KR(ret), K(*this)); + } + } + // prepare end_scn for tx-data items + if (OB_SUCC(ret)) { + tmp_tx_data_guard.tx_data()->end_scn_ = replay_scn; + if (need_update_tx_data) { + // if the tx-data will be inserted into frozen tx-data-memtable, and it may be not the one with largest end_scn + // we must delete others in order to ensure ourself is the valid one with largest end_scn + tmp_tx_data_guard.tx_data()->exclusive_flag_ = ObTxData::ExclusiveType::EXCLUSIVE; + // for update tx-data, use the same end_scn_ + update_tx_data_guard.tx_data()->end_scn_ = ctx_tx_data_.get_max_replayed_rollback_scn(); + update_tx_data_guard.tx_data()->exclusive_flag_ = ObTxData::ExclusiveType::EXCLUSIVE; + } + } + // prepare done, do the final step to insert tx-data-table, this should not fail + if (OB_SUCC(ret)) { + if (OB_FAIL(ctx_tx_data_.insert_tmp_tx_data(tmp_tx_data_guard.tx_data()))) { + TRANS_LOG(WARN, "insert to tx table failed", KR(ret), K(*this)); + } else if (need_update_tx_data && OB_FAIL(ctx_tx_data_.insert_tmp_tx_data(update_tx_data_guard.tx_data()))) { + TRANS_LOG(WARN, "insert to tx table failed", KR(ret), K(*this)); + } + } + // if this is the largest scn replayed, remember it + if (OB_SUCC(ret) && !need_update_tx_data) { + ctx_tx_data_.set_max_replayed_rollback_scn(replay_scn); } } else if (OB_UNLIKELY(exec_info_.max_submitted_seq_no_ > to_scn)) { /* Leader */ ObUndoAction undo_action(from_scn, to_scn); diff --git a/src/storage/tx/ob_tx_data_define.h b/src/storage/tx/ob_tx_data_define.h index 6059353b7..0ceefb22a 100644 --- a/src/storage/tx/ob_tx_data_define.h +++ b/src/storage/tx/ob_tx_data_define.h @@ -235,10 +235,22 @@ public: // DONT : Modify this definition class ObTxData : public ObTxCommitData, public ObTxDataLink { +public: + enum ExclusiveType { + NORMAL = 0, + EXCLUSIVE, + DELETED + }; private: const static int64_t UNIS_VERSION = 1; public: - ObTxData() : ObTxCommitData(), ObTxDataLink(), tx_data_allocator_(nullptr), ref_cnt_(0), undo_status_list_(), flag_(0) {} + ObTxData() + : ObTxCommitData(), + ObTxDataLink(), + tx_data_allocator_(nullptr), + ref_cnt_(0), + undo_status_list_(), + exclusive_flag_(ExclusiveType::NORMAL) {} ObTxData(const ObTxData &rhs); ObTxData &operator=(const ObTxData &rhs); ObTxData &operator=(const ObTxCommitData &rhs); @@ -328,7 +340,7 @@ public: share::ObTenantTxDataAllocator *tx_data_allocator_; int64_t ref_cnt_; ObUndoStatusList undo_status_list_; - int64_t flag_; + ExclusiveType exclusive_flag_; }; class ObTxDataGuard diff --git a/src/storage/tx_table/ob_tx_data_hash_map.cpp b/src/storage/tx_table/ob_tx_data_hash_map.cpp index d2d38107d..1d3b76fdb 100644 --- a/src/storage/tx_table/ob_tx_data_hash_map.cpp +++ b/src/storage/tx_table/ob_tx_data_hash_map.cpp @@ -64,6 +64,20 @@ int ObTxDataHashMap::insert(const transaction::ObTransID &key, ObTxData *value) } else { int64_t pos = get_pos(key); + if (OB_UNLIKELY(ObTxData::ExclusiveType::NORMAL != value->exclusive_flag_)) { + if (ObTxData::ExclusiveType::EXCLUSIVE != value->exclusive_flag_) { + STORAGE_LOG(ERROR, "invalid exclusive flag", KPC(value)); + } else { + ObTxData *iter = buckets_[pos].next_; + while (OB_NOT_NULL(iter)) { + if (iter->contain(key)) { + iter->exclusive_flag_ = ObTxData::ExclusiveType::DELETED; + } + iter = iter->hash_node_.next_; + } + } + } + // atomic insert this value while (true) { ObTxData *next_value = ATOMIC_LOAD(&buckets_[pos].next_); @@ -133,7 +147,7 @@ int ObTxDataHashMap::Iterator::get_next(ObTxDataGuard &guard) while (++bucket_idx_ < tx_data_map_.BUCKETS_CNT) { val_ = tx_data_map_.buckets_[bucket_idx_].next_; - if (OB_NOT_NULL(val_)) { + if (OB_NOT_NULL(val_) && (ObTxData::ExclusiveType::DELETED != val_->exclusive_flag_)) { break; } } diff --git a/unittest/storage/tx/mock_utils/basic_fake_define.h b/unittest/storage/tx/mock_utils/basic_fake_define.h index 8e5b6e5ec..25b1b2ebf 100644 --- a/unittest/storage/tx/mock_utils/basic_fake_define.h +++ b/unittest/storage/tx/mock_utils/basic_fake_define.h @@ -70,7 +70,6 @@ public: ObTxData *tx_data = new (ptr) ObTxData(); tx_data->ref_cnt_ = 100; tx_data->tx_data_allocator_ = FAKE_ALLOCATOR; - tx_data->flag_ = 269381; tx_data_guard.init(tx_data); return OB_ISNULL(tx_data) ? OB_ALLOCATE_MEMORY_FAILED : OB_SUCCESS; } @@ -82,7 +81,6 @@ public: ObTxData *from = (ObTxData*)from_guard.tx_data(); to->ref_cnt_ = 100; to->tx_data_allocator_ = FAKE_ALLOCATOR; - to->flag_ = 269381; to_guard.init(to); OX (*to = *from); OZ (deep_copy_undo_status_list_(from->undo_status_list_, to->undo_status_list_));