diff --git a/src/storage/memtable/mvcc/ob_mvcc_trans_ctx.cpp b/src/storage/memtable/mvcc/ob_mvcc_trans_ctx.cpp index d9984e5617..03ab81ab50 100644 --- a/src/storage/memtable/mvcc/ob_mvcc_trans_ctx.cpp +++ b/src/storage/memtable/mvcc/ob_mvcc_trans_ctx.cpp @@ -390,14 +390,14 @@ int ObTransCallbackMgr::remove_callbacks_for_fast_commit(bool &has_remove) return ret; } -int ObTransCallbackMgr::remove_callback_for_uncommited_txn(ObIMemtable *memtable) +int ObTransCallbackMgr::remove_callback_for_uncommited_txn(ObIMemtable *memtable, const share::SCN max_applied_scn) { int ret = OB_SUCCESS; if (OB_ISNULL(memtable)) { ret = OB_INVALID_ARGUMENT; TRANS_LOG(WARN, "memtable is null", K(ret)); - } else if (OB_FAIL(callback_list_.remove_callbacks_for_remove_memtable(memtable))) { + } else if (OB_FAIL(callback_list_.remove_callbacks_for_remove_memtable(memtable, max_applied_scn))) { TRANS_LOG(WARN, "fifo remove callback fail", K(ret), K(*memtable)); } diff --git a/src/storage/memtable/mvcc/ob_mvcc_trans_ctx.h b/src/storage/memtable/mvcc/ob_mvcc_trans_ctx.h index 6aa1820fe8..25580c38dd 100644 --- a/src/storage/memtable/mvcc/ob_mvcc_trans_ctx.h +++ b/src/storage/memtable/mvcc/ob_mvcc_trans_ctx.h @@ -209,7 +209,7 @@ public: void set_for_replay(const bool for_replay); bool is_for_replay() const { return ATOMIC_LOAD(&for_replay_); } int remove_callbacks_for_fast_commit(bool &has_remove); - int remove_callback_for_uncommited_txn(memtable::ObIMemtable *memtable); + int remove_callback_for_uncommited_txn(memtable::ObIMemtable *memtable, const share::SCN max_applied_scn); int get_memtable_key_arr(transaction::ObMemtableKeyArray &memtable_key_arr); void acquire_callback_list(); void revert_callback_list(); diff --git a/src/storage/memtable/mvcc/ob_tx_callback_list.cpp b/src/storage/memtable/mvcc/ob_tx_callback_list.cpp index bfb4fbe2ca..c6d2632ff8 100644 --- a/src/storage/memtable/mvcc/ob_tx_callback_list.cpp +++ b/src/storage/memtable/mvcc/ob_tx_callback_list.cpp @@ -163,7 +163,7 @@ int ObTxCallbackList::remove_callbacks_for_fast_commit(bool &has_remove) return ret; } -int ObTxCallbackList::remove_callbacks_for_remove_memtable(ObIMemtable *memtable_for_remove) +int ObTxCallbackList::remove_callbacks_for_remove_memtable(ObIMemtable *memtable_for_remove, const share::SCN max_applied_scn) { int ret = OB_SUCCESS; SpinLockGuard guard(latch_); @@ -177,8 +177,8 @@ int ObTxCallbackList::remove_callbacks_for_remove_memtable(ObIMemtable *memtable return false; } }, // condition for stop - [memtable_for_remove](ObITransCallback *callback) -> bool { - if (callback->get_scn() > memtable_for_remove->get_key().get_end_scn()) { + [max_applied_scn](ObITransCallback *callback) -> bool { + if (callback->get_scn() > max_applied_scn) { return true; } else { return false; diff --git a/src/storage/memtable/mvcc/ob_tx_callback_list.h b/src/storage/memtable/mvcc/ob_tx_callback_list.h index b7ecdb8a08..e0d543b6ae 100644 --- a/src/storage/memtable/mvcc/ob_tx_callback_list.h +++ b/src/storage/memtable/mvcc/ob_tx_callback_list.h @@ -62,7 +62,8 @@ public: // callback for the correctness. And user need guarantee all callbacks // belonged to the memtable must be synced before removing. What's more, it // will calculate checksum when removing. - int remove_callbacks_for_remove_memtable(ObIMemtable *memtable_for_remove); + int remove_callbacks_for_remove_memtable(ObIMemtable *memtable_for_remove, + const share::SCN max_applied_scn); // remove_callbacks_for_rollback_to will remove callbacks from back to front // until callbacks smaller or equal than the seq_no. It will remove both diff --git a/src/storage/memtable/ob_memtable.cpp b/src/storage/memtable/ob_memtable.cpp index 78db89e85a..f203ab1b13 100644 --- a/src/storage/memtable/ob_memtable.cpp +++ b/src/storage/memtable/ob_memtable.cpp @@ -203,7 +203,6 @@ int ObMemtable::remove_unused_callback_for_uncommited_txn_() transaction::ObTransService *txs_svr = MTL(transaction::ObTransService *); if (NULL != txs_svr - && share::ObScnRange::MAX_SCN != get_end_scn() && OB_FAIL(txs_svr->remove_callback_for_uncommited_txn(this))) { TRANS_LOG(WARN, "remove callback for uncommited txn failed", K(ret), K(*this)); } diff --git a/src/storage/memtable/ob_memtable_context.cpp b/src/storage/memtable/ob_memtable_context.cpp index 2886cb28c2..4f923ece6f 100644 --- a/src/storage/memtable/ob_memtable_context.cpp +++ b/src/storage/memtable/ob_memtable_context.cpp @@ -927,7 +927,7 @@ int ObMemtableCtx::remove_callbacks_for_fast_commit() return ret; } -int ObMemtableCtx::remove_callback_for_uncommited_txn(ObMemtable *mt) +int ObMemtableCtx::remove_callback_for_uncommited_txn(ObMemtable *mt, const share::SCN max_applied_scn) { int ret = OB_SUCCESS; ObByteLockGuard guard(lock_); @@ -937,7 +937,7 @@ int ObMemtableCtx::remove_callback_for_uncommited_txn(ObMemtable *mt) TRANS_LOG(WARN, "memtable is NULL", K(mt)); } else if (OB_FAIL(reuse_log_generator_())) { TRANS_LOG(ERROR, "fail to reset log generator", K(ret)); - } else if (OB_FAIL(trans_mgr_.remove_callback_for_uncommited_txn(mt))) { + } else if (OB_FAIL(trans_mgr_.remove_callback_for_uncommited_txn(mt, max_applied_scn))) { TRANS_LOG(WARN, "fail to remove callback for uncommitted txn", K(ret), K(mt)); } diff --git a/src/storage/memtable/ob_memtable_context.h b/src/storage/memtable/ob_memtable_context.h index fd34079709..7e52f3793c 100644 --- a/src/storage/memtable/ob_memtable_context.h +++ b/src/storage/memtable/ob_memtable_context.h @@ -399,7 +399,7 @@ public: uint64_t get_tenant_id() const; inline bool has_read_elr_data() const { return read_elr_data_; } int remove_callbacks_for_fast_commit(); - int remove_callback_for_uncommited_txn(memtable::ObMemtable* mt); + int remove_callback_for_uncommited_txn(memtable::ObMemtable* mt, const share::SCN max_applied_scn); int rollback(const int64_t seq_no, const int64_t from_seq_no); bool is_all_redo_submitted(); bool is_for_replay() const { return trans_mgr_.is_for_replay(); } diff --git a/src/storage/tx/ob_trans_part_ctx.cpp b/src/storage/tx/ob_trans_part_ctx.cpp index 51333c0644..660b8e1725 100644 --- a/src/storage/tx/ob_trans_part_ctx.cpp +++ b/src/storage/tx/ob_trans_part_ctx.cpp @@ -1504,8 +1504,8 @@ int ObPartTransCtx::remove_callback_for_uncommited_txn(ObMemtable *mt) ret = OB_INVALID_ARGUMENT; TRANS_LOG(WARN, "memtable is NULL", K(mt)); } else if (OB_UNLIKELY(is_exiting_)) { - } else if (OB_FAIL(mt_ctx_.remove_callback_for_uncommited_txn(mt))) { - TRANS_LOG(WARN, "fail to remove callback for uncommitted txn", K(ret), K(mt_ctx_)); + } else if (OB_FAIL(mt_ctx_.remove_callback_for_uncommited_txn(mt, exec_info_.max_applied_log_ts_))) { + TRANS_LOG(WARN, "fail to remove callback for uncommitted txn", K(ret), K(mt_ctx_), K(exec_info_.max_applied_log_ts_)); } return ret; diff --git a/unittest/storage/memtable/mvcc/test_mvcc_callback.cpp b/unittest/storage/memtable/mvcc/test_mvcc_callback.cpp index 313ae6f23b..3d4a56bb28 100644 --- a/unittest/storage/memtable/mvcc/test_mvcc_callback.cpp +++ b/unittest/storage/memtable/mvcc/test_mvcc_callback.cpp @@ -327,19 +327,19 @@ TEST_F(TestTxCallbackList, remove_callback_by_release_memtable) EXPECT_EQ(9, callback_list_.get_length()); EXPECT_EQ(OB_SUCCESS, - callback_list_.remove_callbacks_for_remove_memtable(memtable2)); + callback_list_.remove_callbacks_for_remove_memtable(memtable2, scn_1/*not used*/)); EXPECT_EQ(7, callback_list_.get_length()); EXPECT_EQ(2, mgr_.get_callback_remove_for_remove_memtable_count()); EXPECT_EQ(OB_SUCCESS, - callback_list_.remove_callbacks_for_remove_memtable(memtable1)); + callback_list_.remove_callbacks_for_remove_memtable(memtable1, scn_1/*not used*/)); EXPECT_EQ(5, callback_list_.get_length()); EXPECT_EQ(4, mgr_.get_callback_remove_for_remove_memtable_count()); EXPECT_EQ(OB_SUCCESS, - callback_list_.remove_callbacks_for_remove_memtable(memtable3)); + callback_list_.remove_callbacks_for_remove_memtable(memtable3, scn_1/*not used*/)); EXPECT_EQ(4, callback_list_.get_length()); EXPECT_EQ(5, mgr_.get_callback_remove_for_remove_memtable_count()); @@ -787,16 +787,16 @@ TEST_F(TestTxCallbackList, checksum_remove_memtable_and_tx_end) EXPECT_EQ(9, callback_list_.get_length()); - EXPECT_EQ(OB_SUCCESS, callback_list_.remove_callbacks_for_remove_memtable(memtable2)); + EXPECT_EQ(OB_SUCCESS, callback_list_.remove_callbacks_for_remove_memtable(memtable2, scn_1/*not used*/)); EXPECT_EQ(true, is_checksum_equal(5, checksum_)); EXPECT_EQ(scn_3, callback_list_.checksum_scn_); - EXPECT_EQ(OB_SUCCESS, callback_list_.remove_callbacks_for_remove_memtable(memtable3)); + EXPECT_EQ(OB_SUCCESS, callback_list_.remove_callbacks_for_remove_memtable(memtable3, scn_1/*not used*/)); EXPECT_EQ(true, is_checksum_equal(5, checksum_)); EXPECT_EQ(scn_3, callback_list_.checksum_scn_); - EXPECT_EQ(OB_SUCCESS, callback_list_.remove_callbacks_for_remove_memtable(memtable1)); + EXPECT_EQ(OB_SUCCESS, callback_list_.remove_callbacks_for_remove_memtable(memtable1, scn_1/*not used*/)); EXPECT_EQ(true, is_checksum_equal(5, checksum_)); EXPECT_EQ(scn_3, callback_list_.checksum_scn_); @@ -1010,6 +1010,8 @@ TEST_F(TestTxCallbackList, checksum_all_and_tx_end_test) { ObFunction remove_memtable_op = [&]() -> bool { ObMemtable *mt = mts[ObRandom::rand(0, mt_cnt-1)]; + share::SCN scn_1; + scn_1.convert_for_logservice(1); bool enable = false; for (ObITransCallback* it = need_submit_head->next_; it != &(callback_list_.head_); @@ -1024,7 +1026,7 @@ TEST_F(TestTxCallbackList, checksum_all_and_tx_end_test) { if (enable) { EXPECT_EQ(OB_SUCCESS, - callback_list_.remove_callbacks_for_remove_memtable(mt)); + callback_list_.remove_callbacks_for_remove_memtable(mt, scn_1/*not used*/)); } return enable; @@ -1139,7 +1141,8 @@ int ObTxCallbackList::remove_callbacks_for_fast_commit(bool &has_remove) return ret; } -int ObTxCallbackList::remove_callbacks_for_remove_memtable(ObIMemtable *memtable_for_remove) +int ObTxCallbackList::remove_callbacks_for_remove_memtable(ObIMemtable *memtable_for_remove, + const share::SCN) { int ret = OB_SUCCESS; SpinLockGuard guard(latch_);