fix parallel_start_pos_ adjust when parallel replay failed

This commit is contained in:
chinaxing 2024-07-04 04:21:03 +00:00 committed by ob-robot
parent a3168c3fbe
commit b5495c5a66
3 changed files with 85 additions and 3 deletions

View File

@ -65,10 +65,16 @@ public:
{
create_memtable();
init_mem_ctx(handle_);
created_cb_list_.reset();
LOG_INFO("set up success");
}
virtual void TearDown() override
{
//release all callbacks
ObOBJLockCallback *cb;
ARRAY_FOREACH_NORET(created_cb_list_, i) {
mt_ctx_.free_table_lock_callback(created_cb_list_[i]);
}
LOG_INFO("tear down success");
}
private:
@ -106,6 +112,7 @@ private:
ret = mt_key.encode(&rowkey);
ASSERT_EQ(OB_SUCCESS, ret);
cb->set(mt_key, lock_op_node);
ASSERT_EQ(OB_SUCCESS, created_cb_list_.push_back(cb));
}
void free_callback(ObOBJLockCallback *&cb)
{
@ -121,6 +128,7 @@ private:
ObFreezer freezer_;
ObMemtableCtx mt_ctx_;
ObSEArray<ObOBJLockCallback*, 16>created_cb_list_;
};
void TestLockTableCallback::SetUpTestCase()

View File

@ -264,7 +264,7 @@ int ObTxCallbackList::callback_(ObITxCallbackFunctor &functor,
log_cursor_ = next;
}
if (parallel_start_pos_ == iter) {
parallel_start_pos_ = (next == &head_) ? NULL : next;
parallel_start_pos_ = (is_reverse || next == &head_) ? NULL : next;
}
++removed_;
if (iter->need_submit_log()) {
@ -728,10 +728,21 @@ int ObTxCallbackList::replay_fail(const SCN scn, const bool serial_replay)
functor.scn_ = scn;
LockGuard guard(*this, LOCK_MODE::LOCK_ALL);
//
// for replay fail of serial log, if parallel replay has happened,
// must reverse traversal from parallel_start_pos_.prev_
ObITransCallback *start_pos = (serial_replay && parallel_start_pos_) ? parallel_start_pos_ : get_guard();
ObITransCallback *end_pos = get_guard();
//
// head_ --> ... -> parallel_start_pos_ -> ... -> head_
//
ObITransCallback *start_pos = NULL;
ObITransCallback *end_pos = NULL;
if (serial_replay) {
start_pos = parallel_start_pos_ ?: get_guard();
end_pos = get_guard();
} else {
start_pos = get_guard();
end_pos = parallel_start_pos_ ? parallel_start_pos_->get_prev() : get_guard();
}
if (OB_FAIL(callback_(functor, start_pos, end_pos, guard.state_))) {
TRANS_LOG(ERROR, "replay fail failed", K(ret), K(functor));
} else {

View File

@ -164,6 +164,36 @@ public:
cb->need_submit_log_ = need_submit_log;
return cb;
}
ObMockTxCallback * replay_callback(ObMemtable *mt,
transaction::ObTxSEQ seq_no,
int64_t scn_v,
bool parallel_replay,
bool serial_final)
{
share::SCN scn;
EXPECT_EQ(OB_SUCCESS, scn.convert_for_tx(scn_v));
ObMockTxCallback *cb = new ObMockTxCallback(mt,
false,
scn,
seq_no);
EXPECT_EQ(OB_SUCCESS, callback_list_.append_callback(cb, true, parallel_replay, serial_final));
return cb;
}
ObMockTxCallback * parallel_replay_callback(ObMemtable *mt,
transaction::ObTxSEQ seq_no,
int64_t scn_v,
bool serial_final)
{
return replay_callback(mt, seq_no, scn_v, true, serial_final);
}
ObMockTxCallback * serial_replay_callback(ObMemtable *mt,
transaction::ObTxSEQ seq_no,
int64_t scn_v,
bool serial_final)
{
return replay_callback(mt, seq_no, scn_v, false, serial_final);
}
ObMemtable *create_memtable()
{
@ -1272,6 +1302,39 @@ TEST_F(TestTxCallbackList, log_cursor) {
EXPECT_EQ(callback_list_.log_cursor_, cb_4->next_);
#undef APPEND_CB
}
TEST_F(TestTxCallbackList, parallel_replay_and_replay_fail_parallel_start_pos) {
using ObTxSEQ = transaction::ObTxSEQ;
using SCN = share::SCN;
TRANS_LOG(INFO, "CASE: parallel replay and replay fail");
int ret = 0;
ObMemtable *mt = create_memtable();
ObMockTxCallback *cb1 = parallel_replay_callback(mt, ObTxSEQ(10, 1), 100, false/*serial_final*/);
ObMockTxCallback *cb2 = parallel_replay_callback(mt, ObTxSEQ(20, 1), 100, false/*serial_final*/);
ObMockTxCallback *cb3 = parallel_replay_callback(mt, ObTxSEQ(30, 1), 100, false/*serial_final*/);
ASSERT_EQ(callback_list_.parallel_start_pos_, cb1);
ObMockTxCallback *cb4 = serial_replay_callback(mt, ObTxSEQ(1, 1), 10, false/*serial_final*/);
ObMockTxCallback *cb5 = serial_replay_callback(mt, ObTxSEQ(2, 1), 10, false/*serial_final*/);
ObMockTxCallback *cb6 = serial_replay_callback(mt, ObTxSEQ(3, 1), 10, false/*serial_final*/);
// check chain: serial -> parallel_start_pos -> parallel
ASSERT_EQ(cb6->next_, callback_list_.parallel_start_pos_);
ASSERT_EQ(callback_list_.parallel_start_pos_, cb1);
// replay fail, parallel start_pos
SCN scn;
scn.convert_for_tx(100);
ASSERT_EQ(OB_SUCCESS, callback_list_.replay_fail(scn, false /*serial replay*/));
ASSERT_EQ(NULL, callback_list_.parallel_start_pos_);
ObMockTxCallback *cb7 = serial_replay_callback(mt, ObTxSEQ(4, 1), 11, false/*serial_final*/);
ObMockTxCallback *cb8 = serial_replay_callback(mt, ObTxSEQ(5, 1), 11, false/*serial_final*/);
ObMockTxCallback *cb9 = serial_replay_callback(mt, ObTxSEQ(6, 1), 11, false/*serial_final*/);
// check chain order on serial part: head -> cb4 -> cb5 -> cb6 -> cb7 -> cb8 -> cb9 -> head
ASSERT_EQ(cb6->next_, cb7);
ASSERT_EQ(cb9->next_, &callback_list_.head_);
ASSERT_EQ(cb4->prev_, &callback_list_.head_);
ASSERT_EQ(callback_list_.head_.next_, cb4);
ASSERT_EQ(callback_list_.head_.prev_, cb9);
}
} // namespace unittest
namespace memtable