From f5fa404be165217050ad46415a6c681f4115e40b Mon Sep 17 00:00:00 2001 From: Handora Date: Fri, 11 Nov 2022 10:07:09 +0000 Subject: [PATCH] [BUG] change from normal read to atomic read --- .../memtable/mvcc/ob_mvcc_iterator.cpp | 30 ++++--- src/storage/memtable/mvcc/ob_mvcc_row.cpp | 16 ++-- src/storage/memtable/mvcc/ob_mvcc_row.h | 11 ++- src/storage/tx/ob_ctx_tx_data.cpp | 16 ++-- src/storage/tx/ob_tx_data_functor.cpp | 78 ++++++++++++------- 5 files changed, 92 insertions(+), 59 deletions(-) diff --git a/src/storage/memtable/mvcc/ob_mvcc_iterator.cpp b/src/storage/memtable/mvcc/ob_mvcc_iterator.cpp index b4157da89..45d368f38 100644 --- a/src/storage/memtable/mvcc/ob_mvcc_iterator.cpp +++ b/src/storage/memtable/mvcc/ob_mvcc_iterator.cpp @@ -143,29 +143,39 @@ int ObMvccValueIterator::lock_for_read_inner_(const ObQueryFlag &flag, // reader_tx_id. const ObTransID &snapshot_tx_id = ctx_->snapshot_.tx_id_; const ObTransID &reader_tx_id = ctx_->tx_id_; - const ObTransID &data_tx_id = iter->get_tx_id(); - - const int64_t data_seq_no = iter->get_seq_no(); const int64_t snapshot_seq_no = ctx_->snapshot_.scn_; - const int64_t snapshot_version = ctx_->snapshot_.version_; const int64_t read_epoch = ctx_->get_tx_table_guard().epoch(); - const bool read_latest = flag.is_read_latest(); ObTxTable *tx_table = ctx_->get_tx_table_guard().get_tx_table(); + const bool read_latest = flag.is_read_latest(); + + const ObTransID &data_tx_id = iter->get_tx_id(); + const int64_t data_seq_no = iter->get_seq_no(); + + // NB: We need pay much attention to the order of the reads to the different + // variables. Although we update the version before the state for the tnodes + // and read the state before the version. It may appear that the compiled code + // execution may rearrange its order and fail to obey its origin logic(You can + // read the Dependency Definiation of the ARM architecture book to understand + // it). So the synchronization primitive below is much important. + const bool is_committed = iter->is_committed(); + const bool is_aborted = iter->is_aborted(); + const bool is_elr = iter->is_elr(); + const bool is_delayed_cleanout = iter->is_delayed_cleanout(); // Opt1: data is decided - if ((iter->is_committed() || iter->is_aborted() || iter->is_elr()) + if ((is_committed || is_aborted || is_elr) // Opt2: data is not decided while we donot need cleanout - || (!iter->is_delayed_cleanout() + || (!is_delayed_cleanout && (// Opt2.1: snapshot reads the data written by snapshot data_tx_id == snapshot_tx_id || // Opt2.2: read reader's latest is matched (read_latest && data_tx_id == reader_tx_id)))) { // Case 1: Cleanout can be skipped // because inner tx read only care whether tx node rollbacked - if (iter->is_committed() || iter->is_elr()) { + if (is_committed || is_elr) { // Case 2: Data is committed, so the state is decided - const int64_t data_version = iter->trans_version_; + const int64_t data_version = ATOMIC_LOAD(&iter->trans_version_); if (snapshot_version >= data_version) { // Case 2.1 Read the version if it is smaller than read version version_iter_ = iter; @@ -173,7 +183,7 @@ int ObMvccValueIterator::lock_for_read_inner_(const ObQueryFlag &flag, // Case 2.2: Otherwise, skip to the next version iter = iter->prev_; } - } else if (iter->is_aborted()) { + } else if (is_aborted) { // Case 3: Data is aborted, so the state is decided. So we skip aborted data // version iter = iter->prev_; diff --git a/src/storage/memtable/mvcc/ob_mvcc_row.cpp b/src/storage/memtable/mvcc/ob_mvcc_row.cpp index 4166dad89..6d83e452f 100644 --- a/src/storage/memtable/mvcc/ob_mvcc_row.cpp +++ b/src/storage/memtable/mvcc/ob_mvcc_row.cpp @@ -206,13 +206,13 @@ bool ObMvccTransNode::is_delayed_cleanout() const int ObMvccTransNode::fill_trans_version(const int64_t version) { - trans_version_ = version; + ATOMIC_STORE(&trans_version_, version); return OB_SUCCESS; } int ObMvccTransNode::fill_log_timestamp(const int64_t log_timestamp) { - log_timestamp_ = log_timestamp; + ATOMIC_STORE(&log_timestamp_, log_timestamp); return OB_SUCCESS; } @@ -228,7 +228,6 @@ void ObMvccTransNode::trans_abort(const int64_t tx_end_log_ts) { set_aborted(); set_tx_end_log_ts(tx_end_log_ts); - } void ObMvccTransNode::remove_callback() @@ -695,8 +694,8 @@ int ObMvccRow::insert_trans_node(ObIMvccCtx &ctx, bool ObMvccRow::is_transaction_set_violation(const int64_t snapshot_version) { - return max_trans_version_ > snapshot_version - || max_elr_trans_version_ > snapshot_version; + return ATOMIC_LOAD(&max_trans_version_) > snapshot_version + || ATOMIC_LOAD(&max_elr_trans_version_) > snapshot_version; } int ObMvccRow::elr(const ObTransID &tx_id, @@ -921,6 +920,7 @@ int ObMvccRow::mvcc_write_(ObIMemtableCtx &ctx, // on the lock state of the node even the node is not delayed cleanout for // read operation.(If you are intereted in it, read ObMvccRow::mvcc_write) ObTransID data_tx_id = iter->get_tx_id(); + if (iter->is_delayed_cleanout() && !(iter->is_committed() || iter->is_aborted()) && OB_FAIL(tx_table->cleanout_tx_node(data_tx_id, @@ -1055,7 +1055,8 @@ int ObMvccRow::mvcc_write(ObIMemtableCtx &ctx, int ret = OB_SUCCESS; lock_begin(ctx); - if (max_trans_version_ > snapshot_version || max_elr_trans_version_ > snapshot_version) { + if (ATOMIC_LOAD(&max_trans_version_) > snapshot_version + || ATOMIC_LOAD(&max_elr_trans_version_) > snapshot_version) { // Case 3. successfully locked while tsc ret = OB_TRANSACTION_SET_VIOLATION; TRANS_LOG(WARN, "transaction set violation", K(ret), @@ -1067,7 +1068,8 @@ int ObMvccRow::mvcc_write(ObIMemtableCtx &ctx, // Case1: Cannot insert because of write-write conflict ret = OB_TRY_LOCK_ROW_CONFLICT; TRANS_LOG(WARN, "mvcc write conflict", K(ret), K(ctx), K(node), K(res), K(*this)); - } else if (max_trans_version_ > snapshot_version || max_elr_trans_version_ > snapshot_version) { + } else if (ATOMIC_LOAD(&max_trans_version_) > snapshot_version + || ATOMIC_LOAD(&max_elr_trans_version_) > snapshot_version) { // Case 3. successfully locked while tsc ret = OB_TRANSACTION_SET_VIOLATION; TRANS_LOG(WARN, "transaction set violation", K(ret), K(ctx), K(node), K(*this)); diff --git a/src/storage/memtable/mvcc/ob_mvcc_row.h b/src/storage/memtable/mvcc/ob_mvcc_row.h index 55977eec3..d4611716e 100644 --- a/src/storage/memtable/mvcc/ob_mvcc_row.h +++ b/src/storage/memtable/mvcc/ob_mvcc_row.h @@ -105,13 +105,12 @@ public: // ===================== ObMvccTransNode Flag Interface ===================== void set_committed(); - bool is_committed() const { return flag_ & F_COMMITTED; } - bool is_locked() const { return flag_ & F_MUTEX; } + bool is_committed() const { return ATOMIC_LOAD(&flag_) & F_COMMITTED; } void set_elr(); - bool is_elr() const { return flag_ & F_ELR; } + bool is_elr() const { return ATOMIC_LOAD(&flag_) & F_ELR; } void set_aborted(); void clear_aborted(); - bool is_aborted() const { return (flag_ & F_ABORTED); } + bool is_aborted() const { return ATOMIC_LOAD(&flag_) & F_ABORTED; } void set_delayed_cleanout(const bool delayed_cleanout); bool is_delayed_cleanout() const; @@ -128,10 +127,10 @@ public: void set_tx_end_log_ts(const int64_t tx_end_log_ts) { if (INT64_MAX != tx_end_log_ts) { - tx_end_log_ts_ = tx_end_log_ts; + ATOMIC_STORE(&tx_end_log_ts_, tx_end_log_ts); } } - int64_t get_tx_end_log_ts() { return tx_end_log_ts_; } + int64_t get_tx_end_log_ts() { return ATOMIC_LOAD(&tx_end_log_ts_); } private: static const uint8_t F_INIT; diff --git a/src/storage/tx/ob_ctx_tx_data.cpp b/src/storage/tx/ob_ctx_tx_data.cpp index 39ce29c8e..90d29cc3d 100644 --- a/src/storage/tx/ob_ctx_tx_data.cpp +++ b/src/storage/tx/ob_ctx_tx_data.cpp @@ -265,7 +265,7 @@ int ObCtxTxData::set_state(int32_t state) if (OB_FAIL(check_tx_data_writable_())) { TRANS_LOG(WARN, "tx data is not writeable", K(ret), K(*this)); } else { - tx_data_->state_ = state; + ATOMIC_STORE(&tx_data_->state_, state); } return ret; @@ -279,7 +279,7 @@ int ObCtxTxData::set_commit_version(int64_t commit_version) if (OB_FAIL(check_tx_data_writable_())) { TRANS_LOG(WARN, "tx data is not writeable", K(ret), K(*this)); } else { - tx_data_->commit_version_ = commit_version; + ATOMIC_STORE(&tx_data_->commit_version_, commit_version); } return ret; @@ -294,7 +294,7 @@ int ObCtxTxData::set_start_log_ts(int64_t start_ts) if (OB_FAIL(check_tx_data_writable_())) { TRANS_LOG(WARN, "tx data is not writeable", K(ret), K(*this)); } else { - tx_data_->start_log_ts_ = tmp_start_ts; + ATOMIC_STORE(&tx_data_->start_log_ts_, tmp_start_ts); } return ret; @@ -308,7 +308,7 @@ int ObCtxTxData::set_end_log_ts(int64_t end_ts) if (OB_FAIL(check_tx_data_writable_())) { TRANS_LOG(WARN, "tx data is not writeable", K(ret), K(*this)); } else { - tx_data_->end_log_ts_ = end_ts; + ATOMIC_STORE(&tx_data_->end_log_ts_, end_ts); } return ret; @@ -317,19 +317,19 @@ int ObCtxTxData::set_end_log_ts(int64_t end_ts) int32_t ObCtxTxData::get_state() const { RLockGuard guard(lock_); - return (NULL != tx_data_ ? tx_data_->state_: tx_commit_data_.state_); + return (NULL != tx_data_ ? ATOMIC_LOAD(&tx_data_->state_): ATOMIC_LOAD(&tx_commit_data_.state_)); } int64_t ObCtxTxData::get_commit_version() const { RLockGuard guard(lock_); - return (NULL != tx_data_ ? tx_data_->commit_version_ : tx_commit_data_.commit_version_); + return (NULL != tx_data_ ? ATOMIC_LOAD(&tx_data_->commit_version_) : ATOMIC_LOAD(&tx_commit_data_.commit_version_)); } int64_t ObCtxTxData::get_start_log_ts() const { RLockGuard guard(lock_); - int64_t ctx_log_ts = (NULL != tx_data_ ? tx_data_->start_log_ts_ : tx_commit_data_.start_log_ts_); + int64_t ctx_log_ts = (NULL != tx_data_ ? ATOMIC_LOAD(&tx_data_->start_log_ts_) : ATOMIC_LOAD(&tx_commit_data_.start_log_ts_)); if (INT64_MAX == ctx_log_ts) { ctx_log_ts = OB_INVALID_TIMESTAMP; } @@ -339,7 +339,7 @@ int64_t ObCtxTxData::get_start_log_ts() const int64_t ObCtxTxData::get_end_log_ts() const { RLockGuard guard(lock_); - int64_t ctx_log_ts = (NULL != tx_data_ ? tx_data_->end_log_ts_ : tx_commit_data_.end_log_ts_); + int64_t ctx_log_ts = (NULL != tx_data_ ? ATOMIC_LOAD(&tx_data_->end_log_ts_) : ATOMIC_LOAD(&tx_commit_data_.end_log_ts_)); if (INT64_MAX == ctx_log_ts) { ctx_log_ts = OB_INVALID_TIMESTAMP; } diff --git a/src/storage/tx/ob_tx_data_functor.cpp b/src/storage/tx/ob_tx_data_functor.cpp index c3cea1458..b71b7e549 100644 --- a/src/storage/tx/ob_tx_data_functor.cpp +++ b/src/storage/tx/ob_tx_data_functor.cpp @@ -62,9 +62,10 @@ namespace storage int CheckSqlSequenceCanReadFunctor::operator() (const ObTxData &tx_data, ObTxCCCtx *tx_cc_ctx) { UNUSED(tx_cc_ctx); int ret = OB_SUCCESS; + const int32_t state = ATOMIC_LOAD(&tx_data.state_); // NB: The functor is only used during minor merge - if (ObTxData::ABORT == tx_data.state_) { + if (ObTxData::ABORT == state) { // Case 1: data is aborted, so we donot need it during merge can_read_ = false; } else if (tx_data.undo_status_list_.is_contain(sql_sequence_)) { @@ -82,13 +83,15 @@ int CheckRowLockedFunctor::operator() (const ObTxData &tx_data, ObTxCCCtx *tx_cc { UNUSED(tx_cc_ctx); int ret = OB_SUCCESS; + const int32_t state = ATOMIC_LOAD(&tx_data.state_); + const int64_t commit_version = ATOMIC_LOAD(&tx_data.commit_version_); - switch (tx_data.state_) { + switch (state) { case ObTxData::COMMIT: { // Case 1: data is committed, so the lock is locked by the data and we // also need return the commit version for tsc check lock_state_.is_locked_ = false; - lock_state_.trans_version_ = tx_data.commit_version_; + lock_state_.trans_version_ = commit_version; break; } case ObTxData::RUNNING: { @@ -132,26 +135,29 @@ int GetTxStateWithLogTSFunctor::operator()(const ObTxData &tx_data, ObTxCCCtx *t { UNUSED(tx_cc_ctx); int ret = OB_SUCCESS; + const int32_t state = ATOMIC_LOAD(&tx_data.state_); + const int64_t commit_version = ATOMIC_LOAD(&tx_data.commit_version_); + const int64_t end_log_ts = ATOMIC_LOAD(&tx_data.end_log_ts_); // return the transaction state_ according to the merge log ts. // the detailed document is available as follows. // https://yuque.antfin-inc.com/docs/share/a3160d5e-6e1a-4980-a12e-4af653c6cf57?# - if (ObTxData::RUNNING == tx_data.state_) { + if (ObTxData::RUNNING == state) { // Case 1: data is during execution, so we return the running state with // INT64_MAX as version state_ = ObTxData::RUNNING; trans_version_ = INT64_MAX; - } else if (log_ts_ < tx_data.end_log_ts_) { + } else if (log_ts_ < end_log_ts) { // Case 2: data is decided while the required state is before the merge log // ts, so we return the running state with INT64_MAX as txn version state_ = ObTxData::RUNNING; trans_version_ = INT64_MAX; - } else if (ObTxData::COMMIT == tx_data.state_) { + } else if (ObTxData::COMMIT == state) { // Case 3: data is committed and the required state is after the merge log // ts, so we return the commit state with commit version as txn version state_ = ObTxData::COMMIT; - trans_version_ = tx_data.commit_version_; - } else if (ObTxData::ABORT == tx_data.state_) { + trans_version_ = commit_version; + } else if (ObTxData::ABORT == state) { // Case 4: data is aborted and the required state is after the merge log // ts, so we return the abort state with 0 as txn version state_ = ObTxData::ABORT; @@ -168,31 +174,42 @@ int GetTxStateWithLogTSFunctor::operator()(const ObTxData &tx_data, ObTxCCCtx *t int LockForReadFunctor::inner_lock_for_read(const ObTxData &tx_data, ObTxCCCtx *tx_cc_ctx) { int ret = OB_SUCCESS; + const transaction::ObTxSnapshot &snapshot = lock_for_read_arg_.mvcc_acc_ctx_.snapshot_; + const int64_t snapshot_version = snapshot.version_; + const transaction::ObTransID snapshot_tx_id = snapshot.tx_id_; + const int64_t snapshot_sql_sequence = snapshot.scn_; + + const transaction::ObTransID data_tx_id = lock_for_read_arg_.data_trans_id_; + const int64_t data_sql_sequence = lock_for_read_arg_.data_sql_sequence_; + const bool read_latest = lock_for_read_arg_.read_latest_; + const transaction::ObTransID reader_tx_id = lock_for_read_arg_.mvcc_acc_ctx_.tx_id_; + + // NB: We need pay much attention to the order of the reads to the different + // variables. Although we update the version before the state for the tnodes + // and read the state before the version. It may appear that the compiled code + // execution may rearrange its order and fail to obey its origin logic(You can + // read the Dependency Definiation of the ARM architecture book to understand + // it). So the synchronization primitive below is much important. + const int32_t state = ATOMIC_LOAD(&tx_data.state_); + const int64_t commit_version = ATOMIC_LOAD(&tx_data.commit_version_); + can_read_ = false; trans_version_ = OB_INVALID_VERSION; is_determined_state_ = false; - auto &snapshot = lock_for_read_arg_.mvcc_acc_ctx_.snapshot_; - auto snapshot_version = snapshot.version_; - auto snapshot_tx_id = snapshot.tx_id_; - auto data_tx_id = lock_for_read_arg_.data_trans_id_; - auto snapshot_sql_sequence = snapshot.scn_; - auto data_sql_sequence = lock_for_read_arg_.data_sql_sequence_; - bool read_latest = lock_for_read_arg_.read_latest_; - auto reader_tx_id = lock_for_read_arg_.mvcc_acc_ctx_.tx_id_; - switch (tx_data.state_) { + switch (state) { case ObTxData::COMMIT: { // Case 1: data is committed, so the state is decided and whether we can read // depends on whether undo status contains the data. Then we return the commit // version as data version. can_read_ = !tx_data.undo_status_list_.is_contain(data_sql_sequence); - trans_version_ = tx_data.commit_version_; + trans_version_ = commit_version; is_determined_state_ = true; break; } case ObTxData::ELR_COMMIT: { can_read_ = !tx_data.undo_status_list_.is_contain(data_sql_sequence); - trans_version_ = tx_data.commit_version_; + trans_version_ = commit_version; is_determined_state_ = false; break; } @@ -287,7 +304,9 @@ int LockForReadFunctor::operator()(const ObTxData &tx_data, ObTxCCCtx *tx_cc_ctx auto &acc_ctx = lock_for_read_arg_.mvcc_acc_ctx_; auto lock_expire_ts = acc_ctx.eval_lock_expire_ts(); - if (OB_ISNULL(tx_cc_ctx) && (ObTxData::RUNNING == tx_data.state_)) { + const int32_t state = ATOMIC_LOAD(&tx_data.state_); + + if (OB_ISNULL(tx_cc_ctx) && (ObTxData::RUNNING == state)) { ret = OB_ERR_UNEXPECTED; STORAGE_LOG(WARN, "lock for read functor need prepare version.", KR(ret)); } else { @@ -348,8 +367,11 @@ bool ObReCheckNothingOperation::operator()() int ObCleanoutTxNodeOperation::operator()(const ObTxData &tx_data, ObTxCCCtx *tx_cc_ctx) { int ret = OB_SUCCESS; + const int32_t state = ATOMIC_LOAD(&tx_data.state_); + const int64_t commit_version = ATOMIC_LOAD(&tx_data.commit_version_); + const int64_t end_log_ts = ATOMIC_LOAD(&tx_data.end_log_ts_); - if (ObTxData::RUNNING == tx_data.state_ + if (ObTxData::RUNNING == state && !tx_data.undo_status_list_.is_contain(tnode_.seq_no_) // NB: we need pay attention to the choice condition when issuing the // lock_for_read, we cannot only treat state in exec_info as judgement @@ -373,27 +395,27 @@ int ObCleanoutTxNodeOperation::operator()(const ObTxData &tx_data, ObTxCCCtx *tx if (OB_FAIL(value_.unlink_trans_node(tnode_))) { TRANS_LOG(WARN, "mvcc trans ctx trans commit error", K(ret), K(value_), K(tnode_)); } else { - (void)tnode_.trans_abort(tx_data.end_log_ts_); + (void)tnode_.trans_abort(end_log_ts); } - } else if (ObTxData::RUNNING == tx_data.state_) { + } else if (ObTxData::RUNNING == state) { if (INT64_MAX != tx_cc_ctx->prepare_version_) { // Case 3: data is prepared, we also donot write back the prepare state } - } else if (ObTxData::COMMIT == tx_data.state_) { + } else if (ObTxData::COMMIT == state) { // Case 4: data is committed, so we should write back the commit state - if (OB_FAIL(value_.trans_commit(tx_data.commit_version_, tnode_))) { + if (OB_FAIL(value_.trans_commit(commit_version, tnode_))) { TRANS_LOG(WARN, "mvcc trans ctx trans commit error", K(ret), K(value_), K(tnode_)); - } else if (FALSE_IT(tnode_.trans_commit(tx_data.commit_version_, tx_data.end_log_ts_))) { + } else if (FALSE_IT(tnode_.trans_commit(commit_version, end_log_ts))) { } else if (blocksstable::ObDmlFlag::DF_LOCK == tnode_.get_dml_flag() && OB_FAIL(value_.unlink_trans_node(tnode_))) { TRANS_LOG(WARN, "unlink lock node failed", K(ret), K(value_), K(tnode_)); } - } else if (ObTxData::ABORT == tx_data.state_) { + } else if (ObTxData::ABORT == state) { // Case 6: data is aborted, so we write back the abort state if (OB_FAIL(value_.unlink_trans_node(tnode_))) { TRANS_LOG(WARN, "mvcc trans ctx trans commit error", K(ret), K(value_), K(tnode_)); } else { - (void)tnode_.trans_abort(tx_data.end_log_ts_); + (void)tnode_.trans_abort(end_log_ts); } } else { ret = OB_ERR_UNEXPECTED;