diff --git a/mittest/mtlenv/storage/test_memtable_v2.cpp b/mittest/mtlenv/storage/test_memtable_v2.cpp index bf85c106ee..5937b55229 100644 --- a/mittest/mtlenv/storage/test_memtable_v2.cpp +++ b/mittest/mtlenv/storage/test_memtable_v2.cpp @@ -616,7 +616,6 @@ public: share::SCN snapshot_scn; snapshot_scn.convert_for_tx(snapshot_version); EXPECT_EQ(OB_SUCCESS, row->row_compact(memtable, - for_replay, snapshot_scn, &allocator2_)); TRANS_LOG(INFO, "====================== end compact row =====================", diff --git a/src/storage/memtable/mvcc/ob_mvcc_engine.cpp b/src/storage/memtable/mvcc/ob_mvcc_engine.cpp index b283866dc1..dfaeca5584 100644 --- a/src/storage/memtable/mvcc/ob_mvcc_engine.cpp +++ b/src/storage/memtable/mvcc/ob_mvcc_engine.cpp @@ -94,7 +94,6 @@ int ObMvccEngine::try_compact_row_when_mvcc_read_(const SCN &snapshot_version, } else { ObRowLatchGuard guard(row.latch_); if (OB_FAIL(row.row_compact(memtable_, - true/*for_replay*/, snapshot_version, engine_allocator_))) { TRANS_LOG(WARN, "row compact error", K(ret), K(snapshot_version)); diff --git a/src/storage/memtable/mvcc/ob_mvcc_iterator.cpp b/src/storage/memtable/mvcc/ob_mvcc_iterator.cpp index 2319d82168..cfeb7a70db 100644 --- a/src/storage/memtable/mvcc/ob_mvcc_iterator.cpp +++ b/src/storage/memtable/mvcc/ob_mvcc_iterator.cpp @@ -95,11 +95,11 @@ int ObMvccValueIterator::lock_for_read_(const ObQueryFlag &flag) if (NULL != version_iter_) { if (ctx_->is_weak_read()) { version_iter_->set_safe_read_barrier(true); - version_iter_->set_snapshot_version_barrier(ctx_->snapshot_.version_); - } - if (!flag.is_prewarm() - && !version_iter_->is_elr()) { - version_iter_->set_snapshot_version_barrier(ctx_->snapshot_.version_); + version_iter_->set_snapshot_version_barrier(ctx_->snapshot_.version_, + ObMvccTransNode::WEAK_READ_BIT); + } else if (!flag.is_prewarm() && !version_iter_->is_elr()) { + version_iter_->set_snapshot_version_barrier(ctx_->snapshot_.version_, + ObMvccTransNode::NORMAL_READ_BIT); } } diff --git a/src/storage/memtable/mvcc/ob_mvcc_row.cpp b/src/storage/memtable/mvcc/ob_mvcc_row.cpp index c08d4e1d79..088e3ec6d0 100644 --- a/src/storage/memtable/mvcc/ob_mvcc_row.cpp +++ b/src/storage/memtable/mvcc/ob_mvcc_row.cpp @@ -131,9 +131,18 @@ bool ObMvccTransNode::is_safe_read_barrier() const || (flag & F_STRONG_CONSISTENT_READ_BARRIER)); } -void ObMvccTransNode::set_snapshot_version_barrier(const SCN version) +void ObMvccTransNode::set_snapshot_version_barrier(const SCN scn_version, + const int64_t flag) { - snapshot_version_barrier_ = version; + ATOMIC_STORE(&snapshot_version_barrier_, scn_version.get_val_for_tx() | flag); +} + +void ObMvccTransNode::get_snapshot_version_barrier(int64_t &version, + int64_t &flag) +{ + int64_t flaged_version = ATOMIC_LOAD(&snapshot_version_barrier_); + version = flaged_version & (~SNAPSHOT_VERSION_BARRIER_BIT); + flag = flaged_version & SNAPSHOT_VERSION_BARRIER_BIT; } void ObMvccTransNode::get_trans_id_and_seq_no(ObTransID &tx_id, @@ -205,7 +214,8 @@ int64_t ObMvccTransNode::to_string(char *buf, const int64_t buf_len) const "version=%ld " "type=%d " "flag=%d " - "snapshot_version_barrier=%s " + "snapshot_barrier=%ld " + "snapshot_barrier_flag=%ld " "mtd=%s " "seq_no=%ld", this, @@ -219,7 +229,9 @@ int64_t ObMvccTransNode::to_string(char *buf, const int64_t buf_len) const version_, type_, flag_, - to_cstring(snapshot_version_barrier_), + snapshot_version_barrier_ + & (~SNAPSHOT_VERSION_BARRIER_BIT), + snapshot_version_barrier_ >> 62, to_cstring(*mtd), seq_no_); return pos; @@ -487,7 +499,6 @@ bool ObMvccRow::need_compact(const bool for_read, const bool for_replay) } int ObMvccRow::row_compact(ObMemtable *memtable, - const bool for_replay, const SCN snapshot_version, ObIAllocator *node_alloc) { @@ -498,9 +509,10 @@ int ObMvccRow::row_compact(ObMemtable *memtable, KP(node_alloc), KP(memtable)); } else { ObMemtableRowCompactor row_compactor; - if (OB_FAIL(row_compactor.init(this, memtable, node_alloc, for_replay))) { + if (OB_FAIL(row_compactor.init(this, memtable, node_alloc))) { TRANS_LOG(WARN, "row compactor init error", K(ret)); - } else if (OB_FAIL(row_compactor.compact(snapshot_version))) { + } else if (OB_FAIL(row_compactor.compact(snapshot_version, + ObMvccTransNode::COMPACT_READ_BIT))) { TRANS_LOG(WARN, "row compact error", K(ret), K(snapshot_version)); } else { // do nothing @@ -732,13 +744,17 @@ int ObMvccRow::trans_commit(const SCN commit_version, ObMvccTransNode &node) } else { // Check safety condition for ELR if (NULL != node.prev_ && node.prev_->is_safe_read_barrier()) { - if (commit_version <= node.prev_->snapshot_version_barrier_) { - if ((node.is_elr() || node.is_delayed_cleanout()) && node.prev_->type_ == NDT_COMPACT) { + int64_t snapshot_version_barrier = 0; + int64_t flag = 0; + (void)node.prev_->get_snapshot_version_barrier(snapshot_version_barrier, flag); + if (commit_version.get_val_for_tx() <= snapshot_version_barrier) { + if ((node.is_elr() || node.is_delayed_cleanout()) && node.prev_->type_ == NDT_COMPACT) { // do nothing } else { // ignore ret - TRANS_LOG(ERROR, "unexpected commit version", K(commit_version), K(*this), - "cur_node", node, "prev_node", *(node.prev_)); + TRANS_LOG(ERROR, "unexpected commit version", K(snapshot_version_barrier), + "cur_node", node, "prev_node", *(node.prev_), K(flag), K(*this), + K(commit_version)); } } } diff --git a/src/storage/memtable/mvcc/ob_mvcc_row.h b/src/storage/memtable/mvcc/ob_mvcc_row.h index eb8e764233..3b94e4078b 100644 --- a/src/storage/memtable/mvcc/ob_mvcc_row.h +++ b/src/storage/memtable/mvcc/ob_mvcc_row.h @@ -72,7 +72,7 @@ public: modify_count_(0), acc_checksum_(0), version_(0), - snapshot_version_barrier_(share::SCN::min_scn()), + snapshot_version_barrier_(0), type_(NDT_NORMAL), flag_(0) {} @@ -88,7 +88,7 @@ public: uint32_t modify_count_; uint32_t acc_checksum_; int64_t version_; - share::SCN snapshot_version_barrier_; + int64_t snapshot_version_barrier_; uint8_t type_; uint8_t flag_; char buf_[0]; @@ -112,14 +112,17 @@ public: void remove_callback(); // ===================== ObMvccTransNode Tx Node Meta ===================== - // ObMvccRow records safe_read_barrier and snapshot_version_barrier to detect - // unexpected behaviors. The safe_read_barrier means the type of the last read - // operation performed on the row. And the snapshot_version_barrier means the - // version of the read operation, + // ObMvccRow records snapshot_version_barrier to detect unexpected concurrency + // control behaviors. The snapshot_version_barrier means the snapshot of the + // latest read operation, and if a commit version appears after the read + // operation with the commit version smaller than the snapshot version, we + // should report the unexpected bahavior. void set_safe_read_barrier(const bool is_weak_consistent_read); void clear_safe_read_barrier(); bool is_safe_read_barrier() const; - void set_snapshot_version_barrier(const share::SCN version); + void set_snapshot_version_barrier(const share::SCN version, + const int64_t flag); + void get_snapshot_version_barrier(int64_t &version, int64_t &flag); // ===================== ObMvccTransNode Flag Interface ===================== OB_INLINE void set_committed() @@ -182,6 +185,7 @@ public: share::SCN get_tx_end_scn() { return tx_end_scn_.atomic_load(); } private: + // the row flag of the mvcc tx node static const uint8_t F_INIT; static const uint8_t F_WEAK_CONSISTENT_READ_BARRIER; static const uint8_t F_STRONG_CONSISTENT_READ_BARRIER; @@ -190,6 +194,13 @@ private: static const uint8_t F_ABORTED; static const uint8_t F_DELAYED_CLEANOUT; static const uint8_t F_MUTEX; + +public: + // the snapshot flag of the snapshot version barrier + static const int64_t NORMAL_READ_BIT = 0x0L; + static const int64_t WEAK_READ_BIT = 0x1L << 62; + static const int64_t COMPACT_READ_BIT = 0x2L << 62; + static const int64_t SNAPSHOT_VERSION_BARRIER_BIT = 0x3L << 62; }; //////////////////////////////////////////////////////////////////////////////////////////////////// @@ -303,7 +314,6 @@ struct ObMvccRow // snapshot_version is the version for row compact // node_alloc is the allocator for compact node allocation int row_compact(ObMemtable *memtable, - const bool for_replay, const share::SCN snapshot_version, common::ObIAllocator *node_alloc); diff --git a/src/storage/memtable/mvcc/ob_mvcc_trans_ctx.cpp b/src/storage/memtable/mvcc/ob_mvcc_trans_ctx.cpp index 7ff55486bf..82e245020a 100644 --- a/src/storage/memtable/mvcc/ob_mvcc_trans_ctx.cpp +++ b/src/storage/memtable/mvcc/ob_mvcc_trans_ctx.cpp @@ -977,12 +977,18 @@ int ObMvccRowCallback::trans_commit() (void)ATOMIC_FAA(&value_.update_since_compact_, 1); if (value_.need_compact(for_read, ctx_.is_for_replay())) { if (ctx_.is_for_replay()) { - if (ctx_.get_replay_compact_version().is_valid_and_not_min() && SCN::max_scn() != ctx_.get_replay_compact_version()) { - memtable_->row_compact(&value_, ctx_.is_for_replay(), ctx_.get_replay_compact_version()); + if (ctx_.get_replay_compact_version().is_valid_and_not_min() + && SCN::max_scn() != ctx_.get_replay_compact_version()) { + memtable_->row_compact(&value_, + ctx_.get_replay_compact_version(), + ObMvccTransNode::WEAK_READ_BIT + | ObMvccTransNode::COMPACT_READ_BIT); } } else { SCN snapshot_version_for_compact = SCN::minus(SCN::max_scn(), 100); - memtable_->row_compact(&value_, ctx_.is_for_replay(), snapshot_version_for_compact); + memtable_->row_compact(&value_, + snapshot_version_for_compact, + ObMvccTransNode::NORMAL_READ_BIT); } } } diff --git a/src/storage/memtable/ob_memtable.cpp b/src/storage/memtable/ob_memtable.cpp index 37f6260495..40b937409a 100644 --- a/src/storage/memtable/ob_memtable.cpp +++ b/src/storage/memtable/ob_memtable.cpp @@ -1375,17 +1375,17 @@ ObDatumRange &ObMemtable::m_get_real_range(ObDatumRange &real_range, const ObDat } int ObMemtable::row_compact(ObMvccRow *row, - const bool for_replay, - const SCN snapshot_version) + const SCN snapshot_version, + const int64_t flag) { int ret = OB_SUCCESS; ObMemtableRowCompactor row_compactor; if (OB_ISNULL(row)) { ret = OB_INVALID_ARGUMENT; TRANS_LOG(WARN, "row is NULL"); - } else if (OB_FAIL(row_compactor.init(row, this, &local_allocator_, for_replay))) { + } else if (OB_FAIL(row_compactor.init(row, this, &local_allocator_))) { TRANS_LOG(WARN, "row compactor init error", K(ret)); - } else if (OB_FAIL(row_compactor.compact(snapshot_version))) { + } else if (OB_FAIL(row_compactor.compact(snapshot_version, flag))) { TRANS_LOG(WARN, "row_compact fail", K(ret), K(*row), K(snapshot_version)); } else { // do nothing diff --git a/src/storage/memtable/ob_memtable.h b/src/storage/memtable/ob_memtable.h index e12d398e58..ca0fc573a9 100644 --- a/src/storage/memtable/ob_memtable.h +++ b/src/storage/memtable/ob_memtable.h @@ -325,7 +325,9 @@ public: inline bool not_empty() const { return INT64_MAX != get_protection_clock(); }; void set_max_schema_version(const int64_t schema_version); virtual int64_t get_max_schema_version() const override; - int row_compact(ObMvccRow *value, const bool for_replay, const share::SCN snapshot_version); + int row_compact(ObMvccRow *value, + const share::SCN snapshot_version, + const int64_t flag); int64_t get_hash_item_count() const; int64_t get_hash_alloc_memory() const; int64_t get_btree_item_count() const; diff --git a/src/storage/memtable/ob_row_compactor.cpp b/src/storage/memtable/ob_row_compactor.cpp index 753f72252d..01004a209d 100644 --- a/src/storage/memtable/ob_row_compactor.cpp +++ b/src/storage/memtable/ob_row_compactor.cpp @@ -40,16 +40,14 @@ ObMemtableRowCompactor::ObMemtableRowCompactor() : is_inited_(false), row_(NULL), memtable_(NULL), - node_alloc_(NULL), - for_replay_(false) + node_alloc_(NULL) {} ObMemtableRowCompactor::~ObMemtableRowCompactor() {} int ObMemtableRowCompactor::init(ObMvccRow *row, ObMemtable *mt, - ObIAllocator *node_alloc, - const bool for_replay) + ObIAllocator *node_alloc) { int ret = OB_SUCCESS; if (is_inited_) { @@ -63,7 +61,6 @@ int ObMemtableRowCompactor::init(ObMvccRow *row, row_ = row; memtable_ = mt; node_alloc_ = node_alloc; - for_replay_ = for_replay; } return ret; } @@ -72,7 +69,8 @@ int ObMemtableRowCompactor::init(ObMvccRow *row, // So modification is guaranteed to be safety with another modification, // while we need pay attention to the concurrency between lock_for_read // and modification(such as compact) -int ObMemtableRowCompactor::compact(const SCN snapshot_version) +int ObMemtableRowCompactor::compact(const SCN snapshot_version, + const int64_t flag) { int ret = OB_SUCCESS; @@ -91,7 +89,7 @@ int ObMemtableRowCompactor::compact(const SCN snapshot_version) find_start_pos_(snapshot_version, start); tg.click(); - ObMvccTransNode *compact_node = construct_compact_node_(snapshot_version, start); + ObMvccTransNode *compact_node = construct_compact_node_(snapshot_version, flag, start); tg.click(); if (OB_NOT_NULL(compact_node)) { @@ -198,6 +196,7 @@ int ObMemtableRowCompactor::try_cleanout_tx_node_during_compact_(ObTxTableGuard } ObMvccTransNode *ObMemtableRowCompactor::construct_compact_node_(const SCN snapshot_version, + const int64_t flag, ObMvccTransNode *save) { int ret = OB_SUCCESS; @@ -350,7 +349,7 @@ ObMvccTransNode *ObMemtableRowCompactor::construct_compact_node_(const SCN snaps trans_node->type_ = NDT_COMPACT; trans_node->flag_ = save->flag_; trans_node->scn_ = save->scn_; - trans_node->set_snapshot_version_barrier(snapshot_version); + trans_node->set_snapshot_version_barrier(snapshot_version, flag); TRANS_LOG(DEBUG, "success to compact row, ", K(trans_node->tx_id_), K(dml_flag), K(compact_row_cnt), KPC(save)); } } diff --git a/src/storage/memtable/ob_row_compactor.h b/src/storage/memtable/ob_row_compactor.h index 8e851b8ea9..834a660ba7 100644 --- a/src/storage/memtable/ob_row_compactor.h +++ b/src/storage/memtable/ob_row_compactor.h @@ -52,14 +52,15 @@ private: public: int init(ObMvccRow *row, ObMemtable *mt, - common::ObIAllocator *node_alloc, - const bool for_replay); + common::ObIAllocator *node_alloc); // compact and refresh the update counter by snapshot version - int compact(const share::SCN snapshot_version); + int compact(const share::SCN snapshot_version, + const int64_t flag); private: void find_start_pos_(const share::SCN snapshot_version, ObMvccTransNode *&save); ObMvccTransNode *construct_compact_node_(const share::SCN snapshot_version, + const int64_t flag, ObMvccTransNode *save); int try_cleanout_tx_node_during_compact_(storage::ObTxTableGuard &tx_table_guard, ObMvccTransNode *tnode); @@ -70,7 +71,6 @@ private: ObMvccRow *row_; ObMemtable *memtable_; common::ObIAllocator *node_alloc_; - bool for_replay_; };