[tablelock] Fix the bug about checkpoint rollback due to disorder commit during replay
This commit is contained in:
@ -1,2 +1,3 @@
|
|||||||
storage_unittest(test_checkpoint_executor)
|
storage_unittest(test_checkpoint_executor)
|
||||||
storage_unittest(test_data_checkpoint)
|
storage_unittest(test_data_checkpoint)
|
||||||
|
storage_unittest(test_lock_memtable_checkpoint)
|
||||||
|
|||||||
@ -0,0 +1,226 @@
|
|||||||
|
/**
|
||||||
|
* Copyright (c) 2021 OceanBase
|
||||||
|
* OceanBase CE is licensed under Mulan PubL v2.
|
||||||
|
* You can use this software according to the terms and conditions of the Mulan PubL v2.
|
||||||
|
* You may obtain a copy of Mulan PubL v2 at:
|
||||||
|
* http://license.coscl.org.cn/MulanPubL-2.0
|
||||||
|
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
|
||||||
|
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
|
||||||
|
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
|
||||||
|
* See the Mulan PubL v2 for more details.
|
||||||
|
*/
|
||||||
|
#define USING_LOG_PREFIX TABLELOCK
|
||||||
|
#include <gtest/gtest.h>
|
||||||
|
#define protected public
|
||||||
|
#define private public
|
||||||
|
|
||||||
|
#include "storage/ls/ob_ls.h"
|
||||||
|
#include "mtlenv/mock_tenant_module_env.h"
|
||||||
|
#include "mtlenv/tablelock/table_lock_common_env.h"
|
||||||
|
#include "mtlenv/tablelock/table_lock_tx_common_env.h"
|
||||||
|
#include "storage/tx_storage/ob_ls_service.h"
|
||||||
|
#include "storage/init_basic_struct.h"
|
||||||
|
#include "storage/tablelock/ob_lock_memtable.h"
|
||||||
|
#include "observer/ob_safe_destroy_thread.h"
|
||||||
|
|
||||||
|
namespace oceanbase
|
||||||
|
{
|
||||||
|
namespace storage
|
||||||
|
{
|
||||||
|
using namespace checkpoint;
|
||||||
|
}
|
||||||
|
namespace transaction
|
||||||
|
{
|
||||||
|
namespace tablelock
|
||||||
|
{
|
||||||
|
class TestLockMemtableCheckpoint : public ::testing::Test
|
||||||
|
{
|
||||||
|
public:
|
||||||
|
TestLockMemtableCheckpoint()
|
||||||
|
: fake_t3m_(common::OB_SERVER_TENANT_ID),
|
||||||
|
ls_id_(ObLSID(100)),
|
||||||
|
ls_handle_()
|
||||||
|
{
|
||||||
|
LOG_INFO("construct TestLockMemtableCheckpoint");
|
||||||
|
}
|
||||||
|
~TestLockMemtableCheckpoint() = default;
|
||||||
|
|
||||||
|
static void SetUpTestCase();
|
||||||
|
static void TearDownTestCase();
|
||||||
|
void SetUp() override
|
||||||
|
{
|
||||||
|
ObCreateLSArg arg;
|
||||||
|
ObLSService *ls_svr = MTL(ObLSService*);
|
||||||
|
|
||||||
|
ASSERT_NE(nullptr, ls_svr);
|
||||||
|
ASSERT_EQ(OB_SUCCESS,
|
||||||
|
storage::gen_create_ls_arg(OB_SYS_TENANT_ID, ls_id_, arg));
|
||||||
|
ASSERT_EQ(OB_SUCCESS, MTL(ObLSService *)->create_ls(arg));
|
||||||
|
ASSERT_EQ(OB_SUCCESS,
|
||||||
|
ls_svr->get_ls(ls_id_, ls_handle_, ObLSGetMod::TABLELOCK_MOD));
|
||||||
|
ASSERT_NE(nullptr, ls_ = ls_handle_.get_ls());
|
||||||
|
ASSERT_EQ(OB_SUCCESS, ls_->get_lock_table()->get_lock_memtable(table_handle_));
|
||||||
|
ASSERT_EQ(OB_SUCCESS, table_handle_.get_lock_memtable(memtable_));
|
||||||
|
LOG_INFO("set up success");
|
||||||
|
}
|
||||||
|
void TearDown() override
|
||||||
|
{
|
||||||
|
LOG_INFO("tear down success");
|
||||||
|
}
|
||||||
|
|
||||||
|
private:
|
||||||
|
ObLockMemtable *memtable_;
|
||||||
|
ObTableHandleV2 table_handle_;
|
||||||
|
ObTenantMetaMemMgr fake_t3m_;
|
||||||
|
ObFreezer freezer_;
|
||||||
|
ObLSID ls_id_;
|
||||||
|
ObLS *ls_;
|
||||||
|
ObLSHandle ls_handle_;
|
||||||
|
|
||||||
|
ObArenaAllocator allocator_;
|
||||||
|
};
|
||||||
|
|
||||||
|
void TestLockMemtableCheckpoint::SetUpTestCase()
|
||||||
|
{
|
||||||
|
LOG_INFO("SetUpTestCase");
|
||||||
|
init_default_lock_test_value();
|
||||||
|
EXPECT_EQ(OB_SUCCESS, MockTenantModuleEnv::get_instance().init());
|
||||||
|
SAFE_DESTROY_INSTANCE.init();
|
||||||
|
SAFE_DESTROY_INSTANCE.start();
|
||||||
|
ObServerCheckpointSlogHandler::get_instance().is_started_ = true;
|
||||||
|
}
|
||||||
|
|
||||||
|
void TestLockMemtableCheckpoint::TearDownTestCase()
|
||||||
|
{
|
||||||
|
LOG_INFO("TearDownTestCase");
|
||||||
|
SAFE_DESTROY_INSTANCE.stop();
|
||||||
|
SAFE_DESTROY_INSTANCE.wait();
|
||||||
|
SAFE_DESTROY_INSTANCE.destroy();
|
||||||
|
MockTenantModuleEnv::get_instance().destroy();
|
||||||
|
}
|
||||||
|
|
||||||
|
TEST_F(TestLockMemtableCheckpoint, replay_disorder)
|
||||||
|
{
|
||||||
|
LOG_INFO("TestLockMemtableCheckpoint::replay_disorder");
|
||||||
|
EXPECT_EQ(OB_SYS_TENANT_ID, MTL_ID());
|
||||||
|
int ret = OB_SUCCESS;
|
||||||
|
ObCreateLSArg arg;
|
||||||
|
|
||||||
|
share::SCN commit_version;
|
||||||
|
share::SCN commit_scn;
|
||||||
|
commit_version.set_base();
|
||||||
|
commit_scn.set_base();
|
||||||
|
|
||||||
|
// 1. get ls checkpoint
|
||||||
|
LOG_INFO("TestLockMemtableCheckpoint::replay_disorder 1");
|
||||||
|
ObCheckpointExecutor *checkpoint_executor = ls_->get_checkpoint_executor();
|
||||||
|
ObCommonCheckpoint *checkpoint =
|
||||||
|
dynamic_cast<ObLSTxService *>(
|
||||||
|
checkpoint_executor
|
||||||
|
->handlers_[logservice::TRANS_SERVICE_LOG_BASE_TYPE])
|
||||||
|
->common_checkpoints_[ObCommonCheckpointType::LOCK_MEMTABLE_TYPE];
|
||||||
|
|
||||||
|
// 2.recover unlock op and lock op
|
||||||
|
LOG_INFO("TestLockMemtableCheckpoint::replay_disorder 2");
|
||||||
|
ret = memtable_->recover_obj_lock(DEFAULT_OUT_TRANS_UNLOCK_OP);
|
||||||
|
ASSERT_EQ(OB_SUCCESS, ret);
|
||||||
|
ret = memtable_->recover_obj_lock(DEFAULT_OUT_TRANS_LOCK_OP);
|
||||||
|
ASSERT_EQ(OB_SUCCESS, ret);
|
||||||
|
|
||||||
|
// 3. update lock status disorder
|
||||||
|
LOG_INFO("TestLockMemtableCheckpoint::replay_disorder 3");
|
||||||
|
commit_version.val_ = 3;
|
||||||
|
commit_scn.val_ = 3;
|
||||||
|
ret = memtable_->update_lock_status(DEFAULT_OUT_TRANS_UNLOCK_OP,
|
||||||
|
commit_version,
|
||||||
|
commit_scn,
|
||||||
|
COMMIT_LOCK_OP_STATUS);
|
||||||
|
ASSERT_EQ(OB_SUCCESS, ret);
|
||||||
|
|
||||||
|
commit_version.val_ = 2;
|
||||||
|
commit_scn.val_ = 2;
|
||||||
|
ret = memtable_->update_lock_status(DEFAULT_OUT_TRANS_LOCK_OP,
|
||||||
|
commit_version,
|
||||||
|
commit_scn,
|
||||||
|
COMMIT_LOCK_OP_STATUS);
|
||||||
|
ASSERT_EQ(OB_SUCCESS, ret);
|
||||||
|
|
||||||
|
// 4. check checkpoint
|
||||||
|
// The rec_scn should be equal with the smaller commit_scn
|
||||||
|
LOG_INFO("TestLockMemtableCheckpoint::replay_disorder 4");
|
||||||
|
ASSERT_EQ(commit_scn.val_, memtable_->get_rec_scn().val_);
|
||||||
|
share::SCN rec_scn = checkpoint->get_rec_scn();
|
||||||
|
ASSERT_EQ(commit_scn.val_, rec_scn.val_);
|
||||||
|
|
||||||
|
// 5. flush and get a previous commit log
|
||||||
|
// You will find the log about disordered replay in the log file.
|
||||||
|
LOG_INFO("TestLockMemtableCheckpoint::replay_disorder 5");
|
||||||
|
ret = memtable_->recover_obj_lock(DEFAULT_OUT_TRANS_UNLOCK_OP);
|
||||||
|
ASSERT_EQ(OB_SUCCESS, ret);
|
||||||
|
ASSERT_EQ(OB_SUCCESS, checkpoint->flush(share::SCN::max_scn()));
|
||||||
|
commit_version.val_ = 1;
|
||||||
|
commit_scn.val_ = 1;
|
||||||
|
ret = memtable_->update_lock_status(DEFAULT_OUT_TRANS_UNLOCK_OP,
|
||||||
|
commit_version,
|
||||||
|
commit_scn,
|
||||||
|
COMMIT_LOCK_OP_STATUS);
|
||||||
|
ASSERT_EQ(OB_SUCCESS, ret);
|
||||||
|
|
||||||
|
// 6. check checkpoint
|
||||||
|
// The rec_scn should be equal with the smaller commit_scn
|
||||||
|
// during flushing (i.e. it's get from pre_rec_scn)
|
||||||
|
LOG_INFO("TestLockMemtableCheckpoint::replay_disorder 6");
|
||||||
|
ASSERT_EQ(commit_scn.val_, memtable_->get_rec_scn().val_);
|
||||||
|
rec_scn = checkpoint->get_rec_scn();
|
||||||
|
ASSERT_EQ(commit_scn.val_, rec_scn.val_);
|
||||||
|
|
||||||
|
// 7. get a commit log with a commit_scn which
|
||||||
|
// is larger than freeze_scn during flushing
|
||||||
|
LOG_INFO("TestLockMemtableCheckpoint::replay_disorder 7");
|
||||||
|
ret = memtable_->recover_obj_lock(DEFAULT_OUT_TRANS_LOCK_OP);
|
||||||
|
ASSERT_EQ(OB_SUCCESS, ret);
|
||||||
|
commit_version.val_ = 4;
|
||||||
|
commit_scn.val_ = 4;
|
||||||
|
ret = memtable_->update_lock_status(DEFAULT_OUT_TRANS_LOCK_OP,
|
||||||
|
commit_version,
|
||||||
|
commit_scn,
|
||||||
|
COMMIT_LOCK_OP_STATUS);
|
||||||
|
|
||||||
|
// 8. check checkpoint
|
||||||
|
// The rec_scn should still be equal with the smaller
|
||||||
|
// commit_scn during flushing (i.e. it's get from pre_rec_scn)
|
||||||
|
LOG_INFO("TestLockMemtableCheckpoint::replay_disorder 8");
|
||||||
|
ASSERT_EQ(1, memtable_->get_rec_scn().val_);
|
||||||
|
rec_scn = checkpoint->get_rec_scn();
|
||||||
|
ASSERT_EQ(1, rec_scn.val_);
|
||||||
|
|
||||||
|
// 9. flush finish
|
||||||
|
LOG_INFO("TestLockMemtableCheckpoint::replay_disorder 9");
|
||||||
|
ret = memtable_->on_memtable_flushed();
|
||||||
|
ASSERT_EQ(OB_SUCCESS, ret);
|
||||||
|
|
||||||
|
// 10. check checkpoint
|
||||||
|
// The rec_scn should be equal with the latest commit_scn
|
||||||
|
// which got during previous flushing (i.e. it's get from rec_scn)
|
||||||
|
LOG_INFO("TestLockMemtableCheckpoint::replay_disorder 10");
|
||||||
|
ASSERT_EQ(commit_scn.val_, memtable_->get_rec_scn().val_);
|
||||||
|
rec_scn = checkpoint->get_rec_scn();
|
||||||
|
ASSERT_EQ(commit_scn.val_, rec_scn.val_);
|
||||||
|
|
||||||
|
// 11. clean up
|
||||||
|
LOG_INFO("TestLockMemtableCheckpoint::replay_disorder 11");
|
||||||
|
table_handle_.reset();
|
||||||
|
ls_handle_.reset();
|
||||||
|
ASSERT_EQ(OB_SUCCESS, MTL(ObLSService*)->remove_ls(ls_id_, false));
|
||||||
|
}
|
||||||
|
} // namespace tablelock
|
||||||
|
} // namespace transaction
|
||||||
|
} // namespace oceanbase
|
||||||
|
|
||||||
|
int main(int argc, char **argv)
|
||||||
|
{
|
||||||
|
oceanbase::common::ObLogger::get_logger().set_file_name("test_lock_memtable_checkpoint.log", true);
|
||||||
|
oceanbase::common::ObLogger::get_logger().set_log_level("INFO");
|
||||||
|
::testing::InitGoogleTest(&argc, argv);
|
||||||
|
return RUN_ALL_TESTS();
|
||||||
|
}
|
||||||
@ -49,7 +49,7 @@ ObLockMemtable::ObLockMemtable()
|
|||||||
freeze_scn_(SCN::min_scn()),
|
freeze_scn_(SCN::min_scn()),
|
||||||
flushed_scn_(SCN::min_scn()),
|
flushed_scn_(SCN::min_scn()),
|
||||||
rec_scn_(SCN::max_scn()),
|
rec_scn_(SCN::max_scn()),
|
||||||
pre_rec_scn_(),
|
pre_rec_scn_(SCN::max_scn()),
|
||||||
max_committed_scn_(),
|
max_committed_scn_(),
|
||||||
is_frozen_(false),
|
is_frozen_(false),
|
||||||
freezer_(nullptr),
|
freezer_(nullptr),
|
||||||
@ -98,7 +98,7 @@ int ObLockMemtable::init(
|
|||||||
void ObLockMemtable::reset()
|
void ObLockMemtable::reset()
|
||||||
{
|
{
|
||||||
rec_scn_.set_max();
|
rec_scn_.set_max();
|
||||||
pre_rec_scn_.reset();
|
pre_rec_scn_.set_max();
|
||||||
max_committed_scn_.reset();
|
max_committed_scn_.reset();
|
||||||
ls_id_.reset();
|
ls_id_.reset();
|
||||||
ObITable::reset();
|
ObITable::reset();
|
||||||
@ -519,7 +519,14 @@ int ObLockMemtable::update_lock_status(
|
|||||||
} else if ((OUT_TRANS_LOCK == op_info.op_type_ || OUT_TRANS_UNLOCK == op_info.op_type_)
|
} else if ((OUT_TRANS_LOCK == op_info.op_type_ || OUT_TRANS_UNLOCK == op_info.op_type_)
|
||||||
&& LOCK_OP_COMPLETE == status) {
|
&& LOCK_OP_COMPLETE == status) {
|
||||||
RLockGuard guard(flush_lock_);
|
RLockGuard guard(flush_lock_);
|
||||||
|
if (commit_scn <= freeze_scn_) {
|
||||||
|
LOG_INFO("meet disordered replay, will dec_update pre_rec_scn_", K(ret),
|
||||||
|
K(op_info), K(commit_scn), K(rec_scn_), K(pre_rec_scn_),
|
||||||
|
K(freeze_scn_), K(ls_id_));
|
||||||
|
pre_rec_scn_.dec_update(commit_scn);
|
||||||
|
} else {
|
||||||
rec_scn_.dec_update(commit_scn);
|
rec_scn_.dec_update(commit_scn);
|
||||||
|
}
|
||||||
max_committed_scn_.inc_update(commit_scn);
|
max_committed_scn_.inc_update(commit_scn);
|
||||||
LOG_INFO("out_trans update_lock_status", K(ret), K(op_info), K(commit_scn), K(status), K(rec_scn_), K(ls_id_));
|
LOG_INFO("out_trans update_lock_status", K(ret), K(op_info), K(commit_scn), K(status), K(rec_scn_), K(ls_id_));
|
||||||
}
|
}
|
||||||
@ -740,10 +747,27 @@ SCN ObLockMemtable::get_rec_scn()
|
|||||||
LOG_INFO("rec_scn of ObLockMemtable is ",
|
LOG_INFO("rec_scn of ObLockMemtable is ",
|
||||||
K(rec_scn_), K(flushed_scn_), K(pre_rec_scn_),
|
K(rec_scn_), K(flushed_scn_), K(pre_rec_scn_),
|
||||||
K(freeze_scn_), K(max_committed_scn_), K(is_frozen_), K(ls_id_));
|
K(freeze_scn_), K(max_committed_scn_), K(is_frozen_), K(ls_id_));
|
||||||
if (!pre_rec_scn_.is_valid()) {
|
// If pre_rec_scn_ is max, it means that previous memtable
|
||||||
|
// has already been flushed. In ohter words, it means that
|
||||||
|
// rec_scn_ is ready to work, so we can return rec_scn_.
|
||||||
|
// You can regard max_scn as an invalid value for pre_rec_scn_ here.
|
||||||
|
// (As a matter of fact, max_scn means pre_rec_scn_ or rec_scn_
|
||||||
|
// will not block checkpoint advancing.)
|
||||||
|
//
|
||||||
|
// Specifically, if there's a commit_scn which is smaller
|
||||||
|
// than the freeze_scn_, the pre_rec_scn_ will be set to
|
||||||
|
// an valid value (i.e. not max_scn) again, it's a special
|
||||||
|
// case in disordered replay.
|
||||||
|
// You can see details about this case in update_lock_status.
|
||||||
|
if (pre_rec_scn_.is_max()) {
|
||||||
return rec_scn_;
|
return rec_scn_;
|
||||||
} else {
|
} else {
|
||||||
return pre_rec_scn_;
|
if (pre_rec_scn_ > rec_scn_) {
|
||||||
|
LOG_INFO("prec_rec_scn_ is larger than rec_scn_!", K(pre_rec_scn_),
|
||||||
|
K(rec_scn_), K(flushed_scn_), K(freeze_scn_),
|
||||||
|
K(max_committed_scn_), K(is_frozen_), K(ls_id_));
|
||||||
|
}
|
||||||
|
return share::SCN::min(pre_rec_scn_, rec_scn_);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -761,7 +785,7 @@ int ObLockMemtable::on_memtable_flushed()
|
|||||||
{
|
{
|
||||||
int ret = OB_SUCCESS;
|
int ret = OB_SUCCESS;
|
||||||
WLockGuard guard(flush_lock_);
|
WLockGuard guard(flush_lock_);
|
||||||
pre_rec_scn_.reset();
|
pre_rec_scn_.set_max();
|
||||||
if (freeze_scn_ > flushed_scn_) {
|
if (freeze_scn_ > flushed_scn_) {
|
||||||
flushed_scn_ = freeze_scn_;
|
flushed_scn_ = freeze_scn_;
|
||||||
} else {
|
} else {
|
||||||
@ -802,7 +826,7 @@ int ObLockMemtable::flush(SCN recycle_scn,
|
|||||||
LOG_INFO("lock memtable no need to flush", K(rec_scn), K(recycle_scn),
|
LOG_INFO("lock memtable no need to flush", K(rec_scn), K(recycle_scn),
|
||||||
K(is_frozen_), K(ls_id_));
|
K(is_frozen_), K(ls_id_));
|
||||||
} else if (is_active_memtable()) {
|
} else if (is_active_memtable()) {
|
||||||
freeze_scn_ = max_committed_scn_;
|
freeze_scn_.inc_update(max_committed_scn_);
|
||||||
if (flushed_scn_ >= freeze_scn_) {
|
if (flushed_scn_ >= freeze_scn_) {
|
||||||
LOG_INFO("skip freeze because of flushed", K_(ls_id), K_(flushed_scn), K_(freeze_scn));
|
LOG_INFO("skip freeze because of flushed", K_(ls_id), K_(flushed_scn), K_(freeze_scn));
|
||||||
} else {
|
} else {
|
||||||
|
|||||||
Reference in New Issue
Block a user