[bugfix] parallel replay branch savepoint insert tx-data-table lost data
Co-authored-by: ZenoWang <wzybuaasoft@163.com>
This commit is contained in:
		@ -69,6 +69,7 @@ void ObCtxTxData::reset()
 | 
				
			|||||||
  tx_data_guard_.reset();
 | 
					  tx_data_guard_.reset();
 | 
				
			||||||
  read_only_ = false;
 | 
					  read_only_ = false;
 | 
				
			||||||
  recovered_from_tx_table_ = false;
 | 
					  recovered_from_tx_table_ = false;
 | 
				
			||||||
 | 
					  max_replayed_rollback_scn_.set_min();
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
void ObCtxTxData::destroy()
 | 
					void ObCtxTxData::destroy()
 | 
				
			||||||
 | 
				
			|||||||
@ -38,6 +38,8 @@ public:
 | 
				
			|||||||
 | 
					
 | 
				
			||||||
  bool is_read_only() const { return read_only_; }
 | 
					  bool is_read_only() const { return read_only_; }
 | 
				
			||||||
  bool has_recovered_from_tx_table() const { return recovered_from_tx_table_; }
 | 
					  bool has_recovered_from_tx_table() const { return recovered_from_tx_table_; }
 | 
				
			||||||
 | 
					  share::SCN get_max_replayed_rollback_scn() const { return max_replayed_rollback_scn_; }
 | 
				
			||||||
 | 
					  void set_max_replayed_rollback_scn(const share::SCN &scn) { max_replayed_rollback_scn_ = scn; }
 | 
				
			||||||
  int insert_into_tx_table();
 | 
					  int insert_into_tx_table();
 | 
				
			||||||
  int recover_tx_data(storage::ObTxData *tmp_tx_data);
 | 
					  int recover_tx_data(storage::ObTxData *tmp_tx_data);
 | 
				
			||||||
  int deep_copy_tx_data_out(storage::ObTxDataGuard &tmp_tx_data_guard);
 | 
					  int deep_copy_tx_data_out(storage::ObTxDataGuard &tmp_tx_data_guard);
 | 
				
			||||||
@ -109,6 +111,17 @@ private:
 | 
				
			|||||||
  storage::ObTxDataGuard tx_data_guard_;
 | 
					  storage::ObTxDataGuard tx_data_guard_;
 | 
				
			||||||
  bool read_only_;
 | 
					  bool read_only_;
 | 
				
			||||||
  bool recovered_from_tx_table_;
 | 
					  bool recovered_from_tx_table_;
 | 
				
			||||||
 | 
					  // record the max replayed rollback to end_scn
 | 
				
			||||||
 | 
					  // used in replay of RollbackToLog
 | 
				
			||||||
 | 
					  // when replay multiple RollbackToLog in parallell, tx-data inserted into
 | 
				
			||||||
 | 
					  // tx-data-table with end_scn out of order, in order to ensure the invariant
 | 
				
			||||||
 | 
					  // of tx-data with larger end_scn contains the tx-data with smaller end_scn
 | 
				
			||||||
 | 
					  // we rewrite the tx-data by delete and insert the tx-data with same end_scn
 | 
				
			||||||
 | 
					  //
 | 
				
			||||||
 | 
					  // this is a temporary solution for the problem, in the comming refine names as
 | 
				
			||||||
 | 
					  // `shared contents of tx-data`, which can ensure the tx-data has been inserted
 | 
				
			||||||
 | 
					  // into tx-data memtable was refresh with the latest content replayed out.
 | 
				
			||||||
 | 
					  share::SCN max_replayed_rollback_scn_;
 | 
				
			||||||
  // lock for tx_data_ pointer
 | 
					  // lock for tx_data_ pointer
 | 
				
			||||||
  RWLock lock_;
 | 
					  RWLock lock_;
 | 
				
			||||||
};
 | 
					};
 | 
				
			||||||
 | 
				
			|||||||
