/** * Copyright (c) 2021 OceanBase * OceanBase CE is licensed under Mulan PubL v2. * You can use this software according to the terms and conditions of the Mulan PubL v2. * You may obtain a copy of Mulan PubL v2 at: * http://license.coscl.org.cn/MulanPubL-2.0 * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, * EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, * MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE. * See the Mulan PubL v2 for more details. */ #define USING_LOG_PREFIX STORAGETEST #define protected public #define private public #include "test_ss_common_util.h" #include "mittest/mtlenv/mock_tenant_module_env.h" #include "mittest/shared_storage/clean_residual_data.h" #undef private #undef protected namespace oceanbase { namespace storage { using namespace oceanbase::blocksstable; using namespace oceanbase::common; using namespace oceanbase::storage; class TestSSReaderWriter : public ::testing::Test { public: class TestIOCallback : public ObIOCallback { public: TestIOCallback() : ObIOCallback(ObIOCallbackType::TEST_CALLBACK), number_(nullptr), allocator_(nullptr), help_buf_(nullptr) { } virtual ~TestIOCallback() { if (nullptr != number_) { *number_ -= 90; number_ = nullptr; } if (nullptr != allocator_) { if (nullptr != help_buf_) { allocator_->free(help_buf_); help_buf_ = nullptr; } allocator_->free(this); LOG_INFO("success reset callback when out_rec_cnt = 0"); } } virtual const char *get_data() override { return (char *)help_buf_; } virtual int64_t size() const override { return sizeof(TestIOCallback); } virtual int alloc_data_buf(const char *io_data_buffer, const int64_t data_size) override { int ret = OB_SUCCESS; if (OB_ISNULL(allocator_)) { ret = OB_INVALID_DATA; LOG_WARN("Invalid data, the allocator is NULL, ", K(ret)); } else if (OB_UNLIKELY(data_size <= 0)) { ret = OB_INVALID_ARGUMENT; STORAGE_LOG(WARN, "invalid data buffer size", K(ret), K(data_size)); } else if (OB_ISNULL(help_buf_ = static_cast(allocator_->alloc(data_size)))) { ret = OB_ALLOCATE_MEMORY_FAILED; LOG_WARN("Failed to allocate help buf", K(ret), K(data_size), KP(help_buf_)); } else { memset(help_buf_, 0, data_size); MEMCPY(help_buf_, io_data_buffer, data_size); } return OB_SUCCESS; } virtual int inner_process(const char *data_buffer, const int64_t size) override { int ret = OB_SUCCESS; if (OB_ISNULL(allocator_)) { // for test, ignore } else if (OB_FAIL(alloc_data_buf(data_buffer, size))) { LOG_WARN("Fail to allocate memory, ", K(ret), K(size)); } if (nullptr != number_) { *number_ += 100; } return OB_SUCCESS; } virtual ObIAllocator *get_allocator() override { return allocator_; } TO_STRING_KV(KP(number_), KP(allocator_), KP(help_buf_)); public: int64_t *number_; ObIAllocator *allocator_; char *help_buf_; }; TestSSReaderWriter() : write_info_(), read_info_(), write_buf_(), read_buf_() {} virtual ~TestSSReaderWriter() = default; static void SetUpTestCase(); static void TearDownTestCase(); virtual void SetUp(); virtual void TearDown(); void exhaust_tmp_file_write_free_disk_size(int64_t &avail_size); void alloc_tmp_file_write_disk_size(const int64_t disk_size); void release_tmp_file_write_free_disk_size(const int64_t avail_size); void write_tmp_file_data(const MacroBlockId ¯o_id, const int64_t offset, const int64_t size, const int64_t valid_length, const bool is_sealed, const char *buffer); void read_and_compare_tmp_file_data(const MacroBlockId ¯o_id, const int64_t offset, const int64_t size); void check_tmp_file_seg_meta(const MacroBlockId ¯o_id, const bool is_meta_exist, const bool is_in_local = false, const int64_t valid_length = 0); public: static const int64_t WRITE_IO_SIZE = DIO_READ_ALIGN_SIZE * 768; // 3MB ObStorageObjectWriteInfo write_info_; ObStorageObjectReadInfo read_info_; char write_buf_[WRITE_IO_SIZE]; char read_buf_[WRITE_IO_SIZE]; }; void TestSSReaderWriter::SetUpTestCase() { GCTX.startup_mode_ = observer::ObServerMode::SHARED_STORAGE_MODE; EXPECT_EQ(OB_SUCCESS, MockTenantModuleEnv::get_instance().init()); MTL(tmp_file::ObTenantTmpFileManager *)->stop(); MTL(tmp_file::ObTenantTmpFileManager *)->wait(); MTL(tmp_file::ObTenantTmpFileManager *)->destroy(); } void TestSSReaderWriter::TearDownTestCase() { int ret = OB_SUCCESS; if (OB_FAIL(ResidualDataCleanerHelper::clean_in_mock_env())) { LOG_WARN("failed to clean residual data", KR(ret)); } MockTenantModuleEnv::get_instance().destroy(); } void TestSSReaderWriter::SetUp() { // construct write info write_buf_[0] = '\0'; const int64_t mid_offset = WRITE_IO_SIZE / 2; memset(write_buf_, 'a', mid_offset); memset(write_buf_ + mid_offset, 'b', WRITE_IO_SIZE - mid_offset); write_info_.io_desc_.set_wait_event(1); write_info_.buffer_ = write_buf_; write_info_.offset_ = 0; write_info_.size_ = WRITE_IO_SIZE; write_info_.io_timeout_ms_ = DEFAULT_IO_WAIT_TIME_MS; write_info_.mtl_tenant_id_ = MTL_ID(); // construct read info read_buf_[0] = '\0'; read_info_.io_desc_.set_wait_event(1); read_info_.buf_ = read_buf_; read_info_.offset_ = 0; read_info_.size_ = WRITE_IO_SIZE; read_info_.io_timeout_ms_ = DEFAULT_IO_WAIT_TIME_MS; read_info_.mtl_tenant_id_ = MTL_ID(); } void TestSSReaderWriter::TearDown() { write_buf_[0] = '\0'; read_buf_[0] = '\0'; } void TestSSReaderWriter::exhaust_tmp_file_write_free_disk_size(int64_t &avail_size) { static int64_t call_times = 0; call_times++; ObTenantDiskSpaceManager *disk_space_manager = MTL(ObTenantDiskSpaceManager *); ASSERT_NE(nullptr, disk_space_manager) << "call_times: " << call_times; avail_size = disk_space_manager->get_tmp_file_write_free_disk_size(); ASSERT_EQ(OB_SUCCESS, disk_space_manager->alloc_file_size(avail_size, ObStorageObjectType::TMP_FILE, false/*is_tmp_file_read_cache*/)) << "call_times: " << call_times; ASSERT_EQ(0, disk_space_manager->get_tmp_file_write_free_disk_size()) << "call_times: " << call_times; ASSERT_EQ(OB_SERVER_OUTOF_DISK_SPACE, disk_space_manager->alloc_file_size(8192, ObStorageObjectType::TMP_FILE, false/*is_tmp_file_read_cache*/)) << "call_times: " << call_times; } void TestSSReaderWriter::alloc_tmp_file_write_disk_size(const int64_t disk_size) { static int64_t call_times = 0; call_times++; ObTenantDiskSpaceManager *disk_space_manager = MTL(ObTenantDiskSpaceManager *); ASSERT_NE(nullptr, disk_space_manager) << "call_times: " << call_times; ASSERT_EQ(OB_SUCCESS, disk_space_manager->alloc_file_size(disk_size, ObStorageObjectType::TMP_FILE, false/*is_tmp_file_read_cache*/)) << "call_times: " << call_times; ASSERT_EQ(0, disk_space_manager->get_tmp_file_write_free_disk_size()) << "call_times: " << call_times; ASSERT_EQ(OB_SERVER_OUTOF_DISK_SPACE, disk_space_manager->alloc_file_size(8192, ObStorageObjectType::TMP_FILE, false/*is_tmp_file_read_cache*/)) << "call_times: " << call_times; } void TestSSReaderWriter::release_tmp_file_write_free_disk_size(const int64_t avail_size) { static int64_t call_times = 0; call_times++; ObTenantDiskSpaceManager *disk_space_manager = MTL(ObTenantDiskSpaceManager *); ASSERT_NE(nullptr, disk_space_manager) << "call_times: " << call_times; ASSERT_EQ(OB_SUCCESS, disk_space_manager->free_file_size(avail_size, ObStorageObjectType::TMP_FILE, false/*is_tmp_file_read_cache*/)) << "call_times: " << call_times; } void TestSSReaderWriter::write_tmp_file_data( const MacroBlockId ¯o_id, const int64_t offset, const int64_t size, const int64_t valid_length, const bool is_sealed, const char *buffer) { static int64_t call_times = 0; call_times++; ObStorageObjectHandle write_object_handle; ASSERT_TRUE(macro_id.is_valid()) << "call_times: " << call_times; ASSERT_EQ(OB_SUCCESS, write_object_handle.set_macro_block_id(macro_id)) << "call_times: " << call_times; write_info_.offset_ = offset; write_info_.size_ = size; write_info_.tmp_file_valid_length_ = valid_length; if (is_sealed) { write_info_.io_desc_.set_sealed(); } else { write_info_.io_desc_.set_unsealed(); } write_info_.buffer_ = buffer; ObTenantFileManager *file_manager = MTL(ObTenantFileManager *); ASSERT_NE(nullptr, file_manager) << "call_times: " << call_times; ASSERT_EQ(OB_SUCCESS, file_manager->async_append_file(write_info_, write_object_handle)) << "call_times: " << call_times; ASSERT_EQ(OB_SUCCESS, write_object_handle.wait()) << "call_times: " << call_times; } void TestSSReaderWriter::read_and_compare_tmp_file_data( const MacroBlockId ¯o_id, const int64_t offset, const int64_t size) { static int64_t call_times = 0; call_times++; ObStorageObjectHandle read_object_handle; ObSSTmpFileReader tmp_file_reader; read_info_.macro_block_id_ = macro_id; read_info_.offset_ = offset; read_info_.size_ = size; ASSERT_EQ(OB_SUCCESS, tmp_file_reader.aio_read(read_info_, read_object_handle)) << "call_times: " << call_times; ASSERT_EQ(OB_SUCCESS, read_object_handle.wait()) << "call_times: " << call_times; ASSERT_NE(nullptr, read_object_handle.get_buffer()) << "call_times: " << call_times; ASSERT_EQ(read_info_.size_, read_object_handle.get_data_size()) << "call_times: " << call_times; ASSERT_EQ(0, memcmp(write_buf_ + offset, read_object_handle.get_buffer(), size)) << "call_times: " << call_times; memset(read_buf_, 0, WRITE_IO_SIZE); } void TestSSReaderWriter::check_tmp_file_seg_meta( const MacroBlockId ¯o_id, const bool is_meta_exist, const bool is_in_local, const int64_t valid_length) { static int64_t call_times = 0; call_times++; TmpFileSegId seg_id(macro_id.second_id(), macro_id.third_id()); TmpFileMetaHandle meta_handle; bool exist = false; ObTenantFileManager *file_manager = MTL(ObTenantFileManager *); ASSERT_NE(nullptr, file_manager) << "call_times: " << call_times; ASSERT_EQ(OB_SUCCESS, file_manager->get_segment_file_mgr().try_get_seg_meta(seg_id, meta_handle, exist)) << "call_times: " << call_times; ASSERT_EQ(is_meta_exist, exist) << "call_times: " << call_times; if (is_meta_exist) { ASSERT_TRUE(meta_handle.is_valid()); ASSERT_TRUE(meta_handle.get_tmpfile_meta()->is_valid()); ASSERT_EQ(is_in_local, meta_handle.is_in_local()) << "call_times: " << call_times; ASSERT_EQ(valid_length, meta_handle.get_valid_length()) << "call_times: " << call_times; } } TEST_F(TestSSReaderWriter, object_storage_reader_writer) { int ret = OB_SUCCESS; uint64_t tablet_id = 500; // 1. write MacroBlockId macro_id; macro_id.set_id_mode((uint64_t)ObMacroBlockIdMode::ID_MODE_SHARE); macro_id.set_storage_object_type((uint64_t)ObStorageObjectType::SHARED_MAJOR_TABLET_META); macro_id.set_second_id(tablet_id); // tablet_id macro_id.set_third_id(1000000); // meta_version ASSERT_TRUE(macro_id.is_valid()); ObStorageObjectHandle write_object_handle; ASSERT_EQ(OB_SUCCESS, write_object_handle.set_macro_block_id(macro_id)); ObSSObjectStorageWriter object_storage_writer; ASSERT_EQ(OB_SUCCESS, object_storage_writer.aio_write(write_info_, write_object_handle)); ASSERT_EQ(OB_SUCCESS, write_object_handle.wait()); // 2. read and compare the read data with the written data ObArenaAllocator allocator; int64_t tmp_number = 0; TestIOCallback callback; callback.number_ = &tmp_number; callback.allocator_ = &allocator; read_info_.io_callback_ = &callback; read_info_.buf_ = nullptr; read_info_.macro_block_id_ = macro_id; ObStorageObjectHandle read_object_handle; ObSSObjectStorageReader object_storage_reader; ASSERT_EQ(OB_SUCCESS, object_storage_reader.aio_read(read_info_, read_object_handle)); ASSERT_EQ(OB_SUCCESS, read_object_handle.wait()); ASSERT_NE(nullptr, read_object_handle.get_buffer()); ASSERT_EQ(read_info_.size_, read_object_handle.get_data_size()); ASSERT_EQ(0, memcmp(write_buf_, read_object_handle.get_buffer(), WRITE_IO_SIZE)); ASSERT_EQ(100, tmp_number); // callback process called } TEST_F(TestSSReaderWriter, local_cache_reader_writer) { int ret = OB_SUCCESS; uint64_t ls_id = 1; uint64_t ls_epoch_id = 1; uint64_t tablet_id = 800; ASSERT_EQ(OB_SUCCESS, OB_DIR_MGR.create_ls_id_dir(MTL_ID(), MTL_EPOCH_ID(), ls_id, ls_epoch_id)); ASSERT_EQ(OB_SUCCESS, OB_DIR_MGR.create_tablet_meta_tablet_id_dir(MTL_ID(), MTL_EPOCH_ID(), ls_id, ls_epoch_id, tablet_id)); // 1. write MacroBlockId macro_id; macro_id.set_id_mode((uint64_t)ObMacroBlockIdMode::ID_MODE_SHARE); macro_id.set_storage_object_type((uint64_t)ObStorageObjectType::PRIVATE_TABLET_META); macro_id.set_second_id(ls_id); // ls_id macro_id.set_third_id(tablet_id); // tablet_id macro_id.set_macro_transfer_seq(0); // transfer_seq macro_id.set_tenant_seq(2000000); //tenant_seq ASSERT_TRUE(macro_id.is_valid()); ObStorageObjectHandle write_object_handle; ASSERT_EQ(OB_SUCCESS, write_object_handle.set_macro_block_id(macro_id)); write_info_.ls_epoch_id_ = ls_epoch_id; ObSSLocalCacheWriter local_cache_writer; ASSERT_EQ(OB_SUCCESS, local_cache_writer.aio_write(write_info_, write_object_handle)); ASSERT_EQ(OB_SUCCESS, write_object_handle.wait()); // 2. read and compare the read data with the written data read_info_.macro_block_id_ = macro_id; read_info_.ls_epoch_id_ = ls_epoch_id; ObStorageObjectHandle read_object_handle; ObSSLocalCacheReader local_cache_reader; ASSERT_EQ(OB_SUCCESS, local_cache_reader.aio_read(read_info_, read_object_handle)); ASSERT_EQ(OB_SUCCESS, read_object_handle.wait()); ASSERT_NE(nullptr, read_object_handle.get_buffer()); ASSERT_EQ(read_info_.size_, read_object_handle.get_data_size()); ASSERT_EQ(0, memcmp(write_buf_, read_object_handle.get_buffer(), WRITE_IO_SIZE)); read_object_handle.reset(); // 3. try read DATA_OUT_OF_RANGE, expect io layer atomically adjust read size and read successfully read_info_.macro_block_id_ = macro_id; read_info_.ls_epoch_id_ = ls_epoch_id; read_info_.offset_ = 10000; read_info_.size_ = WRITE_IO_SIZE; ASSERT_EQ(OB_SUCCESS, local_cache_reader.aio_read(read_info_, read_object_handle)); ASSERT_EQ(OB_SUCCESS, read_object_handle.wait()); ASSERT_NE(nullptr, read_object_handle.get_buffer()); ASSERT_EQ(read_info_.size_ - read_info_.offset_, read_object_handle.get_data_size()); ASSERT_EQ(0, memcmp(write_buf_ + read_info_.offset_, read_object_handle.get_buffer(), read_info_.size_ - read_info_.offset_)); read_object_handle.reset(); // 4. try read offset exceeds acutal file length, expect OB_DATA_OUT_OF_RANGE errno read_info_.macro_block_id_ = macro_id; read_info_.ls_epoch_id_ = ls_epoch_id; read_info_.offset_ = WRITE_IO_SIZE; read_info_.size_ = 1; ASSERT_EQ(OB_DATA_OUT_OF_RANGE, local_cache_reader.aio_read(read_info_, read_object_handle)); read_object_handle.reset(); } TEST_F(TestSSReaderWriter, private_macro_reader_writer) { int ret = OB_SUCCESS; uint64_t tablet_id = 100; uint64_t server_id = 1; ASSERT_EQ(OB_SUCCESS, OB_DIR_MGR.create_tablet_data_tablet_id_transfer_seq_dir(MTL_ID(), MTL_EPOCH_ID(), tablet_id, 0/*trasfer_seq*/)); // 1. write MacroBlockId macro_id; macro_id.set_id_mode((uint64_t)ObMacroBlockIdMode::ID_MODE_SHARE); macro_id.set_storage_object_type((uint64_t)ObStorageObjectType::PRIVATE_DATA_MACRO); macro_id.set_second_id(tablet_id); // tablet_id macro_id.set_third_id(100); // seq_id macro_id.set_macro_transfer_seq(0); // transfer_seq macro_id.set_tenant_seq(server_id); //tenant_seq ASSERT_TRUE(macro_id.is_valid()); ObStorageObjectHandle write_object_handle; ASSERT_EQ(OB_SUCCESS, write_object_handle.set_macro_block_id(macro_id)); ObSSPrivateMacroWriter private_macro_writer; ASSERT_EQ(OB_SUCCESS, private_macro_writer.aio_write(write_info_, write_object_handle)); ASSERT_EQ(OB_SUCCESS, write_object_handle.wait()); // 2. read and compare the read data with the written data read_info_.macro_block_id_ = macro_id; read_info_.offset_ = 1; read_info_.size_ = WRITE_IO_SIZE / 2; ObStorageObjectHandle read_object_handle; ObSSPrivateMacroReader private_macro_reader; ASSERT_EQ(OB_SUCCESS, private_macro_reader.aio_read(read_info_, read_object_handle)); ASSERT_EQ(OB_SUCCESS, read_object_handle.wait()); ASSERT_NE(nullptr, read_object_handle.get_buffer()); ASSERT_EQ(read_info_.size_, read_object_handle.get_data_size()); ASSERT_EQ(0, memcmp(write_buf_ + read_info_.offset_, read_object_handle.get_buffer(), read_info_.size_)); read_object_handle.reset(); } TEST_F(TestSSReaderWriter, share_macro_reader_writer) { int ret = OB_SUCCESS; uint64_t tablet_id = 200; uint64_t data_seq = 15; // 1. write MacroBlockId macro_id; macro_id.set_id_mode((uint64_t)ObMacroBlockIdMode::ID_MODE_SHARE); macro_id.set_storage_object_type((uint64_t)ObStorageObjectType::SHARED_MAJOR_DATA_MACRO); macro_id.set_second_id(tablet_id); // tablet_id macro_id.set_third_id(data_seq); // data_seq ASSERT_TRUE(macro_id.is_valid()); ObStorageObjectHandle write_object_handle; ASSERT_EQ(OB_SUCCESS, write_object_handle.set_macro_block_id(macro_id)); ObSSShareMacroWriter share_macro_writer; ASSERT_EQ(OB_SUCCESS, share_macro_writer.aio_write(write_info_, write_object_handle)); ASSERT_EQ(OB_SUCCESS, write_object_handle.wait()); ObLogicMicroBlockId logic_micro_id_1; logic_micro_id_1.version_ = ObLogicMicroBlockId::LOGIC_MICRO_ID_VERSION_V1; logic_micro_id_1.offset_ = 100; logic_micro_id_1.logic_macro_id_.data_seq_.macro_data_seq_ = 1; logic_micro_id_1.logic_macro_id_.logic_version_ = 100; logic_micro_id_1.logic_macro_id_.tablet_id_ = tablet_id; ObLogicMicroBlockId logic_micro_id_2; logic_micro_id_2 = logic_micro_id_1; logic_micro_id_2.offset_ = 1; // 2. read <1, WRITE_IO_SIZE / 2>, expect cache miss and load cache read_info_.macro_block_id_ = macro_id; read_info_.offset_ = 1; read_info_.size_ = WRITE_IO_SIZE / 2; read_info_.logic_micro_id_ = logic_micro_id_1; read_info_.micro_crc_ = 100; ObStorageObjectHandle cache_miss_read_object_handle; ObSSShareMacroReader share_macro_reader; ASSERT_EQ(OB_SUCCESS, share_macro_reader.aio_read(read_info_, cache_miss_read_object_handle)); ASSERT_EQ(OB_SUCCESS, cache_miss_read_object_handle.wait()); ASSERT_NE(nullptr, cache_miss_read_object_handle.get_buffer()); ASSERT_EQ(read_info_.size_, cache_miss_read_object_handle.get_data_size()); ASSERT_EQ(0, memcmp(write_buf_ + read_info_.offset_, cache_miss_read_object_handle.get_buffer(), read_info_.size_)); cache_miss_read_object_handle.reset(); // 3. read <1, WRITE_IO_SIZE / 2>, expect hit memory read_info_.macro_block_id_ = macro_id; read_info_.offset_ = 1; read_info_.size_ = WRITE_IO_SIZE / 2; read_info_.logic_micro_id_ = logic_micro_id_1; read_info_.micro_crc_ = 100; ObStorageObjectHandle hit_memory_read_object_handle; ASSERT_EQ(OB_SUCCESS, share_macro_reader.aio_read(read_info_, hit_memory_read_object_handle)); ASSERT_EQ(OB_SUCCESS, hit_memory_read_object_handle.wait()); ASSERT_NE(nullptr, hit_memory_read_object_handle.get_buffer()); ASSERT_EQ(read_info_.size_, hit_memory_read_object_handle.get_data_size()); ASSERT_EQ(0, memcmp(write_buf_ + read_info_.offset_, hit_memory_read_object_handle.get_buffer(), read_info_.size_)); hit_memory_read_object_handle.reset(); // 4. read , expect cache miss and load cache read_info_.macro_block_id_ = macro_id; read_info_.offset_ = WRITE_IO_SIZE / 2; read_info_.size_ = WRITE_IO_SIZE / 2; read_info_.logic_micro_id_ = logic_micro_id_2; read_info_.micro_crc_ = 200; ASSERT_EQ(OB_SUCCESS, share_macro_reader.aio_read(read_info_, cache_miss_read_object_handle)); ASSERT_EQ(OB_SUCCESS, cache_miss_read_object_handle.wait()); ASSERT_NE(nullptr, cache_miss_read_object_handle.get_buffer()); ASSERT_EQ(read_info_.size_, cache_miss_read_object_handle.get_data_size()); ASSERT_EQ(0, memcmp(write_buf_ + read_info_.offset_, cache_miss_read_object_handle.get_buffer(), read_info_.size_)); cache_miss_read_object_handle.reset(); // 5. wait <1, WRITE_IO_SIZE / 2> flush from memory to disk ASSERT_EQ(OB_SUCCESS, TestSSCommonUtil::wait_for_persist_task()); // 6. read <1, WRITE_IO_SIZE / 2>, expect hit disk read_info_.macro_block_id_ = macro_id; read_info_.offset_ = 1; read_info_.size_ = WRITE_IO_SIZE / 2; read_info_.logic_micro_id_ = logic_micro_id_1; read_info_.micro_crc_ = 100; ObStorageObjectHandle hit_disk_read_object_handle; ASSERT_EQ(OB_SUCCESS, share_macro_reader.aio_read(read_info_, hit_disk_read_object_handle)); ASSERT_EQ(OB_SUCCESS, hit_disk_read_object_handle.wait()); ASSERT_NE(nullptr, hit_disk_read_object_handle.get_buffer()); ASSERT_EQ(read_info_.size_, hit_disk_read_object_handle.get_data_size()); ASSERT_EQ(0, memcmp(write_buf_ + read_info_.offset_, hit_disk_read_object_handle.get_buffer(), read_info_.size_)); hit_disk_read_object_handle.reset(); } TEST_F(TestSSReaderWriter, tmp_file_reader_writer) { int ret = OB_SUCCESS; ObTenantFileManager *file_manager = MTL(ObTenantFileManager *); ASSERT_NE(nullptr, file_manager); ObTenantDiskSpaceManager *disk_space_manager = MTL(ObTenantDiskSpaceManager *); ASSERT_NE(nullptr, disk_space_manager); // to avoid affecting tmp file seg_meta_map and tmp file tmp_file_write_free_disk_size // disable tmp_file_flush_task, preread_task_, calibrate_disk_space_task and gc_unsealed_tmp_file_task // preread_task_ will affect local disk size, so preread_task_ need to disble file_manager->preread_cache_mgr_.preread_task_.is_inited_ = false; file_manager->tmp_file_flush_task_.is_inited_ = false; file_manager->calibrate_disk_space_task_.is_inited_ = false; file_manager->segment_file_mgr_.gc_segment_file_task_.is_inited_ = false; sleep(3); uint64_t tmp_file_id = 100; ASSERT_EQ(OB_SUCCESS, OB_DIR_MGR.create_tmp_file_dir(MTL_ID(), MTL_EPOCH_ID(), tmp_file_id)); // 1.1 disk space enough, write local, write one new unsealed segment with offset = 0 ASSERT_LT(8192, disk_space_manager->get_tmp_file_write_free_disk_size()); MacroBlockId macro_id; macro_id.set_id_mode((uint64_t)ObMacroBlockIdMode::ID_MODE_SHARE); macro_id.set_storage_object_type((uint64_t)ObStorageObjectType::TMP_FILE); macro_id.set_second_id(tmp_file_id); // tmp_file_id macro_id.set_third_id(10); // segment_id write_tmp_file_data(macro_id, 0/*offset*/, 8192/*size*/, 8192/*valid_length*/, false/*is_sealed*/, write_buf_); read_and_compare_tmp_file_data(macro_id, 0/*offset*/, 8192/*size*/); check_tmp_file_seg_meta(macro_id, true/*is_meta_exist*/, true/*is_in_local*/, 8192/*valid_length*/); // 1.2 write another two unsealed local seg (seg_id: 11, 12), for append test case below for (int64_t i = 1; i <= 2; ++i) { ASSERT_LT(8192, disk_space_manager->get_tmp_file_write_free_disk_size()); macro_id.set_third_id(10 + i); // segment_id write_tmp_file_data(macro_id, 0/*offset*/, 8192/*size*/, 8192/*valid_length*/, false/*is_sealed*/, write_buf_); read_and_compare_tmp_file_data(macro_id, 0/*offset*/, 8192/*size*/); check_tmp_file_seg_meta(macro_id, true/*is_meta_exist*/, true/*is_in_local*/, 8192/*valid_length*/); } // 2. disk space enough, write local, write one new sealed segment with offset = 0 ASSERT_LT(8192, disk_space_manager->get_tmp_file_write_free_disk_size()); macro_id.set_third_id(20); // segment_id write_tmp_file_data(macro_id, 0/*offset*/, 8192/*size*/, 8192/*valid_length*/, true/*is_sealed*/, write_buf_); read_and_compare_tmp_file_data(macro_id, 0/*offset*/, 8192/*size*/); check_tmp_file_seg_meta(macro_id, true/*is_meta_exist*/, true/*is_in_local*/, 8192/*valid_length*/); // 3. write one new unsealed segment with offset > 0 macro_id.set_third_id(30); // segment_id ASSERT_TRUE(macro_id.is_valid()); write_tmp_file_data(macro_id, 8192/*offset*/, 8192/*size*/, 8192 * 2/*valid_length*/, false/*is_sealed*/, write_buf_); read_and_compare_tmp_file_data(macro_id, 8192/*offset*/, 8192/*size*/); check_tmp_file_seg_meta(macro_id, true/*is_meta_exist*/, true/*is_in_local*/, 8192 * 2/*valid_length*/); // 4.1 disk space not enough, write through, write one new unsealed segment with offset = 0 int64_t avail_size = 0; exhaust_tmp_file_write_free_disk_size(avail_size); macro_id.set_third_id(40); // segment_id write_tmp_file_data(macro_id, 0/*offset*/, 8192/*size*/, 8192/*valid_length*/, false/*is_sealed*/, write_buf_); read_and_compare_tmp_file_data(macro_id, 0/*offset*/, 8192/*size*/); check_tmp_file_seg_meta(macro_id, true/*is_meta_exist*/, false/*is_in_local*/, 8192/*valid_length*/); // 4.2 write another four unsealed remote seg (seg_id: 41, 42, 43, 44), for append test case below for (int64_t i = 1; i <= 4; ++i) { macro_id.set_third_id(40 + i); // segment_id write_tmp_file_data(macro_id, 0/*offset*/, 8192/*size*/, 8192/*valid_length*/, false/*is_sealed*/, write_buf_); read_and_compare_tmp_file_data(macro_id, 0/*offset*/, 8192/*size*/); check_tmp_file_seg_meta(macro_id, true/*is_meta_exist*/, false/*is_in_local*/, 8192/*valid_length*/); } // 5. disk space not enough, write through, write one new sealed segment with offset = 0 macro_id.set_third_id(50); // segment_id write_tmp_file_data(macro_id, 0/*offset*/, 8192/*size*/, 8192/*valid_length*/, true/*is_sealed*/, write_buf_); read_and_compare_tmp_file_data(macro_id, 0/*offset*/, 8192/*size*/); check_tmp_file_seg_meta(macro_id, false/*is_meta_exist*/); // 6. seg meta already exist, io.valid_len <= seg_meta.valid_len. no need write io and simulate io result macro_id.set_third_id(60); // segment_id write_tmp_file_data(macro_id, 0/*offset*/, 8192/*size*/, 8192/*valid_length*/, true/*is_sealed*/, write_buf_); // 7. local seg meta already exist, io.valid_len > seg_meta.valid_len. // disk space enough, write local, append unsealed segment release_tmp_file_write_free_disk_size(avail_size); ASSERT_LT(8192, disk_space_manager->get_tmp_file_write_free_disk_size()); macro_id.set_third_id(10); // segment_id write_tmp_file_data(macro_id, 8192/*offset*/, 8192/*size*/, 8192 * 2/*valid_length*/, false/*is_sealed*/, write_buf_ + 8192); read_and_compare_tmp_file_data(macro_id, 0/*offset*/, 8192 * 2/*size*/); check_tmp_file_seg_meta(macro_id, true/*is_meta_exist*/, true/*is_in_local*/, 8192 * 2/*valid_length*/); // 8. local seg meta already exist, io.valid_len > seg_meta.valid_len. // disk space enough, write local, append sealed segment ASSERT_LT(8192, disk_space_manager->get_tmp_file_write_free_disk_size()); macro_id.set_third_id(10); // segment_id write_tmp_file_data(macro_id, 8192 * 2/*offset*/, 8192/*size*/, 8192 * 3/*valid_length*/, true/*is_sealed*/, write_buf_ + (8192 * 2)); read_and_compare_tmp_file_data(macro_id, 0/*offset*/, 8192 * 3/*size*/); check_tmp_file_seg_meta(macro_id, true/*is_meta_exist*/, true/*is_in_local*/, 8192 * 3/*valid_length*/); // 9. local seg meta already exist, io.valid_len > seg_meta.valid_len. // disk space not enough, write through, append unsealed segment exhaust_tmp_file_write_free_disk_size(avail_size); macro_id.set_third_id(11); // segment_id write_tmp_file_data(macro_id, 8192/*offset*/, 8192/*size*/, 8192 * 2/*valid_length*/, false/*is_sealed*/, write_buf_ + 8192); read_and_compare_tmp_file_data(macro_id, 0/*offset*/, 8192 * 2/*size*/); check_tmp_file_seg_meta(macro_id, true/*is_meta_exist*/, false/*is_in_local*/, 8192 * 2/*valid_length*/); // 10. local seg meta already exist, io.valid_len > seg_meta.valid_len. // disk space not enough, write through, append sealed segment // because preread_task_ has been disabled, aio_read segment_id of No.10 will push segment_id of No.11 to queue, segment_id of No.11 is FAKE node // when deleting segment_id of No.11, it will free tmp_file_read_cache_size, there free_disk_size is 0 int64_t free_disk_size = disk_space_manager->get_tmp_file_write_free_disk_size(); if (free_disk_size > 0) { alloc_tmp_file_write_disk_size(free_disk_size); } macro_id.set_third_id(12); // segment_id write_tmp_file_data(macro_id, 8192/*offset*/, 8192/*size*/, 8192 * 2/*valid_length*/, true/*is_sealed*/, write_buf_ + 8192); read_and_compare_tmp_file_data(macro_id, 0/*offset*/, 8192 * 2/*size*/); check_tmp_file_seg_meta(macro_id, false/*is_meta_exist*/); // 11. remote seg meta already exist, io.valid_len > seg_meta.valid_len. // disk space enough, write local, append unsealed segment release_tmp_file_write_free_disk_size(avail_size + free_disk_size); ASSERT_LT(8192, disk_space_manager->get_tmp_file_write_free_disk_size()); macro_id.set_third_id(41); // segment_id write_tmp_file_data(macro_id, 8192/*offset*/, 8192/*size*/, 8192 * 2/*valid_length*/, false/*is_sealed*/, write_buf_ + 8192); read_and_compare_tmp_file_data(macro_id, 0/*offset*/, 8192 * 2/*size*/); check_tmp_file_seg_meta(macro_id, true/*is_meta_exist*/, true/*is_in_local*/, 8192 * 2/*valid_length*/); // 12. remote seg meta already exist, io.valid_len > seg_meta.valid_len. // disk space enough, append sealed segment, write remote ASSERT_LT(8192, disk_space_manager->get_tmp_file_write_free_disk_size()); macro_id.set_third_id(42); // segment_id write_tmp_file_data(macro_id, 8192/*offset*/, 8192/*size*/, 8192 * 2/*valid_length*/, true/*is_sealed*/, write_buf_ + 8192); read_and_compare_tmp_file_data(macro_id, 0/*offset*/, 8192 * 2/*size*/); check_tmp_file_seg_meta(macro_id, false/*is_meta_exist*/); // 13. remote seg meta already exist, io.valid_len > seg_meta.valid_len. // disk space not enough, write remote, append unsealed segment exhaust_tmp_file_write_free_disk_size(avail_size); macro_id.set_third_id(43); // segment_id write_tmp_file_data(macro_id, 8192/*offset*/, 8192/*size*/, 8192 * 2/*valid_length*/, false/*is_sealed*/, write_buf_ + 8192); read_and_compare_tmp_file_data(macro_id, 0/*offset*/, 8192 * 2/*size*/); check_tmp_file_seg_meta(macro_id, true/*is_meta_exist*/, false/*is_in_local*/, 8192 * 2/*valid_length*/); // 14. remote seg meta already exist, io.valid_len > seg_meta.valid_len. // disk space not enough, write remote, append sealed segment macro_id.set_third_id(44); // segment_id write_tmp_file_data(macro_id, 8192/*offset*/, 8192/*size*/, 8192 * 2/*valid_length*/, true/*is_sealed*/, write_buf_ + 8192); read_and_compare_tmp_file_data(macro_id, 0/*offset*/, 8192 * 2/*size*/); check_tmp_file_seg_meta(macro_id, false/*is_meta_exist*/); } TEST_F(TestSSReaderWriter, private_macro_write_less_read_more) { int ret = OB_SUCCESS; uint64_t tablet_id = 900; uint64_t server_id = 1; ASSERT_EQ(OB_SUCCESS, OB_DIR_MGR.create_tablet_data_tablet_id_transfer_seq_dir(MTL_ID(), MTL_EPOCH_ID(), tablet_id, 0/*trasfer_seq*/)); // 1. write 4KB MacroBlockId macro_id; macro_id.set_id_mode((uint64_t)ObMacroBlockIdMode::ID_MODE_SHARE); macro_id.set_storage_object_type((uint64_t)ObStorageObjectType::PRIVATE_DATA_MACRO); macro_id.set_second_id(tablet_id); // tablet_id macro_id.set_third_id(900); // seq_id macro_id.set_macro_transfer_seq(0); // transfer_seq macro_id.set_tenant_seq(server_id); //tenant_seq ASSERT_TRUE(macro_id.is_valid()); ObStorageObjectHandle write_object_handle; ASSERT_EQ(OB_SUCCESS, write_object_handle.set_macro_block_id(macro_id)); ObSSPrivateMacroWriter private_macro_writer; write_info_.offset_ = 0; write_info_.size_ = DIO_READ_ALIGN_SIZE; // 4KB ASSERT_EQ(OB_SUCCESS, private_macro_writer.aio_write(write_info_, write_object_handle)); ASSERT_EQ(OB_SUCCESS, write_object_handle.wait()); // 2. try read 2MB, expect real read 4KB read_info_.macro_block_id_ = macro_id; ObStorageObjectHandle read_object_handle; ObSSPrivateMacroReader private_macro_reader; read_info_.offset_ = 0; read_info_.size_ = OB_STORAGE_OBJECT_MGR.get_macro_block_size(); // 2MB ASSERT_EQ(OB_SUCCESS, private_macro_reader.aio_read(read_info_, read_object_handle)); ASSERT_EQ(OB_SUCCESS, read_object_handle.wait()); ASSERT_NE(nullptr, read_object_handle.get_buffer()); ASSERT_EQ(DIO_READ_ALIGN_SIZE, read_object_handle.get_data_size()); ASSERT_EQ(0, memcmp(write_buf_, read_object_handle.get_buffer(), DIO_READ_ALIGN_SIZE)); } TEST_F(TestSSReaderWriter, local_overwrite) { int ret = OB_SUCCESS; uint64_t ls_id = 1; uint64_t ls_epoch_id = 1; uint64_t tablet_id = 800; ASSERT_EQ(OB_SUCCESS, OB_DIR_MGR.create_ls_id_dir(MTL_ID(), MTL_EPOCH_ID(), ls_id, ls_epoch_id)); ASSERT_EQ(OB_SUCCESS, OB_DIR_MGR.create_tablet_meta_tablet_id_dir(MTL_ID(), MTL_EPOCH_ID(), ls_id, ls_epoch_id, tablet_id)); // 1. write MacroBlockId macro_id; macro_id.set_id_mode((uint64_t)ObMacroBlockIdMode::ID_MODE_SHARE); macro_id.set_storage_object_type((uint64_t)ObStorageObjectType::PRIVATE_TABLET_CURRENT_VERSION); macro_id.set_second_id(ls_id); // ls_id macro_id.set_third_id(tablet_id); // tablet_id ASSERT_TRUE(macro_id.is_valid()); ObStorageObjectHandle write_object_handle; ASSERT_EQ(OB_SUCCESS, write_object_handle.set_macro_block_id(macro_id)); write_info_.ls_epoch_id_ = ls_epoch_id; ObSSLocalCacheWriter local_cache_writer; ASSERT_EQ(OB_SUCCESS, local_cache_writer.aio_write(write_info_, write_object_handle)); ASSERT_EQ(OB_SUCCESS, write_object_handle.wait()); write_object_handle.reset(); // 2. read and compare the read data with the written data read_info_.macro_block_id_ = macro_id; read_info_.ls_epoch_id_ = ls_epoch_id; ObStorageObjectHandle read_object_handle; ObSSLocalCacheReader local_cache_reader; ASSERT_EQ(OB_SUCCESS, local_cache_reader.aio_read(read_info_, read_object_handle)); ASSERT_EQ(OB_SUCCESS, read_object_handle.wait()); ASSERT_NE(nullptr, read_object_handle.get_buffer()); ASSERT_EQ(read_info_.size_, read_object_handle.get_data_size()); ASSERT_EQ(0, memcmp(write_buf_, read_object_handle.get_buffer(), WRITE_IO_SIZE)); read_object_handle.reset(); // 3. overwrite char overwrite_buf[WRITE_IO_SIZE]; for (int64_t i = 0; i < WRITE_IO_SIZE; ++i) { overwrite_buf[i] = write_buf_[i] + 1; } write_info_.buffer_ = overwrite_buf; write_object_handle.set_macro_block_id(macro_id); ASSERT_EQ(OB_SUCCESS, local_cache_writer.aio_write(write_info_, write_object_handle)); ASSERT_EQ(OB_SUCCESS, write_object_handle.wait()); write_object_handle.reset(); // 4. read again, and compare the read data with the overwriten data ASSERT_EQ(OB_SUCCESS, local_cache_reader.aio_read(read_info_, read_object_handle)); ASSERT_EQ(OB_SUCCESS, read_object_handle.wait()); ASSERT_NE(nullptr, read_object_handle.get_buffer()); ASSERT_EQ(read_info_.size_, read_object_handle.get_data_size()); ASSERT_EQ(0, memcmp(overwrite_buf, read_object_handle.get_buffer(), WRITE_IO_SIZE)); ASSERT_NE(0, memcmp(write_buf_, read_object_handle.get_buffer(), WRITE_IO_SIZE)); read_object_handle.reset(); } TEST_F(TestSSReaderWriter, read_when_disable_micro_cache) { int ret = OB_SUCCESS; uint64_t tablet_id = 200001; uint64_t data_seq = 15; // 1. write MacroBlockId macro_id; macro_id.set_id_mode((uint64_t)ObMacroBlockIdMode::ID_MODE_SHARE); macro_id.set_storage_object_type((uint64_t)ObStorageObjectType::SHARED_MAJOR_DATA_MACRO); macro_id.set_second_id(tablet_id); // tablet_id macro_id.set_third_id(data_seq); // data_seq ASSERT_TRUE(macro_id.is_valid()); ObStorageObjectHandle write_object_handle; ASSERT_EQ(OB_SUCCESS, write_object_handle.set_macro_block_id(macro_id)); ObSSShareMacroWriter share_macro_writer; ASSERT_EQ(OB_SUCCESS, share_macro_writer.aio_write(write_info_, write_object_handle)); ASSERT_EQ(OB_SUCCESS, write_object_handle.wait()); // 2. disable micro cache, and then read <1, WRITE_IO_SIZE / 2> // expect bypass micro cache and read data from object storage successfully ObSSMicroCache *micro_cache = MTL(ObSSMicroCache *); ASSERT_NE(nullptr, micro_cache); micro_cache->disable_cache(); ObLogicMicroBlockId logic_micro_id; logic_micro_id.version_ = ObLogicMicroBlockId::LOGIC_MICRO_ID_VERSION_V1; logic_micro_id.offset_ = 100; logic_micro_id.logic_macro_id_.data_seq_.macro_data_seq_ = 1; logic_micro_id.logic_macro_id_.logic_version_ = 100; logic_micro_id.logic_macro_id_.tablet_id_ = tablet_id; read_info_.macro_block_id_ = macro_id; read_info_.offset_ = 1; read_info_.size_ = WRITE_IO_SIZE / 2; read_info_.logic_micro_id_ = logic_micro_id; read_info_.micro_crc_ = 100; ObStorageObjectHandle object_handle; ObSSShareMacroReader share_macro_reader; ASSERT_EQ(OB_SUCCESS, share_macro_reader.aio_read(read_info_, object_handle)); ASSERT_EQ(OB_SUCCESS, object_handle.wait()); ASSERT_NE(nullptr, object_handle.get_buffer()); ASSERT_EQ(read_info_.size_, object_handle.get_data_size()); ASSERT_EQ(0, memcmp(write_buf_ + read_info_.offset_, object_handle.get_buffer(), read_info_.size_)); object_handle.reset(); } static void get_random_io_info(ObIOInfo &io_info) { io_info.tenant_id_ = MTL_ID(); io_info.fd_.first_id_ = ObIOFd::NORMAL_FILE_ID; // first_id is not used in shared storage mode io_info.fd_.second_id_ = MTL(ObTenantFileManager*)->get_micro_cache_file_fd(); io_info.fd_.device_handle_ = &LOCAL_DEVICE_INSTANCE; io_info.flag_.set_wait_event(ObWaitEventIds::DB_FILE_COMPACT_READ); io_info.timeout_us_ = DEFAULT_IO_WAIT_TIME_US; io_info.offset_ = ObRandom::rand(1, 1000L * 1000L); io_info.size_ = ObRandom::rand(1, 1000L); io_info.flag_.set_read(); } TEST_F(TestSSReaderWriter, IOFaultDetector) { ObIOFaultDetector &detector = OB_IO_MANAGER.get_device_health_detector(); ObIOConfig &io_config = (ObIOConfig &)detector.io_config_; // test get device health ObDeviceHealthStatus dhs = DEVICE_HEALTH_NORMAL; int64_t disk_abnormal_time = 0; ASSERT_EQ(OB_SUCCESS, detector.get_device_health_status(dhs, disk_abnormal_time)); ASSERT_TRUE(DEVICE_HEALTH_NORMAL == dhs); ASSERT_TRUE(0 == disk_abnormal_time); // test read failure detection ObIOInfo io_info; get_random_io_info(io_info); char read_buf[1000] = { 0 }; io_info.user_data_buf_ = read_buf; ObSSPhysicalBlock phy_block; ObSSPhysicalBlockHandle phy_block_handle; phy_block_handle.set_ptr(&phy_block); ASSERT_EQ(OB_SUCCESS, io_info.phy_block_handle_.assign(phy_block_handle)); ObIOResult result; ObIORequest req; req.inc_ref(); result.inc_ref(); ASSERT_EQ(OB_SUCCESS, result.basic_init()); ASSERT_EQ(OB_SUCCESS, result.init(io_info)); ASSERT_EQ(OB_SUCCESS, req.init(io_info, &result)); detector.reset_device_health(); ASSERT_EQ(OB_SUCCESS, detector.get_device_health_status(dhs, disk_abnormal_time)); ASSERT_TRUE(DEVICE_HEALTH_NORMAL == dhs); ASSERT_TRUE(0 == disk_abnormal_time); result.flag_.set_mode(ObIOMode::READ); io_config.data_storage_warning_tolerance_time_ = 1000L * 1000L; io_config.data_storage_error_tolerance_time_ = 3000L * 1000L; detector.record_io_error(result, req); usleep(2000L * 1000L); ASSERT_FALSE(detector.is_device_warning_); ASSERT_FALSE(detector.is_device_error_); detector.record_io_timeout(result, req); usleep(2000L * 1000L); ASSERT_FALSE(detector.is_device_warning_); ASSERT_FALSE(detector.is_device_error_); // test auto clean device warning, but not clean device error detector.reset_device_health(); ASSERT_EQ(OB_SUCCESS, detector.get_device_health_status(dhs, disk_abnormal_time)); ASSERT_TRUE(DEVICE_HEALTH_NORMAL == dhs); ASSERT_TRUE(0 == disk_abnormal_time); io_config.read_failure_black_list_interval_ = 1000L * 100L; // 100ms detector.set_device_warning(); ASSERT_EQ(OB_SUCCESS, detector.get_device_health_status(dhs, disk_abnormal_time)); ASSERT_TRUE(DEVICE_HEALTH_WARNING == dhs); ASSERT_TRUE(disk_abnormal_time > 0); usleep(io_config.read_failure_black_list_interval_ * 2); ASSERT_EQ(OB_SUCCESS, detector.get_device_health_status(dhs, disk_abnormal_time)); ASSERT_TRUE(DEVICE_HEALTH_NORMAL == dhs); ASSERT_TRUE(0 == disk_abnormal_time); detector.set_device_error(); usleep(io_config.read_failure_black_list_interval_ * 2); ASSERT_EQ(OB_SUCCESS, detector.get_device_health_status(dhs, disk_abnormal_time)); ASSERT_TRUE(DEVICE_HEALTH_ERROR == dhs); ASSERT_TRUE(disk_abnormal_time > 0); } } // namespace storage } // namespace oceanbase int main(int argc, char **argv) { int ret = 0; system("rm -f ./test_ss_reader_writer.log*"); OB_LOGGER.set_file_name("test_ss_reader_writer.log", true); OB_LOGGER.set_log_level("INFO"); testing::InitGoogleTest(&argc, argv); return RUN_ALL_TESTS(); }