From d5faf6c8c8cdcd5812bef57ebeadd9f9b75df0fc Mon Sep 17 00:00:00 2001 From: obdev Date: Thu, 6 Apr 2023 07:59:34 +0000 Subject: [PATCH] [tablelock] Fix the bug about checkpoint rollback due to disorder commit during replay --- .../mtlenv/storage/checkpoint/CMakeLists.txt | 1 + .../test_lock_memtable_checkpoint.cpp | 226 ++++++++++++++++++ src/storage/tablelock/ob_lock_memtable.cpp | 38 ++- 3 files changed, 258 insertions(+), 7 deletions(-) create mode 100644 mittest/mtlenv/storage/checkpoint/test_lock_memtable_checkpoint.cpp diff --git a/mittest/mtlenv/storage/checkpoint/CMakeLists.txt b/mittest/mtlenv/storage/checkpoint/CMakeLists.txt index a88c1addb..a396bbbba 100644 --- a/mittest/mtlenv/storage/checkpoint/CMakeLists.txt +++ b/mittest/mtlenv/storage/checkpoint/CMakeLists.txt @@ -1,2 +1,3 @@ storage_unittest(test_checkpoint_executor) storage_unittest(test_data_checkpoint) +storage_unittest(test_lock_memtable_checkpoint) diff --git a/mittest/mtlenv/storage/checkpoint/test_lock_memtable_checkpoint.cpp b/mittest/mtlenv/storage/checkpoint/test_lock_memtable_checkpoint.cpp new file mode 100644 index 000000000..a5bfe7c3a --- /dev/null +++ b/mittest/mtlenv/storage/checkpoint/test_lock_memtable_checkpoint.cpp @@ -0,0 +1,226 @@ +/** + * Copyright (c) 2021 OceanBase + * OceanBase CE is licensed under Mulan PubL v2. + * You can use this software according to the terms and conditions of the Mulan PubL v2. + * You may obtain a copy of Mulan PubL v2 at: + * http://license.coscl.org.cn/MulanPubL-2.0 + * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, + * EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, + * MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE. + * See the Mulan PubL v2 for more details. + */ +#define USING_LOG_PREFIX TABLELOCK +#include +#define protected public +#define private public + +#include "storage/ls/ob_ls.h" +#include "mtlenv/mock_tenant_module_env.h" +#include "mtlenv/tablelock/table_lock_common_env.h" +#include "mtlenv/tablelock/table_lock_tx_common_env.h" +#include "storage/tx_storage/ob_ls_service.h" +#include "storage/init_basic_struct.h" +#include "storage/tablelock/ob_lock_memtable.h" +#include "observer/ob_safe_destroy_thread.h" + +namespace oceanbase +{ +namespace storage +{ +using namespace checkpoint; +} +namespace transaction +{ +namespace tablelock +{ +class TestLockMemtableCheckpoint : public ::testing::Test +{ +public: + TestLockMemtableCheckpoint() + : fake_t3m_(common::OB_SERVER_TENANT_ID), + ls_id_(ObLSID(100)), + ls_handle_() + { + LOG_INFO("construct TestLockMemtableCheckpoint"); + } + ~TestLockMemtableCheckpoint() = default; + + static void SetUpTestCase(); + static void TearDownTestCase(); + void SetUp() override + { + ObCreateLSArg arg; + ObLSService *ls_svr = MTL(ObLSService*); + + ASSERT_NE(nullptr, ls_svr); + ASSERT_EQ(OB_SUCCESS, + storage::gen_create_ls_arg(OB_SYS_TENANT_ID, ls_id_, arg)); + ASSERT_EQ(OB_SUCCESS, MTL(ObLSService *)->create_ls(arg)); + ASSERT_EQ(OB_SUCCESS, + ls_svr->get_ls(ls_id_, ls_handle_, ObLSGetMod::TABLELOCK_MOD)); + ASSERT_NE(nullptr, ls_ = ls_handle_.get_ls()); + ASSERT_EQ(OB_SUCCESS, ls_->get_lock_table()->get_lock_memtable(table_handle_)); + ASSERT_EQ(OB_SUCCESS, table_handle_.get_lock_memtable(memtable_)); + LOG_INFO("set up success"); + } + void TearDown() override + { + LOG_INFO("tear down success"); + } + +private: + ObLockMemtable *memtable_; + ObTableHandleV2 table_handle_; + ObTenantMetaMemMgr fake_t3m_; + ObFreezer freezer_; + ObLSID ls_id_; + ObLS *ls_; + ObLSHandle ls_handle_; + + ObArenaAllocator allocator_; +}; + +void TestLockMemtableCheckpoint::SetUpTestCase() +{ + LOG_INFO("SetUpTestCase"); + init_default_lock_test_value(); + EXPECT_EQ(OB_SUCCESS, MockTenantModuleEnv::get_instance().init()); + SAFE_DESTROY_INSTANCE.init(); + SAFE_DESTROY_INSTANCE.start(); + ObServerCheckpointSlogHandler::get_instance().is_started_ = true; +} + +void TestLockMemtableCheckpoint::TearDownTestCase() +{ + LOG_INFO("TearDownTestCase"); + SAFE_DESTROY_INSTANCE.stop(); + SAFE_DESTROY_INSTANCE.wait(); + SAFE_DESTROY_INSTANCE.destroy(); + MockTenantModuleEnv::get_instance().destroy(); +} + +TEST_F(TestLockMemtableCheckpoint, replay_disorder) +{ + LOG_INFO("TestLockMemtableCheckpoint::replay_disorder"); + EXPECT_EQ(OB_SYS_TENANT_ID, MTL_ID()); + int ret = OB_SUCCESS; + ObCreateLSArg arg; + + share::SCN commit_version; + share::SCN commit_scn; + commit_version.set_base(); + commit_scn.set_base(); + + // 1. get ls checkpoint + LOG_INFO("TestLockMemtableCheckpoint::replay_disorder 1"); + ObCheckpointExecutor *checkpoint_executor = ls_->get_checkpoint_executor(); + ObCommonCheckpoint *checkpoint = + dynamic_cast( + checkpoint_executor + ->handlers_[logservice::TRANS_SERVICE_LOG_BASE_TYPE]) + ->common_checkpoints_[ObCommonCheckpointType::LOCK_MEMTABLE_TYPE]; + + // 2.recover unlock op and lock op + LOG_INFO("TestLockMemtableCheckpoint::replay_disorder 2"); + ret = memtable_->recover_obj_lock(DEFAULT_OUT_TRANS_UNLOCK_OP); + ASSERT_EQ(OB_SUCCESS, ret); + ret = memtable_->recover_obj_lock(DEFAULT_OUT_TRANS_LOCK_OP); + ASSERT_EQ(OB_SUCCESS, ret); + + // 3. update lock status disorder + LOG_INFO("TestLockMemtableCheckpoint::replay_disorder 3"); + commit_version.val_ = 3; + commit_scn.val_ = 3; + ret = memtable_->update_lock_status(DEFAULT_OUT_TRANS_UNLOCK_OP, + commit_version, + commit_scn, + COMMIT_LOCK_OP_STATUS); + ASSERT_EQ(OB_SUCCESS, ret); + + commit_version.val_ = 2; + commit_scn.val_ = 2; + ret = memtable_->update_lock_status(DEFAULT_OUT_TRANS_LOCK_OP, + commit_version, + commit_scn, + COMMIT_LOCK_OP_STATUS); + ASSERT_EQ(OB_SUCCESS, ret); + + // 4. check checkpoint + // The rec_scn should be equal with the smaller commit_scn + LOG_INFO("TestLockMemtableCheckpoint::replay_disorder 4"); + ASSERT_EQ(commit_scn.val_, memtable_->get_rec_scn().val_); + share::SCN rec_scn = checkpoint->get_rec_scn(); + ASSERT_EQ(commit_scn.val_, rec_scn.val_); + + // 5. flush and get a previous commit log + // You will find the log about disordered replay in the log file. + LOG_INFO("TestLockMemtableCheckpoint::replay_disorder 5"); + ret = memtable_->recover_obj_lock(DEFAULT_OUT_TRANS_UNLOCK_OP); + ASSERT_EQ(OB_SUCCESS, ret); + ASSERT_EQ(OB_SUCCESS, checkpoint->flush(share::SCN::max_scn())); + commit_version.val_ = 1; + commit_scn.val_ = 1; + ret = memtable_->update_lock_status(DEFAULT_OUT_TRANS_UNLOCK_OP, + commit_version, + commit_scn, + COMMIT_LOCK_OP_STATUS); + ASSERT_EQ(OB_SUCCESS, ret); + + // 6. check checkpoint + // The rec_scn should be equal with the smaller commit_scn + // during flushing (i.e. it's get from pre_rec_scn) + LOG_INFO("TestLockMemtableCheckpoint::replay_disorder 6"); + ASSERT_EQ(commit_scn.val_, memtable_->get_rec_scn().val_); + rec_scn = checkpoint->get_rec_scn(); + ASSERT_EQ(commit_scn.val_, rec_scn.val_); + + // 7. get a commit log with a commit_scn which + // is larger than freeze_scn during flushing + LOG_INFO("TestLockMemtableCheckpoint::replay_disorder 7"); + ret = memtable_->recover_obj_lock(DEFAULT_OUT_TRANS_LOCK_OP); + ASSERT_EQ(OB_SUCCESS, ret); + commit_version.val_ = 4; + commit_scn.val_ = 4; + ret = memtable_->update_lock_status(DEFAULT_OUT_TRANS_LOCK_OP, + commit_version, + commit_scn, + COMMIT_LOCK_OP_STATUS); + + // 8. check checkpoint + // The rec_scn should still be equal with the smaller + // commit_scn during flushing (i.e. it's get from pre_rec_scn) + LOG_INFO("TestLockMemtableCheckpoint::replay_disorder 8"); + ASSERT_EQ(1, memtable_->get_rec_scn().val_); + rec_scn = checkpoint->get_rec_scn(); + ASSERT_EQ(1, rec_scn.val_); + + // 9. flush finish + LOG_INFO("TestLockMemtableCheckpoint::replay_disorder 9"); + ret = memtable_->on_memtable_flushed(); + ASSERT_EQ(OB_SUCCESS, ret); + + // 10. check checkpoint + // The rec_scn should be equal with the latest commit_scn + // which got during previous flushing (i.e. it's get from rec_scn) + LOG_INFO("TestLockMemtableCheckpoint::replay_disorder 10"); + ASSERT_EQ(commit_scn.val_, memtable_->get_rec_scn().val_); + rec_scn = checkpoint->get_rec_scn(); + ASSERT_EQ(commit_scn.val_, rec_scn.val_); + + // 11. clean up + LOG_INFO("TestLockMemtableCheckpoint::replay_disorder 11"); + table_handle_.reset(); + ls_handle_.reset(); + ASSERT_EQ(OB_SUCCESS, MTL(ObLSService*)->remove_ls(ls_id_, false)); +} +} // namespace tablelock +} // namespace transaction +} // namespace oceanbase + +int main(int argc, char **argv) +{ + oceanbase::common::ObLogger::get_logger().set_file_name("test_lock_memtable_checkpoint.log", true); + oceanbase::common::ObLogger::get_logger().set_log_level("INFO"); + ::testing::InitGoogleTest(&argc, argv); + return RUN_ALL_TESTS(); +} diff --git a/src/storage/tablelock/ob_lock_memtable.cpp b/src/storage/tablelock/ob_lock_memtable.cpp index d4d4c1217..7fba5aad9 100644 --- a/src/storage/tablelock/ob_lock_memtable.cpp +++ b/src/storage/tablelock/ob_lock_memtable.cpp @@ -49,7 +49,7 @@ ObLockMemtable::ObLockMemtable() freeze_scn_(SCN::min_scn()), flushed_scn_(SCN::min_scn()), rec_scn_(SCN::max_scn()), - pre_rec_scn_(), + pre_rec_scn_(SCN::max_scn()), max_committed_scn_(), is_frozen_(false), freezer_(nullptr), @@ -98,7 +98,7 @@ int ObLockMemtable::init( void ObLockMemtable::reset() { rec_scn_.set_max(); - pre_rec_scn_.reset(); + pre_rec_scn_.set_max(); max_committed_scn_.reset(); ls_id_.reset(); ObITable::reset(); @@ -519,7 +519,14 @@ int ObLockMemtable::update_lock_status( } else if ((OUT_TRANS_LOCK == op_info.op_type_ || OUT_TRANS_UNLOCK == op_info.op_type_) && LOCK_OP_COMPLETE == status) { RLockGuard guard(flush_lock_); - rec_scn_.dec_update(commit_scn); + if (commit_scn <= freeze_scn_) { + LOG_INFO("meet disordered replay, will dec_update pre_rec_scn_", K(ret), + K(op_info), K(commit_scn), K(rec_scn_), K(pre_rec_scn_), + K(freeze_scn_), K(ls_id_)); + pre_rec_scn_.dec_update(commit_scn); + } else { + rec_scn_.dec_update(commit_scn); + } max_committed_scn_.inc_update(commit_scn); LOG_INFO("out_trans update_lock_status", K(ret), K(op_info), K(commit_scn), K(status), K(rec_scn_), K(ls_id_)); } @@ -740,10 +747,27 @@ SCN ObLockMemtable::get_rec_scn() LOG_INFO("rec_scn of ObLockMemtable is ", K(rec_scn_), K(flushed_scn_), K(pre_rec_scn_), K(freeze_scn_), K(max_committed_scn_), K(is_frozen_), K(ls_id_)); - if (!pre_rec_scn_.is_valid()) { + // If pre_rec_scn_ is max, it means that previous memtable + // has already been flushed. In ohter words, it means that + // rec_scn_ is ready to work, so we can return rec_scn_. + // You can regard max_scn as an invalid value for pre_rec_scn_ here. + // (As a matter of fact, max_scn means pre_rec_scn_ or rec_scn_ + // will not block checkpoint advancing.) + // + // Specifically, if there's a commit_scn which is smaller + // than the freeze_scn_, the pre_rec_scn_ will be set to + // an valid value (i.e. not max_scn) again, it's a special + // case in disordered replay. + // You can see details about this case in update_lock_status. + if (pre_rec_scn_.is_max()) { return rec_scn_; } else { - return pre_rec_scn_; + if (pre_rec_scn_ > rec_scn_) { + LOG_INFO("prec_rec_scn_ is larger than rec_scn_!", K(pre_rec_scn_), + K(rec_scn_), K(flushed_scn_), K(freeze_scn_), + K(max_committed_scn_), K(is_frozen_), K(ls_id_)); + } + return share::SCN::min(pre_rec_scn_, rec_scn_); } } @@ -761,7 +785,7 @@ int ObLockMemtable::on_memtable_flushed() { int ret = OB_SUCCESS; WLockGuard guard(flush_lock_); - pre_rec_scn_.reset(); + pre_rec_scn_.set_max(); if (freeze_scn_ > flushed_scn_) { flushed_scn_ = freeze_scn_; } else { @@ -802,7 +826,7 @@ int ObLockMemtable::flush(SCN recycle_scn, LOG_INFO("lock memtable no need to flush", K(rec_scn), K(recycle_scn), K(is_frozen_), K(ls_id_)); } else if (is_active_memtable()) { - freeze_scn_ = max_committed_scn_; + freeze_scn_.inc_update(max_committed_scn_); if (flushed_scn_ >= freeze_scn_) { LOG_INFO("skip freeze because of flushed", K_(ls_id), K_(flushed_scn), K_(freeze_scn)); } else {