diff --git a/mittest/mtlenv/tablelock/test_lock_table_callback.cpp b/mittest/mtlenv/tablelock/test_lock_table_callback.cpp index c7e993911..65f431af1 100644 --- a/mittest/mtlenv/tablelock/test_lock_table_callback.cpp +++ b/mittest/mtlenv/tablelock/test_lock_table_callback.cpp @@ -65,10 +65,16 @@ public: { create_memtable(); init_mem_ctx(handle_); + created_cb_list_.reset(); LOG_INFO("set up success"); } virtual void TearDown() override { + //release all callbacks + ObOBJLockCallback *cb; + ARRAY_FOREACH_NORET(created_cb_list_, i) { + mt_ctx_.free_table_lock_callback(created_cb_list_[i]); + } LOG_INFO("tear down success"); } private: @@ -106,6 +112,7 @@ private: ret = mt_key.encode(&rowkey); ASSERT_EQ(OB_SUCCESS, ret); cb->set(mt_key, lock_op_node); + ASSERT_EQ(OB_SUCCESS, created_cb_list_.push_back(cb)); } void free_callback(ObOBJLockCallback *&cb) { @@ -121,6 +128,7 @@ private: ObFreezer freezer_; ObMemtableCtx mt_ctx_; + ObSEArraycreated_cb_list_; }; void TestLockTableCallback::SetUpTestCase() diff --git a/src/storage/memtable/mvcc/ob_tx_callback_list.cpp b/src/storage/memtable/mvcc/ob_tx_callback_list.cpp index 256ea2835..773e4b683 100644 --- a/src/storage/memtable/mvcc/ob_tx_callback_list.cpp +++ b/src/storage/memtable/mvcc/ob_tx_callback_list.cpp @@ -264,7 +264,7 @@ int ObTxCallbackList::callback_(ObITxCallbackFunctor &functor, log_cursor_ = next; } if (parallel_start_pos_ == iter) { - parallel_start_pos_ = (next == &head_) ? NULL : next; + parallel_start_pos_ = (is_reverse || next == &head_) ? NULL : next; } ++removed_; if (iter->need_submit_log()) { @@ -728,10 +728,21 @@ int ObTxCallbackList::replay_fail(const SCN scn, const bool serial_replay) functor.scn_ = scn; LockGuard guard(*this, LOCK_MODE::LOCK_ALL); + // // for replay fail of serial log, if parallel replay has happened, // must reverse traversal from parallel_start_pos_.prev_ - ObITransCallback *start_pos = (serial_replay && parallel_start_pos_) ? parallel_start_pos_ : get_guard(); - ObITransCallback *end_pos = get_guard(); + // + // head_ --> ... -> parallel_start_pos_ -> ... -> head_ + // + ObITransCallback *start_pos = NULL; + ObITransCallback *end_pos = NULL; + if (serial_replay) { + start_pos = parallel_start_pos_ ?: get_guard(); + end_pos = get_guard(); + } else { + start_pos = get_guard(); + end_pos = parallel_start_pos_ ? parallel_start_pos_->get_prev() : get_guard(); + } if (OB_FAIL(callback_(functor, start_pos, end_pos, guard.state_))) { TRANS_LOG(ERROR, "replay fail failed", K(ret), K(functor)); } else { diff --git a/unittest/storage/memtable/mvcc/test_mvcc_callback.cpp b/unittest/storage/memtable/mvcc/test_mvcc_callback.cpp index d4cd11dd7..9ef62ab83 100644 --- a/unittest/storage/memtable/mvcc/test_mvcc_callback.cpp +++ b/unittest/storage/memtable/mvcc/test_mvcc_callback.cpp @@ -164,6 +164,36 @@ public: cb->need_submit_log_ = need_submit_log; return cb; } + ObMockTxCallback * replay_callback(ObMemtable *mt, + transaction::ObTxSEQ seq_no, + int64_t scn_v, + bool parallel_replay, + bool serial_final) + { + share::SCN scn; + EXPECT_EQ(OB_SUCCESS, scn.convert_for_tx(scn_v)); + ObMockTxCallback *cb = new ObMockTxCallback(mt, + false, + scn, + seq_no); + EXPECT_EQ(OB_SUCCESS, callback_list_.append_callback(cb, true, parallel_replay, serial_final)); + return cb; + } + ObMockTxCallback * parallel_replay_callback(ObMemtable *mt, + transaction::ObTxSEQ seq_no, + int64_t scn_v, + bool serial_final) + { + return replay_callback(mt, seq_no, scn_v, true, serial_final); + } + + ObMockTxCallback * serial_replay_callback(ObMemtable *mt, + transaction::ObTxSEQ seq_no, + int64_t scn_v, + bool serial_final) + { + return replay_callback(mt, seq_no, scn_v, false, serial_final); + } ObMemtable *create_memtable() { @@ -1272,6 +1302,39 @@ TEST_F(TestTxCallbackList, log_cursor) { EXPECT_EQ(callback_list_.log_cursor_, cb_4->next_); #undef APPEND_CB } + +TEST_F(TestTxCallbackList, parallel_replay_and_replay_fail_parallel_start_pos) { + using ObTxSEQ = transaction::ObTxSEQ; + using SCN = share::SCN; + TRANS_LOG(INFO, "CASE: parallel replay and replay fail"); + int ret = 0; + ObMemtable *mt = create_memtable(); + ObMockTxCallback *cb1 = parallel_replay_callback(mt, ObTxSEQ(10, 1), 100, false/*serial_final*/); + ObMockTxCallback *cb2 = parallel_replay_callback(mt, ObTxSEQ(20, 1), 100, false/*serial_final*/); + ObMockTxCallback *cb3 = parallel_replay_callback(mt, ObTxSEQ(30, 1), 100, false/*serial_final*/); + ASSERT_EQ(callback_list_.parallel_start_pos_, cb1); + ObMockTxCallback *cb4 = serial_replay_callback(mt, ObTxSEQ(1, 1), 10, false/*serial_final*/); + ObMockTxCallback *cb5 = serial_replay_callback(mt, ObTxSEQ(2, 1), 10, false/*serial_final*/); + ObMockTxCallback *cb6 = serial_replay_callback(mt, ObTxSEQ(3, 1), 10, false/*serial_final*/); + // check chain: serial -> parallel_start_pos -> parallel + ASSERT_EQ(cb6->next_, callback_list_.parallel_start_pos_); + ASSERT_EQ(callback_list_.parallel_start_pos_, cb1); + // replay fail, parallel start_pos + SCN scn; + scn.convert_for_tx(100); + ASSERT_EQ(OB_SUCCESS, callback_list_.replay_fail(scn, false /*serial replay*/)); + ASSERT_EQ(NULL, callback_list_.parallel_start_pos_); + ObMockTxCallback *cb7 = serial_replay_callback(mt, ObTxSEQ(4, 1), 11, false/*serial_final*/); + ObMockTxCallback *cb8 = serial_replay_callback(mt, ObTxSEQ(5, 1), 11, false/*serial_final*/); + ObMockTxCallback *cb9 = serial_replay_callback(mt, ObTxSEQ(6, 1), 11, false/*serial_final*/); + // check chain order on serial part: head -> cb4 -> cb5 -> cb6 -> cb7 -> cb8 -> cb9 -> head + ASSERT_EQ(cb6->next_, cb7); + ASSERT_EQ(cb9->next_, &callback_list_.head_); + ASSERT_EQ(cb4->prev_, &callback_list_.head_); + ASSERT_EQ(callback_list_.head_.next_, cb4); + ASSERT_EQ(callback_list_.head_.prev_, cb9); +} + } // namespace unittest namespace memtable