267 lines
11 KiB
C++
267 lines
11 KiB
C++
/**
|
|
* Copyright (c) 2021 OceanBase
|
|
* OceanBase CE is licensed under Mulan PubL v2.
|
|
* You can use this software according to the terms and conditions of the Mulan PubL v2.
|
|
* You may obtain a copy of Mulan PubL v2 at:
|
|
* http://license.coscl.org.cn/MulanPubL-2.0
|
|
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
|
|
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
|
|
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
|
|
* See the Mulan PubL v2 for more details.
|
|
*/
|
|
#define USING_LOG_PREFIX STORAGETEST
|
|
|
|
#include <gmock/gmock.h>
|
|
#define protected public
|
|
#define private public
|
|
#include "mittest/mtlenv/mock_tenant_module_env.h"
|
|
#include "storage/shared_storage/ob_ss_reader_writer.h"
|
|
#include "mittest/shared_storage/clean_residual_data.h"
|
|
#include "storage/tmp_file/ob_tmp_file_manager.h"
|
|
#undef private
|
|
#undef protected
|
|
|
|
namespace oceanbase
|
|
{
|
|
namespace storage
|
|
{
|
|
using namespace oceanbase::blocksstable;
|
|
using namespace oceanbase::common;
|
|
using namespace oceanbase::storage;
|
|
using namespace oceanbase::share;
|
|
|
|
class TestSSPreReadTask : public ::testing::Test
|
|
{
|
|
public:
|
|
TestSSPreReadTask() {}
|
|
virtual ~TestSSPreReadTask() = default;
|
|
static void SetUpTestCase();
|
|
static void TearDownTestCase();
|
|
virtual void SetUp();
|
|
virtual void TearDown();
|
|
|
|
public:
|
|
static const int64_t WRITE_IO_SIZE = 16 * 1024; // 16KB
|
|
ObStorageObjectWriteInfo write_info_;
|
|
ObStorageObjectReadInfo read_info_;
|
|
char write_buf_[WRITE_IO_SIZE];
|
|
char read_buf_[WRITE_IO_SIZE];
|
|
};
|
|
|
|
void TestSSPreReadTask::SetUpTestCase()
|
|
{
|
|
GCTX.startup_mode_ = observer::ObServerMode::SHARED_STORAGE_MODE;
|
|
EXPECT_EQ(OB_SUCCESS, MockTenantModuleEnv::get_instance().init());
|
|
|
|
MTL(tmp_file::ObTenantTmpFileManager *)->stop();
|
|
MTL(tmp_file::ObTenantTmpFileManager *)->wait();
|
|
MTL(tmp_file::ObTenantTmpFileManager *)->destroy();
|
|
}
|
|
|
|
void TestSSPreReadTask::TearDownTestCase()
|
|
{
|
|
int ret = OB_SUCCESS;
|
|
if (OB_FAIL(ResidualDataCleanerHelper::clean_in_mock_env())) {
|
|
LOG_WARN("failed to clean residual data", KR(ret));
|
|
}
|
|
MockTenantModuleEnv::get_instance().destroy();
|
|
}
|
|
|
|
void TestSSPreReadTask::SetUp()
|
|
{
|
|
// construct write info
|
|
write_buf_[0] = '\0';
|
|
const int64_t mid_offset = WRITE_IO_SIZE / 2;
|
|
memset(write_buf_, 'a', mid_offset);
|
|
memset(write_buf_ + mid_offset, 'b', WRITE_IO_SIZE - mid_offset);
|
|
write_info_.io_desc_.set_wait_event(1);
|
|
write_info_.buffer_ = write_buf_;
|
|
write_info_.offset_ = 0;
|
|
write_info_.size_ = WRITE_IO_SIZE;
|
|
write_info_.io_timeout_ms_ = DEFAULT_IO_WAIT_TIME_MS;
|
|
write_info_.mtl_tenant_id_ = MTL_ID();
|
|
|
|
// construct read info
|
|
read_buf_[0] = '\0';
|
|
read_info_.io_desc_.set_wait_event(1);
|
|
read_info_.buf_ = read_buf_;
|
|
read_info_.offset_ = 0;
|
|
read_info_.size_ = WRITE_IO_SIZE;
|
|
read_info_.io_timeout_ms_ = DEFAULT_IO_WAIT_TIME_MS;
|
|
read_info_.mtl_tenant_id_ = MTL_ID();
|
|
}
|
|
|
|
void TestSSPreReadTask::TearDown()
|
|
{
|
|
write_buf_[0] = '\0';
|
|
read_buf_[0] = '\0';
|
|
}
|
|
|
|
TEST_F(TestSSPreReadTask, basic_pre_read)
|
|
{
|
|
int ret = OB_SUCCESS;
|
|
ObTenantFileManager *file_manager = MTL(ObTenantFileManager *);
|
|
ASSERT_NE(nullptr, file_manager);
|
|
ObPrereadCacheManager &preread_cache_mgr = file_manager->preread_cache_mgr_;
|
|
ObSSPreReadTask &preread_task = preread_cache_mgr.preread_task_;
|
|
// 1. write tmp_file to object_storage
|
|
const int64_t tmp_file_cnt = 100;
|
|
MacroBlockId macro_ids[tmp_file_cnt];
|
|
for (int64_t i = 0; i < tmp_file_cnt; ++i) {
|
|
macro_ids[i].set_id_mode((uint64_t)ObMacroBlockIdMode::ID_MODE_SHARE);
|
|
macro_ids[i].set_storage_object_type((uint64_t)ObStorageObjectType::TMP_FILE);
|
|
macro_ids[i].set_second_id(i); // tmp_file_id
|
|
macro_ids[i].set_third_id(1); // segment_id
|
|
ASSERT_TRUE(macro_ids[i].is_valid());
|
|
ObStorageObjectHandle write_object_handle;
|
|
ASSERT_EQ(OB_SUCCESS, write_object_handle.set_macro_block_id(macro_ids[i]));
|
|
ObSSObjectStorageWriter object_storage_writer;
|
|
ASSERT_EQ(OB_SUCCESS, object_storage_writer.aio_write(write_info_, write_object_handle));
|
|
ASSERT_EQ(OB_SUCCESS, write_object_handle.wait());
|
|
ASSERT_EQ(OB_SUCCESS, preread_cache_mgr.push_file_id_to_lru(macro_ids[i]));
|
|
}
|
|
|
|
// 2. wait preread_task read tmp file to local cache.
|
|
int64_t start_us = ObTimeUtility::current_time();
|
|
const int64_t timeout_us = 20 * 1000 * 1000L;
|
|
while ((preread_cache_mgr.preread_queue_.size() != 0) ||
|
|
(preread_task.segment_files_.count() != 0) ||
|
|
(preread_task.free_list_.get_curr_total() != ObSSPreReadTask::MAX_PRE_READ_PARALLELISM)) {
|
|
ob_usleep(1000);
|
|
if (timeout_us + start_us < ObTimeUtility::current_time()) {
|
|
ret = OB_ERR_UNEXPECTED;
|
|
LOG_WARN("waiting time is too long", KR(ret),
|
|
KR(preread_cache_mgr.preread_queue_.size()), KR(preread_task.segment_files_.count()),
|
|
KR(preread_task.async_read_list_.get_curr_total()), KR(preread_task.async_write_list_.get_curr_total()));
|
|
break;
|
|
}
|
|
}
|
|
// 3. read and compare the read data with the written data
|
|
for (int64_t i = 0; i < tmp_file_cnt; ++i) {
|
|
read_info_.macro_block_id_ = macro_ids[i];
|
|
ObStorageObjectHandle read_object_handle;
|
|
ObSSTmpFileReader tmp_file_reader;
|
|
ASSERT_EQ(OB_SUCCESS, tmp_file_reader.aio_read(read_info_, read_object_handle));
|
|
ASSERT_EQ(OB_SUCCESS, read_object_handle.wait());
|
|
ASSERT_NE(nullptr, read_object_handle.get_buffer());
|
|
ASSERT_EQ(read_info_.size_, read_object_handle.get_data_size());
|
|
ASSERT_EQ(0, memcmp(write_buf_, read_object_handle.get_buffer(), WRITE_IO_SIZE));
|
|
|
|
ObIOFlag flag;
|
|
ASSERT_EQ(OB_SUCCESS, read_object_handle.get_io_handle().get_io_flag(flag));
|
|
// check read from tmp file read cache
|
|
ASSERT_FALSE(flag.is_sync());
|
|
|
|
bool is_exist = false;
|
|
ASSERT_EQ(OB_SUCCESS, preread_cache_mgr.is_exist_in_lru(macro_ids[i], is_exist));
|
|
ASSERT_TRUE(is_exist);
|
|
}
|
|
|
|
// 4. wait preread_next_segment_file finish
|
|
start_us = ObTimeUtility::current_time();
|
|
while ((preread_cache_mgr.preread_queue_.size() != 0) ||
|
|
(preread_task.segment_files_.count() != 0) ||
|
|
(preread_task.free_list_.get_curr_total() != ObSSPreReadTask::MAX_PRE_READ_PARALLELISM)) {
|
|
ob_usleep(1000);
|
|
if (timeout_us + start_us < ObTimeUtility::current_time()) {
|
|
ret = OB_ERR_UNEXPECTED;
|
|
LOG_WARN("waiting time is too long", KR(ret),
|
|
KR(preread_cache_mgr.preread_queue_.size()), KR(preread_task.segment_files_.count()),
|
|
KR(preread_task.async_read_list_.get_curr_total()), KR(preread_task.async_write_list_.get_curr_total()));
|
|
break;
|
|
}
|
|
}
|
|
|
|
const int64_t max_parallel_cnt = ObSSPreReadTask::MAX_PRE_READ_PARALLELISM;
|
|
ASSERT_EQ(max_parallel_cnt, preread_task.free_list_.get_curr_total());
|
|
ASSERT_EQ(0, preread_cache_mgr.preread_queue_.size());
|
|
ASSERT_EQ(0, preread_task.async_read_list_.get_curr_total());
|
|
ASSERT_EQ(0, preread_task.async_write_list_.get_curr_total());
|
|
// delete all tmp file
|
|
for (int64_t i = 0; i < tmp_file_cnt; ++i) {
|
|
ASSERT_EQ(OB_SUCCESS, file_manager->delete_tmp_file(macro_ids[i]));
|
|
}
|
|
}
|
|
|
|
TEST_F(TestSSPreReadTask, preread_and_gc_parallel)
|
|
{
|
|
ObTenantFileManager *file_manager = MTL(ObTenantFileManager *);
|
|
ObTenantDiskSpaceManager *disk_space_mgr = MTL(ObTenantDiskSpaceManager *);
|
|
ASSERT_NE(nullptr, file_manager);
|
|
ASSERT_NE(nullptr, disk_space_mgr);
|
|
ObPrereadCacheManager &preread_cache_mgr = file_manager->preread_cache_mgr_;
|
|
ObSSPreReadTask &preread_task = preread_cache_mgr.preread_task_;
|
|
ASSERT_EQ(OB_SUCCESS, file_manager->calibrate_disk_space_task_.calibrate_disk_space());
|
|
// tmp_file write_cache and read_cache size
|
|
int64_t write_cache = disk_space_mgr->get_tmp_file_write_cache_alloc_size();
|
|
int64_t read_cache = disk_space_mgr->get_tmp_file_read_cache_alloc_size();
|
|
ASSERT_EQ(0, write_cache);
|
|
ASSERT_EQ(0, read_cache);
|
|
// construct macro_id
|
|
MacroBlockId file_id;
|
|
const int64_t tmp_file_id = 100;
|
|
const int64_t segment_id = 101;
|
|
file_id.set_id_mode((uint64_t)ObMacroBlockIdMode::ID_MODE_SHARE);
|
|
file_id.set_storage_object_type((uint64_t)ObStorageObjectType::TMP_FILE);
|
|
file_id.set_second_id(tmp_file_id); // tmp_file_id
|
|
file_id.set_third_id(segment_id); // segment_id
|
|
ObStorageObjectHandle write_object_handle;
|
|
// write file to object storage
|
|
ASSERT_EQ(OB_SUCCESS, write_object_handle.set_macro_block_id(file_id));
|
|
ObSSObjectStorageWriter object_storage_writer;
|
|
ASSERT_EQ(OB_SUCCESS, object_storage_writer.aio_write(write_info_, write_object_handle));
|
|
ASSERT_EQ(OB_SUCCESS, write_object_handle.wait());
|
|
// read file from object storage
|
|
ObPreReadFileMeta file_meta(file_id, 0);
|
|
ObSSPreReadEntry preread_entry(preread_task.allocator_);
|
|
ASSERT_EQ(OB_SUCCESS, preread_entry.init(file_meta));
|
|
ASSERT_EQ(OB_SUCCESS, preread_task.do_async_read_segment_file(preread_entry));
|
|
ASSERT_EQ(OB_SUCCESS, preread_entry.read_handle_.wait());
|
|
// test1: preread_write,read_whole,GC,update_to_normal
|
|
// push to lru node
|
|
ObPrereadCacheManager::ObListNode list_node = ObPrereadCacheManager::ObListNode(file_id, ObLURNodeStatus::FAKE, 0/*file_length*/);
|
|
ObPrereadCacheManager::ObListNode *node_iter = nullptr;
|
|
ASSERT_EQ(OB_SUCCESS, preread_cache_mgr.segment_file_map_.set_refactored(file_id, list_node));
|
|
node_iter = preread_cache_mgr.segment_file_map_.get(file_id);
|
|
preread_cache_mgr.segment_file_list_.add_first(node_iter);
|
|
// create dir and get dir size
|
|
ASSERT_EQ(OB_SUCCESS, OB_DIR_MGR.create_tmp_file_dir(MTL_ID(), MTL_EPOCH_ID(), tmp_file_id));
|
|
int64_t expected_disk_size = 0;
|
|
char dir_path[ObBaseFileManager::OB_MAX_FILE_PATH_LENGTH] = {0};
|
|
ObIODFileStat statbuf;
|
|
ASSERT_EQ(OB_SUCCESS, OB_DIR_MGR.get_local_tmp_file_dir(dir_path, sizeof(dir_path),
|
|
MTL_ID(), MTL_EPOCH_ID(), tmp_file_id));
|
|
ASSERT_EQ(OB_SUCCESS, ObIODeviceLocalFileOp::stat(dir_path, statbuf));
|
|
expected_disk_size += statbuf.size_;
|
|
// write
|
|
ASSERT_EQ(OB_SUCCESS, preread_task.do_async_write_segment_file(preread_entry));
|
|
ASSERT_EQ(OB_SUCCESS, preread_entry.write_handle_.wait());
|
|
write_cache = disk_space_mgr->get_tmp_file_write_cache_alloc_size();
|
|
read_cache = disk_space_mgr->get_tmp_file_read_cache_alloc_size();
|
|
ASSERT_EQ(expected_disk_size, write_cache);
|
|
ASSERT_EQ(16 * 1024, read_cache);
|
|
// read_whole
|
|
ASSERT_EQ(OB_SUCCESS, preread_cache_mgr.set_need_preread(file_id, false/*is_not_need_preread*/));
|
|
// GC
|
|
ASSERT_EQ(OB_SUCCESS, file_manager->delete_tmp_file(file_id));
|
|
write_cache = disk_space_mgr->get_tmp_file_write_cache_alloc_size();
|
|
read_cache = disk_space_mgr->get_tmp_file_read_cache_alloc_size();
|
|
ASSERT_EQ(0, write_cache);
|
|
ASSERT_EQ(0, read_cache);
|
|
// update_to_normal
|
|
ASSERT_EQ(OB_SUCCESS, preread_cache_mgr.update_to_normal_status(file_id, preread_entry.write_handle_.get_data_size()));
|
|
}
|
|
|
|
} // namespace storage
|
|
} // namespace oceanbase
|
|
|
|
int main(int argc, char **argv)
|
|
{
|
|
int ret = 0;
|
|
system("rm -f ./test_ss_preread_task.log*");
|
|
OB_LOGGER.set_file_name("test_ss_preread_task.log", true);
|
|
OB_LOGGER.set_log_level("INFO");
|
|
testing::InitGoogleTest(&argc, argv);
|
|
return RUN_ALL_TESTS();
|
|
}
|