Files
oceanbase/mittest/mtlenv/storage/blocksstable/test_co_prefetcher.cpp

395 lines
14 KiB
C++

/**
* Copyright (c) 2022 OceanBase
* OceanBase CE is licensed under Mulan PubL v2.
* You can use this software according to the terms and conditions of the Mulan PubL v2.
* You may obtain a copy of Mulan PubL v2 at:
* http://license.coscl.org.cn/MulanPubL-2.0
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
* See the Mulan PubL v2 for more details.
*/
#define USING_LOG_PREFIX STORAGE
#define private public
#define protected public
#include "storage/column_store/ob_co_prefetcher.h"
#include "ob_index_block_data_prepare.h"
namespace oceanbase
{
using namespace storage;
using namespace common;
namespace blocksstable
{
class TestCOPrefetcher : public TestIndexBlockDataPrepare
{
public:
TestCOPrefetcher();
virtual ~TestCOPrefetcher();
static void SetUpTestCase();
static void TearDownTestCase();
virtual void SetUp();
virtual void TearDown();
virtual void prepare_schema();
void generate_range(
const int64_t start,
const int64_t end,
ObDatumRange &range);
void generate_border(const int64_t border);
void prepare_test_case(int level_cnt);
void prepare_one_block_test_case();
void clear_test_case();
void try_prefetch();
static const int64_t DEFAULT_ROWS_PER_MICRO_BLOCK = 3;
static const int64_t MIRCO_BLOCKS_PER_MACRO_BLOCK = 2;
static const int64_t DEFAULT_ROWS_PER_MIDDLE_LAYER_MICRO_BLOCK = 3;
static const int64_t MAX_LOOP_CNT = 10000;
static const int64_t ROWKEY_COLUMN_NUM = 1;
static const int64_t COLUMN_NUM = 2;
static const int64_t MICRO_BLOCK_SIZE = 500;
public:
ObArenaAllocator allocator_;
ObTableAccessParam access_param_;
ObDatumRow start_row_;
ObDatumRow end_row_;
ObDatumRow border_row_;
ObDatumRowkey border_rowkey_;
ObDatumRange range_;
ObCOPrefetcher co_prefetcher_;
};
TestCOPrefetcher::TestCOPrefetcher()
: TestIndexBlockDataPrepare("Test co prefetcher", compaction::ObMergeType::MAJOR_MERGE)
{
}
TestCOPrefetcher::~TestCOPrefetcher()
{
}
void TestCOPrefetcher::SetUpTestCase()
{
TestIndexBlockDataPrepare::SetUpTestCase();
}
void TestCOPrefetcher::TearDownTestCase()
{
TestIndexBlockDataPrepare::TearDownTestCase();
}
void TestCOPrefetcher::SetUp()
{
TestIndexBlockDataPrepare::SetUp();
ASSERT_EQ(OB_SUCCESS, start_row_.init(allocator_, COLUMN_NUM));
ASSERT_EQ(OB_SUCCESS, end_row_.init(allocator_, COLUMN_NUM));
ASSERT_EQ(OB_SUCCESS, border_row_.init(allocator_, COLUMN_NUM));
}
void TestCOPrefetcher::TearDown()
{
tablet_handle_.reset();
TestIndexBlockDataPrepare::TearDown();
}
void TestCOPrefetcher::prepare_schema()
{
ObColumnSchemaV2 column;
const int64_t rowkey_column_num = ROWKEY_COLUMN_NUM;
const int64_t column_num = COLUMN_NUM;
// Init table schema
uint64_t table_id = TEST_TABLE_ID;
table_schema_.reset();
ASSERT_EQ(OB_SUCCESS, table_schema_.set_table_name("test_index_block"));
table_schema_.set_tenant_id(1);
table_schema_.set_tablegroup_id(1);
table_schema_.set_database_id(1);
table_schema_.set_table_id(table_id);
table_schema_.set_rowkey_column_num(rowkey_column_num);
table_schema_.set_max_used_column_id(column_num);
table_schema_.set_block_size(2 * 1024);
table_schema_.set_compress_func_name("none");
table_schema_.set_row_store_type(row_store_type_);
table_schema_.set_storage_format_version(OB_STORAGE_FORMAT_VERSION_V4);
table_schema_.set_micro_index_clustered(false);
index_schema_.reset();
// Init column
char name[OB_MAX_FILE_NAME_LENGTH];
memset(name, 0, sizeof(name));
for(int64_t i = 0; i < column_num; ++i) {
ObObjType obj_type = ObIntType;
column.reset();
column.set_table_id(TEST_TABLE_ID);
column.set_column_id(i + OB_APP_MIN_COLUMN_ID);
sprintf(name, "test%020ld", i);
ASSERT_EQ(OB_SUCCESS, column.set_column_name(name));
column.set_data_type(obj_type);
column.set_collation_type(CS_TYPE_UTF8MB4_GENERAL_CI);
column.set_data_length(1);
if (0 == i) {
column.set_rowkey_position(1);
} else {
column.set_rowkey_position(0);
}
ASSERT_EQ(OB_SUCCESS, table_schema_.add_column(column));
}
}
void TestCOPrefetcher::generate_range(
const int64_t start,
const int64_t end,
ObDatumRange &range)
{
ObDatumRowkey tmp_rowkey;
ASSERT_EQ(OB_SUCCESS, row_generate_.get_next_row(start, start_row_));
tmp_rowkey.assign(start_row_.storage_datums_, ROWKEY_COLUMN_NUM);
ASSERT_EQ(OB_SUCCESS, tmp_rowkey.deep_copy(range.start_key_, allocator_));
ASSERT_EQ(OB_SUCCESS, row_generate_.get_next_row(end, end_row_));
tmp_rowkey.assign(end_row_.storage_datums_, ROWKEY_COLUMN_NUM);
ASSERT_EQ(OB_SUCCESS, tmp_rowkey.deep_copy(range.end_key_, allocator_));
range.border_flag_.set_inclusive_start();
range.border_flag_.set_inclusive_end();
}
void TestCOPrefetcher::generate_border(const int64_t border)
{
// rowkey value = seed * column_type + column_id;
// normal column value = seed * column_type + column_id;
ASSERT_EQ(OB_SUCCESS, row_generate_.get_next_row(border, border_row_));
border_rowkey_.assign(border_row_.storage_datums_, ROWKEY_COLUMN_NUM);
}
void TestCOPrefetcher::prepare_test_case(int level_cnt)
{
rows_per_mirco_block_ = DEFAULT_ROWS_PER_MICRO_BLOCK;
mirco_blocks_per_macro_block_ = MIRCO_BLOCKS_PER_MACRO_BLOCK;
max_row_cnt_ = rows_per_mirco_block_ * mirco_blocks_per_macro_block_ * DEFAULT_ROWS_PER_MIDDLE_LAYER_MICRO_BLOCK;
level_cnt -= 2;
while (level_cnt-- > 0) {
max_row_cnt_ *= DEFAULT_ROWS_PER_MIDDLE_LAYER_MICRO_BLOCK;
}
if (is_cg_data_) {
prepare_cg_data();
} else {
prepare_data(MICRO_BLOCK_SIZE);
}
ASSERT_EQ(max_row_cnt_, row_cnt_);
}
void TestCOPrefetcher::prepare_one_block_test_case()
{
rows_per_mirco_block_ = DEFAULT_ROWS_PER_MICRO_BLOCK;
mirco_blocks_per_macro_block_ = MIRCO_BLOCKS_PER_MACRO_BLOCK;
max_row_cnt_ = rows_per_mirco_block_;
if (is_cg_data_) {
prepare_cg_data();
} else {
prepare_data(MICRO_BLOCK_SIZE);
}
ASSERT_EQ(max_row_cnt_, row_cnt_);
}
void TestCOPrefetcher::clear_test_case()
{
TestIndexBlockDataPrepare::TearDown();
}
void TestCOPrefetcher::try_prefetch()
{
int cnt = 0;
OK(co_prefetcher_.prefetch());
while (co_prefetcher_.read_wait()) {
OK(co_prefetcher_.prefetch());
if (++cnt > MAX_LOOP_CNT) {
ASSERT_TRUE(false);
break;
}
}
}
TEST_F(TestCOPrefetcher, test_basic_case)
{
const int level_cnt = 5; // Total 486 rows.
prepare_test_case(level_cnt);
int64_t start = 0;
int64_t end = max_row_cnt_ - 1;
generate_range(start, end, range_);
bool is_reverse_scan = false;
prepare_query_param(is_reverse_scan);
int iter_type = ObStoreRowIterator::IteratorScan;
OK(co_prefetcher_.init(iter_type, sstable_, iter_param_, context_, &range_));
try_prefetch();
int64_t border_id1 = max_row_cnt_ / 2;
generate_border(border_id1);
// 1. Switch to columnar scan.
OK(co_prefetcher_.refresh_blockscan_checker_for_column_store(1, border_rowkey_));
ASSERT_EQ(1, co_prefetcher_.get_cur_level_of_block_scan());
ASSERT_EQ(rows_per_mirco_block_, co_prefetcher_.block_scan_start_row_id_);
ASSERT_TRUE(OB_INVALID_CS_ROW_ID != co_prefetcher_.block_scan_border_row_id_);
ASSERT_EQ(MicroDataBlockScanState::PENDING_BLOCK_SCAN, co_prefetcher_.block_scan_state_);
ObCSRowId prev_row_id = co_prefetcher_.block_scan_border_row_id_;
int cnt = 0;
while (true) {
OK(co_prefetcher_.prefetch());
if (MicroDataBlockScanState::IN_END_OF_RANGE == co_prefetcher_.block_scan_state_) {
ASSERT_TRUE(OB_INVALID_CS_ROW_ID != co_prefetcher_.block_scan_border_row_id_);
ASSERT_TRUE(prev_row_id <= co_prefetcher_.block_scan_border_row_id_);
ASSERT_TRUE(border_id1 > co_prefetcher_.block_scan_border_row_id_);
ASSERT_TRUE((border_id1 - 1 - co_prefetcher_.block_scan_border_row_id_) <= rows_per_mirco_block_);
break;
} else if (MicroDataBlockScanState::PENDING_BLOCK_SCAN == co_prefetcher_.block_scan_state_) {
ASSERT_TRUE(OB_INVALID_CS_ROW_ID != co_prefetcher_.block_scan_border_row_id_);
ASSERT_TRUE(prev_row_id <= co_prefetcher_.block_scan_border_row_id_);
prev_row_id = co_prefetcher_.block_scan_border_row_id_;
} else {
break;
}
if (++cnt > MAX_LOOP_CNT) {
ASSERT_TRUE(false);
break;
}
}
ASSERT_EQ(-1, co_prefetcher_.get_cur_level_of_block_scan());
co_prefetcher_.block_scan_state_ = MicroDataBlockScanState::ROW_STORE_SCAN;
int border_id2 = max_row_cnt_ / 6 * 5;
generate_border(border_id2);
try_prefetch();
// 2. Switch to columnar scan again.
OK(co_prefetcher_.refresh_blockscan_checker_for_column_store(co_prefetcher_.cur_micro_data_fetch_idx_ + 1, border_rowkey_));
ASSERT_EQ(border_id1 + rows_per_mirco_block_, co_prefetcher_.block_scan_start_row_id_);
ASSERT_TRUE(OB_INVALID_CS_ROW_ID != co_prefetcher_.block_scan_border_row_id_);
ASSERT_EQ(MicroDataBlockScanState::PENDING_BLOCK_SCAN, co_prefetcher_.block_scan_state_);
prev_row_id = co_prefetcher_.block_scan_border_row_id_;
cnt = 0;
while (true) {
OK(co_prefetcher_.prefetch());
if (MicroDataBlockScanState::IN_END_OF_RANGE == co_prefetcher_.block_scan_state_) {
ASSERT_TRUE(OB_INVALID_CS_ROW_ID != co_prefetcher_.block_scan_border_row_id_);
ASSERT_TRUE(prev_row_id <= co_prefetcher_.block_scan_border_row_id_);
ASSERT_TRUE(border_id2 >= co_prefetcher_.block_scan_border_row_id_);
ASSERT_TRUE((border_id2 - co_prefetcher_.block_scan_border_row_id_) <= rows_per_mirco_block_);
break;
} else if (MicroDataBlockScanState::PENDING_BLOCK_SCAN == co_prefetcher_.block_scan_state_) {
ASSERT_TRUE(OB_INVALID_CS_ROW_ID != co_prefetcher_.block_scan_border_row_id_);
ASSERT_TRUE(prev_row_id <= co_prefetcher_.block_scan_border_row_id_);
prev_row_id = co_prefetcher_.block_scan_border_row_id_;
} else {
break;
}
if (++cnt > MAX_LOOP_CNT) {
ASSERT_TRUE(false);
break;
}
}
// 3. Consumed prefetcher
cnt = 0;
ASSERT_EQ(-1, co_prefetcher_.get_cur_level_of_block_scan());
co_prefetcher_.block_scan_state_ = MicroDataBlockScanState::ROW_STORE_SCAN;
while (true) {
OK(co_prefetcher_.prefetch());
if (++cnt > MAX_LOOP_CNT) {
ASSERT_TRUE(false);
break;
}
if (co_prefetcher_.is_prefetch_end_) {
break;
}
if (co_prefetcher_.read_wait()) {
continue;
}
++co_prefetcher_.cur_micro_data_fetch_idx_;
}
destroy_query_param();
}
TEST_F(TestCOPrefetcher, test_one_micro_block_case)
{
// Only one data micro block.
prepare_one_block_test_case();
int64_t start = 0;
int64_t end = max_row_cnt_ - 1;
generate_range(start, end, range_);
bool is_reverse_scan = false;
prepare_query_param(is_reverse_scan);
int iter_type = ObStoreRowIterator::IteratorScan;
OK(co_prefetcher_.init(iter_type, sstable_, iter_param_, context_, &range_));
try_prefetch();
int64_t border_id1 = max_row_cnt_ + 1;
generate_border(border_id1);
OK(co_prefetcher_.refresh_blockscan_checker_for_column_store(1, border_rowkey_));
ASSERT_EQ(2, co_prefetcher_.index_tree_height_);
ASSERT_EQ(-1, co_prefetcher_.get_cur_level_of_block_scan());
ASSERT_EQ(OB_INVALID_CS_ROW_ID, co_prefetcher_.block_scan_start_row_id_);
ASSERT_EQ(OB_INVALID_CS_ROW_ID, co_prefetcher_.block_scan_border_row_id_);
ASSERT_EQ(MicroDataBlockScanState::ROW_STORE_SCAN, co_prefetcher_.block_scan_state_);
}
TEST_F(TestCOPrefetcher, test_all_columnar_scan_case)
{
// Border is max, scan all by columnar store.
const int level_cnt = 5; // Total 486 rows.
prepare_test_case(level_cnt);
int64_t start = 0;
int64_t end = max_row_cnt_ - 1;
generate_range(start, end, range_);
bool is_reverse_scan = false;
prepare_query_param(is_reverse_scan);
int iter_type = ObStoreRowIterator::IteratorScan;
OK(co_prefetcher_.init(iter_type, sstable_, iter_param_, context_, &range_));
try_prefetch();
int64_t border_id1 = max_row_cnt_;
generate_border(border_id1);
OK(co_prefetcher_.refresh_blockscan_checker_for_column_store(1, border_rowkey_));
ASSERT_EQ(1, co_prefetcher_.get_cur_level_of_block_scan());
ASSERT_EQ(rows_per_mirco_block_, co_prefetcher_.block_scan_start_row_id_);
ASSERT_TRUE(OB_INVALID_CS_ROW_ID != co_prefetcher_.block_scan_border_row_id_);
ASSERT_EQ(MicroDataBlockScanState::PENDING_BLOCK_SCAN, co_prefetcher_.block_scan_state_);
ObCSRowId prev_row_id = co_prefetcher_.block_scan_border_row_id_;
int cnt = 0;
while (true) {
OK(co_prefetcher_.prefetch());
if (MicroDataBlockScanState::IN_END_OF_RANGE == co_prefetcher_.block_scan_state_) {
ASSERT_TRUE(OB_INVALID_CS_ROW_ID != co_prefetcher_.block_scan_border_row_id_);
ASSERT_TRUE(prev_row_id <= co_prefetcher_.block_scan_border_row_id_);
ASSERT_TRUE(border_id1 > co_prefetcher_.block_scan_border_row_id_);
// In range border, so co_prefetcher_.block_scan_border_row_id_ is not max_row_cnt_ - 1.
ASSERT_EQ(max_row_cnt_ - 1 - rows_per_mirco_block_, co_prefetcher_.block_scan_border_row_id_);
break;
} else if (MicroDataBlockScanState::PENDING_BLOCK_SCAN == co_prefetcher_.block_scan_state_) {
ASSERT_TRUE(OB_INVALID_CS_ROW_ID != co_prefetcher_.block_scan_border_row_id_);
ASSERT_TRUE(prev_row_id <= co_prefetcher_.block_scan_border_row_id_);
prev_row_id = co_prefetcher_.block_scan_border_row_id_;
} else {
break;
}
if (++cnt > MAX_LOOP_CNT) {
ASSERT_TRUE(false);
break;
}
}
ASSERT_EQ(-1, co_prefetcher_.get_cur_level_of_block_scan());
co_prefetcher_.block_scan_state_ = MicroDataBlockScanState::ROW_STORE_SCAN;
try_prefetch();
ASSERT_TRUE(co_prefetcher_.is_prefetch_end_);
}
}
}
int main(int argc, char **argv)
{
system("rm -f test_co_prefetcher.log*");
OB_LOGGER.set_file_name("test_co_prefetcher.log", true, true);
oceanbase::common::ObLogger::get_logger().set_log_level("INFO");
testing::InitGoogleTest(&argc, argv);
return RUN_ALL_TESTS();
}