924 lines
40 KiB
C++
924 lines
40 KiB
C++
/**
|
|
* Copyright (c) 2021 OceanBase
|
|
* OceanBase CE is licensed under Mulan PubL v2.
|
|
* You can use this software according to the terms and conditions of the Mulan PubL v2.
|
|
* You may obtain a copy of Mulan PubL v2 at:
|
|
* http://license.coscl.org.cn/MulanPubL-2.0
|
|
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
|
|
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
|
|
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
|
|
* See the Mulan PubL v2 for more details.
|
|
*/
|
|
#define USING_LOG_PREFIX STORAGETEST
|
|
|
|
#define protected public
|
|
#define private public
|
|
#include "test_ss_common_util.h"
|
|
#include "mittest/mtlenv/mock_tenant_module_env.h"
|
|
#include "mittest/shared_storage/clean_residual_data.h"
|
|
#undef private
|
|
#undef protected
|
|
|
|
namespace oceanbase
|
|
{
|
|
namespace storage
|
|
{
|
|
using namespace oceanbase::blocksstable;
|
|
using namespace oceanbase::common;
|
|
using namespace oceanbase::storage;
|
|
|
|
class TestSSReaderWriter : public ::testing::Test
|
|
{
|
|
public:
|
|
class TestIOCallback : public ObIOCallback
|
|
{
|
|
public:
|
|
TestIOCallback()
|
|
: ObIOCallback(ObIOCallbackType::TEST_CALLBACK), number_(nullptr), allocator_(nullptr),
|
|
help_buf_(nullptr)
|
|
{
|
|
}
|
|
virtual ~TestIOCallback()
|
|
{
|
|
if (nullptr != number_) {
|
|
*number_ -= 90;
|
|
number_ = nullptr;
|
|
}
|
|
if (nullptr != allocator_) {
|
|
if (nullptr != help_buf_) {
|
|
allocator_->free(help_buf_);
|
|
help_buf_ = nullptr;
|
|
}
|
|
allocator_->free(this);
|
|
LOG_INFO("success reset callback when out_rec_cnt = 0");
|
|
}
|
|
}
|
|
|
|
virtual const char *get_data() override { return (char *)help_buf_; }
|
|
virtual int64_t size() const override { return sizeof(TestIOCallback); }
|
|
virtual int alloc_data_buf(const char *io_data_buffer, const int64_t data_size) override
|
|
{
|
|
int ret = OB_SUCCESS;
|
|
if (OB_ISNULL(allocator_)) {
|
|
ret = OB_INVALID_DATA;
|
|
LOG_WARN("Invalid data, the allocator is NULL, ", K(ret));
|
|
} else if (OB_UNLIKELY(data_size <= 0)) {
|
|
ret = OB_INVALID_ARGUMENT;
|
|
STORAGE_LOG(WARN, "invalid data buffer size", K(ret), K(data_size));
|
|
} else if (OB_ISNULL(help_buf_ = static_cast<char *>(allocator_->alloc(data_size)))) {
|
|
ret = OB_ALLOCATE_MEMORY_FAILED;
|
|
LOG_WARN("Failed to allocate help buf", K(ret), K(data_size), KP(help_buf_));
|
|
} else {
|
|
memset(help_buf_, 0, data_size);
|
|
MEMCPY(help_buf_, io_data_buffer, data_size);
|
|
}
|
|
return OB_SUCCESS;
|
|
}
|
|
virtual int inner_process(const char *data_buffer, const int64_t size) override
|
|
{
|
|
int ret = OB_SUCCESS;
|
|
if (OB_ISNULL(allocator_)) {
|
|
// for test, ignore
|
|
} else if (OB_FAIL(alloc_data_buf(data_buffer, size))) {
|
|
LOG_WARN("Fail to allocate memory, ", K(ret), K(size));
|
|
}
|
|
if (nullptr != number_) {
|
|
*number_ += 100;
|
|
}
|
|
return OB_SUCCESS;
|
|
}
|
|
virtual ObIAllocator *get_allocator() override { return allocator_; }
|
|
TO_STRING_KV(KP(number_), KP(allocator_), KP(help_buf_));
|
|
|
|
public:
|
|
int64_t *number_;
|
|
ObIAllocator *allocator_;
|
|
char *help_buf_;
|
|
};
|
|
|
|
TestSSReaderWriter() : write_info_(), read_info_(), write_buf_(), read_buf_() {}
|
|
virtual ~TestSSReaderWriter() = default;
|
|
static void SetUpTestCase();
|
|
static void TearDownTestCase();
|
|
virtual void SetUp();
|
|
virtual void TearDown();
|
|
void exhaust_tmp_file_write_free_disk_size(int64_t &avail_size);
|
|
void alloc_tmp_file_write_disk_size(const int64_t disk_size);
|
|
void release_tmp_file_write_free_disk_size(const int64_t avail_size);
|
|
void write_tmp_file_data(const MacroBlockId ¯o_id,
|
|
const int64_t offset,
|
|
const int64_t size,
|
|
const int64_t valid_length,
|
|
const bool is_sealed,
|
|
const char *buffer);
|
|
void read_and_compare_tmp_file_data(const MacroBlockId ¯o_id,
|
|
const int64_t offset,
|
|
const int64_t size);
|
|
void check_tmp_file_seg_meta(const MacroBlockId ¯o_id,
|
|
const bool is_meta_exist,
|
|
const bool is_in_local = false,
|
|
const int64_t valid_length = 0);
|
|
|
|
public:
|
|
static const int64_t WRITE_IO_SIZE = DIO_READ_ALIGN_SIZE * 768; // 3MB
|
|
ObStorageObjectWriteInfo write_info_;
|
|
ObStorageObjectReadInfo read_info_;
|
|
char write_buf_[WRITE_IO_SIZE];
|
|
char read_buf_[WRITE_IO_SIZE];
|
|
};
|
|
|
|
void TestSSReaderWriter::SetUpTestCase()
|
|
{
|
|
GCTX.startup_mode_ = observer::ObServerMode::SHARED_STORAGE_MODE;
|
|
EXPECT_EQ(OB_SUCCESS, MockTenantModuleEnv::get_instance().init());
|
|
MTL(tmp_file::ObTenantTmpFileManager *)->stop();
|
|
MTL(tmp_file::ObTenantTmpFileManager *)->wait();
|
|
MTL(tmp_file::ObTenantTmpFileManager *)->destroy();
|
|
}
|
|
|
|
void TestSSReaderWriter::TearDownTestCase()
|
|
{
|
|
int ret = OB_SUCCESS;
|
|
if (OB_FAIL(ResidualDataCleanerHelper::clean_in_mock_env())) {
|
|
LOG_WARN("failed to clean residual data", KR(ret));
|
|
}
|
|
MockTenantModuleEnv::get_instance().destroy();
|
|
}
|
|
|
|
void TestSSReaderWriter::SetUp()
|
|
{
|
|
// construct write info
|
|
write_buf_[0] = '\0';
|
|
const int64_t mid_offset = WRITE_IO_SIZE / 2;
|
|
memset(write_buf_, 'a', mid_offset);
|
|
memset(write_buf_ + mid_offset, 'b', WRITE_IO_SIZE - mid_offset);
|
|
write_info_.io_desc_.set_wait_event(1);
|
|
write_info_.buffer_ = write_buf_;
|
|
write_info_.offset_ = 0;
|
|
write_info_.size_ = WRITE_IO_SIZE;
|
|
write_info_.io_timeout_ms_ = DEFAULT_IO_WAIT_TIME_MS;
|
|
write_info_.mtl_tenant_id_ = MTL_ID();
|
|
|
|
// construct read info
|
|
read_buf_[0] = '\0';
|
|
read_info_.io_desc_.set_wait_event(1);
|
|
read_info_.buf_ = read_buf_;
|
|
read_info_.offset_ = 0;
|
|
read_info_.size_ = WRITE_IO_SIZE;
|
|
read_info_.io_timeout_ms_ = DEFAULT_IO_WAIT_TIME_MS;
|
|
read_info_.mtl_tenant_id_ = MTL_ID();
|
|
}
|
|
|
|
void TestSSReaderWriter::TearDown()
|
|
{
|
|
write_buf_[0] = '\0';
|
|
read_buf_[0] = '\0';
|
|
}
|
|
|
|
void TestSSReaderWriter::exhaust_tmp_file_write_free_disk_size(int64_t &avail_size)
|
|
{
|
|
static int64_t call_times = 0;
|
|
call_times++;
|
|
ObTenantDiskSpaceManager *disk_space_manager = MTL(ObTenantDiskSpaceManager *);
|
|
ASSERT_NE(nullptr, disk_space_manager) << "call_times: " << call_times;
|
|
avail_size = disk_space_manager->get_tmp_file_write_free_disk_size();
|
|
ASSERT_EQ(OB_SUCCESS, disk_space_manager->alloc_file_size(avail_size,
|
|
ObStorageObjectType::TMP_FILE, false/*is_tmp_file_read_cache*/)) << "call_times: " << call_times;
|
|
ASSERT_EQ(0, disk_space_manager->get_tmp_file_write_free_disk_size()) << "call_times: " << call_times;
|
|
ASSERT_EQ(OB_SERVER_OUTOF_DISK_SPACE, disk_space_manager->alloc_file_size(8192,
|
|
ObStorageObjectType::TMP_FILE, false/*is_tmp_file_read_cache*/)) << "call_times: " << call_times;
|
|
}
|
|
|
|
void TestSSReaderWriter::alloc_tmp_file_write_disk_size(const int64_t disk_size)
|
|
{
|
|
static int64_t call_times = 0;
|
|
call_times++;
|
|
ObTenantDiskSpaceManager *disk_space_manager = MTL(ObTenantDiskSpaceManager *);
|
|
ASSERT_NE(nullptr, disk_space_manager) << "call_times: " << call_times;
|
|
ASSERT_EQ(OB_SUCCESS, disk_space_manager->alloc_file_size(disk_size,
|
|
ObStorageObjectType::TMP_FILE, false/*is_tmp_file_read_cache*/)) << "call_times: " << call_times;
|
|
ASSERT_EQ(0, disk_space_manager->get_tmp_file_write_free_disk_size()) << "call_times: " << call_times;
|
|
ASSERT_EQ(OB_SERVER_OUTOF_DISK_SPACE, disk_space_manager->alloc_file_size(8192,
|
|
ObStorageObjectType::TMP_FILE, false/*is_tmp_file_read_cache*/)) << "call_times: " << call_times;
|
|
}
|
|
|
|
void TestSSReaderWriter::release_tmp_file_write_free_disk_size(const int64_t avail_size)
|
|
{
|
|
static int64_t call_times = 0;
|
|
call_times++;
|
|
ObTenantDiskSpaceManager *disk_space_manager = MTL(ObTenantDiskSpaceManager *);
|
|
ASSERT_NE(nullptr, disk_space_manager) << "call_times: " << call_times;
|
|
ASSERT_EQ(OB_SUCCESS, disk_space_manager->free_file_size(avail_size,
|
|
ObStorageObjectType::TMP_FILE, false/*is_tmp_file_read_cache*/)) << "call_times: " << call_times;
|
|
}
|
|
|
|
void TestSSReaderWriter::write_tmp_file_data(
|
|
const MacroBlockId ¯o_id,
|
|
const int64_t offset,
|
|
const int64_t size,
|
|
const int64_t valid_length,
|
|
const bool is_sealed,
|
|
const char *buffer)
|
|
{
|
|
static int64_t call_times = 0;
|
|
call_times++;
|
|
ObStorageObjectHandle write_object_handle;
|
|
ASSERT_TRUE(macro_id.is_valid()) << "call_times: " << call_times;
|
|
ASSERT_EQ(OB_SUCCESS, write_object_handle.set_macro_block_id(macro_id)) << "call_times: " << call_times;
|
|
write_info_.offset_ = offset;
|
|
write_info_.size_ = size;
|
|
write_info_.tmp_file_valid_length_ = valid_length;
|
|
if (is_sealed) {
|
|
write_info_.io_desc_.set_sealed();
|
|
} else {
|
|
write_info_.io_desc_.set_unsealed();
|
|
}
|
|
write_info_.buffer_ = buffer;
|
|
ObTenantFileManager *file_manager = MTL(ObTenantFileManager *);
|
|
ASSERT_NE(nullptr, file_manager) << "call_times: " << call_times;
|
|
ASSERT_EQ(OB_SUCCESS, file_manager->async_append_file(write_info_, write_object_handle)) << "call_times: " << call_times;
|
|
ASSERT_EQ(OB_SUCCESS, write_object_handle.wait()) << "call_times: " << call_times;
|
|
}
|
|
|
|
void TestSSReaderWriter::read_and_compare_tmp_file_data(
|
|
const MacroBlockId ¯o_id,
|
|
const int64_t offset,
|
|
const int64_t size)
|
|
{
|
|
static int64_t call_times = 0;
|
|
call_times++;
|
|
ObStorageObjectHandle read_object_handle;
|
|
ObSSTmpFileReader tmp_file_reader;
|
|
read_info_.macro_block_id_ = macro_id;
|
|
read_info_.offset_ = offset;
|
|
read_info_.size_ = size;
|
|
ASSERT_EQ(OB_SUCCESS, tmp_file_reader.aio_read(read_info_, read_object_handle)) << "call_times: " << call_times;
|
|
ASSERT_EQ(OB_SUCCESS, read_object_handle.wait()) << "call_times: " << call_times;
|
|
ASSERT_NE(nullptr, read_object_handle.get_buffer()) << "call_times: " << call_times;
|
|
ASSERT_EQ(read_info_.size_, read_object_handle.get_data_size()) << "call_times: " << call_times;
|
|
ASSERT_EQ(0, memcmp(write_buf_ + offset, read_object_handle.get_buffer(), size)) << "call_times: " << call_times;
|
|
memset(read_buf_, 0, WRITE_IO_SIZE);
|
|
}
|
|
|
|
void TestSSReaderWriter::check_tmp_file_seg_meta(
|
|
const MacroBlockId ¯o_id,
|
|
const bool is_meta_exist,
|
|
const bool is_in_local,
|
|
const int64_t valid_length)
|
|
{
|
|
static int64_t call_times = 0;
|
|
call_times++;
|
|
TmpFileSegId seg_id(macro_id.second_id(), macro_id.third_id());
|
|
TmpFileMetaHandle meta_handle;
|
|
bool exist = false;
|
|
ObTenantFileManager *file_manager = MTL(ObTenantFileManager *);
|
|
ASSERT_NE(nullptr, file_manager) << "call_times: " << call_times;
|
|
ASSERT_EQ(OB_SUCCESS, file_manager->get_segment_file_mgr().try_get_seg_meta(seg_id, meta_handle, exist)) << "call_times: " << call_times;
|
|
ASSERT_EQ(is_meta_exist, exist) << "call_times: " << call_times;
|
|
if (is_meta_exist) {
|
|
ASSERT_TRUE(meta_handle.is_valid());
|
|
ASSERT_TRUE(meta_handle.get_tmpfile_meta()->is_valid());
|
|
ASSERT_EQ(is_in_local, meta_handle.is_in_local()) << "call_times: " << call_times;
|
|
ASSERT_EQ(valid_length, meta_handle.get_valid_length()) << "call_times: " << call_times;
|
|
}
|
|
}
|
|
|
|
TEST_F(TestSSReaderWriter, object_storage_reader_writer)
|
|
{
|
|
int ret = OB_SUCCESS;
|
|
|
|
uint64_t tablet_id = 500;
|
|
|
|
// 1. write
|
|
MacroBlockId macro_id;
|
|
macro_id.set_id_mode((uint64_t)ObMacroBlockIdMode::ID_MODE_SHARE);
|
|
macro_id.set_storage_object_type((uint64_t)ObStorageObjectType::SHARED_MAJOR_TABLET_META);
|
|
macro_id.set_second_id(tablet_id); // tablet_id
|
|
macro_id.set_third_id(1000000); // meta_version
|
|
ASSERT_TRUE(macro_id.is_valid());
|
|
ObStorageObjectHandle write_object_handle;
|
|
ASSERT_EQ(OB_SUCCESS, write_object_handle.set_macro_block_id(macro_id));
|
|
|
|
ObSSObjectStorageWriter object_storage_writer;
|
|
ASSERT_EQ(OB_SUCCESS, object_storage_writer.aio_write(write_info_, write_object_handle));
|
|
ASSERT_EQ(OB_SUCCESS, write_object_handle.wait());
|
|
|
|
// 2. read and compare the read data with the written data
|
|
ObArenaAllocator allocator;
|
|
int64_t tmp_number = 0;
|
|
TestIOCallback callback;
|
|
callback.number_ = &tmp_number;
|
|
callback.allocator_ = &allocator;
|
|
read_info_.io_callback_ = &callback;
|
|
read_info_.buf_ = nullptr;
|
|
read_info_.macro_block_id_ = macro_id;
|
|
ObStorageObjectHandle read_object_handle;
|
|
ObSSObjectStorageReader object_storage_reader;
|
|
ASSERT_EQ(OB_SUCCESS, object_storage_reader.aio_read(read_info_, read_object_handle));
|
|
ASSERT_EQ(OB_SUCCESS, read_object_handle.wait());
|
|
ASSERT_NE(nullptr, read_object_handle.get_buffer());
|
|
ASSERT_EQ(read_info_.size_, read_object_handle.get_data_size());
|
|
ASSERT_EQ(0, memcmp(write_buf_, read_object_handle.get_buffer(), WRITE_IO_SIZE));
|
|
ASSERT_EQ(100, tmp_number); // callback process called
|
|
}
|
|
|
|
TEST_F(TestSSReaderWriter, local_cache_reader_writer)
|
|
{
|
|
int ret = OB_SUCCESS;
|
|
|
|
uint64_t ls_id = 1;
|
|
uint64_t ls_epoch_id = 1;
|
|
uint64_t tablet_id = 800;
|
|
|
|
ASSERT_EQ(OB_SUCCESS, OB_DIR_MGR.create_ls_id_dir(MTL_ID(), MTL_EPOCH_ID(), ls_id, ls_epoch_id));
|
|
ASSERT_EQ(OB_SUCCESS, OB_DIR_MGR.create_tablet_meta_tablet_id_dir(MTL_ID(), MTL_EPOCH_ID(), ls_id, ls_epoch_id, tablet_id));
|
|
|
|
// 1. write
|
|
MacroBlockId macro_id;
|
|
macro_id.set_id_mode((uint64_t)ObMacroBlockIdMode::ID_MODE_SHARE);
|
|
macro_id.set_storage_object_type((uint64_t)ObStorageObjectType::PRIVATE_TABLET_META);
|
|
macro_id.set_second_id(ls_id); // ls_id
|
|
macro_id.set_third_id(tablet_id); // tablet_id
|
|
macro_id.set_macro_transfer_seq(0); // transfer_seq
|
|
macro_id.set_tenant_seq(2000000); //tenant_seq
|
|
ASSERT_TRUE(macro_id.is_valid());
|
|
ObStorageObjectHandle write_object_handle;
|
|
ASSERT_EQ(OB_SUCCESS, write_object_handle.set_macro_block_id(macro_id));
|
|
|
|
write_info_.ls_epoch_id_ = ls_epoch_id;
|
|
|
|
ObSSLocalCacheWriter local_cache_writer;
|
|
ASSERT_EQ(OB_SUCCESS, local_cache_writer.aio_write(write_info_, write_object_handle));
|
|
ASSERT_EQ(OB_SUCCESS, write_object_handle.wait());
|
|
|
|
// 2. read and compare the read data with the written data
|
|
read_info_.macro_block_id_ = macro_id;
|
|
read_info_.ls_epoch_id_ = ls_epoch_id;
|
|
ObStorageObjectHandle read_object_handle;
|
|
ObSSLocalCacheReader local_cache_reader;
|
|
ASSERT_EQ(OB_SUCCESS, local_cache_reader.aio_read(read_info_, read_object_handle));
|
|
ASSERT_EQ(OB_SUCCESS, read_object_handle.wait());
|
|
ASSERT_NE(nullptr, read_object_handle.get_buffer());
|
|
ASSERT_EQ(read_info_.size_, read_object_handle.get_data_size());
|
|
ASSERT_EQ(0, memcmp(write_buf_, read_object_handle.get_buffer(), WRITE_IO_SIZE));
|
|
read_object_handle.reset();
|
|
|
|
// 3. try read DATA_OUT_OF_RANGE, expect io layer atomically adjust read size and read successfully
|
|
read_info_.macro_block_id_ = macro_id;
|
|
read_info_.ls_epoch_id_ = ls_epoch_id;
|
|
read_info_.offset_ = 10000;
|
|
read_info_.size_ = WRITE_IO_SIZE;
|
|
ASSERT_EQ(OB_SUCCESS, local_cache_reader.aio_read(read_info_, read_object_handle));
|
|
ASSERT_EQ(OB_SUCCESS, read_object_handle.wait());
|
|
ASSERT_NE(nullptr, read_object_handle.get_buffer());
|
|
ASSERT_EQ(read_info_.size_ - read_info_.offset_, read_object_handle.get_data_size());
|
|
ASSERT_EQ(0, memcmp(write_buf_ + read_info_.offset_, read_object_handle.get_buffer(),
|
|
read_info_.size_ - read_info_.offset_));
|
|
read_object_handle.reset();
|
|
|
|
// 4. try read offset exceeds acutal file length, expect OB_DATA_OUT_OF_RANGE errno
|
|
read_info_.macro_block_id_ = macro_id;
|
|
read_info_.ls_epoch_id_ = ls_epoch_id;
|
|
read_info_.offset_ = WRITE_IO_SIZE;
|
|
read_info_.size_ = 1;
|
|
ASSERT_EQ(OB_DATA_OUT_OF_RANGE, local_cache_reader.aio_read(read_info_, read_object_handle));
|
|
read_object_handle.reset();
|
|
}
|
|
|
|
TEST_F(TestSSReaderWriter, private_macro_reader_writer)
|
|
{
|
|
int ret = OB_SUCCESS;
|
|
|
|
uint64_t tablet_id = 100;
|
|
uint64_t server_id = 1;
|
|
|
|
ASSERT_EQ(OB_SUCCESS, OB_DIR_MGR.create_tablet_data_tablet_id_transfer_seq_dir(MTL_ID(), MTL_EPOCH_ID(), tablet_id, 0/*trasfer_seq*/));
|
|
|
|
// 1. write
|
|
MacroBlockId macro_id;
|
|
macro_id.set_id_mode((uint64_t)ObMacroBlockIdMode::ID_MODE_SHARE);
|
|
macro_id.set_storage_object_type((uint64_t)ObStorageObjectType::PRIVATE_DATA_MACRO);
|
|
macro_id.set_second_id(tablet_id); // tablet_id
|
|
macro_id.set_third_id(100); // seq_id
|
|
macro_id.set_macro_transfer_seq(0); // transfer_seq
|
|
macro_id.set_tenant_seq(server_id); //tenant_seq
|
|
ASSERT_TRUE(macro_id.is_valid());
|
|
ObStorageObjectHandle write_object_handle;
|
|
ASSERT_EQ(OB_SUCCESS, write_object_handle.set_macro_block_id(macro_id));
|
|
|
|
ObSSPrivateMacroWriter private_macro_writer;
|
|
ASSERT_EQ(OB_SUCCESS, private_macro_writer.aio_write(write_info_, write_object_handle));
|
|
ASSERT_EQ(OB_SUCCESS, write_object_handle.wait());
|
|
|
|
// 2. read and compare the read data with the written data
|
|
read_info_.macro_block_id_ = macro_id;
|
|
read_info_.offset_ = 1;
|
|
read_info_.size_ = WRITE_IO_SIZE / 2;
|
|
ObStorageObjectHandle read_object_handle;
|
|
ObSSPrivateMacroReader private_macro_reader;
|
|
ASSERT_EQ(OB_SUCCESS, private_macro_reader.aio_read(read_info_, read_object_handle));
|
|
ASSERT_EQ(OB_SUCCESS, read_object_handle.wait());
|
|
ASSERT_NE(nullptr, read_object_handle.get_buffer());
|
|
ASSERT_EQ(read_info_.size_, read_object_handle.get_data_size());
|
|
ASSERT_EQ(0, memcmp(write_buf_ + read_info_.offset_, read_object_handle.get_buffer(), read_info_.size_));
|
|
read_object_handle.reset();
|
|
}
|
|
|
|
TEST_F(TestSSReaderWriter, share_macro_reader_writer)
|
|
{
|
|
int ret = OB_SUCCESS;
|
|
|
|
uint64_t tablet_id = 200;
|
|
uint64_t data_seq = 15;
|
|
|
|
// 1. write
|
|
MacroBlockId macro_id;
|
|
macro_id.set_id_mode((uint64_t)ObMacroBlockIdMode::ID_MODE_SHARE);
|
|
macro_id.set_storage_object_type((uint64_t)ObStorageObjectType::SHARED_MAJOR_DATA_MACRO);
|
|
macro_id.set_second_id(tablet_id); // tablet_id
|
|
macro_id.set_third_id(data_seq); // data_seq
|
|
ASSERT_TRUE(macro_id.is_valid());
|
|
ObStorageObjectHandle write_object_handle;
|
|
ASSERT_EQ(OB_SUCCESS, write_object_handle.set_macro_block_id(macro_id));
|
|
|
|
ObSSShareMacroWriter share_macro_writer;
|
|
ASSERT_EQ(OB_SUCCESS, share_macro_writer.aio_write(write_info_, write_object_handle));
|
|
ASSERT_EQ(OB_SUCCESS, write_object_handle.wait());
|
|
|
|
ObLogicMicroBlockId logic_micro_id_1;
|
|
logic_micro_id_1.version_ = ObLogicMicroBlockId::LOGIC_MICRO_ID_VERSION_V1;
|
|
logic_micro_id_1.offset_ = 100;
|
|
logic_micro_id_1.logic_macro_id_.data_seq_.macro_data_seq_ = 1;
|
|
logic_micro_id_1.logic_macro_id_.logic_version_ = 100;
|
|
logic_micro_id_1.logic_macro_id_.tablet_id_ = tablet_id;
|
|
|
|
ObLogicMicroBlockId logic_micro_id_2;
|
|
logic_micro_id_2 = logic_micro_id_1;
|
|
logic_micro_id_2.offset_ = 1;
|
|
|
|
// 2. read <1, WRITE_IO_SIZE / 2>, expect cache miss and load cache
|
|
read_info_.macro_block_id_ = macro_id;
|
|
read_info_.offset_ = 1;
|
|
read_info_.size_ = WRITE_IO_SIZE / 2;
|
|
read_info_.logic_micro_id_ = logic_micro_id_1;
|
|
read_info_.micro_crc_ = 100;
|
|
ObStorageObjectHandle cache_miss_read_object_handle;
|
|
ObSSShareMacroReader share_macro_reader;
|
|
ASSERT_EQ(OB_SUCCESS, share_macro_reader.aio_read(read_info_, cache_miss_read_object_handle));
|
|
ASSERT_EQ(OB_SUCCESS, cache_miss_read_object_handle.wait());
|
|
ASSERT_NE(nullptr, cache_miss_read_object_handle.get_buffer());
|
|
ASSERT_EQ(read_info_.size_, cache_miss_read_object_handle.get_data_size());
|
|
ASSERT_EQ(0, memcmp(write_buf_ + read_info_.offset_, cache_miss_read_object_handle.get_buffer(), read_info_.size_));
|
|
cache_miss_read_object_handle.reset();
|
|
|
|
// 3. read <1, WRITE_IO_SIZE / 2>, expect hit memory
|
|
read_info_.macro_block_id_ = macro_id;
|
|
read_info_.offset_ = 1;
|
|
read_info_.size_ = WRITE_IO_SIZE / 2;
|
|
read_info_.logic_micro_id_ = logic_micro_id_1;
|
|
read_info_.micro_crc_ = 100;
|
|
ObStorageObjectHandle hit_memory_read_object_handle;
|
|
ASSERT_EQ(OB_SUCCESS, share_macro_reader.aio_read(read_info_, hit_memory_read_object_handle));
|
|
ASSERT_EQ(OB_SUCCESS, hit_memory_read_object_handle.wait());
|
|
ASSERT_NE(nullptr, hit_memory_read_object_handle.get_buffer());
|
|
ASSERT_EQ(read_info_.size_, hit_memory_read_object_handle.get_data_size());
|
|
ASSERT_EQ(0, memcmp(write_buf_ + read_info_.offset_, hit_memory_read_object_handle.get_buffer(), read_info_.size_));
|
|
hit_memory_read_object_handle.reset();
|
|
|
|
// 4. read <WRITE_IO_SIZE / 2, WRITE_IO_SIZE>, expect cache miss and load cache
|
|
read_info_.macro_block_id_ = macro_id;
|
|
read_info_.offset_ = WRITE_IO_SIZE / 2;
|
|
read_info_.size_ = WRITE_IO_SIZE / 2;
|
|
read_info_.logic_micro_id_ = logic_micro_id_2;
|
|
read_info_.micro_crc_ = 200;
|
|
ASSERT_EQ(OB_SUCCESS, share_macro_reader.aio_read(read_info_, cache_miss_read_object_handle));
|
|
ASSERT_EQ(OB_SUCCESS, cache_miss_read_object_handle.wait());
|
|
ASSERT_NE(nullptr, cache_miss_read_object_handle.get_buffer());
|
|
ASSERT_EQ(read_info_.size_, cache_miss_read_object_handle.get_data_size());
|
|
ASSERT_EQ(0, memcmp(write_buf_ + read_info_.offset_, cache_miss_read_object_handle.get_buffer(), read_info_.size_));
|
|
cache_miss_read_object_handle.reset();
|
|
|
|
// 5. wait <1, WRITE_IO_SIZE / 2> flush from memory to disk
|
|
ASSERT_EQ(OB_SUCCESS, TestSSCommonUtil::wait_for_persist_task());
|
|
|
|
// 6. read <1, WRITE_IO_SIZE / 2>, expect hit disk
|
|
read_info_.macro_block_id_ = macro_id;
|
|
read_info_.offset_ = 1;
|
|
read_info_.size_ = WRITE_IO_SIZE / 2;
|
|
read_info_.logic_micro_id_ = logic_micro_id_1;
|
|
read_info_.micro_crc_ = 100;
|
|
ObStorageObjectHandle hit_disk_read_object_handle;
|
|
ASSERT_EQ(OB_SUCCESS, share_macro_reader.aio_read(read_info_, hit_disk_read_object_handle));
|
|
ASSERT_EQ(OB_SUCCESS, hit_disk_read_object_handle.wait());
|
|
ASSERT_NE(nullptr, hit_disk_read_object_handle.get_buffer());
|
|
ASSERT_EQ(read_info_.size_, hit_disk_read_object_handle.get_data_size());
|
|
ASSERT_EQ(0, memcmp(write_buf_ + read_info_.offset_, hit_disk_read_object_handle.get_buffer(), read_info_.size_));
|
|
hit_disk_read_object_handle.reset();
|
|
}
|
|
|
|
TEST_F(TestSSReaderWriter, tmp_file_reader_writer)
|
|
{
|
|
int ret = OB_SUCCESS;
|
|
|
|
ObTenantFileManager *file_manager = MTL(ObTenantFileManager *);
|
|
ASSERT_NE(nullptr, file_manager);
|
|
ObTenantDiskSpaceManager *disk_space_manager = MTL(ObTenantDiskSpaceManager *);
|
|
ASSERT_NE(nullptr, disk_space_manager);
|
|
|
|
// to avoid affecting tmp file seg_meta_map and tmp file tmp_file_write_free_disk_size
|
|
// disable tmp_file_flush_task, preread_task_, calibrate_disk_space_task and gc_unsealed_tmp_file_task
|
|
// preread_task_ will affect local disk size, so preread_task_ need to disble
|
|
file_manager->preread_cache_mgr_.preread_task_.is_inited_ = false;
|
|
file_manager->tmp_file_flush_task_.is_inited_ = false;
|
|
file_manager->calibrate_disk_space_task_.is_inited_ = false;
|
|
file_manager->segment_file_mgr_.gc_segment_file_task_.is_inited_ = false;
|
|
sleep(3);
|
|
|
|
uint64_t tmp_file_id = 100;
|
|
|
|
ASSERT_EQ(OB_SUCCESS, OB_DIR_MGR.create_tmp_file_dir(MTL_ID(), MTL_EPOCH_ID(), tmp_file_id));
|
|
|
|
// 1.1 disk space enough, write local, write one new unsealed segment with offset = 0
|
|
ASSERT_LT(8192, disk_space_manager->get_tmp_file_write_free_disk_size());
|
|
MacroBlockId macro_id;
|
|
macro_id.set_id_mode((uint64_t)ObMacroBlockIdMode::ID_MODE_SHARE);
|
|
macro_id.set_storage_object_type((uint64_t)ObStorageObjectType::TMP_FILE);
|
|
macro_id.set_second_id(tmp_file_id); // tmp_file_id
|
|
macro_id.set_third_id(10); // segment_id
|
|
write_tmp_file_data(macro_id, 0/*offset*/, 8192/*size*/, 8192/*valid_length*/, false/*is_sealed*/, write_buf_);
|
|
read_and_compare_tmp_file_data(macro_id, 0/*offset*/, 8192/*size*/);
|
|
check_tmp_file_seg_meta(macro_id, true/*is_meta_exist*/, true/*is_in_local*/, 8192/*valid_length*/);
|
|
|
|
// 1.2 write another two unsealed local seg (seg_id: 11, 12), for append test case below
|
|
for (int64_t i = 1; i <= 2; ++i) {
|
|
ASSERT_LT(8192, disk_space_manager->get_tmp_file_write_free_disk_size());
|
|
macro_id.set_third_id(10 + i); // segment_id
|
|
write_tmp_file_data(macro_id, 0/*offset*/, 8192/*size*/, 8192/*valid_length*/, false/*is_sealed*/, write_buf_);
|
|
read_and_compare_tmp_file_data(macro_id, 0/*offset*/, 8192/*size*/);
|
|
check_tmp_file_seg_meta(macro_id, true/*is_meta_exist*/, true/*is_in_local*/, 8192/*valid_length*/);
|
|
}
|
|
|
|
// 2. disk space enough, write local, write one new sealed segment with offset = 0
|
|
ASSERT_LT(8192, disk_space_manager->get_tmp_file_write_free_disk_size());
|
|
macro_id.set_third_id(20); // segment_id
|
|
write_tmp_file_data(macro_id, 0/*offset*/, 8192/*size*/, 8192/*valid_length*/, true/*is_sealed*/, write_buf_);
|
|
read_and_compare_tmp_file_data(macro_id, 0/*offset*/, 8192/*size*/);
|
|
check_tmp_file_seg_meta(macro_id, true/*is_meta_exist*/, true/*is_in_local*/, 8192/*valid_length*/);
|
|
|
|
// 3. write one new unsealed segment with offset > 0
|
|
macro_id.set_third_id(30); // segment_id
|
|
ASSERT_TRUE(macro_id.is_valid());
|
|
write_tmp_file_data(macro_id, 8192/*offset*/, 8192/*size*/, 8192 * 2/*valid_length*/, false/*is_sealed*/, write_buf_);
|
|
read_and_compare_tmp_file_data(macro_id, 8192/*offset*/, 8192/*size*/);
|
|
check_tmp_file_seg_meta(macro_id, true/*is_meta_exist*/, true/*is_in_local*/, 8192 * 2/*valid_length*/);
|
|
|
|
// 4.1 disk space not enough, write through, write one new unsealed segment with offset = 0
|
|
int64_t avail_size = 0;
|
|
exhaust_tmp_file_write_free_disk_size(avail_size);
|
|
|
|
macro_id.set_third_id(40); // segment_id
|
|
write_tmp_file_data(macro_id, 0/*offset*/, 8192/*size*/, 8192/*valid_length*/, false/*is_sealed*/, write_buf_);
|
|
read_and_compare_tmp_file_data(macro_id, 0/*offset*/, 8192/*size*/);
|
|
check_tmp_file_seg_meta(macro_id, true/*is_meta_exist*/, false/*is_in_local*/, 8192/*valid_length*/);
|
|
|
|
// 4.2 write another four unsealed remote seg (seg_id: 41, 42, 43, 44), for append test case below
|
|
for (int64_t i = 1; i <= 4; ++i) {
|
|
macro_id.set_third_id(40 + i); // segment_id
|
|
write_tmp_file_data(macro_id, 0/*offset*/, 8192/*size*/, 8192/*valid_length*/, false/*is_sealed*/, write_buf_);
|
|
read_and_compare_tmp_file_data(macro_id, 0/*offset*/, 8192/*size*/);
|
|
check_tmp_file_seg_meta(macro_id, true/*is_meta_exist*/, false/*is_in_local*/, 8192/*valid_length*/);
|
|
}
|
|
|
|
// 5. disk space not enough, write through, write one new sealed segment with offset = 0
|
|
macro_id.set_third_id(50); // segment_id
|
|
write_tmp_file_data(macro_id, 0/*offset*/, 8192/*size*/, 8192/*valid_length*/, true/*is_sealed*/, write_buf_);
|
|
read_and_compare_tmp_file_data(macro_id, 0/*offset*/, 8192/*size*/);
|
|
check_tmp_file_seg_meta(macro_id, false/*is_meta_exist*/);
|
|
|
|
// 6. seg meta already exist, io.valid_len <= seg_meta.valid_len. no need write io and simulate io result
|
|
macro_id.set_third_id(60); // segment_id
|
|
write_tmp_file_data(macro_id, 0/*offset*/, 8192/*size*/, 8192/*valid_length*/, true/*is_sealed*/, write_buf_);
|
|
|
|
// 7. local seg meta already exist, io.valid_len > seg_meta.valid_len.
|
|
// disk space enough, write local, append unsealed segment
|
|
release_tmp_file_write_free_disk_size(avail_size);
|
|
ASSERT_LT(8192, disk_space_manager->get_tmp_file_write_free_disk_size());
|
|
|
|
macro_id.set_third_id(10); // segment_id
|
|
write_tmp_file_data(macro_id, 8192/*offset*/, 8192/*size*/, 8192 * 2/*valid_length*/, false/*is_sealed*/, write_buf_ + 8192);
|
|
read_and_compare_tmp_file_data(macro_id, 0/*offset*/, 8192 * 2/*size*/);
|
|
check_tmp_file_seg_meta(macro_id, true/*is_meta_exist*/, true/*is_in_local*/, 8192 * 2/*valid_length*/);
|
|
|
|
// 8. local seg meta already exist, io.valid_len > seg_meta.valid_len.
|
|
// disk space enough, write local, append sealed segment
|
|
ASSERT_LT(8192, disk_space_manager->get_tmp_file_write_free_disk_size());
|
|
|
|
macro_id.set_third_id(10); // segment_id
|
|
write_tmp_file_data(macro_id, 8192 * 2/*offset*/, 8192/*size*/, 8192 * 3/*valid_length*/, true/*is_sealed*/, write_buf_ + (8192 * 2));
|
|
read_and_compare_tmp_file_data(macro_id, 0/*offset*/, 8192 * 3/*size*/);
|
|
check_tmp_file_seg_meta(macro_id, true/*is_meta_exist*/, true/*is_in_local*/, 8192 * 3/*valid_length*/);
|
|
|
|
// 9. local seg meta already exist, io.valid_len > seg_meta.valid_len.
|
|
// disk space not enough, write through, append unsealed segment
|
|
exhaust_tmp_file_write_free_disk_size(avail_size);
|
|
|
|
macro_id.set_third_id(11); // segment_id
|
|
write_tmp_file_data(macro_id, 8192/*offset*/, 8192/*size*/, 8192 * 2/*valid_length*/, false/*is_sealed*/, write_buf_ + 8192);
|
|
read_and_compare_tmp_file_data(macro_id, 0/*offset*/, 8192 * 2/*size*/);
|
|
check_tmp_file_seg_meta(macro_id, true/*is_meta_exist*/, false/*is_in_local*/, 8192 * 2/*valid_length*/);
|
|
|
|
// 10. local seg meta already exist, io.valid_len > seg_meta.valid_len.
|
|
// disk space not enough, write through, append sealed segment
|
|
// because preread_task_ has been disabled, aio_read segment_id of No.10 will push segment_id of No.11 to queue, segment_id of No.11 is FAKE node
|
|
// when deleting segment_id of No.11, it will free tmp_file_read_cache_size, there free_disk_size is 0
|
|
int64_t free_disk_size = disk_space_manager->get_tmp_file_write_free_disk_size();
|
|
if (free_disk_size > 0) {
|
|
alloc_tmp_file_write_disk_size(free_disk_size);
|
|
}
|
|
|
|
macro_id.set_third_id(12); // segment_id
|
|
write_tmp_file_data(macro_id, 8192/*offset*/, 8192/*size*/, 8192 * 2/*valid_length*/, true/*is_sealed*/, write_buf_ + 8192);
|
|
read_and_compare_tmp_file_data(macro_id, 0/*offset*/, 8192 * 2/*size*/);
|
|
check_tmp_file_seg_meta(macro_id, false/*is_meta_exist*/);
|
|
|
|
// 11. remote seg meta already exist, io.valid_len > seg_meta.valid_len.
|
|
// disk space enough, write local, append unsealed segment
|
|
release_tmp_file_write_free_disk_size(avail_size + free_disk_size);
|
|
ASSERT_LT(8192, disk_space_manager->get_tmp_file_write_free_disk_size());
|
|
|
|
macro_id.set_third_id(41); // segment_id
|
|
write_tmp_file_data(macro_id, 8192/*offset*/, 8192/*size*/, 8192 * 2/*valid_length*/, false/*is_sealed*/, write_buf_ + 8192);
|
|
read_and_compare_tmp_file_data(macro_id, 0/*offset*/, 8192 * 2/*size*/);
|
|
check_tmp_file_seg_meta(macro_id, true/*is_meta_exist*/, true/*is_in_local*/, 8192 * 2/*valid_length*/);
|
|
|
|
// 12. remote seg meta already exist, io.valid_len > seg_meta.valid_len.
|
|
// disk space enough, append sealed segment, write remote
|
|
ASSERT_LT(8192, disk_space_manager->get_tmp_file_write_free_disk_size());
|
|
macro_id.set_third_id(42); // segment_id
|
|
write_tmp_file_data(macro_id, 8192/*offset*/, 8192/*size*/, 8192 * 2/*valid_length*/, true/*is_sealed*/, write_buf_ + 8192);
|
|
read_and_compare_tmp_file_data(macro_id, 0/*offset*/, 8192 * 2/*size*/);
|
|
check_tmp_file_seg_meta(macro_id, false/*is_meta_exist*/);
|
|
|
|
// 13. remote seg meta already exist, io.valid_len > seg_meta.valid_len.
|
|
// disk space not enough, write remote, append unsealed segment
|
|
exhaust_tmp_file_write_free_disk_size(avail_size);
|
|
|
|
macro_id.set_third_id(43); // segment_id
|
|
write_tmp_file_data(macro_id, 8192/*offset*/, 8192/*size*/, 8192 * 2/*valid_length*/, false/*is_sealed*/, write_buf_ + 8192);
|
|
read_and_compare_tmp_file_data(macro_id, 0/*offset*/, 8192 * 2/*size*/);
|
|
check_tmp_file_seg_meta(macro_id, true/*is_meta_exist*/, false/*is_in_local*/, 8192 * 2/*valid_length*/);
|
|
|
|
// 14. remote seg meta already exist, io.valid_len > seg_meta.valid_len.
|
|
// disk space not enough, write remote, append sealed segment
|
|
macro_id.set_third_id(44); // segment_id
|
|
write_tmp_file_data(macro_id, 8192/*offset*/, 8192/*size*/, 8192 * 2/*valid_length*/, true/*is_sealed*/, write_buf_ + 8192);
|
|
read_and_compare_tmp_file_data(macro_id, 0/*offset*/, 8192 * 2/*size*/);
|
|
check_tmp_file_seg_meta(macro_id, false/*is_meta_exist*/);
|
|
}
|
|
|
|
TEST_F(TestSSReaderWriter, private_macro_write_less_read_more)
|
|
{
|
|
int ret = OB_SUCCESS;
|
|
|
|
uint64_t tablet_id = 900;
|
|
uint64_t server_id = 1;
|
|
|
|
ASSERT_EQ(OB_SUCCESS, OB_DIR_MGR.create_tablet_data_tablet_id_transfer_seq_dir(MTL_ID(), MTL_EPOCH_ID(), tablet_id, 0/*trasfer_seq*/));
|
|
|
|
// 1. write 4KB
|
|
MacroBlockId macro_id;
|
|
macro_id.set_id_mode((uint64_t)ObMacroBlockIdMode::ID_MODE_SHARE);
|
|
macro_id.set_storage_object_type((uint64_t)ObStorageObjectType::PRIVATE_DATA_MACRO);
|
|
macro_id.set_second_id(tablet_id); // tablet_id
|
|
macro_id.set_third_id(900); // seq_id
|
|
macro_id.set_macro_transfer_seq(0); // transfer_seq
|
|
macro_id.set_tenant_seq(server_id); //tenant_seq
|
|
ASSERT_TRUE(macro_id.is_valid());
|
|
ObStorageObjectHandle write_object_handle;
|
|
ASSERT_EQ(OB_SUCCESS, write_object_handle.set_macro_block_id(macro_id));
|
|
|
|
ObSSPrivateMacroWriter private_macro_writer;
|
|
write_info_.offset_ = 0;
|
|
write_info_.size_ = DIO_READ_ALIGN_SIZE; // 4KB
|
|
ASSERT_EQ(OB_SUCCESS, private_macro_writer.aio_write(write_info_, write_object_handle));
|
|
ASSERT_EQ(OB_SUCCESS, write_object_handle.wait());
|
|
|
|
// 2. try read 2MB, expect real read 4KB
|
|
read_info_.macro_block_id_ = macro_id;
|
|
ObStorageObjectHandle read_object_handle;
|
|
ObSSPrivateMacroReader private_macro_reader;
|
|
read_info_.offset_ = 0;
|
|
read_info_.size_ = OB_STORAGE_OBJECT_MGR.get_macro_block_size(); // 2MB
|
|
ASSERT_EQ(OB_SUCCESS, private_macro_reader.aio_read(read_info_, read_object_handle));
|
|
ASSERT_EQ(OB_SUCCESS, read_object_handle.wait());
|
|
ASSERT_NE(nullptr, read_object_handle.get_buffer());
|
|
ASSERT_EQ(DIO_READ_ALIGN_SIZE, read_object_handle.get_data_size());
|
|
ASSERT_EQ(0, memcmp(write_buf_, read_object_handle.get_buffer(), DIO_READ_ALIGN_SIZE));
|
|
}
|
|
|
|
TEST_F(TestSSReaderWriter, local_overwrite)
|
|
{
|
|
int ret = OB_SUCCESS;
|
|
|
|
uint64_t ls_id = 1;
|
|
uint64_t ls_epoch_id = 1;
|
|
uint64_t tablet_id = 800;
|
|
|
|
ASSERT_EQ(OB_SUCCESS, OB_DIR_MGR.create_ls_id_dir(MTL_ID(), MTL_EPOCH_ID(), ls_id, ls_epoch_id));
|
|
ASSERT_EQ(OB_SUCCESS, OB_DIR_MGR.create_tablet_meta_tablet_id_dir(MTL_ID(), MTL_EPOCH_ID(), ls_id, ls_epoch_id, tablet_id));
|
|
|
|
// 1. write
|
|
MacroBlockId macro_id;
|
|
macro_id.set_id_mode((uint64_t)ObMacroBlockIdMode::ID_MODE_SHARE);
|
|
macro_id.set_storage_object_type((uint64_t)ObStorageObjectType::PRIVATE_TABLET_CURRENT_VERSION);
|
|
macro_id.set_second_id(ls_id); // ls_id
|
|
macro_id.set_third_id(tablet_id); // tablet_id
|
|
ASSERT_TRUE(macro_id.is_valid());
|
|
ObStorageObjectHandle write_object_handle;
|
|
ASSERT_EQ(OB_SUCCESS, write_object_handle.set_macro_block_id(macro_id));
|
|
|
|
write_info_.ls_epoch_id_ = ls_epoch_id;
|
|
|
|
ObSSLocalCacheWriter local_cache_writer;
|
|
ASSERT_EQ(OB_SUCCESS, local_cache_writer.aio_write(write_info_, write_object_handle));
|
|
ASSERT_EQ(OB_SUCCESS, write_object_handle.wait());
|
|
write_object_handle.reset();
|
|
|
|
// 2. read and compare the read data with the written data
|
|
read_info_.macro_block_id_ = macro_id;
|
|
read_info_.ls_epoch_id_ = ls_epoch_id;
|
|
ObStorageObjectHandle read_object_handle;
|
|
ObSSLocalCacheReader local_cache_reader;
|
|
ASSERT_EQ(OB_SUCCESS, local_cache_reader.aio_read(read_info_, read_object_handle));
|
|
ASSERT_EQ(OB_SUCCESS, read_object_handle.wait());
|
|
ASSERT_NE(nullptr, read_object_handle.get_buffer());
|
|
ASSERT_EQ(read_info_.size_, read_object_handle.get_data_size());
|
|
ASSERT_EQ(0, memcmp(write_buf_, read_object_handle.get_buffer(), WRITE_IO_SIZE));
|
|
read_object_handle.reset();
|
|
|
|
// 3. overwrite
|
|
char overwrite_buf[WRITE_IO_SIZE];
|
|
for (int64_t i = 0; i < WRITE_IO_SIZE; ++i) {
|
|
overwrite_buf[i] = write_buf_[i] + 1;
|
|
}
|
|
write_info_.buffer_ = overwrite_buf;
|
|
write_object_handle.set_macro_block_id(macro_id);
|
|
ASSERT_EQ(OB_SUCCESS, local_cache_writer.aio_write(write_info_, write_object_handle));
|
|
ASSERT_EQ(OB_SUCCESS, write_object_handle.wait());
|
|
write_object_handle.reset();
|
|
|
|
// 4. read again, and compare the read data with the overwriten data
|
|
ASSERT_EQ(OB_SUCCESS, local_cache_reader.aio_read(read_info_, read_object_handle));
|
|
ASSERT_EQ(OB_SUCCESS, read_object_handle.wait());
|
|
ASSERT_NE(nullptr, read_object_handle.get_buffer());
|
|
ASSERT_EQ(read_info_.size_, read_object_handle.get_data_size());
|
|
ASSERT_EQ(0, memcmp(overwrite_buf, read_object_handle.get_buffer(), WRITE_IO_SIZE));
|
|
ASSERT_NE(0, memcmp(write_buf_, read_object_handle.get_buffer(), WRITE_IO_SIZE));
|
|
read_object_handle.reset();
|
|
}
|
|
|
|
TEST_F(TestSSReaderWriter, read_when_disable_micro_cache)
|
|
{
|
|
int ret = OB_SUCCESS;
|
|
|
|
uint64_t tablet_id = 200001;
|
|
uint64_t data_seq = 15;
|
|
|
|
// 1. write
|
|
MacroBlockId macro_id;
|
|
macro_id.set_id_mode((uint64_t)ObMacroBlockIdMode::ID_MODE_SHARE);
|
|
macro_id.set_storage_object_type((uint64_t)ObStorageObjectType::SHARED_MAJOR_DATA_MACRO);
|
|
macro_id.set_second_id(tablet_id); // tablet_id
|
|
macro_id.set_third_id(data_seq); // data_seq
|
|
ASSERT_TRUE(macro_id.is_valid());
|
|
ObStorageObjectHandle write_object_handle;
|
|
ASSERT_EQ(OB_SUCCESS, write_object_handle.set_macro_block_id(macro_id));
|
|
|
|
ObSSShareMacroWriter share_macro_writer;
|
|
ASSERT_EQ(OB_SUCCESS, share_macro_writer.aio_write(write_info_, write_object_handle));
|
|
ASSERT_EQ(OB_SUCCESS, write_object_handle.wait());
|
|
|
|
|
|
// 2. disable micro cache, and then read <1, WRITE_IO_SIZE / 2>
|
|
// expect bypass micro cache and read data from object storage successfully
|
|
ObSSMicroCache *micro_cache = MTL(ObSSMicroCache *);
|
|
ASSERT_NE(nullptr, micro_cache);
|
|
micro_cache->disable_cache();
|
|
|
|
ObLogicMicroBlockId logic_micro_id;
|
|
logic_micro_id.version_ = ObLogicMicroBlockId::LOGIC_MICRO_ID_VERSION_V1;
|
|
logic_micro_id.offset_ = 100;
|
|
logic_micro_id.logic_macro_id_.data_seq_.macro_data_seq_ = 1;
|
|
logic_micro_id.logic_macro_id_.logic_version_ = 100;
|
|
logic_micro_id.logic_macro_id_.tablet_id_ = tablet_id;
|
|
|
|
read_info_.macro_block_id_ = macro_id;
|
|
read_info_.offset_ = 1;
|
|
read_info_.size_ = WRITE_IO_SIZE / 2;
|
|
read_info_.logic_micro_id_ = logic_micro_id;
|
|
read_info_.micro_crc_ = 100;
|
|
ObStorageObjectHandle object_handle;
|
|
ObSSShareMacroReader share_macro_reader;
|
|
ASSERT_EQ(OB_SUCCESS, share_macro_reader.aio_read(read_info_, object_handle));
|
|
ASSERT_EQ(OB_SUCCESS, object_handle.wait());
|
|
ASSERT_NE(nullptr, object_handle.get_buffer());
|
|
ASSERT_EQ(read_info_.size_, object_handle.get_data_size());
|
|
ASSERT_EQ(0, memcmp(write_buf_ + read_info_.offset_, object_handle.get_buffer(), read_info_.size_));
|
|
object_handle.reset();
|
|
}
|
|
|
|
static void get_random_io_info(ObIOInfo &io_info)
|
|
{
|
|
io_info.tenant_id_ = MTL_ID();
|
|
io_info.fd_.first_id_ = ObIOFd::NORMAL_FILE_ID; // first_id is not used in shared storage mode
|
|
io_info.fd_.second_id_ = MTL(ObTenantFileManager*)->get_micro_cache_file_fd();
|
|
io_info.fd_.device_handle_ = &LOCAL_DEVICE_INSTANCE;
|
|
io_info.flag_.set_wait_event(ObWaitEventIds::DB_FILE_COMPACT_READ);
|
|
io_info.timeout_us_ = DEFAULT_IO_WAIT_TIME_US;
|
|
io_info.offset_ = ObRandom::rand(1, 1000L * 1000L);
|
|
io_info.size_ = ObRandom::rand(1, 1000L);
|
|
io_info.flag_.set_read();
|
|
}
|
|
|
|
TEST_F(TestSSReaderWriter, IOFaultDetector)
|
|
{
|
|
ObIOFaultDetector &detector = OB_IO_MANAGER.get_device_health_detector();
|
|
ObIOConfig &io_config = (ObIOConfig &)detector.io_config_;
|
|
|
|
// test get device health
|
|
ObDeviceHealthStatus dhs = DEVICE_HEALTH_NORMAL;
|
|
int64_t disk_abnormal_time = 0;
|
|
ASSERT_EQ(OB_SUCCESS, detector.get_device_health_status(dhs, disk_abnormal_time));
|
|
ASSERT_TRUE(DEVICE_HEALTH_NORMAL == dhs);
|
|
ASSERT_TRUE(0 == disk_abnormal_time);
|
|
|
|
// test read failure detection
|
|
ObIOInfo io_info;
|
|
get_random_io_info(io_info);
|
|
char read_buf[1000] = { 0 };
|
|
io_info.user_data_buf_ = read_buf;
|
|
ObSSPhysicalBlock phy_block;
|
|
ObSSPhysicalBlockHandle phy_block_handle;
|
|
phy_block_handle.set_ptr(&phy_block);
|
|
ASSERT_EQ(OB_SUCCESS, io_info.phy_block_handle_.assign(phy_block_handle));
|
|
|
|
ObIOResult result;
|
|
ObIORequest req;
|
|
req.inc_ref();
|
|
result.inc_ref();
|
|
ASSERT_EQ(OB_SUCCESS, result.basic_init());
|
|
ASSERT_EQ(OB_SUCCESS, result.init(io_info));
|
|
ASSERT_EQ(OB_SUCCESS, req.init(io_info, &result));
|
|
|
|
detector.reset_device_health();
|
|
ASSERT_EQ(OB_SUCCESS, detector.get_device_health_status(dhs, disk_abnormal_time));
|
|
ASSERT_TRUE(DEVICE_HEALTH_NORMAL == dhs);
|
|
ASSERT_TRUE(0 == disk_abnormal_time);
|
|
result.flag_.set_mode(ObIOMode::READ);
|
|
io_config.data_storage_warning_tolerance_time_ = 1000L * 1000L;
|
|
io_config.data_storage_error_tolerance_time_ = 3000L * 1000L;
|
|
|
|
detector.record_io_error(result, req);
|
|
usleep(2000L * 1000L);
|
|
ASSERT_FALSE(detector.is_device_warning_);
|
|
ASSERT_FALSE(detector.is_device_error_);
|
|
|
|
detector.record_io_timeout(result, req);
|
|
usleep(2000L * 1000L);
|
|
ASSERT_FALSE(detector.is_device_warning_);
|
|
ASSERT_FALSE(detector.is_device_error_);
|
|
|
|
// test auto clean device warning, but not clean device error
|
|
detector.reset_device_health();
|
|
ASSERT_EQ(OB_SUCCESS, detector.get_device_health_status(dhs, disk_abnormal_time));
|
|
ASSERT_TRUE(DEVICE_HEALTH_NORMAL == dhs);
|
|
ASSERT_TRUE(0 == disk_abnormal_time);
|
|
io_config.read_failure_black_list_interval_ = 1000L * 100L; // 100ms
|
|
detector.set_device_warning();
|
|
ASSERT_EQ(OB_SUCCESS, detector.get_device_health_status(dhs, disk_abnormal_time));
|
|
ASSERT_TRUE(DEVICE_HEALTH_WARNING == dhs);
|
|
ASSERT_TRUE(disk_abnormal_time > 0);
|
|
usleep(io_config.read_failure_black_list_interval_ * 2);
|
|
ASSERT_EQ(OB_SUCCESS, detector.get_device_health_status(dhs, disk_abnormal_time));
|
|
ASSERT_TRUE(DEVICE_HEALTH_NORMAL == dhs);
|
|
ASSERT_TRUE(0 == disk_abnormal_time);
|
|
detector.set_device_error();
|
|
usleep(io_config.read_failure_black_list_interval_ * 2);
|
|
ASSERT_EQ(OB_SUCCESS, detector.get_device_health_status(dhs, disk_abnormal_time));
|
|
ASSERT_TRUE(DEVICE_HEALTH_ERROR == dhs);
|
|
ASSERT_TRUE(disk_abnormal_time > 0);
|
|
}
|
|
|
|
} // namespace storage
|
|
} // namespace oceanbase
|
|
|
|
int main(int argc, char **argv)
|
|
{
|
|
int ret = 0;
|
|
system("rm -f ./test_ss_reader_writer.log*");
|
|
OB_LOGGER.set_file_name("test_ss_reader_writer.log", true);
|
|
OB_LOGGER.set_log_level("INFO");
|
|
testing::InitGoogleTest(&argc, argv);
|
|
return RUN_ALL_TESTS();
|
|
}
|