
Co-authored-by: saltonz <saltonzh@gmail.com> Co-authored-by: Revendell <2312237896@qq.com> Co-authored-by: sunzhoujia <sunzhoujia1030@163.com>
316 lines
12 KiB
C++
316 lines
12 KiB
C++
/**
|
|
* Copyright (c) 2021 OceanBase
|
|
* OceanBase CE is licensed under Mulan PubL v2.
|
|
* You can use this software according to the terms and conditions of the Mulan PubL v2.
|
|
* You may obtain a copy of Mulan PubL v2 at:
|
|
* http://license.coscl.org.cn/MulanPubL-2.0
|
|
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
|
|
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
|
|
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
|
|
* See the Mulan PubL v2 for more details.
|
|
*/
|
|
#ifndef USING_LOG_PREFIX
|
|
#define USING_LOG_PREFIX STORAGETEST
|
|
#endif
|
|
|
|
#define private public
|
|
#define protected public
|
|
#include "lib/ob_errno.h"
|
|
#include "storage/blocksstable/ob_macro_block_id.h"
|
|
#include "storage/blocksstable/ob_block_sstable_struct.h"
|
|
#include "storage/blocksstable/ob_storage_object_rw_info.h"
|
|
#include "storage/shared_storage/ob_ss_reader_writer.h"
|
|
#include "storage/shared_storage/ob_ss_micro_cache.h"
|
|
#include "lib/random/ob_random.h"
|
|
#include "lib/compress/ob_compress_util.h"
|
|
|
|
namespace oceanbase
|
|
{
|
|
namespace storage
|
|
{
|
|
using namespace oceanbase::common;
|
|
using namespace oceanbase::blocksstable;
|
|
|
|
static const int64_t DEFAULT_BLOCK_SIZE = 2 * 1024 * 1024; // 2MB
|
|
|
|
struct TestSSMicroDataInfo
|
|
{
|
|
public:
|
|
MacroBlockId macro_id_;
|
|
ObArray<ObSSMicroBlockIndex> micro_index_arr_;
|
|
int32_t total_micro_size_;
|
|
|
|
TestSSMicroDataInfo() { reset(); }
|
|
void reset() { macro_id_.reset(); micro_index_arr_.reset(); total_micro_size_ = 0; }
|
|
int32_t get_micro_count() const { return micro_index_arr_.count(); }
|
|
|
|
TO_STRING_KV(K_(macro_id), K_(total_micro_size), "micro_cnt", get_micro_count());
|
|
};
|
|
|
|
class TestSSCommonUtil
|
|
{
|
|
public:
|
|
struct MicroBlockInfo
|
|
{
|
|
MicroBlockInfo() : macro_id_(), offset_(0), size_(0)
|
|
{}
|
|
MicroBlockInfo(MacroBlockId macro_id, int64_t offset, int64_t size)
|
|
: macro_id_(macro_id), offset_(offset), size_(size)
|
|
{}
|
|
MicroBlockInfo(const ObSSMicroBlockId µ_id)
|
|
: macro_id_(micro_id.macro_id_), offset_(micro_id.offset_), size_(micro_id.size_)
|
|
{}
|
|
bool is_valid() const
|
|
{
|
|
return macro_id_.is_valid() && offset_ > 0 && size_ > 0;
|
|
}
|
|
MacroBlockId macro_id_;
|
|
int64_t offset_;
|
|
int64_t size_;
|
|
TO_STRING_KV(K_(macro_id), K_(offset), K_(size));
|
|
};
|
|
|
|
public:
|
|
static MacroBlockId gen_macro_block_id(const int64_t second_id);
|
|
static ObMicroBlockId gen_micro_block_id(const MacroBlockId ¯o_id, const int64_t offset, const int64_t size);
|
|
static int gen_random_data(char *buf, const int64_t size);
|
|
static ObSSMicroBlockCacheKey gen_phy_micro_key(const MacroBlockId ¯o_id, const int32_t offset, const int32_t size);
|
|
static int init_io_info(ObIOInfo &io_info, const ObSSMicroBlockCacheKey µ_key, const int32_t size, char *read_buf);
|
|
static int prepare_micro_blocks(const int64_t macro_block_cnt, const int64_t block_size, ObArray<MicroBlockInfo> µ_block_arr,
|
|
const int64_t start_macro_id = 1, const bool random_micro_size = true, const int32_t min_micro_size = 16 * 1024,
|
|
const int32_t max_micro_size = 16 * 1024);
|
|
static int get_micro_block(const MicroBlockInfo µ_info, char *read_buf);
|
|
static int wait_for_persist_task();
|
|
static int alloc_micro_block_meta(ObSSMicroBlockMeta *µ_meta);
|
|
static ObCompressorType get_random_compress_type();
|
|
};
|
|
|
|
MacroBlockId TestSSCommonUtil::gen_macro_block_id(const int64_t second_id)
|
|
{
|
|
MacroBlockId macro_id;
|
|
macro_id.second_id_ = second_id;
|
|
macro_id.set_id_mode((uint64_t)ObMacroBlockIdMode::ID_MODE_SHARE);
|
|
macro_id.set_storage_object_type((uint64_t)ObStorageObjectType::SHARED_MAJOR_DATA_MACRO);
|
|
return macro_id;
|
|
}
|
|
|
|
ObMicroBlockId TestSSCommonUtil::gen_micro_block_id(
|
|
const MacroBlockId ¯o_id,
|
|
const int64_t offset,
|
|
const int64_t size)
|
|
{
|
|
ObMicroBlockId micro_id(macro_id, offset, size);
|
|
return micro_id;
|
|
}
|
|
|
|
int TestSSCommonUtil::gen_random_data(char *buf, const int64_t size)
|
|
{
|
|
int ret = OB_SUCCESS;
|
|
ObRandom rand;
|
|
if (OB_ISNULL(buf) || OB_UNLIKELY(size <= 0)) {
|
|
ret = OB_INVALID_ARGUMENT;
|
|
} else {
|
|
const int64_t end = size - 1;
|
|
for (int64_t i = 0; i < end; ++i) {
|
|
buf[i] = static_cast<char>(ObRandom::rand(0, 128));
|
|
}
|
|
buf[end] = '\0';
|
|
}
|
|
return ret;
|
|
}
|
|
|
|
ObSSMicroBlockCacheKey TestSSCommonUtil::gen_phy_micro_key(
|
|
const MacroBlockId ¯o_id,
|
|
const int32_t offset,
|
|
const int32_t size)
|
|
{
|
|
ObSSMicroBlockCacheKey micro_key;
|
|
micro_key.mode_ = ObSSMicroBlockCacheKeyMode::PHYSICAL_KEY_MODE;
|
|
micro_key.micro_id_.macro_id_ = macro_id;
|
|
micro_key.micro_id_.offset_ = offset;
|
|
micro_key.micro_id_.size_ = size;
|
|
return micro_key;
|
|
}
|
|
|
|
int TestSSCommonUtil::prepare_micro_blocks(
|
|
const int64_t macro_block_cnt,
|
|
const int64_t block_size,
|
|
ObArray<MicroBlockInfo> µ_block_arr,
|
|
const int64_t start_macro_id,
|
|
const bool random_micro_size,
|
|
const int32_t min_micro_size,
|
|
const int32_t max_micro_size)
|
|
{
|
|
int ret = OB_SUCCESS;
|
|
if (OB_UNLIKELY(macro_block_cnt <= 0 || block_size <= 0)) {
|
|
ret = OB_INVALID_ARGUMENT;
|
|
LOG_WARN("invalid argument", KR(ret), K(macro_block_cnt), K(block_size));
|
|
} else {
|
|
int64_t alignment = 4096;
|
|
ObMemAttr attr(MTL_ID(), "test");
|
|
char *buf = nullptr;
|
|
if (OB_ISNULL(buf = static_cast<char *>(ob_malloc_align(alignment, block_size, attr)))) {
|
|
ret = OB_ALLOCATE_MEMORY_FAILED;
|
|
LOG_WARN("fail to allocate memory", KR(ret));
|
|
} else {
|
|
const int64_t random_min_micro_size = 200;
|
|
const int64_t random_max_micro_size = 16 * 1024;
|
|
const int64_t start_offset = ObSSPhyBlockCommonHeader::get_serialize_size() +
|
|
ObSSNormalPhyBlockHeader::get_fixed_serialize_size();
|
|
for (int64_t i = start_macro_id; OB_SUCC(ret) && i < (macro_block_cnt + start_macro_id); ++i) {
|
|
MacroBlockId macro_id = gen_macro_block_id(i);
|
|
char c = macro_id.hash() % 128;
|
|
MEMSET(buf, c, block_size);
|
|
ObStorageObjectHandle write_object_handle;
|
|
ObSSObjectStorageWriter object_storage_writer;
|
|
ObStorageObjectWriteInfo write_info;
|
|
write_info.io_desc_.set_wait_event(1);
|
|
write_info.buffer_ = buf;
|
|
write_info.offset_ = 0;
|
|
write_info.size_ = block_size;
|
|
write_info.io_timeout_ms_ = DEFAULT_IO_WAIT_TIME_MS;
|
|
write_info.mtl_tenant_id_ = MTL_ID();
|
|
if (OB_FAIL(write_object_handle.set_macro_block_id(macro_id))) {
|
|
LOG_WARN("fail to set macro block id", KR(ret), K(macro_id));
|
|
} else if (OB_FAIL(object_storage_writer.aio_write(write_info, write_object_handle))) {
|
|
LOG_WARN("fail to aio write", KR(ret), K(write_info), K(write_object_handle));
|
|
} else if (OB_FAIL(write_object_handle.wait())) {
|
|
LOG_WARN("fail to wait", KR(ret));
|
|
}
|
|
|
|
if (OB_SUCC(ret)) {
|
|
// split macro block into micro blocks.
|
|
int64_t remain_size = block_size - start_offset;
|
|
int32_t offset = start_offset;
|
|
do {
|
|
const int32_t micro_size = random_micro_size ? ObRandom::rand(min_micro_size, max_micro_size)
|
|
: (min_micro_size + max_micro_size) / 2;
|
|
ObSSMicroBlockCacheKey tmp_key = gen_phy_micro_key(macro_id, offset, micro_size);
|
|
ObSSMicroBlockIndex micro_index(tmp_key, micro_size);
|
|
const int32_t delta_size = micro_size + micro_index.get_serialize_size();
|
|
if (remain_size >= delta_size) {
|
|
MicroBlockInfo micro_info(macro_id, offset, micro_size);
|
|
if (OB_FAIL(micro_block_arr.push_back(micro_info))) {
|
|
LOG_WARN("fail to push back", KR(ret), K(micro_info));
|
|
} else {
|
|
offset += micro_size;
|
|
remain_size -= delta_size;
|
|
}
|
|
} else {
|
|
remain_size = 0;
|
|
}
|
|
} while (OB_SUCC(ret) && remain_size > 0);
|
|
}
|
|
}
|
|
}
|
|
if (buf != nullptr) {
|
|
ob_free_align(buf);
|
|
}
|
|
}
|
|
return ret;
|
|
}
|
|
|
|
int TestSSCommonUtil::get_micro_block(const MicroBlockInfo µ_info, char *read_buf)
|
|
{
|
|
int ret = OB_SUCCESS;
|
|
if (OB_UNLIKELY(!micro_info.is_valid()|| OB_ISNULL(read_buf))) {
|
|
ret = OB_INVALID_ARGUMENT;
|
|
LOG_WARN("invalid argument", KR(ret), K(micro_info), KP(read_buf));
|
|
} else {
|
|
ObStorageObjectHandle object_handle;
|
|
ObIOInfo io_info;
|
|
io_info.tenant_id_ = MTL_ID();
|
|
io_info.offset_ = micro_info.offset_;
|
|
io_info.size_ = micro_info.size_;
|
|
io_info.flag_.set_wait_event(1);
|
|
io_info.timeout_us_ = 5 * 1000 * 1000;
|
|
io_info.buf_ = read_buf;
|
|
io_info.user_data_buf_ = read_buf;
|
|
ObSSMicroBlockCacheKey micro_key = gen_phy_micro_key(micro_info.macro_id_, micro_info.offset_, micro_info.size_);
|
|
ObSSMicroBlockId phy_micro_id(micro_info.macro_id_, micro_info.offset_, micro_info.size_);
|
|
ObSSMicroCache *micro_cache = MTL(ObSSMicroCache *);
|
|
if (OB_FAIL(micro_cache->get_micro_block_cache(micro_key, phy_micro_id,
|
|
MicroCacheGetType::FORCE_GET_DATA, io_info, object_handle,
|
|
ObSSMicroCacheAccessType::COMMON_IO_TYPE))) {
|
|
LOG_WARN("fail to get micro block cache", KR(ret), K(micro_info));
|
|
} else if (OB_FAIL(object_handle.wait())) {
|
|
LOG_WARN("fail to wait until get micro block data", KR(ret), K(micro_info));
|
|
} else if (OB_UNLIKELY(io_info.size_ != object_handle.get_data_size())) {
|
|
ret = OB_ERR_UNEXPECTED;
|
|
LOG_WARN("get data size wrong", KR(ret), K(io_info.size_), K(object_handle.get_data_size()), K(micro_info));
|
|
} else {
|
|
char c = micro_info.macro_id_.hash() % 128;
|
|
for (int64_t i = 0; OB_SUCC(ret) && i < micro_info.size_; i++) {
|
|
if (c != read_buf[i]) {
|
|
ret = OB_ERR_UNEXPECTED;
|
|
LOG_WARN("get wrong data", KR(ret), K(i), K(c), K(read_buf[i]));
|
|
}
|
|
}
|
|
}
|
|
}
|
|
return ret;
|
|
}
|
|
|
|
int TestSSCommonUtil::init_io_info(
|
|
ObIOInfo &io_info,
|
|
const ObSSMicroBlockCacheKey µ_key,
|
|
const int32_t size,
|
|
char *read_buf)
|
|
{
|
|
int ret = OB_SUCCESS;
|
|
if (OB_UNLIKELY(!micro_key.is_valid() || micro_key.is_logic_key()) || OB_ISNULL(read_buf)) {
|
|
ret = OB_INVALID_ARGUMENT;
|
|
LOG_WARN("invalid argument", KR(ret), K(micro_key), K(read_buf));
|
|
} else {
|
|
io_info.tenant_id_ = MTL_ID();
|
|
io_info.offset_ = micro_key.micro_id_.offset_;
|
|
io_info.size_ = size;
|
|
io_info.flag_.set_wait_event(1);
|
|
io_info.timeout_us_ = 5 * 1000 * 1000;
|
|
io_info.buf_ = read_buf;
|
|
io_info.user_data_buf_ = read_buf;
|
|
}
|
|
return ret;
|
|
}
|
|
|
|
int TestSSCommonUtil::wait_for_persist_task()
|
|
{
|
|
int ret = OB_SUCCESS;
|
|
const int64_t interval_us = 10 * 1000 * 1000L;
|
|
const int64_t start_us = ObTimeUtility::current_time();
|
|
while (ATOMIC_LOAD(&SSMicroCacheStat.mem_blk_stat().mem_blk_fg_used_cnt_) > 1 ||
|
|
ATOMIC_LOAD(&SSMicroCacheStat.mem_blk_stat().mem_blk_bg_used_cnt_) > 1) {
|
|
ob_usleep(1000);
|
|
const int64_t cur_us = ObTimeUtility::current_time();
|
|
if (cur_us - start_us > interval_us) {
|
|
ret = OB_ERR_UNEXPECTED;
|
|
LOG_WARN("waiting time is too long", KR(ret), "cache_stat", SSMicroCacheStat);
|
|
break;
|
|
}
|
|
}
|
|
return ret;
|
|
}
|
|
|
|
int TestSSCommonUtil::alloc_micro_block_meta(ObSSMicroBlockMeta *µ_meta)
|
|
{
|
|
int ret = OB_SUCCESS;
|
|
void *ptr = SSMicroMetaAlloc.alloc();
|
|
if (OB_ISNULL(ptr)) {
|
|
ret = OB_ALLOCATE_MEMORY_FAILED;
|
|
LOG_WARN("fail to alloc micro_meta mem", KR(ret));
|
|
} else {
|
|
micro_meta = new(ptr) ObSSMicroBlockMeta();
|
|
SSMicroCacheStat.micro_stat().update_micro_pool_alloc_cnt(1);
|
|
}
|
|
return ret;
|
|
}
|
|
|
|
ObCompressorType TestSSCommonUtil::get_random_compress_type()
|
|
{
|
|
const int64_t random_val = ObRandom::rand(1, 2);
|
|
const ObCompressorType compress_type = (random_val % 2 == 0) ? ObCompressorType::SNAPPY_COMPRESSOR : ObCompressorType::NONE_COMPRESSOR;
|
|
return compress_type;
|
|
}
|
|
|
|
} // namespace storage
|
|
} // namespace oceanbase
|