[bugfix] parallel replay branch savepoint insert tx-data-table lost data
Co-authored-by: ZenoWang <wzybuaasoft@163.com>
This commit is contained in:
parent
4eab635517
commit
5954384755
@ -69,6 +69,7 @@ void ObCtxTxData::reset()
|
||||
tx_data_guard_.reset();
|
||||
read_only_ = false;
|
||||
recovered_from_tx_table_ = false;
|
||||
max_replayed_rollback_scn_.set_min();
|
||||
}
|
||||
|
||||
void ObCtxTxData::destroy()
|
||||
|
@ -38,6 +38,8 @@ public:
|
||||
|
||||
bool is_read_only() const { return read_only_; }
|
||||
bool has_recovered_from_tx_table() const { return recovered_from_tx_table_; }
|
||||
share::SCN get_max_replayed_rollback_scn() const { return max_replayed_rollback_scn_; }
|
||||
void set_max_replayed_rollback_scn(const share::SCN &scn) { max_replayed_rollback_scn_ = scn; }
|
||||
int insert_into_tx_table();
|
||||
int recover_tx_data(storage::ObTxData *tmp_tx_data);
|
||||
int deep_copy_tx_data_out(storage::ObTxDataGuard &tmp_tx_data_guard);
|
||||
@ -109,6 +111,17 @@ private:
|
||||
storage::ObTxDataGuard tx_data_guard_;
|
||||
bool read_only_;
|
||||
bool recovered_from_tx_table_;
|
||||
// record the max replayed rollback to end_scn
|
||||
// used in replay of RollbackToLog
|
||||
// when replay multiple RollbackToLog in parallell, tx-data inserted into
|
||||
// tx-data-table with end_scn out of order, in order to ensure the invariant
|
||||
// of tx-data with larger end_scn contains the tx-data with smaller end_scn
|
||||
// we rewrite the tx-data by delete and insert the tx-data with same end_scn
|
||||
//
|
||||
// this is a temporary solution for the problem, in the comming refine names as
|
||||
// `shared contents of tx-data`, which can ensure the tx-data has been inserted
|
||||
// into tx-data memtable was refresh with the latest content replayed out.
|
||||
share::SCN max_replayed_rollback_scn_;
|
||||
// lock for tx_data_ pointer
|
||||
RWLock lock_;
|
||||
};
|
||||
|
@ -7834,14 +7834,17 @@ int ObPartTransCtx::rollback_to_savepoint_(const ObTxSEQ from_scn,
|
||||
/*
|
||||
* Follower:
|
||||
* 1. add UndoAction into tx_ctx's tx_data
|
||||
* 2. insert UndoAction into tx_data_table
|
||||
* 2. insert tx-data into tx_data_table
|
||||
* Leader:
|
||||
* 1. submit 'RollbackToLog'
|
||||
* 2. add UndoAction into tx_ctx's tx_data
|
||||
* 3. insert UndoAction into tx_data_table after log sync success
|
||||
* 3. insert tx-data into tx_data_table after log sync success
|
||||
*/
|
||||
bool need_update_tx_data = false;
|
||||
ObTxDataGuard tmp_tx_data_guard;
|
||||
ObTxDataGuard update_tx_data_guard;
|
||||
tmp_tx_data_guard.reset();
|
||||
update_tx_data_guard.reset();
|
||||
if (is_follower_()) { /* Follower */
|
||||
ObUndoAction undo_action(from_scn, to_scn);
|
||||
// _NOTICE_ must load Undo(s) from TxDataTable before overwriten
|
||||
@ -7853,9 +7856,62 @@ int ObPartTransCtx::rollback_to_savepoint_(const ObTxSEQ from_scn,
|
||||
TRANS_LOG(WARN, "recrod undo info fail", K(ret), K(from_scn), K(to_scn), KPC(this));
|
||||
} else if (OB_FAIL(ctx_tx_data_.deep_copy_tx_data_out(tmp_tx_data_guard))) {
|
||||
TRANS_LOG(WARN, "deep copy tx data failed", KR(ret), K(*this));
|
||||
} else if (FALSE_IT(tmp_tx_data_guard.tx_data()->end_scn_ = share::SCN::max(replay_scn, exec_info_.max_applying_log_ts_))) {
|
||||
} else if (OB_FAIL(ctx_tx_data_.insert_tmp_tx_data(tmp_tx_data_guard.tx_data()))) {
|
||||
TRANS_LOG(WARN, "insert to tx table failed", KR(ret), K(*this));
|
||||
}
|
||||
|
||||
//
|
||||
// when multiple branch-level savepoints were replayed out of order, to ensure
|
||||
// tx-data with larger end_scn include all undo-actions of others before
|
||||
//
|
||||
// we do deleting in frozen memtable and updating (which with largest end_scn) in active memtable
|
||||
// because distinguish frozen/active memtable is not easy, just always do those two actions.
|
||||
//
|
||||
// following is an illusion of this strategy:
|
||||
//
|
||||
// assume rollback to logs with scn of: 80 90 110
|
||||
//
|
||||
// and frozen scn is 100
|
||||
//
|
||||
// case 1: replay order: 110, 90, 80
|
||||
// case 2: replay order: 110, 80, 90
|
||||
// case 3: replay order: 90, 110, 80
|
||||
// case 4: replay order: 90, 80, 110
|
||||
//
|
||||
// the operations of each case:
|
||||
// case 1: insert 110 -> [insert 90, update 110] -> [insert 80, delete 90, udpate 110]
|
||||
// case 2: insert 110 -> [insert 80 update 110] -> [insert 90, delete 80, update 110]
|
||||
// case 3: insert 90 -> insert 110 -> [insert 80, delete 90, update 110]
|
||||
// case 4: insert 90 -> [insert 80, update 90] -> insert 100
|
||||
//
|
||||
|
||||
if (OB_SUCC(ret)) {
|
||||
need_update_tx_data = ctx_tx_data_.get_max_replayed_rollback_scn() > replay_scn;
|
||||
if (need_update_tx_data && OB_FAIL(ctx_tx_data_.deep_copy_tx_data_out(update_tx_data_guard))) {
|
||||
TRANS_LOG(WARN, "deep copy tx data failed", KR(ret), K(*this));
|
||||
}
|
||||
}
|
||||
// prepare end_scn for tx-data items
|
||||
if (OB_SUCC(ret)) {
|
||||
tmp_tx_data_guard.tx_data()->end_scn_ = replay_scn;
|
||||
if (need_update_tx_data) {
|
||||
// if the tx-data will be inserted into frozen tx-data-memtable, and it may be not the one with largest end_scn
|
||||
// we must delete others in order to ensure ourself is the valid one with largest end_scn
|
||||
tmp_tx_data_guard.tx_data()->exclusive_flag_ = ObTxData::ExclusiveType::EXCLUSIVE;
|
||||
// for update tx-data, use the same end_scn_
|
||||
update_tx_data_guard.tx_data()->end_scn_ = ctx_tx_data_.get_max_replayed_rollback_scn();
|
||||
update_tx_data_guard.tx_data()->exclusive_flag_ = ObTxData::ExclusiveType::EXCLUSIVE;
|
||||
}
|
||||
}
|
||||
// prepare done, do the final step to insert tx-data-table, this should not fail
|
||||
if (OB_SUCC(ret)) {
|
||||
if (OB_FAIL(ctx_tx_data_.insert_tmp_tx_data(tmp_tx_data_guard.tx_data()))) {
|
||||
TRANS_LOG(WARN, "insert to tx table failed", KR(ret), K(*this));
|
||||
} else if (need_update_tx_data && OB_FAIL(ctx_tx_data_.insert_tmp_tx_data(update_tx_data_guard.tx_data()))) {
|
||||
TRANS_LOG(WARN, "insert to tx table failed", KR(ret), K(*this));
|
||||
}
|
||||
}
|
||||
// if this is the largest scn replayed, remember it
|
||||
if (OB_SUCC(ret) && !need_update_tx_data) {
|
||||
ctx_tx_data_.set_max_replayed_rollback_scn(replay_scn);
|
||||
}
|
||||
} else if (OB_UNLIKELY(exec_info_.max_submitted_seq_no_ > to_scn)) { /* Leader */
|
||||
ObUndoAction undo_action(from_scn, to_scn);
|
||||
|
@ -235,10 +235,22 @@ public:
|
||||
// DONT : Modify this definition
|
||||
class ObTxData : public ObTxCommitData, public ObTxDataLink
|
||||
{
|
||||
public:
|
||||
enum ExclusiveType {
|
||||
NORMAL = 0,
|
||||
EXCLUSIVE,
|
||||
DELETED
|
||||
};
|
||||
private:
|
||||
const static int64_t UNIS_VERSION = 1;
|
||||
public:
|
||||
ObTxData() : ObTxCommitData(), ObTxDataLink(), tx_data_allocator_(nullptr), ref_cnt_(0), undo_status_list_(), flag_(0) {}
|
||||
ObTxData()
|
||||
: ObTxCommitData(),
|
||||
ObTxDataLink(),
|
||||
tx_data_allocator_(nullptr),
|
||||
ref_cnt_(0),
|
||||
undo_status_list_(),
|
||||
exclusive_flag_(ExclusiveType::NORMAL) {}
|
||||
ObTxData(const ObTxData &rhs);
|
||||
ObTxData &operator=(const ObTxData &rhs);
|
||||
ObTxData &operator=(const ObTxCommitData &rhs);
|
||||
@ -328,7 +340,7 @@ public:
|
||||
share::ObTenantTxDataAllocator *tx_data_allocator_;
|
||||
int64_t ref_cnt_;
|
||||
ObUndoStatusList undo_status_list_;
|
||||
int64_t flag_;
|
||||
ExclusiveType exclusive_flag_;
|
||||
};
|
||||
|
||||
class ObTxDataGuard
|
||||
|
@ -64,6 +64,20 @@ int ObTxDataHashMap::insert(const transaction::ObTransID &key, ObTxData *value)
|
||||
} else {
|
||||
int64_t pos = get_pos(key);
|
||||
|
||||
if (OB_UNLIKELY(ObTxData::ExclusiveType::NORMAL != value->exclusive_flag_)) {
|
||||
if (ObTxData::ExclusiveType::EXCLUSIVE != value->exclusive_flag_) {
|
||||
STORAGE_LOG(ERROR, "invalid exclusive flag", KPC(value));
|
||||
} else {
|
||||
ObTxData *iter = buckets_[pos].next_;
|
||||
while (OB_NOT_NULL(iter)) {
|
||||
if (iter->contain(key)) {
|
||||
iter->exclusive_flag_ = ObTxData::ExclusiveType::DELETED;
|
||||
}
|
||||
iter = iter->hash_node_.next_;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// atomic insert this value
|
||||
while (true) {
|
||||
ObTxData *next_value = ATOMIC_LOAD(&buckets_[pos].next_);
|
||||
@ -133,7 +147,7 @@ int ObTxDataHashMap::Iterator::get_next(ObTxDataGuard &guard)
|
||||
while (++bucket_idx_ < tx_data_map_.BUCKETS_CNT) {
|
||||
val_ = tx_data_map_.buckets_[bucket_idx_].next_;
|
||||
|
||||
if (OB_NOT_NULL(val_)) {
|
||||
if (OB_NOT_NULL(val_) && (ObTxData::ExclusiveType::DELETED != val_->exclusive_flag_)) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
@ -70,7 +70,6 @@ public:
|
||||
ObTxData *tx_data = new (ptr) ObTxData();
|
||||
tx_data->ref_cnt_ = 100;
|
||||
tx_data->tx_data_allocator_ = FAKE_ALLOCATOR;
|
||||
tx_data->flag_ = 269381;
|
||||
tx_data_guard.init(tx_data);
|
||||
return OB_ISNULL(tx_data) ? OB_ALLOCATE_MEMORY_FAILED : OB_SUCCESS;
|
||||
}
|
||||
@ -82,7 +81,6 @@ public:
|
||||
ObTxData *from = (ObTxData*)from_guard.tx_data();
|
||||
to->ref_cnt_ = 100;
|
||||
to->tx_data_allocator_ = FAKE_ALLOCATOR;
|
||||
to->flag_ = 269381;
|
||||
to_guard.init(to);
|
||||
OX (*to = *from);
|
||||
OZ (deep_copy_undo_status_list_(from->undo_status_list_, to->undo_status_list_));
|
||||
|
Loading…
x
Reference in New Issue
Block a user