Files
oceanbase/unittest/storage/multi_data_source/test_mds_table.cpp

655 lines
27 KiB
C++

/**
* Copyright (c) 2021 OceanBase
* OceanBase CE is licensed under Mulan PubL v2.
* You can use this software according to the terms and conditions of the Mulan PubL v2.
* You may obtain a copy of Mulan PubL v2 at:
* http://license.coscl.org.cn/MulanPubL-2.0
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
* See the Mulan PubL v2 for more details.
*/
#define UNITTEST_DEBUG
#include "lib/utility/utility.h"
#include <gtest/gtest.h>
#define private public
#define protected public
#include "multi_data_source/example_user_data_define.h"
#include "share/ob_ls_id.h"
#include "storage/multi_data_source/mds_writer.h"
#include <thread>
#include <iostream>
#include <vector>
#include <chrono>
#include <exception>
#include "common_define.h"
#include "lib/ob_errno.h"
#include "share/ob_errno.h"
#include "storage/multi_data_source/adapter_define/mds_dump_node.h"
#include "lib/allocator/ob_malloc.h"
#include "storage/multi_data_source/mds_node.h"
#include "common/ob_clock_generator.h"
#include "storage/multi_data_source/mds_row.h"
#include "storage/multi_data_source/mds_unit.h"
#include "storage/multi_data_source/mds_table_handle.h"
#include "storage/multi_data_source/mds_table_handler.h"
#include "example_user_helper_define.cpp"
#include "storage/tx/ob_trans_define.h"
#include <algorithm>
#include <numeric>
#include "storage/multi_data_source/runtime_utility/mds_lock.h"
#include "storage/tablet/ob_tablet_meta.h"
namespace oceanbase {
namespace storage
{
namespace mds
{
void *MdsAllocator::alloc(const int64_t size)
{
void *ptr = ob_malloc(size, "MDS");
ATOMIC_INC(&alloc_times_);
MDS_LOG(DEBUG, "alloc obj", KP(ptr), K(size), K(lbt()));
return ptr;
}
void MdsAllocator::free(void *ptr) {
ATOMIC_INC(&free_times_);
MDS_LOG(DEBUG, "free obj", KP(ptr), K(lbt()));
ob_free(ptr);
}
}
}
namespace unittest {
using namespace common;
using namespace std;
using namespace storage;
using namespace mds;
using namespace transaction;
class TestMdsTable: public ::testing::Test
{
public:
TestMdsTable() {}
virtual ~TestMdsTable() {}
virtual void SetUp() {
}
virtual void TearDown() {
}
static void set();
static void replay();
static void get_latest();
static void get_snapshot();
static void get_snapshot_hung_1s();
static void get_by_writer();
static void insert_multi_row();
static void get_multi_row();
// static void for_each_scan();
static void standard_iterator();
static void OB_iterator();
static void test_flush();
static void test_is_locked_by_others();
static void test_multi_key_remove();
private:
// disallow copy
DISALLOW_COPY_AND_ASSIGN(TestMdsTable);
};
ObMdsTableHandler mds_table_hanlder;
MdsTableHandle &mds_table_ = mds_table_hanlder.mds_table_handle_;
/***********************************************Single Row*************************************************************/
struct A { ObSpinLock lock_; };
struct B { MdsLock lock_; };
#define GET_REAL_MDS_TABLE(mds_table) (*((MdsTableImpl<UnitTestMdsTable>*)(dynamic_cast<guard::LightDataBlock<MdsTableImpl<UnitTestMdsTable>>*>((mds_table.p_mds_table_base_.ctrl_ptr_->p_data_block_))->data_)))
void TestMdsTable::set() {
ASSERT_EQ(OB_SUCCESS, mds_table_.init<UnitTestMdsTable>(MdsAllocator::get_instance(), ObTabletID(1), share::ObLSID(1), (ObTabletPointer*)0x111));
MDS_LOG(INFO, "test sizeof", K(sizeof(MdsTableImpl<UnitTestMdsTable>)), K(sizeof(B)), K(mds_table_.p_mds_table_base_.ctrl_ptr_->ref_));
ExampleUserData1 data1(1);
ExampleUserData2 data2;
ASSERT_EQ(OB_SUCCESS, data2.assign(MdsAllocator::get_instance(), "123"));
MdsCtx ctx1(mds::MdsWriter(ObTransID(1)));// commit finally
DummyKey key;
ASSERT_EQ(OB_SUCCESS, mds_table_.set(data1, ctx1));
ASSERT_EQ(OB_SUCCESS, mds_table_.set(data2, ctx1));
ctx1.on_redo(mock_scn(10));
ctx1.before_prepare();
ctx1.on_prepare(mock_scn(10));
ctx1.on_commit(mock_scn(10), mock_scn(10));
ExampleUserData1 data3(3);
MdsCtx ctx2(mds::MdsWriter(ObTransID(2)));// abort by RAII finally
ASSERT_EQ(OB_SUCCESS, mds_table_.set(data1, ctx2));
ctx2.on_abort(mock_scn(8));
}
// <DummyKey, ExampleUserData1>: (data:1, writer:1, ver:10)
// <DummyKey, ExampleUserData2>: (data:2, writer:1, ver:10)
void TestMdsTable::replay() {
ExampleUserData1 data1(3);
MdsCtx ctx1(mds::MdsWriter(ObTransID(1)));// commit finally
ASSERT_EQ(OB_SUCCESS, mds_table_.replay(data1, ctx1, mock_scn(9)));
// ctx1.on_redo(mock_scn(9));// insert to tail on unit ExampleUserData1
ctx1.before_prepare();
ctx1.on_prepare(mock_scn(9));
ctx1.on_commit(mock_scn(9), mock_scn(9));
auto &unit = GET_REAL_MDS_TABLE(mds_table_).unit_tuple_.element<MdsUnit<DummyKey, ExampleUserData1>>();
MDS_LOG(INFO, "xuwang test", K(unit.single_row_));
share::SCN recorde_scn = mock_scn(0);
unit.single_row_.v_.sorted_list_.for_each_node_from_tail_to_head_until_true([&](const UserMdsNode<DummyKey, ExampleUserData1> &node) -> int {
if (!node.is_aborted_()) {
MDS_ASSERT(node.redo_scn_ > recorde_scn);
recorde_scn = node.redo_scn_;
}
return OB_SUCCESS;
});
}
// <DummyKey, ExampleUserData1>: (data:1, writer:1, ver:10) -> (data:3, writer:1, ver:9)
// <DummyKey, ExampleUserData2>: (data:2, writer:1, ver:10)
void TestMdsTable::get_latest()
{
ExampleUserData1 data1(5);
MdsCtx ctx1(mds::MdsWriter(ObTransID(1)));// abort finally
ASSERT_EQ(OB_SUCCESS, mds_table_.set(data1, ctx1));
int value = 0;
bool unused_committed_flag = false;
ASSERT_EQ(OB_SUCCESS, mds_table_.get_latest<ExampleUserData1>([&value](const ExampleUserData1 &data) {
value = data.value_;
return OB_SUCCESS;
}, unused_committed_flag));
ASSERT_EQ(5, value);// read uncommitted
ctx1.on_abort(mock_scn(11));
}
void TestMdsTable::get_snapshot()
{
int value = 0;
ASSERT_EQ(OB_SUCCESS, mds_table_.get_snapshot<ExampleUserData1>([&value](const ExampleUserData1 &data) {
value = data.value_;
return OB_SUCCESS;
}, mock_scn(9)));
ASSERT_EQ(3, value);// read snapshot
}
void TestMdsTable::get_snapshot_hung_1s()
{
std::thread th1([&]() {
MdsCtx ctx(mds::MdsWriter(ObTransID(1)));// abort finally
ExampleUserData1 data(1);
ASSERT_EQ(OB_SUCCESS, mds_table_.set(data, ctx));
ctx.on_redo(mock_scn(3));
ctx.before_prepare();
this_thread::sleep_for(chrono::milliseconds(1200));
ctx.on_abort(mock_scn(12));
});
std::thread th2([&]() {
this_thread::sleep_for(chrono::milliseconds(100));
ExampleUserData1 data;
int64_t start_ts = ObClockGenerator::getRealClock();
ASSERT_EQ(OB_SUCCESS, mds_table_.get_snapshot<ExampleUserData1>([&data](const ExampleUserData1 &node_data) {
data = node_data;
return OB_SUCCESS;
}, share::SCN::max_scn(), 0, 2_s));
ASSERT_LE(1_s, ObClockGenerator::getRealClock() - start_ts);
ASSERT_EQ(1, data.value_);// read snapshot
});
th1.join();
th2.join();
}
void TestMdsTable::get_by_writer()
{
ExampleUserData1 data1(15);
MdsCtx ctx1(mds::MdsWriter(ObTransID(15)));// abort finally
ASSERT_EQ(OB_SUCCESS, mds_table_.set(data1, ctx1));
ExampleUserData2 data2;
ASSERT_EQ(OB_SUCCESS, data2.assign(MdsAllocator::get_instance(), "3456"));
MdsCtx ctx2(mds::MdsWriter(ObTransID(20)));// commit finally with version 20
ASSERT_EQ(OB_SUCCESS, mds_table_.set(data2, ctx2));
ctx2.on_redo(mock_scn(20));
ctx2.before_prepare();
ctx2.on_prepare(mock_scn(20));
ctx2.on_commit(mock_scn(20), mock_scn(20));
int value = 0;
ASSERT_EQ(OB_SUCCESS, mds_table_.get_by_writer<ExampleUserData1>([&value](const ExampleUserData1 &data) {
value = data.value_;
return OB_SUCCESS;
}, mds::MdsWriter(ObTransID(15)), mock_scn(15)));// read self uncommitted change
ASSERT_EQ(15, value);// read uncommitted
ASSERT_EQ(OB_SUCCESS, mds_table_.get_by_writer<ExampleUserData2>([&value](const ExampleUserData2 &data) {
value = data.data_.length();
return OB_SUCCESS;
}, mds::MdsWriter(ObTransID(15)), mock_scn(15)));// read others committed change
ASSERT_EQ(3, value);// read last committed
ctx1.on_abort(mock_scn(20));
}
// <DummyKey, ExampleUserData1>: (data:1, writer:1, ver:10) -> (data:3, writer:1, ver:9)
// <DummyKey, ExampleUserData2>: (data:20, writer:20, ver:20) -> (data:2, writer:1, ver:10)
/***********************************************Multi Row**************************************************************/
void TestMdsTable::insert_multi_row() {
ExampleUserKey key(1);
ExampleUserData1 data1(1);
MdsCtx ctx(mds::MdsWriter(ObTransID(1)));// commit finally
ASSERT_EQ(OB_SUCCESS, mds_table_.set(key, data1, ctx));
ctx.on_redo(mock_scn(1));
ctx.before_prepare();
ctx.on_prepare(mock_scn(1));
ctx.on_commit(mock_scn(1), mock_scn(1));
ExampleUserKey key2(2);
ExampleUserData1 data2(2);
MdsCtx ctx2(mds::MdsWriter(ObTransID(2)));// commit finally
ASSERT_EQ(OB_SUCCESS, mds_table_.set(key2, data2, ctx2));
ASSERT_EQ(OB_SUCCESS, mds_table_.set(key, data2, ctx2));// 因为只存储单版本,所以data1读不到了
ctx2.on_redo(mock_scn(2));
ctx2.before_prepare();
ctx2.on_prepare(mock_scn(2));
ctx2.on_commit(mock_scn(2), mock_scn(2));
}
// <DummyKey, ExampleUserData1>: (data:1, writer:1, ver:10) -> (data:3, writer:1, ver:9)
// <DummyKey, ExampleUserData2>: (data:20, writer:20, ver:20) -> (data:2, writer:1, ver:10)
// <ExampleUserKey, ExampleUserData1> : <1> : (data:2, writer:2, ver:2) -> (data:1, writer:1, ver:1)
// <2> : (data:2, writer:2, ver:2)
void TestMdsTable::get_multi_row() {
ExampleUserData1 read_data;
ExampleUserKey key(1);
ExampleUserData1 data3(3);
MdsCtx ctx3(mds::MdsWriter(ObTransID(3)));// abort finally
ASSERT_EQ(OB_SUCCESS, mds_table_.set(key, data3, ctx3));
auto read_op = [&read_data](const ExampleUserData1 &data) { read_data = data; return OB_SUCCESS; };
int ret = mds_table_.get_snapshot<ExampleUserKey, ExampleUserData1>(ExampleUserKey(1), read_op, mock_scn(2));
ASSERT_EQ(OB_SUCCESS, ret);
ASSERT_EQ(2, read_data.value_);
ret = mds_table_.get_snapshot<ExampleUserKey, ExampleUserData1>(ExampleUserKey(1), read_op, mock_scn(1));// 没有转储,旧版本还是保留的
ASSERT_EQ(OB_SUCCESS, ret);
ASSERT_EQ(1, read_data.value_);
bool unused_committed_flag = false;
ret = mds_table_.get_latest<ExampleUserKey, ExampleUserData1>(ExampleUserKey(1), read_op, unused_committed_flag);
ASSERT_EQ(OB_SUCCESS, ret);
ASSERT_EQ(3, read_data.value_);
ctx3.on_abort(mock_scn(3));
}
struct ScanOp {
ScanOp() : valid_count_(0), total_count_(0) {}
int operator()(const MdsNode &node) {
if (!node.is_aborted_())
++valid_count_;
return OB_SUCCESS;
}
int valid_count_;
int total_count_;
};
// void TestMdsTable::for_each_scan() {
// ScanOp op;
// ASSERT_EQ(OB_SUCCESS, mds_table_.for_each_scan_node(op));
// ASSERT_EQ(op.valid_count_, 7);
// }
void TestMdsTable::standard_iterator() {
// iter kv unit, range for
MdsUnit<ExampleUserKey, ExampleUserData1> *p_mds_unit = nullptr;
ASSERT_EQ(OB_SUCCESS, mds_table_.get_mds_unit(p_mds_unit));// need handle protect table life
{
MdsRLockGuard lg(p_mds_unit->lock_);// lock unit
for (auto &kv_row : *p_mds_unit) {
MdsRLockGuard lg(kv_row.v_.lock_);// lock row
for (auto &mds_node : kv_row.v_) {
MDS_LOG(INFO, "print iter mds node", K(mds_node));
}
}
}
// iter dummy key unit, reverse iter
MdsUnit<DummyKey, ExampleUserData1> *p_mds_unit2 = nullptr;
ASSERT_EQ(OB_SUCCESS, mds_table_.get_mds_unit(p_mds_unit2));// need handle protect table life
{
int64_t cnt_committed = 0;
// using KvRowIter = MdsUnit<DummyKey, ExampleUserData2>::iterator;
// using NodeIter = KvRowIter::row_type::iterator;
using KvRowIter = MdsUnit<DummyKey, ExampleUserData1>::reverse_iterator;
using NodeIter = KvRowIter::row_type::reverse_iterator;
MdsRLockGuard lg(p_mds_unit->lock_);// lock unit
for (KvRowIter iter1 = p_mds_unit2->rbegin(); iter1 != p_mds_unit2->rend(); ++iter1) {// there is actually only one
MdsRLockGuard lg(iter1->v_.lock_);// lock row
for (NodeIter iter2 = iter1->v_.rbegin(); iter2 != iter1->v_.rend(); ++iter2) {
if (iter2->is_committed_()) {
cnt_committed += 1;
}
}
int64_t cnt = std::count_if(iter1->v_.begin(), iter1->v_.end(), [](UserMdsNode<DummyKey, ExampleUserData1> &node){ return node.is_committed_(); });
ASSERT_EQ(cnt_committed, cnt);
}
}
}
void TestMdsTable::OB_iterator() {
ObMdsUnitRowNodeScanIterator<ExampleUserKey, ExampleUserData1> iter;
ExampleUserKey key;
UserMdsNode<ExampleUserKey, ExampleUserData1> *p_node = nullptr;
ASSERT_EQ(OB_SUCCESS, iter.init(mds_table_));
ASSERT_EQ(OB_SUCCESS, iter.get_next(key, p_node));
MDS_LOG(INFO, "print iter kv", K(key), K(*p_node));
ASSERT_EQ(ExampleUserKey(1), key);
{
ASSERT_EQ(ExampleUserData1(2), p_node->user_data_);
ASSERT_EQ(true, p_node->is_committed_());
ASSERT_EQ(mock_scn(2), p_node->get_commit_version_());
}
ASSERT_EQ(OB_SUCCESS, iter.get_next(key, p_node));
MDS_LOG(INFO, "print iter kv", K(key), K(*p_node));
ASSERT_EQ(ExampleUserKey(1), key);
{
ASSERT_EQ(ExampleUserData1(1), p_node->user_data_);
ASSERT_EQ(true, p_node->is_committed_());
ASSERT_EQ(mock_scn(1), p_node->get_commit_version_());
}
ASSERT_EQ(OB_SUCCESS, iter.get_next(key, p_node));
ASSERT_EQ(ExampleUserKey(2), key);
ASSERT_EQ(OB_ITER_END, iter.get_next(key, p_node));
}
// <DummyKey, ExampleUserData1>: (data:1, writer:1, ver:10) -> (data:3, writer:1, ver:9)
// <DummyKey, ExampleUserData2>: (data:20, writer:20, ver:20) -> (data:2, writer:1, ver:10)
// <ExampleUserKey, ExampleUserData1> : <1> : (data:100, writer:100, ver:19001) -> (data:2, writer:2, ver:2) -> (data:1, writer:1, ver:1)
// <2> : (data:200, writer:200, ver:MAX)
void TestMdsTable::test_flush() {
ExampleUserKey key(1);
ExampleUserData1 data1(100);
MdsCtx ctx(mds::MdsWriter(ObTransID(100)));// commit finally
ASSERT_EQ(OB_SUCCESS, mds_table_.set(key, data1, ctx));
ctx.on_redo(mock_scn(19001));
ctx.before_prepare();
ctx.on_prepare(mock_scn(19001));
ctx.on_commit(mock_scn(19002), mock_scn(19002));
ExampleUserKey key2(2);
ExampleUserData1 data2(200);
MdsCtx ctx2(mds::MdsWriter(ObTransID(200)));// abort finally
ASSERT_EQ(OB_SUCCESS, mds_table_.set(key2, data2, ctx2));
ctx2.on_redo(mock_scn(200));
int idx = 0;
ASSERT_EQ(OB_SUCCESS, mds_table_.flush(mock_scn(300)));// 1. 以300为版本号进行flush动作
ASSERT_EQ(mock_scn(199), mds_table_.p_mds_table_base_->flushing_scn_);// 2. 实际上以199为版本号进行flush动作
ASSERT_EQ(OB_SUCCESS, mds_table_.for_each_unit_from_small_key_to_big_from_old_node_to_new_to_dump(
[&idx](const MdsDumpKV &kv) -> int {// 2. 转储时扫描mds table
MDS_ASSERT(kv.v_.end_scn_ < mock_scn(199));// 扫描时看不到199版本以上的提交
MDS_ASSERT(idx < 10);
MDS_LOG(INFO, "print dump node kv", K(kv));
return OB_SUCCESS;
}, 0, true)
);
mds_table_.on_flush(mock_scn(199), OB_SUCCESS);// 3. 推大rec_scn【至少】到200
share::SCN rec_scn;
ASSERT_EQ(OB_SUCCESS, mds_table_.get_rec_scn(rec_scn));
MDS_LOG(INFO, "print rec scn", K(rec_scn));
ASSERT_EQ(rec_scn, mock_scn(200));
MdsCtx ctx3(mds::MdsWriter(ObTransID(101)));// abort finally
ASSERT_EQ(OB_SUCCESS, mds_table_.replay(ExampleUserKey(111), ExampleUserData1(111), ctx3, mock_scn(100)));
ASSERT_EQ(OB_SUCCESS, mds_table_.get_rec_scn(rec_scn));
ASSERT_EQ(rec_scn, mock_scn(100));
ctx3.on_abort(mock_scn(100));
ScanOp op;
// 未转储:一个已决node + 一个未决node
MDS_LOG(INFO, "print free times", K(oceanbase::storage::mds::MdsAllocator::get_alloc_times()), K(oceanbase::storage::mds::MdsAllocator::get_free_times()));// 回收已转储的node
ASSERT_EQ(OB_SUCCESS, mds_table_.try_recycle(mock_scn(200)));
// ASSERT_EQ(OB_SUCCESS, mds_table_.for_each_scan_node(op));
// ASSERT_EQ(op.valid_count_, 2);
ctx2.on_abort(mock_scn(200));
MDS_LOG(INFO, "print free times", K(oceanbase::storage::mds::MdsAllocator::get_free_times()));// 回收已转储的node
}
// <ExampleUserKey, ExampleUserData1> : <1> : (data:100, writer:100, ver:19001)
void TestMdsTable::test_is_locked_by_others() {
int ret = OB_SUCCESS;
bool is_locked = false;
ret = mds_table_.is_locked_by_others<ExampleUserKey, ExampleUserData1>(ExampleUserKey(1), is_locked);
ASSERT_EQ(OB_SUCCESS, ret);
ASSERT_EQ(false, is_locked);
ExampleUserData1 data1(300);
MdsCtx ctx(mds::MdsWriter(ObTransID(100)));// abort finally
ASSERT_EQ(OB_SUCCESS, mds_table_.set(ExampleUserKey(1), data1, ctx));
ret = mds_table_.is_locked_by_others<ExampleUserKey, ExampleUserData1>(ExampleUserKey(1), is_locked);
ASSERT_EQ(OB_SUCCESS, ret);
ASSERT_EQ(true, is_locked);
ret = mds_table_.is_locked_by_others<ExampleUserKey, ExampleUserData1>(ExampleUserKey(1), is_locked, mds::MdsWriter(ObTransID(100)));
ASSERT_EQ(OB_SUCCESS, ret);
ASSERT_EQ(false, is_locked);
}
// <ExampleUserKey, ExampleUserData1> : <1> : (data:100, writer:100, ver:19001)
void TestMdsTable::test_multi_key_remove() {
bool is_committed = false;
int ret = mds_table_.get_latest<ExampleUserKey, ExampleUserData1>(ExampleUserKey(1), [](const ExampleUserData1 &data){
return OB_SUCCESS;
}, is_committed);
ASSERT_EQ(OB_SUCCESS, ret);
ASSERT_EQ(OB_SUCCESS, mds_table_.flush(mock_scn(200)));
ASSERT_EQ(OB_SUCCESS, mds_table_.try_recycle(mock_scn(200)));
ret = mds_table_.get_latest<ExampleUserKey, ExampleUserData1>(ExampleUserKey(2), [](const ExampleUserData1 &data){
return OB_SUCCESS;
}, is_committed);
ASSERT_EQ(OB_ENTRY_NOT_EXIST, ret);
MdsCtx ctx(mds::MdsWriter(ObTransID(1)));
ret = mds_table_.remove<ExampleUserKey, ExampleUserData1>(ExampleUserKey(1), ctx);
ASSERT_EQ(OB_SUCCESS, ret);
ret = mds_table_.get_latest<ExampleUserKey, ExampleUserData1>(ExampleUserKey(1), [](const ExampleUserData1 &data){
return OB_SUCCESS;
}, is_committed);
ASSERT_EQ(OB_ENTRY_NOT_EXIST, ret);
}
TEST_F(TestMdsTable, set) { TestMdsTable::set(); }
TEST_F(TestMdsTable, replay) { TestMdsTable::replay(); }
TEST_F(TestMdsTable, get_latest) { TestMdsTable::get_latest(); }
TEST_F(TestMdsTable, get_snapshot) { TestMdsTable::get_snapshot(); }
TEST_F(TestMdsTable, get_snapshot_hung_1s) { TestMdsTable::get_snapshot_hung_1s(); }
TEST_F(TestMdsTable, get_by_writer) { TestMdsTable::get_by_writer(); }
TEST_F(TestMdsTable, insert_multi_row) { TestMdsTable::insert_multi_row(); }
TEST_F(TestMdsTable, get_multi_row) { TestMdsTable::get_multi_row(); }
TEST_F(TestMdsTable, test_standard_style_iterator) { TestMdsTable::standard_iterator(); }
TEST_F(TestMdsTable, test_OB_style_iterator) { TestMdsTable::OB_iterator(); }
TEST_F(TestMdsTable, test_flush) { TestMdsTable::test_flush(); }
TEST_F(TestMdsTable, test_is_locked_by_others) { TestMdsTable::test_is_locked_by_others(); }
TEST_F(TestMdsTable, test_multi_key_remove) { TestMdsTable::test_multi_key_remove(); }
TEST_F(TestMdsTable, basic_trans_example) {
MdsTableHandle mth;
// 1. 初始化为UnitTestMdsTable
ASSERT_EQ(OB_SUCCESS, mth.init<UnitTestMdsTable>(mds::DefaultAllocator::get_instance(),
ObTabletID(1),
share::ObLSID(1),
nullptr));
MdsTableHandle mth2 = mth;// 两个引用计数
MdsCtx ctx(mds::MdsWriter(ObTransID(123)));// 创建一个写入句柄,接入多源事务,ctx由事务层创建
// 2. 写入数据
ASSERT_EQ(OB_SUCCESS, mth.set(ExampleUserData1(1), ctx));// 写入第一个数据单元,成功
ASSERT_EQ(OB_OBJ_TYPE_ERROR, mth.set((int)54321, ctx));// 写入第二个数据单元,但UnitTestMdsTable并未注册该类型数据,Type ERROR
// 3. 对写入数据写CLOG并进行两阶段提交,接入多源事务,则该流程由事务层代为执行, 用户无感知
ctx.on_redo(mock_scn(100));
ctx.before_prepare();
ctx.on_prepare(mock_scn(100));
ctx.on_commit(mock_scn(100), mock_scn(100));
// 4. 读取最新已提交数据
ASSERT_EQ(OB_SUCCESS, mth.get_snapshot<ExampleUserData1>([](const ExampleUserData1 &data) {
return data.value_ != 1 ? OB_ERR_UNEXPECTED : OB_SUCCESS;
}));
}// 5. 最后一个Handle析构的时候,MdsTable发生真正的析构行为
TEST_F(TestMdsTable, basic_non_trans_example) {
MdsTableHandle mth;
// 1. 初始化为UnitTestMdsTable
ASSERT_EQ(OB_SUCCESS, mth.init<UnitTestMdsTable>(mds::DefaultAllocator::get_instance(),
ObTabletID(1),
share::ObLSID(1),
nullptr));
MdsTableHandle mth2 = mth;// 两个引用计数
MdsCtx ctx(MdsWriter(WriterType::AUTO_INC_SEQ, 1));// 创建一个写入句柄,不接入事务,自己写日志
// 2. 写入数据
ASSERT_EQ(OB_SUCCESS, mth.set(ExampleUserData1(1), ctx));// 写入第一个数据单元,成功
ASSERT_EQ(OB_OBJ_TYPE_ERROR, mth.set((int)54321, ctx));// 写入第二个数据单元,但UnitTestMdsTable并未注册该类型数据,Type ERROR
// 3. 对写入数据写CLOG并进行单条日志提交
ctx.single_log_commit(mock_scn(100), mock_scn(100));
// 4. 读取最新已提交数据
ASSERT_EQ(OB_SUCCESS, mth.get_snapshot<ExampleUserData1>([](const ExampleUserData1 &data) {
return data.value_ != 1 ? OB_ERR_UNEXPECTED : OB_SUCCESS;
}));
}// 5. 最后一个Handle析构的时候,MdsTable发生真正的析构行为
TEST_F(TestMdsTable, test_recycle) {
int64_t alloc_times = oceanbase::storage::mds::MdsAllocator::get_alloc_times();
int64_t free_times = oceanbase::storage::mds::MdsAllocator::get_free_times();
ASSERT_NE(alloc_times, free_times);
ASSERT_EQ(OB_SUCCESS, mds_table_.try_recycle(mock_scn(20000)));
int64_t valid_cnt = 0;
ASSERT_EQ(OB_SUCCESS, mds_table_.get_node_cnt(valid_cnt));
ASSERT_EQ(1, valid_cnt);// 此时还有一个19001版本的已提交数据,因为rec_scn没有推上去
ASSERT_EQ(OB_SUCCESS, mds_table_.flush(mock_scn(20000)));
mds_table_.for_each_unit_from_small_key_to_big_from_old_node_to_new_to_dump([](const MdsDumpKV &){
return OB_SUCCESS;
}, 0, true);
mds_table_.on_flush(mock_scn(20000), OB_SUCCESS);
share::SCN rec_scn;
ASSERT_EQ(OB_SUCCESS, mds_table_.get_rec_scn(rec_scn));
MDS_LOG(INFO, "print rec scn", K(rec_scn));
ASSERT_EQ(share::SCN::max_scn(), rec_scn);
ASSERT_EQ(OB_SUCCESS, mds_table_.try_recycle(mock_scn(20000)));
ASSERT_EQ(OB_SUCCESS, mds_table_.get_node_cnt(valid_cnt));
ASSERT_EQ(0, valid_cnt);// 此时还有一个19001版本的已提交数据,因为rec_scn没有推上去
}
TEST_F(TestMdsTable, test_recalculate_flush_scn_op) {
MdsTableHandle mds_table;
ASSERT_EQ(OB_SUCCESS, mds_table.init<UnitTestMdsTable>(MdsAllocator::get_instance(), ObTabletID(1), share::ObLSID(1), (ObTabletPointer*)0x111));
MdsCtx ctx1(mds::MdsWriter(ObTransID(1)));
MdsCtx ctx2(mds::MdsWriter(ObTransID(2)));
MdsCtx ctx3(mds::MdsWriter(ObTransID(3)));
ASSERT_EQ(OB_SUCCESS, mds_table.set(ExampleUserData1(1), ctx1));
ctx1.on_redo(mock_scn(1));
ctx1.on_commit(mock_scn(3), mock_scn(3));
ASSERT_EQ(OB_SUCCESS, mds_table.set(ExampleUserData1(2), ctx2));
ctx2.on_redo(mock_scn(5));
ctx2.on_commit(mock_scn(7), mock_scn(7));
ASSERT_EQ(OB_SUCCESS, mds_table.set(ExampleUserData1(3), ctx3));
ctx3.on_redo(mock_scn(9));
ctx3.on_commit(mock_scn(11), mock_scn(11));
ASSERT_EQ(OB_SUCCESS, mds_table.flush(mock_scn(4)));
ASSERT_EQ(mock_scn(4), mds_table.p_mds_table_base_->flushing_scn_);
mds_table.on_flush(mock_scn(4), OB_SUCCESS);
ASSERT_EQ(OB_SUCCESS, mds_table.flush(mock_scn(5)));// no need do flush, directly advance rec_scn
ASSERT_EQ(false, mds_table.p_mds_table_base_->flushing_scn_.is_valid());
ASSERT_EQ(OB_SUCCESS, mds_table.flush(mock_scn(6)));// no need do flush, directly advance rec_scn
ASSERT_EQ(false, mds_table.p_mds_table_base_->flushing_scn_.is_valid());
ASSERT_EQ(OB_SUCCESS, mds_table.flush(mock_scn(7)));
ASSERT_EQ(mock_scn(7), mds_table.p_mds_table_base_->flushing_scn_);
mds_table.on_flush(mock_scn(7), OB_SUCCESS);
ASSERT_EQ(OB_SUCCESS, mds_table.flush(mock_scn(8)));
ASSERT_EQ(false, mds_table.p_mds_table_base_->flushing_scn_.is_valid());
ASSERT_EQ(mock_scn(9), mds_table.p_mds_table_base_->rec_scn_);
}
// TEST_F(TestMdsTable, test_node_commit_in_row) {
// MdsRow<DummyKey, ExampleUserData1> row;
// MdsCtx ctx1(mds::MdsWriter(WriterType::AUTO_INC_SEQ, 1));
// ASSERT_EQ(OB_SUCCESS, row.set(ExampleUserData1(1), ctx1, 0));
// ctx1.single_log_commit(mock_scn(1), mock_scn(1));
// MdsCtx ctx2(mds::MdsWriter(WriterType::AUTO_INC_SEQ, 2));
// ASSERT_EQ(OB_SUCCESS, row.set(ExampleUserData1(2), ctx2, 0));
// ctx1.single_log_commit(mock_scn(2), mock_scn(2));// won't release last committed node
// int node_cnt = 0;
// row.sorted_list_.for_each_node_from_head_to_tail_until_true([&node_cnt](const UserMdsNode<DummyKey, ExampleUserData1> &){ ++node_cnt; return false; });
// ASSERT_EQ(2, node_cnt);
// }
TEST_F(TestMdsTable, test_rw_lock_rrlock) {
MdsLock lock;
std::thread t1([&lock]() {
MdsRLockGuard lg(lock);
ob_usleep(1_s);
});
std::thread t2([&lock]() {
MdsRLockGuard lg(lock);
ob_usleep(1_s);
});
t1.join();
t2.join();
}
TEST_F(TestMdsTable, test_rw_lock_wrlock) {
MdsLock lock;
std::thread t1([&lock]() {
MdsWLockGuard lg(lock);
ob_usleep(1_s);
});
std::thread t2([&lock]() {
ob_usleep(200_ms);
MdsRLockGuard lg(lock);
ob_usleep(1_s);
});
t1.join();
t2.join();
}
TEST_F(TestMdsTable, test_rw_lock_rwlock) {
MdsLock lock;
std::thread t1([&lock]() {
ob_usleep(200_ms);
MdsWLockGuard lg(lock);
ob_usleep(1_s);
});
std::thread t2([&lock]() {
MdsRLockGuard lg(lock);
ob_usleep(1_s);
});
t1.join();
t2.join();
}
TEST_F(TestMdsTable, test_rw_lock_wwlock) {
MdsLock lock;
std::thread t1([&lock]() {
MdsWLockGuard lg(lock);
ob_usleep(1_s);
});
std::thread t2([&lock]() {
MdsWLockGuard lg(lock);
ob_usleep(1_s);
});
t1.join();
t2.join();
}
}
}
int main(int argc, char **argv)
{
system("rm -rf test_mds_table.log");
oceanbase::common::ObLogger &logger = oceanbase::common::ObLogger::get_logger();
logger.set_file_name("test_mds_table.log", false);
logger.set_log_level(OB_LOG_LEVEL_DEBUG);
testing::InitGoogleTest(&argc, argv);
int ret = RUN_ALL_TESTS();
oceanbase::unittest::mds_table_.~MdsTableHandle();
int64_t alloc_times = oceanbase::storage::mds::MdsAllocator::get_alloc_times();
int64_t free_times = oceanbase::storage::mds::MdsAllocator::get_free_times();
if (alloc_times != free_times) {
MDS_LOG(ERROR, "memory may leak", K(free_times), K(alloc_times));
ret = -1;
} else {
MDS_LOG(INFO, "all memory released", K(free_times), K(alloc_times));
}
return ret;
}