Files
oceanbase/mittest/mtlenv/storage/test_shared_block_reader_writer.cpp
2024-12-12 12:14:48 +00:00

576 lines
20 KiB
C++

// owner: gaishun.gs
// owner group: storage
/**
* Copyright (c) 2022 OceanBase
* OceanBase CE is licensed under Mulan PubL v2.
* You can use this software according to the terms and conditions of the Mulan PubL v2.
* You may obtain a copy of Mulan PubL v2 at:
* http://license.coscl.org.cn/MulanPubL-2.0
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
* See the Mulan PubL v2 for more details.
*/
#include <errno.h>
#include <gtest/gtest.h>
#define protected public
#define private public
#define OK(ass) ASSERT_EQ(OB_SUCCESS, (ass))
#include "storage/blockstore/ob_shared_object_reader_writer.h"
#include "share/io/ob_io_define.h"
#include "share/io/ob_io_manager.h"
#include "mittest/mtlenv/mock_tenant_module_env.h"
#include "storage/meta_mem/ob_storage_meta_cache.h"
#include "storage/blocksstable/ob_sstable.h"
#include "storage/tablet/ob_tablet_create_delete_helper.h"
#include "share/ob_simple_mem_limit_getter.h"
#include "storage/blocksstable/ob_storage_cache_suite.h"
#include "storage/tablet/ob_tablet.h"
#include "storage/blocksstable/ob_object_manager.h"
namespace oceanbase
{
using namespace common;
using namespace blocksstable;
using namespace storage;
static ObSimpleMemLimitGetter getter;
namespace unittest
{
class TestSharedBlockRWriter : public::testing::Test
{
public:
TestSharedBlockRWriter() {}
virtual ~TestSharedBlockRWriter() = default;
static void SetUpTestCase();
static void TearDownTestCase();
virtual void SetUp();
virtual void TearDown();
void create_empty_sstable(ObSSTable &empty_sstable);
ObArenaAllocator allocator_;
};
void TestSharedBlockRWriter::SetUpTestCase()
{
STORAGE_LOG(INFO, "SetUpTestCase");
EXPECT_EQ(OB_SUCCESS, MockTenantModuleEnv::get_instance().init());
}
void TestSharedBlockRWriter::TearDownTestCase()
{
MockTenantModuleEnv::get_instance().destroy();
}
void TestSharedBlockRWriter::SetUp()
{
int ret = OB_SUCCESS;
}
void TestSharedBlockRWriter::TearDown()
{
}
void TestSharedBlockRWriter::create_empty_sstable(ObSSTable &empty_sstable)
{
ObTabletCreateSSTableParam param;
param.set_init_value_for_column_store_();
param.encrypt_id_ = 0;
param.master_key_id_ = 0;
MEMSET(param.encrypt_key_, 0, share::OB_MAX_TABLESPACE_ENCRYPT_KEY_LENGTH);
param.table_key_.table_type_ = ObITable::TableType::MAJOR_SSTABLE;
param.table_key_.tablet_id_ = 10;
param.table_key_.version_range_.snapshot_version_ = 1;
param.max_merged_trans_version_ = 1;
param.schema_version_ = 1;
param.create_snapshot_version_ = 0;
param.progressive_merge_round_ = 0;
param.progressive_merge_step_ = 0;
ObTableMode table_mode;
param.table_mode_ = table_mode;
param.index_type_ = ObIndexType::INDEX_TYPE_IS_NOT;
param.rowkey_column_cnt_ = 1;
param.root_block_addr_.set_none_addr();
param.data_block_macro_meta_addr_.set_none_addr();
param.root_row_store_type_ = ObRowStoreType::FLAT_ROW_STORE;
param.latest_row_store_type_ = ObRowStoreType::FLAT_ROW_STORE;
param.data_index_tree_height_ = 0;
param.index_blocks_cnt_ = 0;
param.data_blocks_cnt_ = 0;
param.micro_block_cnt_ = 0;
param.use_old_macro_block_count_ = 0;
param.data_checksum_ = 0;
param.occupy_size_ = 0;
param.ddl_scn_.set_min();
param.filled_tx_scn_.set_min();
param.tx_data_recycle_scn_.set_min();
param.original_size_ = 0;
param.ddl_scn_.set_min();
param.compressor_type_ = ObCompressorType::NONE_COMPRESSOR;
param.column_cnt_ = 1;
param.table_backup_flag_.reset();
param.table_shared_flag_.reset();
param.sstable_logic_seq_ = 0;
param.row_count_ = 0;
param.recycle_version_ = 0;
param.root_macro_seq_ = 0;
param.nested_size_ = 0;
param.nested_offset_ = 0;
OK(ObSSTableMergeRes::fill_column_checksum_for_empty_major(param.column_cnt_, param.column_checksums_));
OK(empty_sstable.init(param, &allocator_));
}
TEST_F(TestSharedBlockRWriter, test_rwrite_easy_block)
{
ObSharedObjectReaderWriter rwriter;
OK(rwriter.init(true/*need align*/, false/*need cross*/));
ObMetaDiskAddr addr;
MacroBlockId macro_id;
ObMacroBlockCommonHeader common_header;
common_header.reset();
common_header.set_attr(ObMacroBlockCommonHeader::MacroBlockType::SharedMetaData);
const int64_t header_size = common_header.get_serialize_size();
ASSERT_EQ(rwriter.offset_, header_size);
ASSERT_TRUE(rwriter.hanging_);
ASSERT_EQ(rwriter.data_.pos_, header_size);
char s[10] = "";
int test_round = 10;
while (test_round--) {
ObSharedObjectWriteInfo write_info;
ObSharedObjectWriteHandle write_handle;
ObSharedObjectReadHandle read_handle(allocator_);
for (int i = 0; i < 10; ++i) {
s[i] = '0' + test_round;
}
write_info.buffer_ = s;
write_info.offset_ = 0;
write_info.size_ = 10;
write_info.io_desc_.set_wait_event(ObWaitEventIds::DB_FILE_COMPACT_WRITE);
blocksstable::ObStorageObjectOpt curr_opt;
curr_opt.set_private_object_opt();
OK(rwriter.async_write(write_info, curr_opt, write_handle));
ASSERT_TRUE(write_handle.is_valid());
ObSharedObjectsWriteCtx write_ctx;
ObSharedObjectReadInfo read_info;
OK(write_handle.get_write_ctx(write_ctx));
if (test_round == 9) {
macro_id = write_ctx.addr_.block_id();
}
read_info.addr_ = write_ctx.addr_;
ASSERT_TRUE(read_info.addr_.is_raw_block());
read_info.io_desc_.set_wait_event(ObWaitEventIds::DB_FILE_COMPACT_READ);
OK(rwriter.async_read(read_info, read_handle));
char *buf = nullptr;
int64_t buf_len = 0;
OK(read_handle.get_data(allocator_, buf, buf_len));
ASSERT_EQ(10, buf_len);
for (int64_t i = 0; i < buf_len; ++i) {
ASSERT_EQ(buf[i], '0' + test_round);
}
}
// test check shared blk
ObMacroBlockReadInfo read_info;
ObMacroBlockHandle macro_handle;
read_info.macro_block_id_ = macro_id;
read_info.offset_ = 0;
read_info.size_ = (2L << 20);
read_info.io_desc_.set_wait_event(ObWaitEventIds::DB_FILE_COMPACT_READ);
read_info.buf_ = static_cast<char *>(allocator_.alloc(read_info.size_));
OK(ObBlockManager::async_read_block(read_info, macro_handle));
OK(macro_handle.wait());
OK((ObSSTableMacroBlockChecker::check(macro_handle.get_buffer(),
macro_handle.get_data_size(), ObMacroBlockCheckLevel::CHECK_LEVEL_PHYSICAL)));
}
TEST_F(TestSharedBlockRWriter, test_batch_write_easy_block)
{
ObSharedObjectReaderWriter rwriter;
OK(rwriter.init(true/*need align*/, false/*need cross*/));
int test_round = 10;
ObSharedObjectWriteInfo write_info;
ObArray<ObSharedObjectWriteInfo> write_infos;
char s[10][100];
for (int i = 0; i < test_round; ++i) {
for (int j = 0; j <10; ++j) {
s[i][j] = '0' + i;
}
write_info.buffer_ = s[i];
write_info.offset_ = 0;
write_info.size_ = 10;
write_info.io_desc_.set_wait_event(ObWaitEventIds::DB_FILE_COMPACT_WRITE);
write_infos.push_back(write_info);
}
ObSharedObjectBatchHandle write_handle;
blocksstable::ObStorageObjectOpt curr_opt;
curr_opt.set_private_object_opt();
OK(rwriter.async_batch_write(write_infos, write_handle, curr_opt));
ASSERT_TRUE(write_handle.is_valid());
ObArray<ObSharedObjectsWriteCtx> write_ctxs;
OK(write_handle.batch_get_write_ctx(write_ctxs));
ASSERT_EQ(test_round, write_ctxs.count());
for (int i = 0; i < test_round; ++i) {
ObSharedObjectReadInfo read_info;
ObSharedObjectReadHandle read_handle(allocator_);
read_info.addr_ = write_ctxs.at(i).addr_;
read_info.io_desc_.set_wait_event(ObWaitEventIds::DB_FILE_COMPACT_READ);
OK(rwriter.async_read(read_info, read_handle));
char *buf = nullptr;
int64_t buf_len = 0;
OK(read_handle.get_data(allocator_, buf, buf_len));
ASSERT_EQ(10, buf_len);
for (int64_t j = 0; j < buf_len; ++j) {
ASSERT_EQ(buf[j], '0' + i);
}
}
}
TEST_F(TestSharedBlockRWriter, test_link_write)
{
ObSharedObjectReaderWriter rwriter;
OK(rwriter.init(true/*need align*/, false/*need cross*/));
int test_round = 10;
ObSharedObjectWriteInfo write_info;
ObSharedObjectLinkHandle write_handle;
char s[10];
for (int i = 0; i < test_round; ++i) {
for (int j = 0; j < 10; ++j) {
s[j] = '0' + i;
}
write_info.buffer_ = s;
write_info.offset_ = 0;
write_info.size_ = 10;
write_info.io_desc_.set_wait_event(ObWaitEventIds::DB_FILE_COMPACT_WRITE);
blocksstable::ObStorageObjectOpt curr_opt;
curr_opt.set_private_object_opt();
OK(rwriter.async_link_write(write_info, curr_opt, write_handle));
ASSERT_TRUE(write_handle.is_valid());
}
ObSharedObjectsWriteCtx write_ctx;
ObSharedObjectLinkIter iter;
OK(write_handle.get_write_ctx(write_ctx));
OK(iter.init(write_ctx.addr_));
char *buf = nullptr;
int64_t buf_len = 0;
for (int i = 0; i < test_round; ++i) {
OK(iter.get_next_block(allocator_, buf, buf_len));
ASSERT_EQ(10, buf_len);
for (int64_t j = 0; j < buf_len; ++j) {
ASSERT_EQ(buf[j], '0'+9-i);
}
}
ASSERT_EQ(OB_ITER_END, iter.get_next_block(allocator_, buf, buf_len));
}
TEST_F(TestSharedBlockRWriter, test_cb_single_write)
{
ObSharedObjectReaderWriter rwriter;
OK(rwriter.init(true/*need align*/, false/*need cross*/));
ObSSTable empty_sstable;
create_empty_sstable(empty_sstable);
const int64_t sstable_size = empty_sstable.get_serialize_size();
char *sstable_buf = static_cast<char *>(allocator_.alloc(sstable_size));
int64_t pos = 0;
OK(empty_sstable.serialize(sstable_buf, sstable_size, pos));
ObSharedObjectWriteInfo write_info;
ObSharedObjectWriteHandle write_handle;
write_info.buffer_ = sstable_buf;
write_info.offset_ = 0;
write_info.size_ = sstable_size;
write_info.io_desc_.set_wait_event(ObWaitEventIds::DB_FILE_COMPACT_WRITE);
blocksstable::ObStorageObjectOpt curr_opt;
curr_opt.set_private_object_opt();
OK(rwriter.async_write(write_info, curr_opt, write_handle));
ASSERT_TRUE(write_handle.is_valid());
ObSharedObjectsWriteCtx write_ctx;
OK(write_handle.get_write_ctx(write_ctx));
ObStorageMetaHandle meta_handle;
OK(meta_handle.cache_handle_.new_value(allocator_));
const ObStorageMetaValue::MetaType meta_type = ObStorageMetaValue::SSTABLE;
ObTablet tablet;
ObTablet *fake_tablet = &tablet;
uint64_t tenant_id = 1;
ObStorageMetaKey meta_key;
meta_key.tenant_id_ = tenant_id;
meta_key.phy_addr_ = write_ctx.addr_;
void *callback_buf = nullptr;
ObStorageMetaCache::ObStorageMetaIOCallback *cb = nullptr;
callback_buf = allocator_.alloc(sizeof(ObStorageMetaCache::ObStorageMetaIOCallback));
ASSERT_NE(nullptr, callback_buf);
cb = new (callback_buf) ObStorageMetaCache::ObStorageMetaIOCallback(&allocator_,
meta_type,
meta_key,
meta_handle.cache_handle_,
fake_tablet,
nullptr);
ObSharedObjectReadInfo read_info;
ObSharedObjectReadHandle read_handle;
read_info.addr_ = write_ctx.addr_;
read_info.io_desc_.set_wait_event(ObWaitEventIds::DB_FILE_COMPACT_READ);
read_info.io_callback_ = cb;
OK(rwriter.async_read(read_info, read_handle));
OK(read_handle.wait());
// test the buf of read handle
char *buf = nullptr;
int64_t buf_len = 0;
OK(read_handle.get_data(allocator_, buf, buf_len));
ASSERT_EQ(sstable_size, buf_len);
}
TEST_F(TestSharedBlockRWriter, test_cb_batch_write)
{
ObSharedObjectReaderWriter rwriter;
OK(rwriter.init(true/*need align*/, false/*need cross*/));
int test_round = 10;
ObSharedObjectWriteInfo write_info;
ObArray<ObSharedObjectWriteInfo> write_infos;
char s[10][100];
for (int i = 0; i < test_round; ++i) {
for (int j = 0; j <10; ++j) {
s[i][j] = '0' + i;
}
write_info.buffer_ = s[i];
write_info.offset_ = 0;
write_info.size_ = 10;
write_info.io_desc_.set_wait_event(ObWaitEventIds::DB_FILE_COMPACT_WRITE);
write_infos.push_back(write_info);
}
ObSSTable empty_sstable;
create_empty_sstable(empty_sstable);
const int64_t sstable_size = empty_sstable.get_serialize_size();
char *sstable_buf = static_cast<char *>(allocator_.alloc(sstable_size));
int64_t pos = 0;
OK(empty_sstable.serialize(sstable_buf, sstable_size, pos));
write_info.buffer_ = sstable_buf;
write_info.offset_ = 0;
write_info.size_ = sstable_size;
write_info.io_desc_.set_wait_event(ObWaitEventIds::DB_FILE_COMPACT_WRITE);
write_infos.push_back(write_info);
ObSharedObjectBatchHandle write_handle;
blocksstable::ObStorageObjectOpt curr_opt;
curr_opt.set_private_object_opt();
OK(rwriter.async_batch_write(write_infos, write_handle, curr_opt));
ASSERT_TRUE(write_handle.is_valid());
ObArray<ObSharedObjectsWriteCtx> write_ctxs;
OK(write_handle.batch_get_write_ctx(write_ctxs));
ASSERT_EQ(test_round + 1, write_ctxs.count());
ObStorageMetaHandle meta_handle;
OK(meta_handle.cache_handle_.new_value(allocator_));
const ObStorageMetaValue::MetaType meta_type = ObStorageMetaValue::SSTABLE;
ObTablet tablet;
ObTablet *fake_tablet = &tablet;
uint64_t tenant_id = 1;
ObStorageMetaKey meta_key;
meta_key.tenant_id_ = tenant_id;
meta_key.phy_addr_ = write_ctxs[test_round].addr_;
void *callback_buf = nullptr;
ObStorageMetaCache::ObStorageMetaIOCallback *cb = nullptr;
callback_buf = allocator_.alloc(sizeof(ObStorageMetaCache::ObStorageMetaIOCallback));
ASSERT_NE(nullptr, callback_buf);
cb = new (callback_buf) ObStorageMetaCache::ObStorageMetaIOCallback(&allocator_,
meta_type,
meta_key,
meta_handle.cache_handle_,
fake_tablet,
nullptr);
ObSharedObjectReadInfo read_info;
ObSharedObjectReadHandle read_handle;
read_info.addr_ = write_ctxs[test_round].addr_;
read_info.io_desc_.set_wait_event(ObWaitEventIds::DB_FILE_COMPACT_READ);
read_info.io_callback_ = cb;
OK(rwriter.async_read(read_info, read_handle));
OK(read_handle.wait());
// test the buf of read handle
char *buf = nullptr;
int64_t buf_len = 0;
OK(read_handle.get_data(allocator_, buf, buf_len));
ASSERT_EQ(sstable_size, buf_len);
}
TEST_F(TestSharedBlockRWriter, test_parse_data_from_object)
{
ObSharedObjectReaderWriter rwriter;
OK(rwriter.init(true/*need align*/, false/*need cross*/));
int test_round = 10;
ObSharedObjectWriteInfo write_info;
ObArray<ObSharedObjectWriteInfo> write_infos;
char s[10][20];
for (int i = 0; i < test_round; ++i) {
for (int j = 0; j < 20; ++j) {
s[i][j] = '0' + i;
}
write_info.buffer_ = s[i];
write_info.offset_ = 0;
write_info.size_ = 20;
write_info.io_desc_.set_wait_event(ObWaitEventIds::DB_FILE_COMPACT_WRITE);
write_infos.push_back(write_info);
}
ObSharedObjectBatchHandle write_handle;
blocksstable::ObStorageObjectOpt curr_opt;
curr_opt.set_private_object_opt();
OK(rwriter.async_batch_write(write_infos, write_handle, curr_opt));
ASSERT_TRUE(write_handle.is_valid());
ObArray<ObSharedObjectsWriteCtx> write_ctxs;
OK(write_handle.batch_get_write_ctx(write_ctxs));
ASSERT_EQ(test_round, write_ctxs.count());
MacroBlockId block_id = write_ctxs[0].addr_.block_id();
ObStorageObjectReadInfo read_info;
ObStorageObjectHandle object_handle;
const int64_t io_buf_size = OB_STORAGE_OBJECT_MGR.get_macro_object_size();
read_info.offset_ = 0;
read_info.size_ = io_buf_size;
read_info.io_desc_.set_mode(ObIOMode::READ);
read_info.io_desc_.set_wait_event(ObWaitEventIds::DB_FILE_COMPACT_READ);
read_info.io_desc_.set_sys_module_id(ObIOModule::SHARED_BLOCK_RW_IO);
read_info.macro_block_id_ = block_id;
read_info.buf_ = static_cast<char *>(allocator_.alloc(io_buf_size));
read_info.mtl_tenant_id_ = MTL_ID();
OK(ObObjectManager::read_object(read_info, object_handle));
char *buf = nullptr;
int64_t buf_len = 0;
for (int i = 0; i < test_round; ++i) {
OK(ObSharedObjectReaderWriter::parse_data_from_object(object_handle, write_ctxs[i].addr_, buf, buf_len));
ASSERT_EQ(20, buf_len);
ASSERT_EQ(0, MEMCMP(s[i], buf, 20));
}
}
TEST_F(TestSharedBlockRWriter, test_batch_write_switch_block)
{
// test switch block when batch write, which means hanging_=true
ObSharedObjectReaderWriter rwriter;
OK(rwriter.init(true/*need align*/, false/*need cross*/));
ObSharedObjectWriteInfo write_info;
ObArray<ObSharedObjectWriteInfo> write_infos;
int64_t data_size = 3L << 10; // 3K
char s[3L << 10] = "";
for (int i = 0; i < data_size; ++i) {
s[i] = '0' + (i % 10);
}
write_info.buffer_ = s;
write_info.offset_ = 0;
write_info.size_ = data_size;
write_info.io_desc_.set_wait_event(ObWaitEventIds::DB_FILE_COMPACT_WRITE);
OK(write_infos.push_back(write_info));
OK(write_infos.push_back(write_info));
ObMacroBlockCommonHeader common_header;
common_header.reset();
common_header.set_attr(ObMacroBlockCommonHeader::MacroBlockType::SharedMetaData);
const int64_t header_size = common_header.get_serialize_size();
ASSERT_EQ(rwriter.data_.pos_, header_size);
rwriter.offset_ = (2L << 20) - (4L << 10); // 2M - 4K
rwriter.align_offset_ = rwriter.offset_;
rwriter.data_.advance(rwriter.offset_ - header_size);
ObSharedObjectBatchHandle write_handle;
blocksstable::ObStorageObjectOpt curr_opt;
curr_opt.set_private_object_opt();
OK(rwriter.async_batch_write(write_infos, write_handle, curr_opt));
ASSERT_TRUE(write_handle.is_valid());
ObArray<ObSharedObjectsWriteCtx> write_ctxs;
OK(write_handle.batch_get_write_ctx(write_ctxs));
ASSERT_EQ(write_infos.count(), write_ctxs.count());
for (int i = 0; i < write_ctxs.count(); ++i) {
ObSharedObjectReadInfo read_info;
ObSharedObjectReadHandle read_handle(allocator_);
read_info.addr_ = write_ctxs.at(i).addr_;
read_info.io_desc_.set_wait_event(ObWaitEventIds::DB_FILE_COMPACT_READ);
OK(rwriter.async_read(read_info, read_handle));
char *buf = nullptr;
int64_t buf_len = 0;
OK(read_handle.get_data(allocator_, buf, buf_len));
ASSERT_EQ(data_size, buf_len);
for (int64_t j = 0; j < data_size; ++j) {
ASSERT_EQ(buf[j], '0' + (j % 10));
}
}
}
TEST_F(TestSharedBlockRWriter, test_batch_write_bug1)
{
/* batch_write_bug1:
The store_size of the last block in batch_write is not aligned to 4K
when hanging_ = true && we need switch_block.
This may result in 4016 read error of other blocks flushed by next async_write.
*/
ObSharedObjectReaderWriter rwriter;
OK(rwriter.init(true/*need align*/, false/*need cross*/));
rwriter.offset_ = (2L << 20) - 10; // nearly to 2M, ready to switch block
rwriter.align_offset_ = (2L << 20) - 4096;
rwriter.hanging_ = true;
ObSharedObjectWriteInfo write_info; // the last block
int64_t data_size = 1L << 10; // 1K
char s[1L << 10] = "";
for (int i = 0; i < data_size; ++i) {
s[i] = '0' + (i % 10);
}
write_info.buffer_ = s;
write_info.offset_ = 0;
write_info.size_ = data_size;
write_info.io_desc_.set_wait_event(ObWaitEventIds::DB_FILE_COMPACT_WRITE);
blocksstable::ObStorageObjectOpt curr_opt;
curr_opt.set_private_object_opt();
ObSharedObjectWriteHandle shared_obj_handle;
OK(rwriter.async_write(write_info, curr_opt, shared_obj_handle));
ObSharedObjectsWriteCtx write_ctx;
OK(shared_obj_handle.get_write_ctx(write_ctx));
const ObMetaDiskAddr &addr = write_ctx.addr_;
ObMacroBlockCommonHeader common_header;
common_header.reset();
common_header.set_attr(ObMacroBlockCommonHeader::MacroBlockType::SharedMetaData);
const int64_t header_size = common_header.get_serialize_size();
ASSERT_EQ(addr.offset_, 0 + header_size);
ASSERT_EQ(addr.size_, data_size);
ASSERT_EQ(rwriter.offset_, 4096);
ASSERT_EQ(rwriter.align_offset_, 4096);
}
}//end namespace unittest
}//end namespace oceanbase
int main(int argc, char **argv)
{
system("rm -f test_shared_block_reader_writer.log*");
oceanbase::common::ObLogger::get_logger().set_log_level("INFO");
OB_LOGGER.set_file_name("test_shared_block_reader_writer.log", true);
srand(time(NULL));
testing::InitGoogleTest(&argc, argv);
return RUN_ALL_TESTS();
}