@ -7834,14 +7834,17 @@ int ObPartTransCtx::rollback_to_savepoint_(const ObTxSEQ from_scn,
 | 
				
			|||||||
  /*
 | 
					  /*
 | 
				
			||||||
   * Follower:
 | 
					   * Follower:
 | 
				
			||||||
   *  1. add UndoAction into tx_ctx's tx_data
 | 
					   *  1. add UndoAction into tx_ctx's tx_data
 | 
				
			||||||
   *  2. insert UndoAction into tx_data_table
 | 
					   *  2. insert tx-data into tx_data_table
 | 
				
			||||||
   * Leader:
 | 
					   * Leader:
 | 
				
			||||||
   *  1. submit 'RollbackToLog'
 | 
					   *  1. submit 'RollbackToLog'
 | 
				
			||||||
   *  2. add UndoAction into tx_ctx's tx_data
 | 
					   *  2. add UndoAction into tx_ctx's tx_data
 | 
				
			||||||
   *  3. insert UndoAction into tx_data_table after log sync success
 | 
					   *  3. insert tx-data into tx_data_table after log sync success
 | 
				
			||||||
   */
 | 
					   */
 | 
				
			||||||
 | 
					  bool need_update_tx_data = false;
 | 
				
			||||||
  ObTxDataGuard tmp_tx_data_guard;
 | 
					  ObTxDataGuard tmp_tx_data_guard;
 | 
				
			||||||
 | 
					  ObTxDataGuard update_tx_data_guard;
 | 
				
			||||||
  tmp_tx_data_guard.reset();
 | 
					  tmp_tx_data_guard.reset();
 | 
				
			||||||
 | 
					  update_tx_data_guard.reset();
 | 
				
			||||||
  if (is_follower_()) { /* Follower */
 | 
					  if (is_follower_()) { /* Follower */
 | 
				
			||||||
    ObUndoAction undo_action(from_scn, to_scn);
 | 
					    ObUndoAction undo_action(from_scn, to_scn);
 | 
				
			||||||
    // _NOTICE_ must load Undo(s) from TxDataTable before overwriten
 | 
					    // _NOTICE_ must load Undo(s) from TxDataTable before overwriten
 | 
				
			||||||
@ -7853,9 +7856,62 @@ int ObPartTransCtx::rollback_to_savepoint_(const ObTxSEQ from_scn,
 | 
				
			|||||||
      TRANS_LOG(WARN, "recrod undo info fail", K(ret), K(from_scn), K(to_scn), KPC(this));
 | 
					      TRANS_LOG(WARN, "recrod undo info fail", K(ret), K(from_scn), K(to_scn), KPC(this));
 | 
				
			||||||
    } else if (OB_FAIL(ctx_tx_data_.deep_copy_tx_data_out(tmp_tx_data_guard))) {
 | 
					    } else if (OB_FAIL(ctx_tx_data_.deep_copy_tx_data_out(tmp_tx_data_guard))) {
 | 
				
			||||||
      TRANS_LOG(WARN, "deep copy tx data failed", KR(ret), K(*this));
 | 
					      TRANS_LOG(WARN, "deep copy tx data failed", KR(ret), K(*this));
 | 
				
			||||||
    } else if (FALSE_IT(tmp_tx_data_guard.tx_data()->end_scn_ = share::SCN::max(replay_scn, exec_info_.max_applying_log_ts_))) {
 | 
					    }
 | 
				
			||||||
    } else if (OB_FAIL(ctx_tx_data_.insert_tmp_tx_data(tmp_tx_data_guard.tx_data()))) {
 | 
					
 | 
				
			||||||
      TRANS_LOG(WARN, "insert to tx table failed", KR(ret), K(*this));
 | 
					    //
 | 
				
			||||||
 | 
					    // when multiple branch-level savepoints were replayed out of order, to ensure
 | 
				
			||||||
 | 
					    // tx-data with larger end_scn include all undo-actions of others before
 | 
				
			||||||
 | 
					    //
 | 
				
			||||||
 | 
					    // we do deleting in frozen memtable and updating (which with largest end_scn) in active memtable
 | 
				
			||||||
 | 
					    // because distinguish frozen/active memtable is not easy, just always do those two actions.
 | 
				
			||||||
 | 
					    //
 | 
				
			||||||
 | 
					    // following is an illusion of this strategy:
 | 
				
			||||||
 | 
					    //
 | 
				
			||||||
 | 
					    // assume rollback to logs with scn of: 80 90 110
 | 
				
			||||||
 | 
					    //
 | 
				
			||||||
 | 
					    // and frozen scn is 100
 | 
				
			||||||
 | 
					    //
 | 
				
			||||||
 | 
					    // case 1: replay order: 110, 90,  80
 | 
				
			||||||
 | 
					    // case 2: replay order: 110, 80,  90
 | 
				
			||||||
 | 
					    // case 3: replay order: 90,  110, 80
 | 
				
			||||||
 | 
					    // case 4: replay order: 90,  80,  110
 | 
				
			||||||
 | 
					    //
 | 
				
			||||||
 | 
					    // the operations of each case:
 | 
				
			||||||
 | 
					    // case 1: insert 110 -> [insert 90, update 110] -> [insert 80, delete 90, udpate 110]
 | 
				
			||||||
 | 
					    // case 2: insert 110 -> [insert 80  update 110] -> [insert 90, delete 80, update 110]
 | 
				
			||||||
 | 
					    // case 3: insert 90  -> insert 110 -> [insert 80, delete 90, update 110]
 | 
				
			||||||
 | 
					    // case 4: insert 90  -> [insert 80, update 90]  ->  insert 100
 | 
				
			||||||
 | 
					    //
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    if (OB_SUCC(ret)) {
 | 
				
			||||||
 | 
					      need_update_tx_data = ctx_tx_data_.get_max_replayed_rollback_scn() > replay_scn;
 | 
				
			||||||
 | 
					      if (need_update_tx_data && OB_FAIL(ctx_tx_data_.deep_copy_tx_data_out(update_tx_data_guard))) {
 | 
				
			||||||
 | 
					        TRANS_LOG(WARN, "deep copy tx data failed", KR(ret), K(*this));
 | 
				
			||||||
 | 
					      }
 | 
				
			||||||
 | 
					    }
 | 
				
			||||||
 | 
					    // prepare end_scn for tx-data items
 | 
				
			||||||
 | 
					    if (OB_SUCC(ret)) {
 | 
				
			||||||
 | 
					      tmp_tx_data_guard.tx_data()->end_scn_ = replay_scn;
 | 
				
			||||||
 | 
					      if (need_update_tx_data) {
 | 
				
			||||||
 | 
					        // if the tx-data will be inserted into frozen tx-data-memtable, and it may be not the one with largest end_scn
 | 
				
			||||||
 | 
					        // we must delete others in order to ensure ourself is the valid one with largest end_scn
 | 
				
			||||||
 | 
					        tmp_tx_data_guard.tx_data()->exclusive_flag_ = ObTxData::ExclusiveType::EXCLUSIVE;
 | 
				
			||||||
 | 
					        // for update tx-data, use the same end_scn_
 | 
				
			||||||
 | 
					        update_tx_data_guard.tx_data()->end_scn_ = ctx_tx_data_.get_max_replayed_rollback_scn();
 | 
				
			||||||
 | 
					        update_tx_data_guard.tx_data()->exclusive_flag_ = ObTxData::ExclusiveType::EXCLUSIVE;
 | 
				
			||||||
 | 
					      }
 | 
				
			||||||
 | 
					    }
 | 
				
			||||||
 | 
					    // prepare done, do the final step to insert tx-data-table, this should not fail
 | 
				
			||||||
 | 
					    if (OB_SUCC(ret)) {
 | 
				
			||||||
 | 
					      if (OB_FAIL(ctx_tx_data_.insert_tmp_tx_data(tmp_tx_data_guard.tx_data()))) {
 | 
				
			||||||
 | 
					        TRANS_LOG(WARN, "insert to tx table failed", KR(ret), K(*this));
 | 
				
			||||||
 | 
					      } else if (need_update_tx_data && OB_FAIL(ctx_tx_data_.insert_tmp_tx_data(update_tx_data_guard.tx_data()))) {
 | 
				
			||||||
 | 
					        TRANS_LOG(WARN, "insert to tx table failed", KR(ret), K(*this));
 | 
				
			||||||
 | 
					      }
 | 
				
			||||||
 | 
					    }
 | 
				
			||||||
 | 
					    // if this is the largest scn replayed, remember it
 | 
				
			||||||
 | 
					    if (OB_SUCC(ret) && !need_update_tx_data) {
 | 
				
			||||||
 | 
					        ctx_tx_data_.set_max_replayed_rollback_scn(replay_scn);
 | 
				
			||||||
    }
 | 
					    }
 | 
				
			||||||
  } else if (OB_UNLIKELY(exec_info_.max_submitted_seq_no_ > to_scn)) { /* Leader */
 | 
					  } else if (OB_UNLIKELY(exec_info_.max_submitted_seq_no_ > to_scn)) { /* Leader */
 | 
				
			||||||
    ObUndoAction undo_action(from_scn, to_scn);
 | 
					    ObUndoAction undo_action(from_scn, to_scn);
 | 
				
			||||||
 | 
				
			|||||||
@ -235,10 +235,22 @@ public:
 | 
				
			|||||||
// DONT : Modify this definition
 | 
					// DONT : Modify this definition
 | 
				
			||||||
class ObTxData : public ObTxCommitData, public ObTxDataLink
 | 
					class ObTxData : public ObTxCommitData, public ObTxDataLink
 | 
				
			||||||
{
 | 
					{
 | 
				
			||||||
 | 
					public:
 | 
				
			||||||
 | 
					  enum ExclusiveType {
 | 
				
			||||||
 | 
					    NORMAL = 0,
 | 
				
			||||||
 | 
					    EXCLUSIVE,
 | 
				
			||||||
 | 
					    DELETED
 | 
				
			||||||
 | 
					  };
 | 
				
			||||||
private:
 | 
					private:
 | 
				
			||||||
  const static int64_t UNIS_VERSION = 1;
 | 
					  const static int64_t UNIS_VERSION = 1;
 | 
				
			||||||
public:
 | 
					public:
 | 
				
			||||||
  ObTxData() : ObTxCommitData(), ObTxDataLink(), tx_data_allocator_(nullptr), ref_cnt_(0), undo_status_list_(), flag_(0) {}
 | 
					  ObTxData()
 | 
				
			||||||
 | 
					      : ObTxCommitData(),
 | 
				
			||||||
 | 
					        ObTxDataLink(),
 | 
				
			||||||
 | 
					        tx_data_allocator_(nullptr),
 | 
				
			||||||
 | 
					        ref_cnt_(0),
 | 
				
			||||||
 | 
					        undo_status_list_(),
 | 
				
			||||||
 | 
					        exclusive_flag_(ExclusiveType::NORMAL) {}
 | 
				
			||||||
  ObTxData(const ObTxData &rhs);
 | 
					  ObTxData(const ObTxData &rhs);
 | 
				
			||||||
  ObTxData &operator=(const ObTxData &rhs);
 | 
					  ObTxData &operator=(const ObTxData &rhs);
 | 
				
			||||||
  ObTxData &operator=(const ObTxCommitData &rhs);
 | 
					  ObTxData &operator=(const ObTxCommitData &rhs);
 | 
				
			||||||
@ -328,7 +340,7 @@ public:
 | 
				
			|||||||
  share::ObTenantTxDataAllocator *tx_data_allocator_;
 | 
					  share::ObTenantTxDataAllocator *tx_data_allocator_;
 | 
				
			||||||
  int64_t ref_cnt_;
 | 
					  int64_t ref_cnt_;
 | 
				
			||||||
  ObUndoStatusList undo_status_list_;
 | 
					  ObUndoStatusList undo_status_list_;
 | 
				
			||||||
  int64_t flag_;
 | 
					  ExclusiveType exclusive_flag_;
 | 
				
			||||||
};
 | 
					};
 | 
				
			||||||
 | 
					
 | 
				
			||||||
class ObTxDataGuard
 | 
					class ObTxDataGuard
 | 
				
			||||||
 | 
				
			|||||||
@ -64,6 +64,20 @@ int ObTxDataHashMap::insert(const transaction::ObTransID &key, ObTxData *value)
 | 
				
			|||||||
  } else {
 | 
					  } else {
 | 
				
			||||||
    int64_t pos = get_pos(key);
 | 
					    int64_t pos = get_pos(key);
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    if (OB_UNLIKELY(ObTxData::ExclusiveType::NORMAL != value->exclusive_flag_)) {
 | 
				
			||||||
 | 
					      if (ObTxData::ExclusiveType::EXCLUSIVE != value->exclusive_flag_) {
 | 
				
			||||||
 | 
					        STORAGE_LOG(ERROR, "invalid exclusive flag", KPC(value));
 | 
				
			||||||
 | 
					      } else {
 | 
				
			||||||
 | 
					        ObTxData *iter = buckets_[pos].next_;
 | 
				
			||||||
 | 
					        while (OB_NOT_NULL(iter)) {
 | 
				
			||||||
 | 
					          if (iter->contain(key)) {
 | 
				
			||||||
 | 
					            iter->exclusive_flag_ = ObTxData::ExclusiveType::DELETED;
 | 
				
			||||||
 | 
					          }
 | 
				
			||||||
 | 
					          iter = iter->hash_node_.next_;
 | 
				
			||||||
 | 
					        }
 | 
				
			||||||
 | 
					      }
 | 
				
			||||||
 | 
					    }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    // atomic insert this value
 | 
					    // atomic insert this value
 | 
				
			||||||
    while (true) {
 | 
					    while (true) {
 | 
				
			||||||
      ObTxData *next_value = ATOMIC_LOAD(&buckets_[pos].next_);
 | 
					      ObTxData *next_value = ATOMIC_LOAD(&buckets_[pos].next_);
 | 
				
			||||||
@ -133,7 +147,7 @@ int ObTxDataHashMap::Iterator::get_next(ObTxDataGuard &guard)
 | 
				
			|||||||
      while (++bucket_idx_ < tx_data_map_.BUCKETS_CNT) {
 | 
					      while (++bucket_idx_ < tx_data_map_.BUCKETS_CNT) {
 | 
				
			||||||
        val_ = tx_data_map_.buckets_[bucket_idx_].next_;
 | 
					        val_ = tx_data_map_.buckets_[bucket_idx_].next_;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
        if (OB_NOT_NULL(val_)) {
 | 
					        if (OB_NOT_NULL(val_) && (ObTxData::ExclusiveType::DELETED != val_->exclusive_flag_)) {
 | 
				
			||||||
          break;
 | 
					          break;
 | 
				
			||||||
        }
 | 
					        }
 | 
				
			||||||
      }
 | 
					      }
 | 
				
			||||||
 | 
				
			|||||||
@ -70,7 +70,6 @@ public:
 | 
				
			|||||||
    ObTxData *tx_data = new (ptr) ObTxData();
 | 
					    ObTxData *tx_data = new (ptr) ObTxData();
 | 
				
			||||||
    tx_data->ref_cnt_ = 100;
 | 
					    tx_data->ref_cnt_ = 100;
 | 
				
			||||||
    tx_data->tx_data_allocator_ = FAKE_ALLOCATOR;
 | 
					    tx_data->tx_data_allocator_ = FAKE_ALLOCATOR;
 | 
				
			||||||
    tx_data->flag_ = 269381;
 | 
					 | 
				
			||||||
    tx_data_guard.init(tx_data);
 | 
					    tx_data_guard.init(tx_data);
 | 
				
			||||||
    return OB_ISNULL(tx_data) ? OB_ALLOCATE_MEMORY_FAILED : OB_SUCCESS;
 | 
					    return OB_ISNULL(tx_data) ? OB_ALLOCATE_MEMORY_FAILED : OB_SUCCESS;
 | 
				
			||||||
  }
 | 
					  }
 | 
				
			||||||
@ -82,7 +81,6 @@ public:
 | 
				
			|||||||
    ObTxData *from = (ObTxData*)from_guard.tx_data();
 | 
					    ObTxData *from = (ObTxData*)from_guard.tx_data();
 | 
				
			||||||
    to->ref_cnt_ = 100;
 | 
					    to->ref_cnt_ = 100;
 | 
				
			||||||
    to->tx_data_allocator_ = FAKE_ALLOCATOR;
 | 
					    to->tx_data_allocator_ = FAKE_ALLOCATOR;
 | 
				
			||||||
    to->flag_ = 269381;
 | 
					 | 
				
			||||||
    to_guard.init(to);
 | 
					    to_guard.init(to);
 | 
				
			||||||
    OX (*to = *from);
 | 
					    OX (*to = *from);
 | 
				
			||||||
    OZ (deep_copy_undo_status_list_(from->undo_status_list_, to->undo_status_list_));
 | 
					    OZ (deep_copy_undo_status_list_(from->undo_status_list_, to->undo_status_list_));
 | 
				
			||||||
 | 
				
			|||||||
		Reference in New Issue
	
	Block a user