395 lines
14 KiB
C++
395 lines
14 KiB
C++
/**
|
|
* Copyright (c) 2022 OceanBase
|
|
* OceanBase CE is licensed under Mulan PubL v2.
|
|
* You can use this software according to the terms and conditions of the Mulan PubL v2.
|
|
* You may obtain a copy of Mulan PubL v2 at:
|
|
* http://license.coscl.org.cn/MulanPubL-2.0
|
|
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
|
|
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
|
|
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
|
|
* See the Mulan PubL v2 for more details.
|
|
*/
|
|
#define USING_LOG_PREFIX STORAGE
|
|
#define private public
|
|
#define protected public
|
|
|
|
#include "storage/column_store/ob_co_prefetcher.h"
|
|
#include "ob_index_block_data_prepare.h"
|
|
|
|
|
|
namespace oceanbase
|
|
{
|
|
using namespace storage;
|
|
using namespace common;
|
|
namespace blocksstable
|
|
{
|
|
class TestCOPrefetcher : public TestIndexBlockDataPrepare
|
|
{
|
|
public:
|
|
TestCOPrefetcher();
|
|
virtual ~TestCOPrefetcher();
|
|
static void SetUpTestCase();
|
|
static void TearDownTestCase();
|
|
virtual void SetUp();
|
|
virtual void TearDown();
|
|
virtual void prepare_schema();
|
|
|
|
void generate_range(
|
|
const int64_t start,
|
|
const int64_t end,
|
|
ObDatumRange &range);
|
|
void generate_border(const int64_t border);
|
|
void prepare_test_case(int level_cnt);
|
|
void prepare_one_block_test_case();
|
|
void clear_test_case();
|
|
void try_prefetch();
|
|
static const int64_t DEFAULT_ROWS_PER_MICRO_BLOCK = 3;
|
|
static const int64_t MIRCO_BLOCKS_PER_MACRO_BLOCK = 2;
|
|
static const int64_t DEFAULT_ROWS_PER_MIDDLE_LAYER_MICRO_BLOCK = 3;
|
|
static const int64_t MAX_LOOP_CNT = 10000;
|
|
static const int64_t ROWKEY_COLUMN_NUM = 1;
|
|
static const int64_t COLUMN_NUM = 2;
|
|
static const int64_t MICRO_BLOCK_SIZE = 500;
|
|
|
|
public:
|
|
ObArenaAllocator allocator_;
|
|
ObTableAccessParam access_param_;
|
|
ObDatumRow start_row_;
|
|
ObDatumRow end_row_;
|
|
ObDatumRow border_row_;
|
|
ObDatumRowkey border_rowkey_;
|
|
ObDatumRange range_;
|
|
ObCOPrefetcher co_prefetcher_;
|
|
};
|
|
|
|
TestCOPrefetcher::TestCOPrefetcher()
|
|
: TestIndexBlockDataPrepare("Test co prefetcher", compaction::ObMergeType::MAJOR_MERGE)
|
|
{
|
|
}
|
|
|
|
TestCOPrefetcher::~TestCOPrefetcher()
|
|
{
|
|
}
|
|
|
|
void TestCOPrefetcher::SetUpTestCase()
|
|
{
|
|
TestIndexBlockDataPrepare::SetUpTestCase();
|
|
}
|
|
|
|
void TestCOPrefetcher::TearDownTestCase()
|
|
{
|
|
TestIndexBlockDataPrepare::TearDownTestCase();
|
|
}
|
|
|
|
void TestCOPrefetcher::SetUp()
|
|
{
|
|
TestIndexBlockDataPrepare::SetUp();
|
|
ASSERT_EQ(OB_SUCCESS, start_row_.init(allocator_, COLUMN_NUM));
|
|
ASSERT_EQ(OB_SUCCESS, end_row_.init(allocator_, COLUMN_NUM));
|
|
ASSERT_EQ(OB_SUCCESS, border_row_.init(allocator_, COLUMN_NUM));
|
|
}
|
|
|
|
void TestCOPrefetcher::TearDown()
|
|
{
|
|
tablet_handle_.reset();
|
|
TestIndexBlockDataPrepare::TearDown();
|
|
}
|
|
|
|
void TestCOPrefetcher::prepare_schema()
|
|
{
|
|
ObColumnSchemaV2 column;
|
|
const int64_t rowkey_column_num = ROWKEY_COLUMN_NUM;
|
|
const int64_t column_num = COLUMN_NUM;
|
|
// Init table schema
|
|
uint64_t table_id = TEST_TABLE_ID;
|
|
table_schema_.reset();
|
|
ASSERT_EQ(OB_SUCCESS, table_schema_.set_table_name("test_index_block"));
|
|
table_schema_.set_tenant_id(1);
|
|
table_schema_.set_tablegroup_id(1);
|
|
table_schema_.set_database_id(1);
|
|
table_schema_.set_table_id(table_id);
|
|
table_schema_.set_rowkey_column_num(rowkey_column_num);
|
|
table_schema_.set_max_used_column_id(column_num);
|
|
table_schema_.set_block_size(2 * 1024);
|
|
table_schema_.set_compress_func_name("none");
|
|
table_schema_.set_row_store_type(row_store_type_);
|
|
table_schema_.set_storage_format_version(OB_STORAGE_FORMAT_VERSION_V4);
|
|
table_schema_.set_micro_index_clustered(false);
|
|
|
|
index_schema_.reset();
|
|
|
|
// Init column
|
|
char name[OB_MAX_FILE_NAME_LENGTH];
|
|
memset(name, 0, sizeof(name));
|
|
for(int64_t i = 0; i < column_num; ++i) {
|
|
ObObjType obj_type = ObIntType;
|
|
column.reset();
|
|
column.set_table_id(TEST_TABLE_ID);
|
|
column.set_column_id(i + OB_APP_MIN_COLUMN_ID);
|
|
sprintf(name, "test%020ld", i);
|
|
ASSERT_EQ(OB_SUCCESS, column.set_column_name(name));
|
|
column.set_data_type(obj_type);
|
|
column.set_collation_type(CS_TYPE_UTF8MB4_GENERAL_CI);
|
|
column.set_data_length(1);
|
|
if (0 == i) {
|
|
column.set_rowkey_position(1);
|
|
} else {
|
|
column.set_rowkey_position(0);
|
|
}
|
|
ASSERT_EQ(OB_SUCCESS, table_schema_.add_column(column));
|
|
}
|
|
}
|
|
|
|
void TestCOPrefetcher::generate_range(
|
|
const int64_t start,
|
|
const int64_t end,
|
|
ObDatumRange &range)
|
|
{
|
|
ObDatumRowkey tmp_rowkey;
|
|
ASSERT_EQ(OB_SUCCESS, row_generate_.get_next_row(start, start_row_));
|
|
tmp_rowkey.assign(start_row_.storage_datums_, ROWKEY_COLUMN_NUM);
|
|
ASSERT_EQ(OB_SUCCESS, tmp_rowkey.deep_copy(range.start_key_, allocator_));
|
|
ASSERT_EQ(OB_SUCCESS, row_generate_.get_next_row(end, end_row_));
|
|
tmp_rowkey.assign(end_row_.storage_datums_, ROWKEY_COLUMN_NUM);
|
|
ASSERT_EQ(OB_SUCCESS, tmp_rowkey.deep_copy(range.end_key_, allocator_));
|
|
range.border_flag_.set_inclusive_start();
|
|
range.border_flag_.set_inclusive_end();
|
|
}
|
|
|
|
void TestCOPrefetcher::generate_border(const int64_t border)
|
|
{
|
|
// rowkey value = seed * column_type + column_id;
|
|
// normal column value = seed * column_type + column_id;
|
|
ASSERT_EQ(OB_SUCCESS, row_generate_.get_next_row(border, border_row_));
|
|
border_rowkey_.assign(border_row_.storage_datums_, ROWKEY_COLUMN_NUM);
|
|
}
|
|
|
|
void TestCOPrefetcher::prepare_test_case(int level_cnt)
|
|
{
|
|
rows_per_mirco_block_ = DEFAULT_ROWS_PER_MICRO_BLOCK;
|
|
mirco_blocks_per_macro_block_ = MIRCO_BLOCKS_PER_MACRO_BLOCK;
|
|
max_row_cnt_ = rows_per_mirco_block_ * mirco_blocks_per_macro_block_ * DEFAULT_ROWS_PER_MIDDLE_LAYER_MICRO_BLOCK;
|
|
level_cnt -= 2;
|
|
while (level_cnt-- > 0) {
|
|
max_row_cnt_ *= DEFAULT_ROWS_PER_MIDDLE_LAYER_MICRO_BLOCK;
|
|
}
|
|
if (is_cg_data_) {
|
|
prepare_cg_data();
|
|
} else {
|
|
prepare_data(MICRO_BLOCK_SIZE);
|
|
}
|
|
ASSERT_EQ(max_row_cnt_, row_cnt_);
|
|
}
|
|
|
|
void TestCOPrefetcher::prepare_one_block_test_case()
|
|
{
|
|
rows_per_mirco_block_ = DEFAULT_ROWS_PER_MICRO_BLOCK;
|
|
mirco_blocks_per_macro_block_ = MIRCO_BLOCKS_PER_MACRO_BLOCK;
|
|
max_row_cnt_ = rows_per_mirco_block_;
|
|
if (is_cg_data_) {
|
|
prepare_cg_data();
|
|
} else {
|
|
prepare_data(MICRO_BLOCK_SIZE);
|
|
}
|
|
ASSERT_EQ(max_row_cnt_, row_cnt_);
|
|
}
|
|
|
|
void TestCOPrefetcher::clear_test_case()
|
|
{
|
|
TestIndexBlockDataPrepare::TearDown();
|
|
}
|
|
|
|
void TestCOPrefetcher::try_prefetch()
|
|
{
|
|
int cnt = 0;
|
|
OK(co_prefetcher_.prefetch());
|
|
while (co_prefetcher_.read_wait()) {
|
|
OK(co_prefetcher_.prefetch());
|
|
if (++cnt > MAX_LOOP_CNT) {
|
|
ASSERT_TRUE(false);
|
|
break;
|
|
}
|
|
}
|
|
}
|
|
|
|
TEST_F(TestCOPrefetcher, test_basic_case)
|
|
{
|
|
const int level_cnt = 5; // Total 486 rows.
|
|
prepare_test_case(level_cnt);
|
|
int64_t start = 0;
|
|
int64_t end = max_row_cnt_ - 1;
|
|
generate_range(start, end, range_);
|
|
bool is_reverse_scan = false;
|
|
prepare_query_param(is_reverse_scan);
|
|
int iter_type = ObStoreRowIterator::IteratorScan;
|
|
OK(co_prefetcher_.init(iter_type, sstable_, iter_param_, context_, &range_));
|
|
try_prefetch();
|
|
int64_t border_id1 = max_row_cnt_ / 2;
|
|
generate_border(border_id1);
|
|
// 1. Switch to columnar scan.
|
|
OK(co_prefetcher_.refresh_blockscan_checker_for_column_store(1, border_rowkey_));
|
|
|
|
ASSERT_EQ(1, co_prefetcher_.get_cur_level_of_block_scan());
|
|
ASSERT_EQ(rows_per_mirco_block_, co_prefetcher_.block_scan_start_row_id_);
|
|
ASSERT_TRUE(OB_INVALID_CS_ROW_ID != co_prefetcher_.block_scan_border_row_id_);
|
|
ASSERT_EQ(MicroDataBlockScanState::PENDING_BLOCK_SCAN, co_prefetcher_.block_scan_state_);
|
|
ObCSRowId prev_row_id = co_prefetcher_.block_scan_border_row_id_;
|
|
int cnt = 0;
|
|
while (true) {
|
|
OK(co_prefetcher_.prefetch());
|
|
if (MicroDataBlockScanState::IN_END_OF_RANGE == co_prefetcher_.block_scan_state_) {
|
|
ASSERT_TRUE(OB_INVALID_CS_ROW_ID != co_prefetcher_.block_scan_border_row_id_);
|
|
ASSERT_TRUE(prev_row_id <= co_prefetcher_.block_scan_border_row_id_);
|
|
ASSERT_TRUE(border_id1 > co_prefetcher_.block_scan_border_row_id_);
|
|
ASSERT_TRUE((border_id1 - 1 - co_prefetcher_.block_scan_border_row_id_) <= rows_per_mirco_block_);
|
|
break;
|
|
} else if (MicroDataBlockScanState::PENDING_BLOCK_SCAN == co_prefetcher_.block_scan_state_) {
|
|
ASSERT_TRUE(OB_INVALID_CS_ROW_ID != co_prefetcher_.block_scan_border_row_id_);
|
|
ASSERT_TRUE(prev_row_id <= co_prefetcher_.block_scan_border_row_id_);
|
|
prev_row_id = co_prefetcher_.block_scan_border_row_id_;
|
|
} else {
|
|
break;
|
|
}
|
|
if (++cnt > MAX_LOOP_CNT) {
|
|
ASSERT_TRUE(false);
|
|
break;
|
|
}
|
|
}
|
|
|
|
ASSERT_EQ(-1, co_prefetcher_.get_cur_level_of_block_scan());
|
|
co_prefetcher_.block_scan_state_ = MicroDataBlockScanState::ROW_STORE_SCAN;
|
|
int border_id2 = max_row_cnt_ / 6 * 5;
|
|
generate_border(border_id2);
|
|
try_prefetch();
|
|
// 2. Switch to columnar scan again.
|
|
OK(co_prefetcher_.refresh_blockscan_checker_for_column_store(co_prefetcher_.cur_micro_data_fetch_idx_ + 1, border_rowkey_));
|
|
ASSERT_EQ(border_id1 + rows_per_mirco_block_, co_prefetcher_.block_scan_start_row_id_);
|
|
ASSERT_TRUE(OB_INVALID_CS_ROW_ID != co_prefetcher_.block_scan_border_row_id_);
|
|
ASSERT_EQ(MicroDataBlockScanState::PENDING_BLOCK_SCAN, co_prefetcher_.block_scan_state_);
|
|
prev_row_id = co_prefetcher_.block_scan_border_row_id_;
|
|
cnt = 0;
|
|
while (true) {
|
|
OK(co_prefetcher_.prefetch());
|
|
if (MicroDataBlockScanState::IN_END_OF_RANGE == co_prefetcher_.block_scan_state_) {
|
|
ASSERT_TRUE(OB_INVALID_CS_ROW_ID != co_prefetcher_.block_scan_border_row_id_);
|
|
ASSERT_TRUE(prev_row_id <= co_prefetcher_.block_scan_border_row_id_);
|
|
ASSERT_TRUE(border_id2 >= co_prefetcher_.block_scan_border_row_id_);
|
|
ASSERT_TRUE((border_id2 - co_prefetcher_.block_scan_border_row_id_) <= rows_per_mirco_block_);
|
|
break;
|
|
} else if (MicroDataBlockScanState::PENDING_BLOCK_SCAN == co_prefetcher_.block_scan_state_) {
|
|
ASSERT_TRUE(OB_INVALID_CS_ROW_ID != co_prefetcher_.block_scan_border_row_id_);
|
|
ASSERT_TRUE(prev_row_id <= co_prefetcher_.block_scan_border_row_id_);
|
|
prev_row_id = co_prefetcher_.block_scan_border_row_id_;
|
|
} else {
|
|
break;
|
|
}
|
|
if (++cnt > MAX_LOOP_CNT) {
|
|
ASSERT_TRUE(false);
|
|
break;
|
|
}
|
|
}
|
|
|
|
// 3. Consumed prefetcher
|
|
cnt = 0;
|
|
ASSERT_EQ(-1, co_prefetcher_.get_cur_level_of_block_scan());
|
|
co_prefetcher_.block_scan_state_ = MicroDataBlockScanState::ROW_STORE_SCAN;
|
|
while (true) {
|
|
OK(co_prefetcher_.prefetch());
|
|
if (++cnt > MAX_LOOP_CNT) {
|
|
ASSERT_TRUE(false);
|
|
break;
|
|
}
|
|
if (co_prefetcher_.is_prefetch_end_) {
|
|
break;
|
|
}
|
|
if (co_prefetcher_.read_wait()) {
|
|
continue;
|
|
}
|
|
++co_prefetcher_.cur_micro_data_fetch_idx_;
|
|
}
|
|
destroy_query_param();
|
|
}
|
|
|
|
TEST_F(TestCOPrefetcher, test_one_micro_block_case)
|
|
{
|
|
// Only one data micro block.
|
|
prepare_one_block_test_case();
|
|
int64_t start = 0;
|
|
int64_t end = max_row_cnt_ - 1;
|
|
generate_range(start, end, range_);
|
|
bool is_reverse_scan = false;
|
|
prepare_query_param(is_reverse_scan);
|
|
int iter_type = ObStoreRowIterator::IteratorScan;
|
|
OK(co_prefetcher_.init(iter_type, sstable_, iter_param_, context_, &range_));
|
|
try_prefetch();
|
|
int64_t border_id1 = max_row_cnt_ + 1;
|
|
generate_border(border_id1);
|
|
OK(co_prefetcher_.refresh_blockscan_checker_for_column_store(1, border_rowkey_));
|
|
ASSERT_EQ(2, co_prefetcher_.index_tree_height_);
|
|
ASSERT_EQ(-1, co_prefetcher_.get_cur_level_of_block_scan());
|
|
ASSERT_EQ(OB_INVALID_CS_ROW_ID, co_prefetcher_.block_scan_start_row_id_);
|
|
ASSERT_EQ(OB_INVALID_CS_ROW_ID, co_prefetcher_.block_scan_border_row_id_);
|
|
ASSERT_EQ(MicroDataBlockScanState::ROW_STORE_SCAN, co_prefetcher_.block_scan_state_);
|
|
}
|
|
|
|
TEST_F(TestCOPrefetcher, test_all_columnar_scan_case)
|
|
{
|
|
// Border is max, scan all by columnar store.
|
|
const int level_cnt = 5; // Total 486 rows.
|
|
prepare_test_case(level_cnt);
|
|
int64_t start = 0;
|
|
int64_t end = max_row_cnt_ - 1;
|
|
generate_range(start, end, range_);
|
|
bool is_reverse_scan = false;
|
|
prepare_query_param(is_reverse_scan);
|
|
int iter_type = ObStoreRowIterator::IteratorScan;
|
|
OK(co_prefetcher_.init(iter_type, sstable_, iter_param_, context_, &range_));
|
|
try_prefetch();
|
|
int64_t border_id1 = max_row_cnt_;
|
|
generate_border(border_id1);
|
|
OK(co_prefetcher_.refresh_blockscan_checker_for_column_store(1, border_rowkey_));
|
|
ASSERT_EQ(1, co_prefetcher_.get_cur_level_of_block_scan());
|
|
ASSERT_EQ(rows_per_mirco_block_, co_prefetcher_.block_scan_start_row_id_);
|
|
ASSERT_TRUE(OB_INVALID_CS_ROW_ID != co_prefetcher_.block_scan_border_row_id_);
|
|
ASSERT_EQ(MicroDataBlockScanState::PENDING_BLOCK_SCAN, co_prefetcher_.block_scan_state_);
|
|
ObCSRowId prev_row_id = co_prefetcher_.block_scan_border_row_id_;
|
|
int cnt = 0;
|
|
while (true) {
|
|
OK(co_prefetcher_.prefetch());
|
|
if (MicroDataBlockScanState::IN_END_OF_RANGE == co_prefetcher_.block_scan_state_) {
|
|
ASSERT_TRUE(OB_INVALID_CS_ROW_ID != co_prefetcher_.block_scan_border_row_id_);
|
|
ASSERT_TRUE(prev_row_id <= co_prefetcher_.block_scan_border_row_id_);
|
|
ASSERT_TRUE(border_id1 > co_prefetcher_.block_scan_border_row_id_);
|
|
// In range border, so co_prefetcher_.block_scan_border_row_id_ is not max_row_cnt_ - 1.
|
|
ASSERT_EQ(max_row_cnt_ - 1 - rows_per_mirco_block_, co_prefetcher_.block_scan_border_row_id_);
|
|
break;
|
|
} else if (MicroDataBlockScanState::PENDING_BLOCK_SCAN == co_prefetcher_.block_scan_state_) {
|
|
ASSERT_TRUE(OB_INVALID_CS_ROW_ID != co_prefetcher_.block_scan_border_row_id_);
|
|
ASSERT_TRUE(prev_row_id <= co_prefetcher_.block_scan_border_row_id_);
|
|
prev_row_id = co_prefetcher_.block_scan_border_row_id_;
|
|
} else {
|
|
break;
|
|
}
|
|
if (++cnt > MAX_LOOP_CNT) {
|
|
ASSERT_TRUE(false);
|
|
break;
|
|
}
|
|
}
|
|
ASSERT_EQ(-1, co_prefetcher_.get_cur_level_of_block_scan());
|
|
co_prefetcher_.block_scan_state_ = MicroDataBlockScanState::ROW_STORE_SCAN;
|
|
try_prefetch();
|
|
ASSERT_TRUE(co_prefetcher_.is_prefetch_end_);
|
|
}
|
|
|
|
}
|
|
}
|
|
|
|
int main(int argc, char **argv)
|
|
{
|
|
system("rm -f test_co_prefetcher.log*");
|
|
OB_LOGGER.set_file_name("test_co_prefetcher.log", true, true);
|
|
oceanbase::common::ObLogger::get_logger().set_log_level("INFO");
|
|
testing::InitGoogleTest(&argc, argv);
|
|
return RUN_ALL_TESTS();
|
|
}
|