223 lines
9.9 KiB
C++
223 lines
9.9 KiB
C++
/**
|
|
* Copyright (c) 2021 OceanBase
|
|
* OceanBase CE is licensed under Mulan PubL v2.
|
|
* You can use this software according to the terms and conditions of the Mulan PubL v2.
|
|
* You may obtain a copy of Mulan PubL v2 at:
|
|
* http://license.coscl.org.cn/MulanPubL-2.0
|
|
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
|
|
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
|
|
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
|
|
* See the Mulan PubL v2 for more details.
|
|
*/
|
|
#ifndef USING_LOG_PREFIX
|
|
#define USING_LOG_PREFIX STORAGETEST
|
|
#endif
|
|
#include "gtest/gtest.h"
|
|
|
|
#define private public
|
|
#define protected public
|
|
#include "lib/ob_errno.h"
|
|
#include "mittest/mtlenv/mock_tenant_module_env.h"
|
|
#include "share/allocator/ob_tenant_mutil_allocator_mgr.h"
|
|
#include "test_ss_common_util.h"
|
|
#include "storage/shared_storage/micro_cache/ob_ss_micro_meta_manager.h"
|
|
#include "storage/shared_storage/micro_cache/ob_ss_physical_block_manager.h"
|
|
#include "storage/shared_storage/micro_cache/ckpt/ob_ss_linked_phy_block_struct.h"
|
|
#include "storage/shared_storage/ob_ss_micro_cache_io_helper.h"
|
|
#include "storage/shared_storage/micro_cache/ckpt/ob_ss_linked_phy_block_writer.h"
|
|
#include "storage/shared_storage/micro_cache/ckpt/ob_ss_linked_phy_block_reader.h"
|
|
#include "lib/compress/ob_compressor.h"
|
|
#include "lib/compress/ob_compressor_pool.h"
|
|
|
|
namespace oceanbase
|
|
{
|
|
namespace storage
|
|
{
|
|
using namespace oceanbase::common;
|
|
|
|
class TestSSMicroCacheCheckpoint : public ::testing::Test
|
|
{
|
|
public:
|
|
TestSSMicroCacheCheckpoint() {}
|
|
virtual ~TestSSMicroCacheCheckpoint() {}
|
|
static void SetUpTestCase();
|
|
static void TearDownTestCase();
|
|
virtual void SetUp();
|
|
virtual void TearDown();
|
|
};
|
|
|
|
void TestSSMicroCacheCheckpoint::SetUpTestCase()
|
|
{
|
|
GCTX.startup_mode_ = observer::ObServerMode::SHARED_STORAGE_MODE;
|
|
EXPECT_EQ(OB_SUCCESS, MockTenantModuleEnv::get_instance().init());
|
|
}
|
|
|
|
void TestSSMicroCacheCheckpoint::TearDownTestCase()
|
|
{
|
|
MockTenantModuleEnv::get_instance().destroy();
|
|
}
|
|
|
|
void TestSSMicroCacheCheckpoint::SetUp()
|
|
{}
|
|
|
|
void TestSSMicroCacheCheckpoint::TearDown()
|
|
{}
|
|
|
|
TEST_F(TestSSMicroCacheCheckpoint, test_compress_micro_ckpt)
|
|
{
|
|
int ret = OB_SUCCESS;
|
|
ObSSMicroCache *micro_cache = MTL(ObSSMicroCache *);
|
|
ASSERT_NE(nullptr, micro_cache);
|
|
const uint64_t tenant_id = MTL_ID();
|
|
const int64_t block_size = micro_cache->phy_block_size_;
|
|
ObSSMemDataManager &mem_data_mgr = micro_cache->mem_data_mgr_;
|
|
ObSSMicroMetaManager µ_meta_mgr = micro_cache->micro_meta_mgr_;
|
|
ObSSPhysicalBlockManager &phy_blk_mgr = micro_cache->phy_blk_mgr_;
|
|
ObSSMicroCacheStat &cache_stat = micro_cache->cache_stat_;
|
|
|
|
const int32_t micro_size = 8 * 1024;
|
|
const int64_t payload_offset = ObSSPhyBlockCommonHeader::get_serialize_size() +
|
|
ObSSNormalPhyBlockHeader::get_fixed_serialize_size();
|
|
const int32_t micro_index_size = sizeof(ObSSMicroBlockIndex) + SS_SERIALIZE_EXTRA_BUF_LEN;
|
|
const int32_t micro_cnt = (block_size - payload_offset) / (micro_size + micro_index_size);
|
|
const int64_t total_normal_blk_cnt = phy_blk_mgr.blk_cnt_info_.normal_blk_.total_cnt_;
|
|
const int64_t macro_cnt = MIN(150, total_normal_blk_cnt);
|
|
|
|
ObArenaAllocator allocator;
|
|
char *data_buf = static_cast<char *>(allocator.alloc(micro_size));
|
|
ASSERT_NE(nullptr, data_buf);
|
|
MEMSET(data_buf, 'a', micro_size);
|
|
const int64_t item_buf_size = sizeof(ObSSMicroBlockMeta) + 128;
|
|
char *item_buf = static_cast<char *>(allocator.alloc(item_buf_size));
|
|
ASSERT_NE(nullptr, item_buf);
|
|
char *io_buf = static_cast<char *>(allocator.alloc(block_size));
|
|
ASSERT_NE(nullptr, io_buf);
|
|
|
|
// 1. gen micro_meta
|
|
for (int64_t i = 0; i < macro_cnt; ++i) {
|
|
MacroBlockId macro_id = TestSSCommonUtil::gen_macro_block_id(2000 + i);
|
|
for (int32_t j = 0; j < micro_cnt; ++j) {
|
|
const int32_t offset = payload_offset + j * micro_size;
|
|
ObSSMicroBlockCacheKey micro_key = TestSSCommonUtil::gen_phy_micro_key(macro_id, offset, micro_size);
|
|
micro_cache->add_micro_block_cache(micro_key, data_buf, micro_size, ObSSMicroCacheAccessType::COMMON_IO_TYPE);
|
|
}
|
|
ASSERT_EQ(OB_SUCCESS, TestSSCommonUtil::wait_for_persist_task());
|
|
}
|
|
|
|
// 2. gen micro_meta ckpt
|
|
ObSSLinkedPhyBlockItemWriter item_writer;
|
|
ObSSLinkedPhyBlockItemWriter item_comp_writer;
|
|
ASSERT_EQ(OB_SUCCESS, item_writer.init(tenant_id, phy_blk_mgr, ObSSPhyBlockType::SS_MICRO_META_CKPT_BLK));
|
|
ASSERT_EQ(OB_SUCCESS, item_comp_writer.init(tenant_id, phy_blk_mgr, ObSSPhyBlockType::SS_MICRO_META_CKPT_BLK, ObCompressorType::SNAPPY_COMPRESSOR));
|
|
int64_t total_micro_item_cnt = 0;
|
|
for (int64_t i = 0; i < macro_cnt; ++i) {
|
|
MacroBlockId macro_id = TestSSCommonUtil::gen_macro_block_id(2000 + i);
|
|
for (int32_t j = 0; j < micro_cnt; ++j) {
|
|
const int32_t offset = payload_offset + j * micro_size;
|
|
ObSSMicroBlockCacheKey micro_key = TestSSCommonUtil::gen_phy_micro_key(macro_id, offset, micro_size);
|
|
ObSSMicroBlockMetaHandle micro_handle;
|
|
ret = micro_meta_mgr.get_micro_block_meta_handle(micro_key, micro_handle, false);
|
|
if (OB_FAIL(ret)) {
|
|
LOG_WARN("fail to get micro_meta", KR(ret), K(i), K(j), K(micro_key), K(cache_stat));
|
|
}
|
|
ASSERT_EQ(OB_SUCCESS, ret);
|
|
ASSERT_EQ(true, micro_handle.is_valid());
|
|
if (micro_handle.get_ptr()->is_persisted()) {
|
|
int64_t pos = 0;
|
|
ASSERT_EQ(OB_SUCCESS, micro_handle()->serialize(item_buf, item_buf_size, pos));
|
|
ASSERT_EQ(pos, micro_handle()->get_serialize_size());
|
|
ASSERT_EQ(OB_SUCCESS, item_writer.write_item(item_buf, pos));
|
|
ASSERT_EQ(OB_SUCCESS, item_comp_writer.write_item(item_buf, pos));
|
|
++total_micro_item_cnt;
|
|
}
|
|
}
|
|
ASSERT_EQ(OB_SUCCESS, TestSSCommonUtil::wait_for_persist_task());
|
|
}
|
|
ASSERT_EQ(OB_SUCCESS, item_writer.close());
|
|
ASSERT_EQ(OB_SUCCESS, item_comp_writer.close());
|
|
ObArray<int64_t> micro_ckpt_blk_list;
|
|
ASSERT_EQ(OB_SUCCESS, item_writer.get_block_id_list(micro_ckpt_blk_list));
|
|
ASSERT_LT(1, micro_ckpt_blk_list.count());
|
|
int64_t entry_idx = 0;
|
|
item_writer.get_entry_block(entry_idx);
|
|
ObArray<int64_t> micro_comp_ckpt_blk_list;
|
|
ASSERT_EQ(OB_SUCCESS, item_comp_writer.get_block_id_list(micro_comp_ckpt_blk_list));
|
|
ASSERT_LE(micro_comp_ckpt_blk_list.count(), micro_ckpt_blk_list.count());
|
|
|
|
// 3. get micro_meta ckpt data(one phy_block)
|
|
ObSSPhysicalBlockHandle phy_blk_handle;
|
|
const int64_t entry_blk_idx = micro_ckpt_blk_list.at(1);
|
|
ASSERT_EQ(OB_SUCCESS, phy_blk_mgr.get_block_handle(entry_blk_idx, phy_blk_handle));
|
|
ASSERT_EQ(OB_SUCCESS, ObSSMicroCacheIOHelper::read_block(entry_blk_idx * block_size, block_size,
|
|
io_buf, phy_blk_handle));
|
|
char *all_ckpt_item = nullptr;
|
|
int32_t all_ckpt_item_len = 0;
|
|
{
|
|
int64_t pos = 0;
|
|
ObSSPhyBlockCommonHeader common_header;
|
|
ASSERT_EQ(OB_SUCCESS, common_header.deserialize(io_buf, block_size, pos));
|
|
ObSSLinkedPhyBlockHeader linked_header;
|
|
ASSERT_EQ(OB_SUCCESS, linked_header.deserialize(io_buf, block_size, pos));
|
|
all_ckpt_item = io_buf + pos;
|
|
all_ckpt_item_len = block_size - pos;
|
|
}
|
|
|
|
// 4. compress micro_meta ckpt data
|
|
const uint8_t start_type = static_cast<uint8_t>(ObCompressorType::LZ4_COMPRESSOR);
|
|
const uint8_t end_type = static_cast<uint8_t>(ObCompressorType::ZSTD_COMPRESSOR);
|
|
for (uint8_t i = start_type; i <= end_type; ++i) {
|
|
ObCompressorType comp_type = static_cast<ObCompressorType>(i);
|
|
ObCompressor *compressor = nullptr;
|
|
ASSERT_EQ(OB_SUCCESS, ObCompressorPool::get_instance().get_compressor(comp_type, compressor));
|
|
ASSERT_NE(nullptr, compressor);
|
|
int64_t max_overflow_size = 0;
|
|
ASSERT_EQ(OB_SUCCESS, compressor->get_max_overflow_size(all_ckpt_item_len, max_overflow_size));
|
|
const int64_t out_io_buf_size = all_ckpt_item_len + max_overflow_size;
|
|
char *out_io_buf = static_cast<char *>(allocator.alloc(out_io_buf_size));
|
|
ASSERT_NE(nullptr, out_io_buf);
|
|
int64_t compressed_len = 0;
|
|
const int64_t start_us = ObTimeUtility::current_time();
|
|
ASSERT_EQ(OB_SUCCESS, compressor->compress(all_ckpt_item, all_ckpt_item_len, out_io_buf, out_io_buf_size, compressed_len));
|
|
const int64_t cost_us = ObTimeUtility::current_time() - start_us;
|
|
LOG_INFO("finish current round micro_meta ckpt compress", K(all_compressor_name[i]), K(all_ckpt_item_len), K(compressed_len), K(cost_us));
|
|
}
|
|
|
|
// 5. execute reading micro_meta ckpt
|
|
ObSSLinkedPhyBlockItemReader item_reader;
|
|
ASSERT_EQ(OB_SUCCESS, item_reader.init(micro_ckpt_blk_list.at(0), tenant_id, phy_blk_mgr));
|
|
char *des_item_buf = nullptr;
|
|
int64_t des_item_buf_len = 0;
|
|
while (OB_SUCC(ret)) {
|
|
ret = item_reader.get_next_item(des_item_buf, des_item_buf_len);
|
|
if (OB_ITER_END == ret) {
|
|
ret = OB_SUCCESS;
|
|
break;
|
|
} else {
|
|
ASSERT_EQ(OB_SUCCESS, ret);
|
|
}
|
|
ASSERT_NE(nullptr, des_item_buf);
|
|
ASSERT_NE(0, des_item_buf_len);
|
|
}
|
|
|
|
// 6. use checkpoint_op logic to gen micro_ckpt, mock micro_ckpt_block is not enough, we will allow this situation
|
|
ObSSExecuteMicroCheckpointTask µ_ckpt_task = micro_cache->task_runner_.micro_ckpt_task_;
|
|
const int64_t ori_micro_ckpt_blk_cnt = phy_blk_mgr.blk_cnt_info_.micro_ckpt_blk_.used_cnt_;
|
|
phy_blk_mgr.blk_cnt_info_.micro_ckpt_blk_.used_cnt_ = phy_blk_mgr.blk_cnt_info_.micro_ckpt_blk_.total_cnt_ - 1;
|
|
ASSERT_EQ(OB_SUCCESS, phy_blk_mgr.get_ss_super_block(micro_ckpt_task.ckpt_op_.micro_ckpt_ctx_.prev_super_block_));
|
|
ASSERT_EQ(OB_SUCCESS, micro_ckpt_task.ckpt_op_.gen_micro_meta_checkpoint());
|
|
// actually we need more than 1 phy_block to store micro_ckpt, but here we just used 1 phy_block.
|
|
ASSERT_EQ(phy_blk_mgr.blk_cnt_info_.micro_ckpt_blk_.used_cnt_, phy_blk_mgr.blk_cnt_info_.micro_ckpt_blk_.total_cnt_);
|
|
|
|
allocator.clear();
|
|
}
|
|
|
|
} // namespace storage
|
|
} // namespace oceanbase
|
|
int main(int argc, char **argv)
|
|
{
|
|
system("rm -f test_ss_micro_cache_checkpoint.log*");
|
|
OB_LOGGER.set_file_name("test_ss_micro_cache_checkpoint.log", true, true);
|
|
OB_LOGGER.set_log_level("INFO");
|
|
::testing::InitGoogleTest(&argc, argv);
|
|
return RUN_ALL_TESTS();
|
|
} |