[FEAT MERGE] mds_mvs
Co-authored-by: xuhuleon <xuhuleon@qq.com> Co-authored-by: godyangfight <godyangfight@gmail.com> Co-authored-by: JiahuaChen <garfieldjia@qq.com>
This commit is contained in:
@ -14,6 +14,33 @@
|
||||
#include <gtest/gtest.h>
|
||||
#include "storage/schema_utils.h"
|
||||
#include "storage/test_tablet_helper.h"
|
||||
#include "storage/tablet/ob_mds_schema_helper.h"
|
||||
namespace oceanbase {
|
||||
namespace storage {
|
||||
namespace mds {
|
||||
void *DefaultAllocator::alloc(const int64_t size) {
|
||||
void *ptr = std::malloc(size);// ob_malloc(size, "MDS");
|
||||
ATOMIC_INC(&alloc_times_);
|
||||
MDS_LOG(DEBUG, "alloc obj", KP(ptr), K(size), K(lbt()));
|
||||
return ptr;
|
||||
}
|
||||
void DefaultAllocator::free(void *ptr) {
|
||||
ATOMIC_INC(&free_times_);
|
||||
MDS_LOG(DEBUG, "free obj", KP(ptr), K(lbt()));
|
||||
std::free(ptr);// ob_free(ptr);
|
||||
}
|
||||
void *MdsAllocator::alloc(const int64_t size) {
|
||||
void *ptr = std::malloc(size);// ob_malloc(size, "MDS");
|
||||
ATOMIC_INC(&alloc_times_);
|
||||
MDS_LOG(DEBUG, "alloc obj", KP(ptr), K(size), K(lbt()));
|
||||
return ptr;
|
||||
}
|
||||
void MdsAllocator::free(void *ptr) {
|
||||
ATOMIC_INC(&free_times_);
|
||||
MDS_LOG(DEBUG, "free obj", KP(ptr), K(lbt()));
|
||||
std::free(ptr);// ob_free(ptr);
|
||||
}
|
||||
}}}
|
||||
|
||||
#define USING_LOG_PREFIX STORAGE
|
||||
|
||||
@ -33,6 +60,7 @@ public:
|
||||
|
||||
static void SetUpTestCase()
|
||||
{
|
||||
ObMdsSchemaHelper::get_instance().init();
|
||||
}
|
||||
|
||||
static void TearDownTestCase()
|
||||
|
@ -19,6 +19,33 @@
|
||||
#include "common/ob_clock_generator.h"
|
||||
#include "storage/multi_data_source/mds_table_handle.h"
|
||||
#include "storage/multi_data_source/compile_utility/compile_mapper.h"
|
||||
#include "storage/tablet/ob_mds_schema_helper.h"
|
||||
namespace oceanbase {
|
||||
namespace storage {
|
||||
namespace mds {
|
||||
void *DefaultAllocator::alloc(const int64_t size) {
|
||||
void *ptr = std::malloc(size);// ob_malloc(size, "MDS");
|
||||
ATOMIC_INC(&alloc_times_);
|
||||
MDS_LOG(DEBUG, "alloc obj", KP(ptr), K(size), K(lbt()));
|
||||
return ptr;
|
||||
}
|
||||
void DefaultAllocator::free(void *ptr) {
|
||||
ATOMIC_INC(&free_times_);
|
||||
MDS_LOG(DEBUG, "free obj", KP(ptr), K(lbt()));
|
||||
std::free(ptr);// ob_free(ptr);
|
||||
}
|
||||
void *MdsAllocator::alloc(const int64_t size) {
|
||||
void *ptr = std::malloc(size);// ob_malloc(size, "MDS");
|
||||
ATOMIC_INC(&alloc_times_);
|
||||
MDS_LOG(DEBUG, "alloc obj", KP(ptr), K(size), K(lbt()));
|
||||
return ptr;
|
||||
}
|
||||
void MdsAllocator::free(void *ptr) {
|
||||
ATOMIC_INC(&free_times_);
|
||||
MDS_LOG(DEBUG, "free obj", KP(ptr), K(lbt()));
|
||||
std::free(ptr);// ob_free(ptr);
|
||||
}
|
||||
}}}
|
||||
namespace oceanbase {
|
||||
namespace unittest {
|
||||
|
||||
@ -28,7 +55,7 @@ using namespace std;
|
||||
class TestMdsCompile: public ::testing::Test
|
||||
{
|
||||
public:
|
||||
TestMdsCompile() {};
|
||||
TestMdsCompile() { ObMdsSchemaHelper::get_instance().init(); };
|
||||
virtual ~TestMdsCompile() {};
|
||||
virtual void SetUp() {
|
||||
};
|
||||
|
@ -16,6 +16,7 @@
|
||||
#include <gtest/gtest.h>
|
||||
#define private public
|
||||
#define protected public
|
||||
#include "storage/multi_data_source/mds_table_impl.h"
|
||||
#include "storage/multi_data_source/compile_utility/mds_dummy_key.h"
|
||||
#include "storage/multi_data_source/compile_utility/map_type_index_in_tuple.h"
|
||||
#include "storage/multi_data_source/adapter_define/mds_dump_node.h"
|
||||
@ -28,14 +29,23 @@
|
||||
#include "storage/multi_data_source/mds_node.h"
|
||||
#include "storage/multi_data_source/mds_table_handle.h"
|
||||
#include "storage/tablet/ob_tablet_meta.h"
|
||||
#include "storage/tablet/ob_mds_schema_helper.h"
|
||||
namespace oceanbase {
|
||||
namespace storage
|
||||
{
|
||||
namespace mds
|
||||
{
|
||||
void *MdsAllocator::alloc(const int64_t size)
|
||||
{
|
||||
void *ptr = ob_malloc(size, "MDS");
|
||||
namespace storage {
|
||||
namespace mds {
|
||||
void *DefaultAllocator::alloc(const int64_t size) {
|
||||
void *ptr = std::malloc(size);// ob_malloc(size, "MDS");
|
||||
ATOMIC_INC(&alloc_times_);
|
||||
MDS_LOG(DEBUG, "alloc obj", KP(ptr), K(size), K(lbt()));
|
||||
return ptr;
|
||||
}
|
||||
void DefaultAllocator::free(void *ptr) {
|
||||
ATOMIC_INC(&free_times_);
|
||||
MDS_LOG(DEBUG, "free obj", KP(ptr), K(lbt()));
|
||||
std::free(ptr);// ob_free(ptr);
|
||||
}
|
||||
void *MdsAllocator::alloc(const int64_t size) {
|
||||
void *ptr = std::malloc(size);// ob_malloc(size, "MDS");
|
||||
ATOMIC_INC(&alloc_times_);
|
||||
MDS_LOG(DEBUG, "alloc obj", KP(ptr), K(size), K(lbt()));
|
||||
return ptr;
|
||||
@ -43,10 +53,10 @@ void *MdsAllocator::alloc(const int64_t size)
|
||||
void MdsAllocator::free(void *ptr) {
|
||||
ATOMIC_INC(&free_times_);
|
||||
MDS_LOG(DEBUG, "free obj", KP(ptr), K(lbt()));
|
||||
ob_free(ptr);
|
||||
}
|
||||
}
|
||||
std::free(ptr);// ob_free(ptr);
|
||||
}
|
||||
}}}
|
||||
namespace oceanbase {
|
||||
namespace unittest {
|
||||
|
||||
using namespace common;
|
||||
@ -54,17 +64,117 @@ using namespace std;
|
||||
using namespace storage;
|
||||
using namespace mds;
|
||||
|
||||
struct OldMdsDumpNode
|
||||
{
|
||||
public:
|
||||
int serialize(char *buf, const int64_t buf_len, int64_t &pos) const;
|
||||
int deserialize(common::ObIAllocator &allocator, const char *buf, const int64_t data_len, int64_t &pos);
|
||||
int64_t get_serialize_size() const;
|
||||
public:
|
||||
static constexpr int64_t UNIS_VERSION = 1;
|
||||
|
||||
uint8_t mds_table_id_;
|
||||
uint8_t mds_unit_id_;
|
||||
uint32_t crc_check_number_;
|
||||
MdsNodeStatus status_;
|
||||
ObIAllocator *allocator_;// to release serialized buffer
|
||||
int64_t writer_id_; // mostly is tx id
|
||||
int64_t seq_no_;// if one writer write multi nodes in one same row, seq_no is used to order those writes
|
||||
share::SCN redo_scn_; // log scn of redo
|
||||
share::SCN end_scn_; // log scn of commit/abort
|
||||
share::SCN trans_version_; // read as prepare version if phase is not COMMIT, or read as commit version
|
||||
common::ObString user_data_;// different user data type serialized result(may be very large, but should be less than 1.5MB)
|
||||
};
|
||||
|
||||
int OldMdsDumpNode::serialize(char *buf, const int64_t buf_len, int64_t &pos) const
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
|
||||
LST_DO_CODE(OB_UNIS_ENCODE,
|
||||
UNIS_VERSION,
|
||||
mds_table_id_,
|
||||
mds_unit_id_,
|
||||
writer_id_,
|
||||
seq_no_,
|
||||
redo_scn_,
|
||||
end_scn_,
|
||||
trans_version_,
|
||||
status_,
|
||||
crc_check_number_,
|
||||
user_data_);
|
||||
|
||||
return ret;
|
||||
}
|
||||
|
||||
int OldMdsDumpNode::deserialize(common::ObIAllocator &allocator, const char *buf, const int64_t data_len, int64_t &pos)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
int64_t version = 0;
|
||||
ObString user_data;
|
||||
|
||||
LST_DO_CODE(OB_UNIS_DECODE,
|
||||
version,
|
||||
mds_table_id_,
|
||||
mds_unit_id_,
|
||||
writer_id_,
|
||||
seq_no_,
|
||||
redo_scn_,
|
||||
end_scn_,
|
||||
trans_version_,
|
||||
status_,
|
||||
crc_check_number_,
|
||||
user_data);
|
||||
|
||||
if (OB_SUCC(ret)) {
|
||||
allocator_ = &allocator;
|
||||
const int64_t len = user_data.length();
|
||||
char *buffer = nullptr;
|
||||
if (0 == len) {
|
||||
} else if (OB_ISNULL(buffer = static_cast<char*>(allocator_->alloc(len)))) {
|
||||
ret = OB_ALLOCATE_MEMORY_FAILED;
|
||||
MDS_LOG(WARN, "failed to allocate memory", K(ret), K(len));
|
||||
} else {
|
||||
MEMCPY(buffer, user_data.ptr(), len);
|
||||
user_data_.assign(buffer, len);
|
||||
}
|
||||
}
|
||||
|
||||
return ret;
|
||||
}
|
||||
|
||||
int64_t OldMdsDumpNode::get_serialize_size() const
|
||||
{
|
||||
int64_t len = 0;
|
||||
|
||||
LST_DO_CODE(OB_UNIS_ADD_LEN,
|
||||
UNIS_VERSION,
|
||||
mds_table_id_,
|
||||
mds_unit_id_,
|
||||
writer_id_,
|
||||
seq_no_,
|
||||
redo_scn_,
|
||||
end_scn_,
|
||||
trans_version_,
|
||||
status_,
|
||||
crc_check_number_,
|
||||
user_data_);
|
||||
|
||||
return len;
|
||||
}
|
||||
|
||||
class TestMdsDumpKV: public ::testing::Test
|
||||
{
|
||||
public:
|
||||
TestMdsDumpKV() {};
|
||||
virtual ~TestMdsDumpKV() {};
|
||||
TestMdsDumpKV() { ObMdsSchemaHelper::get_instance().init(); }
|
||||
virtual ~TestMdsDumpKV() {}
|
||||
virtual void SetUp() {
|
||||
};
|
||||
}
|
||||
virtual void TearDown() {
|
||||
};
|
||||
}
|
||||
static void test_dump_node_convert_and_serialize_and_compare();
|
||||
static void test_dump_dummy_key();
|
||||
static void test_dump_kv_wrapper();
|
||||
static void test_dump_node_deserialize();
|
||||
private:
|
||||
// disallow copy
|
||||
DISALLOW_COPY_AND_ASSIGN(TestMdsDumpKV);
|
||||
@ -74,7 +184,7 @@ void TestMdsDumpKV::test_dump_node_convert_and_serialize_and_compare()
|
||||
{
|
||||
ExampleUserData2 first_data;
|
||||
ASSERT_EQ(OB_SUCCESS, first_data.assign(MdsAllocator::get_instance(), "123"));
|
||||
UserMdsNode<DummyKey, ExampleUserData2> user_mds_node(nullptr, MdsNodeType::SET, WriterType::TRANSACTION, 1);
|
||||
UserMdsNode<DummyKey, ExampleUserData2> user_mds_node(nullptr, MdsNodeType::SET, WriterType::TRANSACTION, 1, transaction::ObTxSEQ::mk_v0(100));
|
||||
ASSERT_EQ(OB_SUCCESS, meta::move_or_copy_or_assign(first_data, user_mds_node.user_data_, MdsAllocator::get_instance()));
|
||||
ASSERT_EQ(true, first_data.data_.ptr() != user_mds_node.user_data_.data_.ptr());
|
||||
ASSERT_EQ(0, memcmp(first_data.data_.ptr(), user_mds_node.user_data_.data_.ptr(), first_data.data_.length()));
|
||||
@ -122,9 +232,114 @@ void TestMdsDumpKV::test_dump_dummy_key()
|
||||
ASSERT_EQ(0, compare_result);
|
||||
}
|
||||
|
||||
void TestMdsDumpKV::test_dump_kv_wrapper()
|
||||
{
|
||||
MdsDumpKV dump_kv;
|
||||
// construct dump key
|
||||
DummyKey key;
|
||||
dump_kv.k_.init(GET_MDS_TABLE_ID<UnitTestMdsTable>::value, GET_MDS_UNIT_ID<UnitTestMdsTable, DummyKey, ExampleUserData2>::value, key, MdsAllocator::get_instance());
|
||||
|
||||
// construct dump node
|
||||
ExampleUserData2 first_data;
|
||||
ASSERT_EQ(OB_SUCCESS, first_data.assign(MdsAllocator::get_instance(), "123"));
|
||||
UserMdsNode<DummyKey, ExampleUserData2> user_mds_node(nullptr, MdsNodeType::SET, WriterType::TRANSACTION, 1, transaction::ObTxSEQ::mk_v0(1));
|
||||
ASSERT_EQ(OB_SUCCESS, meta::move_or_copy_or_assign(first_data, user_mds_node.user_data_, MdsAllocator::get_instance()));
|
||||
ASSERT_EQ(true, first_data.data_.ptr() != user_mds_node.user_data_.data_.ptr());
|
||||
ASSERT_EQ(0, memcmp(first_data.data_.ptr(), user_mds_node.user_data_.data_.ptr(), first_data.data_.length()));
|
||||
user_mds_node.try_on_redo(mock_scn(10));
|
||||
user_mds_node.try_before_prepare();
|
||||
user_mds_node.try_on_prepare(mock_scn(11));
|
||||
user_mds_node.try_on_commit(mock_scn(11), mock_scn(11));
|
||||
ASSERT_EQ(OB_SUCCESS, dump_kv.v_.init(GET_MDS_TABLE_ID<UnitTestMdsTable>::value, GET_MDS_UNIT_ID<UnitTestMdsTable, DummyKey, ExampleUserData2>::value, user_mds_node, mds::MdsAllocator::get_instance()));
|
||||
|
||||
// convert dump kv to adapter
|
||||
MdsDumpKVStorageAdapter adapter(dump_kv);
|
||||
|
||||
// convert from adapter
|
||||
MdsDumpKV dump_kv2;
|
||||
ASSERT_EQ(OB_SUCCESS, dump_kv2.convert_from_adapter(MdsAllocator::get_instance(), adapter));
|
||||
|
||||
// convert adapter to row;
|
||||
ObArenaAllocator allocator;
|
||||
blocksstable::ObDatumRow mds_row;
|
||||
ASSERT_EQ(OB_SUCCESS, mds_row.init(ObMdsSchemaHelper::MDS_MULTI_VERSION_ROW_COLUMN_CNT));
|
||||
ASSERT_EQ(OB_SUCCESS, adapter.convert_to_mds_row(allocator, mds_row));
|
||||
|
||||
// TODO(@luhaopeng.lhp): open this case later
|
||||
/*
|
||||
// convert adapter from row;
|
||||
MdsDumpKVStorageAdapter adapter2;
|
||||
// mds row should come from sstable, which does not offer multi version row(trans version+sql no)
|
||||
ASSERT_EQ(OB_SUCCESS, adapter2.convert_from_mds_row(mds_row));
|
||||
|
||||
|
||||
ASSERT_EQ(adapter.type_, adapter2.type_);
|
||||
ASSERT_EQ(0, adapter.key_.compare(adapter2.key_));
|
||||
ASSERT_EQ(adapter.meta_info_.trans_version_.val_, adapter2.meta_info_.trans_version_.val_);
|
||||
ASSERT_EQ(adapter.meta_info_.seq_no_, adapter2.meta_info_.seq_no_);
|
||||
ASSERT_EQ(adapter.meta_info_.mds_table_id_, adapter2.meta_info_.mds_table_id_);
|
||||
ASSERT_EQ(adapter.meta_info_.key_crc_check_number_, adapter2.meta_info_.key_crc_check_number_);
|
||||
ASSERT_EQ(adapter.meta_info_.data_crc_check_number_, adapter2.meta_info_.data_crc_check_number_);
|
||||
ASSERT_EQ(adapter.meta_info_.status_.union_.value_, adapter2.meta_info_.status_.union_.value_);
|
||||
ASSERT_EQ(adapter.meta_info_.writer_id_, adapter2.meta_info_.writer_id_);
|
||||
ASSERT_EQ(adapter.meta_info_.redo_scn_.val_, adapter2.meta_info_.redo_scn_.val_);
|
||||
ASSERT_EQ(adapter.meta_info_.end_scn_.val_, adapter2.meta_info_.end_scn_.val_);
|
||||
ASSERT_EQ(0, adapter.user_data_.compare(adapter2.user_data_));
|
||||
*/
|
||||
}
|
||||
|
||||
void TestMdsDumpKV::test_dump_node_deserialize()
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
|
||||
ObTabletCreateDeleteMdsUserData user_data;
|
||||
user_data.tablet_status_ = ObTabletStatus::NORMAL;
|
||||
user_data.data_type_ = ObTabletMdsUserDataType::CREATE_TABLET;
|
||||
user_data.create_commit_scn_ = share::SCN::plus(share::SCN::min_scn(), 10);
|
||||
user_data.create_commit_version_ = 10;
|
||||
|
||||
ObArenaAllocator allocator;
|
||||
int64_t size = user_data.get_serialize_size();
|
||||
char *buf = static_cast<char *>(allocator.alloc(size));
|
||||
ASSERT_NE(nullptr, buf);
|
||||
|
||||
int64_t pos = 0;
|
||||
ret = user_data.serialize(buf, size, pos);
|
||||
ASSERT_EQ(OB_SUCCESS, ret);
|
||||
|
||||
OldMdsDumpNode old_dump_node;
|
||||
old_dump_node.mds_table_id_ = 1;
|
||||
old_dump_node.mds_unit_id_ = 1;
|
||||
old_dump_node.crc_check_number_ = 0xabcd;
|
||||
old_dump_node.status_.union_.field_.node_type_ = MdsNodeType::SET;
|
||||
old_dump_node.status_.union_.field_.writer_type_ = WriterType::TRANSACTION;
|
||||
old_dump_node.status_.union_.field_.state_ = TwoPhaseCommitState::ON_COMMIT;
|
||||
old_dump_node.writer_id_ = 123;
|
||||
old_dump_node.seq_no_ = 0;
|
||||
old_dump_node.redo_scn_ = share::SCN::plus(share::SCN::min_scn(), 5);
|
||||
old_dump_node.end_scn_ = share::SCN::plus(share::SCN::min_scn(), 10);
|
||||
old_dump_node.trans_version_ = share::SCN::plus(share::SCN::min_scn(), 10);
|
||||
old_dump_node.user_data_.assign(buf, size);
|
||||
|
||||
size = old_dump_node.get_serialize_size();
|
||||
pos = 0;
|
||||
buf = static_cast<char *>(allocator.alloc(size));
|
||||
ASSERT_NE(nullptr, buf);
|
||||
ret = old_dump_node.serialize(buf, size, pos);
|
||||
ASSERT_EQ(OB_SUCCESS, ret);
|
||||
|
||||
// compat
|
||||
MdsDumpNode dump_node;
|
||||
pos = 0;
|
||||
ret = dump_node.deserialize(allocator, buf, size, pos);
|
||||
ASSERT_EQ(OB_SUCCESS, ret);
|
||||
ASSERT_EQ(transaction::ObTxSEQ::MIN_VAL(), dump_node.seq_no_);
|
||||
}
|
||||
|
||||
TEST_F(TestMdsDumpKV, test_convert_and_serialize_and_compare) { TestMdsDumpKV::test_dump_node_convert_and_serialize_and_compare(); }
|
||||
TEST_F(TestMdsDumpKV, test_dump_dummy_key) { TestMdsDumpKV::test_dump_dummy_key(); }
|
||||
TEST_F(TestMdsDumpKV, test_dump_kv_wrapper) { TestMdsDumpKV::test_dump_kv_wrapper(); }
|
||||
TEST_F(TestMdsDumpKV, test_dump_node_deserialize) { TestMdsDumpKV::test_dump_node_deserialize(); }
|
||||
|
||||
}
|
||||
}
|
||||
|
@ -27,14 +27,23 @@
|
||||
#include "storage/multi_data_source/mds_node.h"
|
||||
#include "common/meta_programming/ob_type_traits.h"
|
||||
#include "storage/multi_data_source/mds_row.h"
|
||||
#include "storage/tablet/ob_mds_schema_helper.h"
|
||||
namespace oceanbase {
|
||||
namespace storage
|
||||
{
|
||||
namespace mds
|
||||
{
|
||||
void *MdsAllocator::alloc(const int64_t size)
|
||||
{
|
||||
void *ptr = ob_malloc(size, "MDS");
|
||||
namespace storage {
|
||||
namespace mds {
|
||||
void *DefaultAllocator::alloc(const int64_t size) {
|
||||
void *ptr = std::malloc(size);// ob_malloc(size, "MDS");
|
||||
ATOMIC_INC(&alloc_times_);
|
||||
MDS_LOG(DEBUG, "alloc obj", KP(ptr), K(size), K(lbt()));
|
||||
return ptr;
|
||||
}
|
||||
void DefaultAllocator::free(void *ptr) {
|
||||
ATOMIC_INC(&free_times_);
|
||||
MDS_LOG(DEBUG, "free obj", KP(ptr), K(lbt()));
|
||||
std::free(ptr);// ob_free(ptr);
|
||||
}
|
||||
void *MdsAllocator::alloc(const int64_t size) {
|
||||
void *ptr = std::malloc(size);// ob_malloc(size, "MDS");
|
||||
ATOMIC_INC(&alloc_times_);
|
||||
MDS_LOG(DEBUG, "alloc obj", KP(ptr), K(size), K(lbt()));
|
||||
return ptr;
|
||||
@ -42,10 +51,10 @@ void *MdsAllocator::alloc(const int64_t size)
|
||||
void MdsAllocator::free(void *ptr) {
|
||||
ATOMIC_INC(&free_times_);
|
||||
MDS_LOG(DEBUG, "free obj", KP(ptr), K(lbt()));
|
||||
ob_free(ptr);
|
||||
}
|
||||
}
|
||||
std::free(ptr);// ob_free(ptr);
|
||||
}
|
||||
}}}
|
||||
namespace oceanbase {
|
||||
namespace unittest {
|
||||
|
||||
using namespace common;
|
||||
@ -56,7 +65,7 @@ using namespace mds;
|
||||
class TestMdsList: public ::testing::Test
|
||||
{
|
||||
public:
|
||||
TestMdsList() {};
|
||||
TestMdsList() { ObMdsSchemaHelper::get_instance().init(); };
|
||||
virtual ~TestMdsList() {};
|
||||
virtual void SetUp() {
|
||||
};
|
||||
|
@ -27,14 +27,23 @@
|
||||
#include "storage/multi_data_source/mds_node.h"
|
||||
#include "common/meta_programming/ob_type_traits.h"
|
||||
#include "storage/multi_data_source/mds_row.h"
|
||||
#include "storage/tablet/ob_mds_schema_helper.h"
|
||||
namespace oceanbase {
|
||||
namespace storage
|
||||
{
|
||||
namespace mds
|
||||
{
|
||||
void *MdsAllocator::alloc(const int64_t size)
|
||||
{
|
||||
void *ptr = ob_malloc(size, "MDS");
|
||||
namespace storage {
|
||||
namespace mds {
|
||||
void *DefaultAllocator::alloc(const int64_t size) {
|
||||
void *ptr = std::malloc(size);// ob_malloc(size, "MDS");
|
||||
ATOMIC_INC(&alloc_times_);
|
||||
MDS_LOG(DEBUG, "alloc obj", KP(ptr), K(size), K(lbt()));
|
||||
return ptr;
|
||||
}
|
||||
void DefaultAllocator::free(void *ptr) {
|
||||
ATOMIC_INC(&free_times_);
|
||||
MDS_LOG(DEBUG, "free obj", KP(ptr), K(lbt()));
|
||||
std::free(ptr);// ob_free(ptr);
|
||||
}
|
||||
void *MdsAllocator::alloc(const int64_t size) {
|
||||
void *ptr = std::malloc(size);// ob_malloc(size, "MDS");
|
||||
ATOMIC_INC(&alloc_times_);
|
||||
MDS_LOG(DEBUG, "alloc obj", KP(ptr), K(size), K(lbt()));
|
||||
return ptr;
|
||||
@ -42,10 +51,10 @@ void *MdsAllocator::alloc(const int64_t size)
|
||||
void MdsAllocator::free(void *ptr) {
|
||||
ATOMIC_INC(&free_times_);
|
||||
MDS_LOG(DEBUG, "free obj", KP(ptr), K(lbt()));
|
||||
ob_free(ptr);
|
||||
}
|
||||
}
|
||||
std::free(ptr);// ob_free(ptr);
|
||||
}
|
||||
}}}
|
||||
namespace oceanbase {
|
||||
namespace unittest {
|
||||
|
||||
using namespace common;
|
||||
@ -56,7 +65,7 @@ using namespace mds;
|
||||
class TestMdsNode: public ::testing::Test
|
||||
{
|
||||
public:
|
||||
TestMdsNode() {};
|
||||
TestMdsNode() { ObMdsSchemaHelper::get_instance().init(); };
|
||||
virtual ~TestMdsNode() {};
|
||||
virtual void SetUp() {
|
||||
};
|
||||
@ -99,7 +108,7 @@ struct UserDataWithCallBack
|
||||
|
||||
TEST_F(TestMdsNode, call_user_method) {
|
||||
MdsRow<DummyKey, UserDataWithCallBack> row;
|
||||
MdsCtx ctx(mds::MdsWriter(transaction::ObTransID(100)));// commit finally
|
||||
MdsCtx ctx(mds::MdsWriter(transaction::ObTransID(100)), transaction::ObTxSEQ::mk_v0(1));// commit finally
|
||||
ASSERT_EQ(OB_SUCCESS, row.set(UserDataWithCallBack(1), ctx, {share::ObLSID(0), 0}));
|
||||
ctx.on_redo(mock_scn(1));
|
||||
ctx.before_prepare();
|
||||
@ -113,7 +122,7 @@ TEST_F(TestMdsNode, call_user_method) {
|
||||
|
||||
TEST_F(TestMdsNode, release_node_while_node_in_ctx) {
|
||||
MdsRow<DummyKey, UserDataWithCallBack> row;
|
||||
MdsCtx ctx(mds::MdsWriter(transaction::ObTransID(100)));// commit finally
|
||||
MdsCtx ctx(mds::MdsWriter(transaction::ObTransID(100)), transaction::ObTxSEQ::mk_v0(1));// commit finally
|
||||
ASSERT_EQ(OB_SUCCESS, row.set(UserDataWithCallBack(1), ctx, {share::ObLSID(0), 0}));
|
||||
ctx.on_redo(mock_scn(1));
|
||||
ctx.before_prepare();
|
||||
@ -129,7 +138,7 @@ TEST_F(TestMdsNode, release_node_while_node_in_ctx_concurrent) {
|
||||
call_try_on_commit = 0;
|
||||
call_try_on_abort = 0;
|
||||
MdsRow<DummyKey, UserDataWithCallBack> row;
|
||||
MdsCtx ctx(mds::MdsWriter(transaction::ObTransID(100)));// commit finally
|
||||
MdsCtx ctx(mds::MdsWriter(transaction::ObTransID(100)), transaction::ObTxSEQ::mk_v0(1));// commit finally
|
||||
// 提交这些node将会耗时50ms
|
||||
ASSERT_EQ(OB_SUCCESS, row.set(UserDataWithCallBack(1), ctx, {share::ObLSID(0), 0}));
|
||||
ASSERT_EQ(OB_SUCCESS, row.set(UserDataWithCallBack(2), ctx, {share::ObLSID(0), 0}));
|
||||
@ -209,10 +218,11 @@ TEST_F(TestMdsNode, release_node_while_node_in_ctx_concurrent) {
|
||||
// }
|
||||
|
||||
TEST_F(TestMdsNode, test_node_print) {
|
||||
UserMdsNode<DummyKey, UserDataWithCallBack> node0(nullptr, MdsNodeType::SET, WriterType::TRANSACTION, 1);
|
||||
UserMdsNode<DummyKey, UserDataWithCallBack> node1(nullptr, MdsNodeType::SET, WriterType::TRANSACTION, 1);
|
||||
UserMdsNode<DummyKey, UserDataWithCallBack> node0(nullptr, MdsNodeType::SET, WriterType::TRANSACTION, 1, transaction::ObTxSEQ::mk_v0(100));
|
||||
UserMdsNode<DummyKey, UserDataWithCallBack> node1(nullptr, MdsNodeType::SET, WriterType::TRANSACTION, 1, transaction::ObTxSEQ::mk_v0(100));
|
||||
}
|
||||
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -25,6 +25,33 @@
|
||||
#include "common/ob_clock_generator.h"
|
||||
#include "storage/multi_data_source/mds_row.h"
|
||||
#include "example_user_helper_define.cpp"
|
||||
#include "storage/tablet/ob_mds_schema_helper.h"
|
||||
namespace oceanbase {
|
||||
namespace storage {
|
||||
namespace mds {
|
||||
void *DefaultAllocator::alloc(const int64_t size) {
|
||||
void *ptr = std::malloc(size);// ob_malloc(size, "MDS");
|
||||
ATOMIC_INC(&alloc_times_);
|
||||
MDS_LOG(DEBUG, "alloc obj", KP(ptr), K(size), K(lbt()));
|
||||
return ptr;
|
||||
}
|
||||
void DefaultAllocator::free(void *ptr) {
|
||||
ATOMIC_INC(&free_times_);
|
||||
MDS_LOG(DEBUG, "free obj", KP(ptr), K(lbt()));
|
||||
std::free(ptr);// ob_free(ptr);
|
||||
}
|
||||
void *MdsAllocator::alloc(const int64_t size) {
|
||||
void *ptr = std::malloc(size);// ob_malloc(size, "MDS");
|
||||
ATOMIC_INC(&alloc_times_);
|
||||
MDS_LOG(DEBUG, "alloc obj", KP(ptr), K(size), K(lbt()));
|
||||
return ptr;
|
||||
}
|
||||
void MdsAllocator::free(void *ptr) {
|
||||
ATOMIC_INC(&free_times_);
|
||||
MDS_LOG(DEBUG, "free obj", KP(ptr), K(lbt()));
|
||||
std::free(ptr);// ob_free(ptr);
|
||||
}
|
||||
}}}
|
||||
namespace oceanbase {
|
||||
namespace unittest {
|
||||
|
||||
@ -36,7 +63,7 @@ using namespace mds;
|
||||
class TestMdsRowAndMdsCtx: public ::testing::Test
|
||||
{
|
||||
public:
|
||||
TestMdsRowAndMdsCtx() {};
|
||||
TestMdsRowAndMdsCtx() { ObMdsSchemaHelper::get_instance().init(); };
|
||||
virtual ~TestMdsRowAndMdsCtx() {};
|
||||
virtual void SetUp() {
|
||||
};
|
||||
@ -61,11 +88,11 @@ void TestMdsRowAndMdsCtx::mds_row_set_element() {
|
||||
ExampleUserData2 data1(1);
|
||||
MdsCtx ctx1(1);
|
||||
ASSERT_EQ(OB_SUCCESS, row_.set(data1, ctx1, 0));// copy user data
|
||||
UserMdsNode<ExampleUserData2> *user_node = dynamic_cast<UserMdsNode<ExampleUserData2> *>(row_.sorted_list_.list_.list_head_);
|
||||
UserMdsNode<ExampleUserData2> *user_node = static_cast<UserMdsNode<ExampleUserData2> *>(row_.sorted_list_.list_.list_head_);
|
||||
ASSERT_NE(user_node->user_data_.data_.ptr(), data1.data_.ptr());
|
||||
auto p_data = data1.data_.ptr();
|
||||
ASSERT_EQ(OB_SUCCESS, row_.set(std::move(data1), ctx1, 0));// move user data
|
||||
user_node = dynamic_cast<UserMdsNode<ExampleUserData2> *>(row_.sorted_list_.list_.list_head_);
|
||||
user_node = static_cast<UserMdsNode<ExampleUserData2> *>(row_.sorted_list_.list_.list_head_);
|
||||
ASSERT_EQ(user_node->user_data_.data_.ptr(), p_data);
|
||||
MdsCtx ctx2(2);
|
||||
ExampleUserData2 data2(2);
|
||||
@ -130,14 +157,14 @@ void TestMdsRowAndMdsCtx::get_latest() {
|
||||
ASSERT_EQ(OB_SUCCESS, row_.get_snapshot([&data_size](const ExampleUserData2 &data) {
|
||||
data_size = data.data_.length();
|
||||
return OB_SUCCESS;
|
||||
}, share::SCN::max_scn(), 0, 0));
|
||||
}, share::SCN::max_scn(), 0));
|
||||
ExampleUserData2 data(13);
|
||||
MdsCtx ctx(7);
|
||||
ASSERT_EQ(OB_SUCCESS, row_.set(data, ctx, 0));
|
||||
ASSERT_EQ(OB_ERR_SHARED_LOCK_CONFLICT, row_.get_snapshot([&data_size](const ExampleUserData2 &data) {
|
||||
data_size = data.data_.length();
|
||||
return OB_SUCCESS;
|
||||
}, share::SCN::max_scn(), 0, 0));
|
||||
}, share::SCN::max_scn(), 0));
|
||||
ASSERT_EQ(12, data_size);
|
||||
}
|
||||
|
||||
@ -172,7 +199,7 @@ void TestMdsRowAndMdsCtx::get_snapshop_until_timeout() {
|
||||
ASSERT_EQ(OB_ERR_SHARED_LOCK_CONFLICT, row_.get_snapshot([&data_size](const ExampleUserData2 &data) {
|
||||
data_size = data.data_.length();
|
||||
return OB_SUCCESS;
|
||||
}, mock_scn(6), 0, 500_ms));// read snapshot timeout
|
||||
}, mock_scn(6), 500_ms));// read snapshot timeout
|
||||
|
||||
ctx.on_redo(mock_scn(20));
|
||||
ctx.on_prepare(mock_scn(20));
|
||||
@ -181,19 +208,19 @@ void TestMdsRowAndMdsCtx::get_snapshop_until_timeout() {
|
||||
ASSERT_EQ(OB_SUCCESS, row_.get_snapshot([&data_size](const ExampleUserData2 &data) {
|
||||
data_size = data.data_.length();
|
||||
return OB_SUCCESS;
|
||||
}, mock_scn(6), 0, 500_ms));// read snapshot
|
||||
}, mock_scn(6), 500_ms));// read snapshot
|
||||
ASSERT_EQ(6, data_size);
|
||||
ASSERT_EQ(OB_SUCCESS, row_.get_snapshot([&data_size](const ExampleUserData2 &data) {
|
||||
data_size = data.data_.length();
|
||||
return OB_SUCCESS;
|
||||
}, mock_scn(21), 0, 500_ms));// read snapshot
|
||||
}, mock_scn(21), 500_ms));// read snapshot
|
||||
ASSERT_EQ(20, data_size);
|
||||
}
|
||||
|
||||
void TestMdsRowAndMdsCtx::get_snapshop_disgard() {
|
||||
ASSERT_EQ(OB_SNAPSHOT_DISCARDED, row_.get_snapshot([](const ExampleUserData2 &data) {
|
||||
return OB_SUCCESS;
|
||||
}, mock_scn(1), 0, 500_ms));// read snapshot timeout
|
||||
}, mock_scn(1), 500_ms));// read snapshot timeout
|
||||
}
|
||||
|
||||
TEST_F(TestMdsRowAndMdsCtx, mds_row_set_element) { TestMdsRowAndMdsCtx::mds_row_set_element(); }
|
||||
|
@ -9,6 +9,7 @@
|
||||
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
|
||||
* See the Mulan PubL v2 for more details.
|
||||
*/
|
||||
#include "storage/multi_data_source/test/example_user_data_define.h"
|
||||
#define UNITTEST_DEBUG
|
||||
#include "lib/utility/utility.h"
|
||||
#include <gtest/gtest.h>
|
||||
@ -37,14 +38,23 @@
|
||||
#include "storage/multi_data_source/runtime_utility/mds_lock.h"
|
||||
#include "storage/tablet/ob_tablet_meta.h"
|
||||
#include "storage/multi_data_source/mds_table_iterator.h"
|
||||
#include "storage/tablet/ob_mds_schema_helper.h"
|
||||
namespace oceanbase {
|
||||
namespace storage
|
||||
{
|
||||
namespace mds
|
||||
{
|
||||
void *MdsAllocator::alloc(const int64_t size)
|
||||
{
|
||||
void *ptr = ob_malloc(size, "MDS");
|
||||
namespace storage {
|
||||
namespace mds {
|
||||
void *DefaultAllocator::alloc(const int64_t size) {
|
||||
void *ptr = std::malloc(size);// ob_malloc(size, "MDS");
|
||||
ATOMIC_INC(&alloc_times_);
|
||||
MDS_LOG(DEBUG, "alloc obj", KP(ptr), K(size), K(lbt()));
|
||||
return ptr;
|
||||
}
|
||||
void DefaultAllocator::free(void *ptr) {
|
||||
ATOMIC_INC(&free_times_);
|
||||
MDS_LOG(DEBUG, "free obj", KP(ptr), K(lbt()));
|
||||
std::free(ptr);// ob_free(ptr);
|
||||
}
|
||||
void *MdsAllocator::alloc(const int64_t size) {
|
||||
void *ptr = std::malloc(size);// ob_malloc(size, "MDS");
|
||||
ATOMIC_INC(&alloc_times_);
|
||||
MDS_LOG(DEBUG, "alloc obj", KP(ptr), K(size), K(lbt()));
|
||||
return ptr;
|
||||
@ -52,10 +62,10 @@ void *MdsAllocator::alloc(const int64_t size)
|
||||
void MdsAllocator::free(void *ptr) {
|
||||
ATOMIC_INC(&free_times_);
|
||||
MDS_LOG(DEBUG, "free obj", KP(ptr), K(lbt()));
|
||||
ob_free(ptr);
|
||||
}
|
||||
}
|
||||
std::free(ptr);// ob_free(ptr);
|
||||
}
|
||||
}}}
|
||||
namespace oceanbase {
|
||||
namespace unittest {
|
||||
|
||||
using namespace common;
|
||||
@ -67,12 +77,15 @@ using namespace transaction;
|
||||
class TestMdsTable: public ::testing::Test
|
||||
{
|
||||
public:
|
||||
TestMdsTable() {}
|
||||
TestMdsTable() {
|
||||
ObMdsSchemaHelper::get_instance().init();
|
||||
}
|
||||
virtual ~TestMdsTable() {}
|
||||
virtual void SetUp() {
|
||||
}
|
||||
virtual void TearDown() {
|
||||
}
|
||||
static void compare_binary_key();
|
||||
static void set();
|
||||
static void replay();
|
||||
static void get_latest();
|
||||
@ -99,7 +112,59 @@ MdsTableHandle &mds_table_ = mds_table_hanlder.mds_table_handle_;
|
||||
struct A { ObSpinLock lock_; };
|
||||
struct B { MdsLock lock_; };
|
||||
|
||||
#define GET_REAL_MDS_TABLE(mds_table) (*((MdsTableImpl<UnitTestMdsTable>*)(dynamic_cast<guard::LightDataBlock<MdsTableImpl<UnitTestMdsTable>>*>((mds_table.p_mds_table_base_.ctrl_ptr_->p_data_block_))->data_)))
|
||||
#define GET_REAL_MDS_TABLE(mds_table) (*((MdsTableImpl<UnitTestMdsTable>*)(static_cast<guard::LightDataBlock<MdsTableImpl<UnitTestMdsTable>>*>((mds_table.p_mds_table_base_.ctrl_ptr_->p_data_block_))->data_)))
|
||||
|
||||
static constexpr int64_t MAX_KEY_SERIALIZE_SIZE = 8;
|
||||
template <typename UnitKey>
|
||||
inline int test_compare_binary_key(const UnitKey &lhs, const UnitKey &rhs, int &compare_result) {
|
||||
int ret = OB_SUCCESS;
|
||||
uint8_t lhs_buffer[MAX_KEY_SERIALIZE_SIZE];
|
||||
uint8_t rhs_buffer[MAX_KEY_SERIALIZE_SIZE];
|
||||
memset(lhs_buffer, 0, MAX_KEY_SERIALIZE_SIZE);
|
||||
memset(rhs_buffer, 0, MAX_KEY_SERIALIZE_SIZE);
|
||||
int64_t pos1 = 0;
|
||||
int64_t pos2 = 0;
|
||||
if (OB_FAIL(lhs.mds_serialize((char *)lhs_buffer, MAX_KEY_SERIALIZE_SIZE, pos1))) {
|
||||
MDS_LOG(WARN, "fail to serialize lhs buffer", K(lhs), K(rhs));
|
||||
} else if (OB_FAIL(rhs.mds_serialize((char *)rhs_buffer, MAX_KEY_SERIALIZE_SIZE, pos2))) {
|
||||
MDS_LOG(WARN, "fail to serialize rhs buffer", K(lhs), K(rhs));
|
||||
} else {
|
||||
int64_t max_pos = std::max(pos1, pos2);
|
||||
int64_t idx = 0;
|
||||
for (; idx < max_pos && OB_SUCC(ret); ++idx) {
|
||||
if (lhs_buffer[idx] > rhs_buffer[idx]) {
|
||||
compare_result = 1;
|
||||
break;
|
||||
} else if (lhs_buffer[idx] < rhs_buffer[idx]) {
|
||||
compare_result = -1;
|
||||
break;
|
||||
} else {
|
||||
continue;
|
||||
}
|
||||
}
|
||||
if (idx == max_pos) {
|
||||
compare_result = 0;
|
||||
}
|
||||
MDS_LOG(DEBUG, "compare binary key", K(lhs), K(rhs), K(compare_result), K(lhs_buffer), K(rhs_buffer));
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
void TestMdsTable::compare_binary_key() {
|
||||
ExampleUserKey key1(0x1000000000000001);
|
||||
ExampleUserKey key2(0x1000000000000002);
|
||||
int comare_result = 0;
|
||||
ASSERT_EQ(OB_SUCCESS, unittest::test_compare_binary_key(key1, key2, comare_result));
|
||||
ASSERT_EQ(true, comare_result < 0);
|
||||
ASSERT_EQ(OB_SUCCESS, unittest::test_compare_binary_key(key2, key1, comare_result));
|
||||
ASSERT_EQ(true, comare_result > 0);
|
||||
|
||||
int64_t pos = 0;
|
||||
char buffer[8] = {0};
|
||||
key1.mds_serialize(buffer, 8, pos);
|
||||
pos = 0;
|
||||
key1.mds_deserialize(buffer, 8, pos);
|
||||
ASSERT_EQ(key1.value_, 0x1000000000000001);
|
||||
}
|
||||
|
||||
void TestMdsTable::set() {
|
||||
ASSERT_EQ(OB_SUCCESS, mds_table_.init<UnitTestMdsTable>(MdsAllocator::get_instance(), ObTabletID(1), share::ObLSID(1), (ObTabletPointer*)0x111));
|
||||
@ -190,7 +255,7 @@ void TestMdsTable::get_snapshot_hung_1s()
|
||||
ASSERT_EQ(OB_SUCCESS, mds_table_.get_snapshot<ExampleUserData1>([&data](const ExampleUserData1 &node_data) {
|
||||
data = node_data;
|
||||
return OB_SUCCESS;
|
||||
}, share::SCN::max_scn(), 0, 2_s));
|
||||
}, share::SCN::max_scn(), 2_s));
|
||||
ASSERT_LE(1_s, ObClockGenerator::getRealClock() - start_ts);
|
||||
ASSERT_EQ(1, data.value_);// read snapshot
|
||||
});
|
||||
@ -303,7 +368,7 @@ void TestMdsTable::standard_iterator() {
|
||||
for (auto &kv_row : *p_mds_unit) {
|
||||
MdsRLockGuard lg(kv_row.v_.lock_);// lock row
|
||||
for (auto &mds_node : kv_row.v_) {
|
||||
MDS_LOG(INFO, "print iter mds node", K(mds_node));
|
||||
MDS_LOG(INFO, "print iter mds node", K(kv_row.k_), K(mds_node));
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -337,7 +402,7 @@ void TestMdsTable::OB_iterator() {
|
||||
ASSERT_EQ(OB_SUCCESS, iter.init(mds_table_));
|
||||
ASSERT_EQ(OB_SUCCESS, iter.get_next(key, p_node));
|
||||
MDS_LOG(INFO, "print iter kv", K(key), K(*p_node));
|
||||
ASSERT_EQ(ExampleUserKey(1), key);
|
||||
ASSERT_EQ(ExampleUserKey(1).value_ == key.value_, true);
|
||||
{
|
||||
ASSERT_EQ(ExampleUserData1(2), p_node->user_data_);
|
||||
ASSERT_EQ(true, p_node->is_committed_());
|
||||
@ -345,14 +410,14 @@ void TestMdsTable::OB_iterator() {
|
||||
}
|
||||
ASSERT_EQ(OB_SUCCESS, iter.get_next(key, p_node));
|
||||
MDS_LOG(INFO, "print iter kv", K(key), K(*p_node));
|
||||
ASSERT_EQ(ExampleUserKey(1), key);
|
||||
ASSERT_EQ(ExampleUserKey(1).value_ == key.value_, true);
|
||||
{
|
||||
ASSERT_EQ(ExampleUserData1(1), p_node->user_data_);
|
||||
ASSERT_EQ(true, p_node->is_committed_());
|
||||
ASSERT_EQ(mock_scn(1), p_node->get_commit_version_());
|
||||
}
|
||||
ASSERT_EQ(OB_SUCCESS, iter.get_next(key, p_node));
|
||||
ASSERT_EQ(ExampleUserKey(2), key);
|
||||
ASSERT_EQ(ExampleUserKey(2).value_ == key.value_, true);
|
||||
ASSERT_EQ(OB_ITER_END, iter.get_next(key, p_node));
|
||||
}
|
||||
|
||||
@ -379,14 +444,14 @@ void TestMdsTable::test_flush() {
|
||||
int idx = 0;
|
||||
ASSERT_EQ(OB_SUCCESS, mds_table_.flush(mock_scn(300), mock_scn(500)));// 1. 以300为版本号进行flush动作
|
||||
ASSERT_EQ(mock_scn(199), mds_table_.p_mds_table_base_->flushing_scn_);// 2. 实际上以199为版本号进行flush动作
|
||||
ASSERT_EQ(OB_SUCCESS, mds_table_.for_each_unit_from_small_key_to_big_from_old_node_to_new_to_dump(
|
||||
ASSERT_EQ(OB_SUCCESS, (mds_table_.scan_all_nodes_to_dump<ScanRowOrder::ASC, ScanNodeOrder::FROM_OLD_TO_NEW>(
|
||||
[&idx](const MdsDumpKV &kv) -> int {// 2. 转储时扫描mds table
|
||||
MDS_ASSERT(kv.v_.end_scn_ < mock_scn(199));// 扫描时看不到199版本以上的提交
|
||||
MDS_ASSERT(idx < 10);
|
||||
MDS_LOG(INFO, "print dump node kv", K(kv));
|
||||
return OB_SUCCESS;
|
||||
}, 0, true)
|
||||
);
|
||||
));
|
||||
mds_table_.on_flush(mock_scn(199), OB_SUCCESS);// 3. 推大rec_scn【至少】到200
|
||||
share::SCN rec_scn;
|
||||
ASSERT_EQ(OB_SUCCESS, mds_table_.get_rec_scn(rec_scn));
|
||||
@ -448,6 +513,7 @@ void TestMdsTable::test_multi_key_remove() {
|
||||
ASSERT_EQ(OB_ENTRY_NOT_EXIST, ret);
|
||||
}
|
||||
|
||||
TEST_F(TestMdsTable, compare_binary_key) { TestMdsTable::compare_binary_key(); }
|
||||
TEST_F(TestMdsTable, set) { TestMdsTable::set(); }
|
||||
TEST_F(TestMdsTable, replay) { TestMdsTable::replay(); }
|
||||
TEST_F(TestMdsTable, get_latest) { TestMdsTable::get_latest(); }
|
||||
@ -514,7 +580,7 @@ TEST_F(TestMdsTable, test_recycle) {
|
||||
ASSERT_EQ(OB_SUCCESS, mds_table_.get_node_cnt(valid_cnt));
|
||||
ASSERT_EQ(1, valid_cnt);// 此时还有一个19001版本的已提交数据,因为rec_scn没有推上去
|
||||
ASSERT_EQ(OB_SUCCESS, mds_table_.flush(mock_scn(20000), mock_scn(40000)));
|
||||
mds_table_.for_each_unit_from_small_key_to_big_from_old_node_to_new_to_dump([](const MdsDumpKV &){
|
||||
mds_table_.scan_all_nodes_to_dump<ScanRowOrder::ASC, ScanNodeOrder::FROM_OLD_TO_NEW>([](const MdsDumpKV &){
|
||||
return OB_SUCCESS;
|
||||
}, 0, true);
|
||||
mds_table_.on_flush(mock_scn(40000), OB_SUCCESS);
|
||||
|
@ -42,6 +42,33 @@ static bool MDS_FLUSHER_ALLOW_ALLOC = true;
|
||||
#include "storage/ls/ob_ls.h"
|
||||
#include "storage/multi_data_source/mds_table_handle.h"
|
||||
#include "storage/multi_data_source/mds_table_order_flusher.h"
|
||||
#include "storage/tablet/ob_mds_schema_helper.h"
|
||||
namespace oceanbase {
|
||||
namespace storage {
|
||||
namespace mds {
|
||||
void *DefaultAllocator::alloc(const int64_t size) {
|
||||
void *ptr = std::malloc(size);// ob_malloc(size, "MDS");
|
||||
ATOMIC_INC(&alloc_times_);
|
||||
MDS_LOG(DEBUG, "alloc obj", KP(ptr), K(size), K(lbt()));
|
||||
return ptr;
|
||||
}
|
||||
void DefaultAllocator::free(void *ptr) {
|
||||
ATOMIC_INC(&free_times_);
|
||||
MDS_LOG(DEBUG, "free obj", KP(ptr), K(lbt()));
|
||||
std::free(ptr);// ob_free(ptr);
|
||||
}
|
||||
void *MdsAllocator::alloc(const int64_t size) {
|
||||
void *ptr = std::malloc(size);// ob_malloc(size, "MDS");
|
||||
ATOMIC_INC(&alloc_times_);
|
||||
MDS_LOG(DEBUG, "alloc obj", KP(ptr), K(size), K(lbt()));
|
||||
return ptr;
|
||||
}
|
||||
void MdsAllocator::free(void *ptr) {
|
||||
ATOMIC_INC(&free_times_);
|
||||
MDS_LOG(DEBUG, "free obj", KP(ptr), K(lbt()));
|
||||
std::free(ptr);// ob_free(ptr);
|
||||
}
|
||||
}}}
|
||||
namespace oceanbase {
|
||||
namespace storage
|
||||
{
|
||||
@ -50,18 +77,6 @@ share::SCN MOCK_MAX_CONSEQUENT_CALLBACKED_SCN;
|
||||
|
||||
namespace mds
|
||||
{
|
||||
void *MdsAllocator::alloc(const int64_t size)
|
||||
{
|
||||
void *ptr = ob_malloc(size, "MDS");
|
||||
ATOMIC_INC(&alloc_times_);
|
||||
MDS_LOG(DEBUG, "alloc obj", KP(ptr), K(size), K(lbt()));
|
||||
return ptr;
|
||||
}
|
||||
void MdsAllocator::free(void *ptr) {
|
||||
ATOMIC_INC(&free_times_);
|
||||
MDS_LOG(DEBUG, "free obj", KP(ptr), K(lbt()));
|
||||
ob_free(ptr);
|
||||
}
|
||||
|
||||
int MdsTableBase::get_ls_max_consequent_callbacked_scn_(share::SCN &max_consequent_callbacked_scn) const
|
||||
{
|
||||
@ -87,7 +102,7 @@ using namespace transaction;
|
||||
class TestMdsTableFlush: public ::testing::Test
|
||||
{
|
||||
public:
|
||||
TestMdsTableFlush() {}
|
||||
TestMdsTableFlush() { ObMdsSchemaHelper::get_instance().init(); }
|
||||
virtual ~TestMdsTableFlush() {}
|
||||
virtual void SetUp() {
|
||||
}
|
||||
@ -160,10 +175,10 @@ TEST_F(TestMdsTableFlush, normal_flush) {
|
||||
ASSERT_EQ(true, is_flusing);
|
||||
ASSERT_EQ(mock_scn(125), handle.p_mds_table_base_->flushing_scn_);
|
||||
int scan_cnt = 0;
|
||||
ASSERT_EQ(OB_SUCCESS, handle.for_each_unit_from_small_key_to_big_from_old_node_to_new_to_dump([&scan_cnt](const MdsDumpKV &kv) -> int {
|
||||
ASSERT_EQ(OB_SUCCESS, (handle.scan_all_nodes_to_dump<ScanRowOrder::ASC, ScanNodeOrder::FROM_OLD_TO_NEW>([&scan_cnt](const MdsDumpKV &kv) -> int {
|
||||
scan_cnt++;
|
||||
return OB_SUCCESS;
|
||||
}, 0, true));
|
||||
}, 0, true)));
|
||||
ASSERT_EQ(1, scan_cnt);
|
||||
handle.on_flush(mock_scn(125), OB_SUCCESS);
|
||||
ASSERT_EQ(OB_SUCCESS, handle.get_rec_scn(rec_scn));
|
||||
@ -184,10 +199,11 @@ TEST_F(TestMdsTableFlush, normal_flush) {
|
||||
ASSERT_EQ(true, is_flusing);
|
||||
ASSERT_EQ(mock_scn(249), handle.p_mds_table_base_->flushing_scn_);
|
||||
scan_cnt = 0;
|
||||
ASSERT_EQ(OB_SUCCESS, handle.for_each_unit_from_small_key_to_big_from_old_node_to_new_to_dump([&scan_cnt](const MdsDumpKV &kv) -> int {
|
||||
auto scan_op = [&scan_cnt](const MdsDumpKV &kv) -> int {
|
||||
scan_cnt++;
|
||||
return OB_SUCCESS;
|
||||
}, 0, true));
|
||||
};
|
||||
ASSERT_EQ(OB_SUCCESS, (handle.scan_all_nodes_to_dump<ScanRowOrder::ASC, ScanNodeOrder::FROM_OLD_TO_NEW>(scan_op, 0, true)));
|
||||
ASSERT_EQ(1, scan_cnt);
|
||||
handle.on_flush(mock_scn(249), OB_SUCCESS);
|
||||
ASSERT_EQ(OB_SUCCESS, handle.get_rec_scn(rec_scn));
|
||||
@ -200,10 +216,11 @@ TEST_F(TestMdsTableFlush, normal_flush) {
|
||||
ASSERT_EQ(true, is_flusing);
|
||||
ASSERT_EQ(mock_scn(499), handle.p_mds_table_base_->flushing_scn_);
|
||||
scan_cnt = 0;
|
||||
ASSERT_EQ(OB_SUCCESS, handle.for_each_unit_from_small_key_to_big_from_old_node_to_new_to_dump([&scan_cnt](const MdsDumpKV &kv) -> int {
|
||||
|
||||
ASSERT_EQ(OB_SUCCESS, (handle.scan_all_nodes_to_dump<ScanRowOrder::ASC, ScanNodeOrder::FROM_OLD_TO_NEW>([&scan_cnt](const MdsDumpKV &kv) -> int {
|
||||
scan_cnt++;
|
||||
return OB_SUCCESS;
|
||||
}, 0, true));
|
||||
}, 0, true)));
|
||||
ASSERT_EQ(2, scan_cnt);
|
||||
handle.on_flush(mock_scn(499), OB_SUCCESS);
|
||||
ASSERT_EQ(OB_SUCCESS, handle.get_rec_scn(rec_scn));
|
||||
|
@ -25,6 +25,33 @@
|
||||
#include "storage/ls/ob_ls.h"
|
||||
#include "storage/multi_data_source/mds_table_handle.h"
|
||||
#include "storage/multi_data_source/mds_table_order_flusher.h"
|
||||
#include "storage/tablet/ob_mds_schema_helper.h"
|
||||
namespace oceanbase {
|
||||
namespace storage {
|
||||
namespace mds {
|
||||
void *DefaultAllocator::alloc(const int64_t size) {
|
||||
void *ptr = std::malloc(size);// ob_malloc(size, "MDS");
|
||||
ATOMIC_INC(&alloc_times_);
|
||||
MDS_LOG(DEBUG, "alloc obj", KP(ptr), K(size), K(lbt()));
|
||||
return ptr;
|
||||
}
|
||||
void DefaultAllocator::free(void *ptr) {
|
||||
ATOMIC_INC(&free_times_);
|
||||
MDS_LOG(DEBUG, "free obj", KP(ptr), K(lbt()));
|
||||
std::free(ptr);// ob_free(ptr);
|
||||
}
|
||||
void *MdsAllocator::alloc(const int64_t size) {
|
||||
void *ptr = std::malloc(size);// ob_malloc(size, "MDS");
|
||||
ATOMIC_INC(&alloc_times_);
|
||||
MDS_LOG(DEBUG, "alloc obj", KP(ptr), K(size), K(lbt()));
|
||||
return ptr;
|
||||
}
|
||||
void MdsAllocator::free(void *ptr) {
|
||||
ATOMIC_INC(&free_times_);
|
||||
MDS_LOG(DEBUG, "free obj", KP(ptr), K(lbt()));
|
||||
std::free(ptr);// ob_free(ptr);
|
||||
}
|
||||
}}}
|
||||
using namespace std;
|
||||
|
||||
oceanbase::common::ObPromise<void> PROMISE1, PROMISE2;
|
||||
@ -51,18 +78,6 @@ int MdsTableImpl<UnitTestMdsTable>::flush(share::SCN need_advanced_rec_scn_lower
|
||||
V_ActualDoFlushKey.push_back({tablet_id_, rec_scn_});
|
||||
return OB_SUCCESS;
|
||||
}
|
||||
void *MdsAllocator::alloc(const int64_t size)
|
||||
{
|
||||
void *ptr = ob_malloc(size, "MDS");
|
||||
ATOMIC_INC(&alloc_times_);
|
||||
MDS_LOG(DEBUG, "alloc obj", KP(ptr), K(size), K(lbt()));
|
||||
return ptr;
|
||||
}
|
||||
void MdsAllocator::free(void *ptr) {
|
||||
ATOMIC_INC(&free_times_);
|
||||
MDS_LOG(DEBUG, "free obj", KP(ptr), K(lbt()));
|
||||
ob_free(ptr);
|
||||
}
|
||||
void *MdsFlusherModulePageAllocator::alloc(const int64_t size, const ObMemAttr &attr) {
|
||||
void *ret = nullptr;
|
||||
if (!NEED_ALLOC_FAIL) {
|
||||
@ -106,7 +121,7 @@ static constexpr int64_t TEST_ALL_SIZE = FLUSH_FOR_ALL_SIZE * 10;
|
||||
class TestMdsTableFlush: public ::testing::Test
|
||||
{
|
||||
public:
|
||||
TestMdsTableFlush() {}
|
||||
TestMdsTableFlush() { ObMdsSchemaHelper::get_instance().init(); }
|
||||
virtual ~TestMdsTableFlush() {}
|
||||
virtual void SetUp() { V_ActualDoFlushKey.clear(); NEED_ALLOC_FAIL = false; NEED_ALLOC_FAIL_AFTER_RESERVE = false; }
|
||||
virtual void TearDown() { }
|
||||
|
@ -20,31 +20,43 @@
|
||||
#include "lib/guard/ob_light_shared_gaurd.h"
|
||||
#include "storage/multi_data_source/mds_table_impl.h"
|
||||
#include "storage/tablet/ob_tablet_meta.h"
|
||||
#include "storage/tablet/ob_mds_schema_helper.h"
|
||||
namespace oceanbase {
|
||||
namespace storage {
|
||||
namespace mds {
|
||||
void *DefaultAllocator::alloc(const int64_t size) {
|
||||
void *ptr = std::malloc(size);// ob_malloc(size, "MDS");
|
||||
ATOMIC_INC(&alloc_times_);
|
||||
MDS_LOG(DEBUG, "alloc obj", KP(ptr), K(size), K(lbt()));
|
||||
return ptr;
|
||||
}
|
||||
void DefaultAllocator::free(void *ptr) {
|
||||
ATOMIC_INC(&free_times_);
|
||||
MDS_LOG(DEBUG, "free obj", KP(ptr), K(lbt()));
|
||||
std::free(ptr);// ob_free(ptr);
|
||||
}
|
||||
void *MdsAllocator::alloc(const int64_t size) {
|
||||
void *ptr = std::malloc(size);// ob_malloc(size, "MDS");
|
||||
ATOMIC_INC(&alloc_times_);
|
||||
MDS_LOG(DEBUG, "alloc obj", KP(ptr), K(size), K(lbt()));
|
||||
return ptr;
|
||||
}
|
||||
void MdsAllocator::free(void *ptr) {
|
||||
ATOMIC_INC(&free_times_);
|
||||
MDS_LOG(DEBUG, "free obj", KP(ptr), K(lbt()));
|
||||
std::free(ptr);// ob_free(ptr);
|
||||
}
|
||||
}}}
|
||||
|
||||
namespace oceanbase
|
||||
{
|
||||
//using namespace share;
|
||||
namespace storage
|
||||
{
|
||||
namespace mds
|
||||
{
|
||||
void *MdsAllocator::alloc(const int64_t size)
|
||||
{
|
||||
void *ptr = ob_malloc(size, "MDS");
|
||||
ATOMIC_INC(&alloc_times_);
|
||||
MDS_LOG(DEBUG, "alloc obj", KP(ptr), K(lbt()));
|
||||
return ptr;
|
||||
}
|
||||
void MdsAllocator::free(void *ptr) {
|
||||
ATOMIC_INC(&free_times_);
|
||||
MDS_LOG(DEBUG, "free obj", KP(ptr), K(lbt()));
|
||||
ob_free(ptr);
|
||||
}
|
||||
}
|
||||
class TestMdsTableHandle : public ::testing::Test
|
||||
{
|
||||
public:
|
||||
TestMdsTableHandle() {}
|
||||
TestMdsTableHandle() { ObMdsSchemaHelper::get_instance().init(); }
|
||||
virtual ~TestMdsTableHandle() = default;
|
||||
virtual void SetUp() override {}
|
||||
virtual void TearDown() override {}
|
||||
|
@ -25,6 +25,33 @@
|
||||
#include "storage/multi_data_source/mds_row.h"
|
||||
#include "storage/multi_data_source/mds_unit.h"
|
||||
#include "example_user_helper_define.cpp"
|
||||
#include "storage/tablet/ob_mds_schema_helper.h"
|
||||
namespace oceanbase {
|
||||
namespace storage {
|
||||
namespace mds {
|
||||
void *DefaultAllocator::alloc(const int64_t size) {
|
||||
void *ptr = std::malloc(size);// ob_malloc(size, "MDS");
|
||||
ATOMIC_INC(&alloc_times_);
|
||||
MDS_LOG(DEBUG, "alloc obj", KP(ptr), K(size), K(lbt()));
|
||||
return ptr;
|
||||
}
|
||||
void DefaultAllocator::free(void *ptr) {
|
||||
ATOMIC_INC(&free_times_);
|
||||
MDS_LOG(DEBUG, "free obj", KP(ptr), K(lbt()));
|
||||
std::free(ptr);// ob_free(ptr);
|
||||
}
|
||||
void *MdsAllocator::alloc(const int64_t size) {
|
||||
void *ptr = std::malloc(size);// ob_malloc(size, "MDS");
|
||||
ATOMIC_INC(&alloc_times_);
|
||||
MDS_LOG(DEBUG, "alloc obj", KP(ptr), K(size), K(lbt()));
|
||||
return ptr;
|
||||
}
|
||||
void MdsAllocator::free(void *ptr) {
|
||||
ATOMIC_INC(&free_times_);
|
||||
MDS_LOG(DEBUG, "free obj", KP(ptr), K(lbt()));
|
||||
std::free(ptr);// ob_free(ptr);
|
||||
}
|
||||
}}}
|
||||
namespace oceanbase {
|
||||
namespace unittest {
|
||||
|
||||
@ -36,7 +63,7 @@ using namespace mds;
|
||||
class TestMdsUnit: public ::testing::Test
|
||||
{
|
||||
public:
|
||||
TestMdsUnit() {};
|
||||
TestMdsUnit() { ObMdsSchemaHelper::get_instance().init(); };
|
||||
virtual ~TestMdsUnit() {};
|
||||
virtual void SetUp() {
|
||||
};
|
||||
|
Reference in New Issue
Block a user