Add minor merge test for ghost row cross to macro data

This commit is contained in:
DengzhiLiu
2023-11-03 11:13:14 +00:00
committed by ob-robot
parent 88c00b6161
commit 7f09dcd4a7

View File

@ -2949,6 +2949,246 @@ TEST_F(TestMultiVersionMerge, test_major_range_cross_macro)
merger.reset(); merger.reset();
} }
TEST_F(TestMultiVersionMerge, test_trans_cross_macro_with_ghost_row)
{
int ret = OB_SUCCESS;
ObTabletMergeDagParam param;
ObTabletMergeCtx merge_context(param, allocator_);
ObPartitionMinorMerger merger(local_arena_, merge_context.static_param_);
ObTableHandleV2 handle1;
const char *micro_data[2];
micro_data[0] =
"bigint var bigint bigint bigint bigint flag multi_version_row_flag trans_id\n"
"1 var1 MIN 0 NOP 6 EXIST FU trans_id_1\n";
micro_data[1] =
"bigint var bigint bigint bigint bigint flag multi_version_row_flag \n"
"1 var1 MAGIC MAGIC NOP NOP EXIST LG\n"
"2 var2 -26 0 7 NOP EXIST LF\n";
int schema_rowkey_cnt = 2;
int64_t snapshot_version = 10;
share::ObScnRange scn_range;
scn_range.start_scn_.set_min();
scn_range.end_scn_.convert_for_tx(30);
prepare_table_schema(micro_data, schema_rowkey_cnt, scn_range, snapshot_version);
reset_writer(snapshot_version);
prepare_one_macro(micro_data, 1);
prepare_one_macro(&micro_data[1], 1);
prepare_data_end(handle1);
merge_context.static_param_.tables_handle_.add_table(handle1);
STORAGE_LOG(INFO, "finish prepare sstable1");
ObTableHandleV2 handle2;
const char *micro_data2[2];
micro_data2[0] =
"bigint var bigint bigint bigint bigint flag multi_version_row_flag trans_id\n"
"1 var1 MIN 0 8 NOP EXIST ULF trans_id_2\n";
micro_data2[1] =
"bigint var bigint bigint bigint bigint flag multi_version_row_flag \n"
"3 var3 -46 0 18 NOP EXIST LF\n";
snapshot_version = 20;
scn_range.start_scn_.convert_for_tx(30);
scn_range.end_scn_.convert_for_tx(50);
table_key_.scn_range_ = scn_range;
reset_writer(snapshot_version);
prepare_one_macro(micro_data2, 1);
prepare_one_macro(&micro_data2[1], 1);
prepare_data_end(handle2);
merge_context.static_param_.tables_handle_.add_table(handle2);
STORAGE_LOG(INFO, "finish prepare sstable2");
ObLSID ls_id(ls_id_);
ObTabletID tablet_id(tablet_id_);
ObLSHandle ls_handle;
ObLSService *ls_svr = MTL(ObLSService*);
ASSERT_EQ(OB_SUCCESS, ls_svr->get_ls(ls_id, ls_handle, ObLSGetMod::STORAGE_MOD));
ObTxTable *tx_table = nullptr;
ObTxTableGuard tx_table_guard;
ls_handle.get_ls()->get_tx_table_guard(tx_table_guard);
ASSERT_NE(nullptr, tx_table = tx_table_guard.get_tx_table());
for (int64_t i = 1; i < 3; i++) {
ObTxData *tx_data = new ObTxData();
transaction::ObTransID tx_id = i;
// fill in data
tx_data->tx_id_ = tx_id;
tx_data->commit_version_.convert_for_tx(i * 20 + 9);
tx_data->start_scn_.convert_for_tx(i);
tx_data->end_scn_ = tx_data->commit_version_;
tx_data->state_ = ObTxData::COMMIT;
ASSERT_EQ(OB_SUCCESS, tx_table->insert(tx_data));
delete tx_data;
}
ObVersionRange trans_version_range;
trans_version_range.snapshot_version_ = 100;
trans_version_range.multi_version_start_ = 1;
trans_version_range.base_version_ = 1;
prepare_merge_context(MINOR_MERGE, false, trans_version_range, merge_context);
// minor mrege
ObSSTable *merged_sstable = nullptr;
ASSERT_EQ(OB_SUCCESS, merger.merge_partition(merge_context, 0));
build_sstable(merge_context, merged_sstable);
const char *result1 =
"bigint var bigint bigint bigint bigint flag multi_version_row_flag trans_id\n"
"1 var1 -49 MIN 8 6 EXIST SCF\n"
"1 var1 -49 0 8 NOP EXIST \n"
"1 var1 -29 0 NOP 6 EXIST L\n"
"2 var2 -26 0 7 NOP EXIST LF\n"
"3 var3 -46 0 18 NOP EXIST LF\n";
ObMockIterator res_iter;
ObStoreRowIterator *scanner = NULL;
ObDatumRange range;
res_iter.reset();
range.set_whole_range();
trans_version_range.base_version_ = 1;
trans_version_range.multi_version_start_ = 1;
trans_version_range.snapshot_version_ = INT64_MAX;
prepare_query_param(trans_version_range);
ASSERT_EQ(OB_SUCCESS, merged_sstable->scan(iter_param_, context_, range, scanner));
ASSERT_EQ(OB_SUCCESS, res_iter.from(result1));
ObMockDirectReadIterator sstable_iter;
ASSERT_EQ(OB_SUCCESS, sstable_iter.init(scanner, allocator_, full_read_info_));
ASSERT_TRUE(res_iter.equals(sstable_iter, true/*cmp multi version row flag*/));
ASSERT_EQ(OB_SUCCESS, clear_tx_data());
scanner->~ObStoreRowIterator();
handle1.reset();
handle2.reset();
merger.reset();
}
TEST_F(TestMultiVersionMerge, test_trans_cross_macro_with_ghost_row2)
{
int ret = OB_SUCCESS;
ObTabletMergeDagParam param;
ObTabletMergeCtx merge_context(param, allocator_);
ObPartitionMinorMerger merger(local_arena_, merge_context.static_param_);
ObTableHandleV2 handle1;
const char *micro_data[2];
micro_data[0] =
"bigint var bigint bigint bigint bigint flag multi_version_row_flag trans_id\n"
"1 var1 MIN 0 8 NOP EXIST FU trans_id_1\n"
"1 var1 -17 MIN 9 5 EXIST SC trans_id_0\n"
"1 var1 -17 0 9 5 EXIST C trans_id_0\n"
"1 var1 -12 0 9 NOP EXIST L trans_id_0\n";
micro_data[1] =
"bigint var bigint bigint bigint bigint flag multi_version_row_flag \n"
"3 var3 -16 0 18 NOP EXIST LF\n";
int schema_rowkey_cnt = 2;
int64_t snapshot_version = 10;
share::ObScnRange scn_range;
scn_range.start_scn_.set_min();
scn_range.end_scn_.convert_for_tx(30);
prepare_table_schema(micro_data, schema_rowkey_cnt, scn_range, snapshot_version);
reset_writer(snapshot_version);
prepare_one_macro(micro_data, 1);
prepare_one_macro(&micro_data[1], 1);
prepare_data_end(handle1);
merge_context.static_param_.tables_handle_.add_table(handle1);
STORAGE_LOG(INFO, "finish prepare sstable1");
ObTableHandleV2 handle2;
const char *micro_data2[2];
micro_data2[0] =
"bigint var bigint bigint bigint bigint flag multi_version_row_flag trans_id\n"
"1 var1 MIN 0 NOP 6 EXIST FU trans_id_2\n";
micro_data2[1] =
"bigint var bigint bigint bigint bigint flag multi_version_row_flag \n"
"1 var1 MAGIC MAGIC NOP NOP EXIST LG\n"
"2 var2 -26 0 7 NOP EXIST LF\n";
snapshot_version = 20;
scn_range.start_scn_.convert_for_tx(30);
scn_range.end_scn_.convert_for_tx(50);
table_key_.scn_range_ = scn_range;
reset_writer(snapshot_version);
prepare_one_macro(micro_data2, 1);
prepare_one_macro(&micro_data2[1], 1);
prepare_data_end(handle2);
merge_context.static_param_.tables_handle_.add_table(handle2);
STORAGE_LOG(INFO, "finish prepare sstable2");
ObLSID ls_id(ls_id_);
ObTabletID tablet_id(tablet_id_);
ObLSHandle ls_handle;
ObLSService *ls_svr = MTL(ObLSService*);
ASSERT_EQ(OB_SUCCESS, ls_svr->get_ls(ls_id, ls_handle, ObLSGetMod::STORAGE_MOD));
ObTxTable *tx_table = nullptr;
ObTxTableGuard tx_table_guard;
ls_handle.get_ls()->get_tx_table_guard(tx_table_guard);
ASSERT_NE(nullptr, tx_table = tx_table_guard.get_tx_table());
for (int64_t i = 1; i < 3; i++) {
ObTxData *tx_data = new ObTxData();
transaction::ObTransID tx_id = i;
// fill in data
tx_data->tx_id_ = tx_id;
tx_data->commit_version_.convert_for_tx(i * 20 + 9);
tx_data->start_scn_.convert_for_tx(i);
tx_data->end_scn_ = tx_data->commit_version_;
tx_data->state_ = ObTxData::COMMIT;
ASSERT_EQ(OB_SUCCESS, tx_table->insert(tx_data));
delete tx_data;
}
ObVersionRange trans_version_range;
trans_version_range.snapshot_version_ = 100;
trans_version_range.multi_version_start_ = 18;
trans_version_range.base_version_ = 18;
prepare_merge_context(MINOR_MERGE, false, trans_version_range, merge_context);
// minor mrege
ObSSTable *merged_sstable = nullptr;
ASSERT_EQ(OB_SUCCESS, merger.merge_partition(merge_context, 0));
build_sstable(merge_context, merged_sstable);
const char *result1 =
"bigint var bigint bigint bigint bigint flag multi_version_row_flag trans_id\n"
"1 var1 -49 MIN 8 6 EXIST SCF\n"
"1 var1 -49 0 NOP 6 EXIST N\n"
"1 var1 -29 0 8 NOP EXIST N\n"
"1 var1 -17 0 9 5 EXIST CL\n"
"2 var2 -26 0 7 NOP EXIST LF\n"
"3 var3 -16 0 18 NOP EXIST LF\n";
ObMockIterator res_iter;
ObStoreRowIterator *scanner = NULL;
ObDatumRange range;
res_iter.reset();
range.set_whole_range();
trans_version_range.base_version_ = 1;
trans_version_range.multi_version_start_ = 1;
trans_version_range.snapshot_version_ = INT64_MAX;
prepare_query_param(trans_version_range);
ASSERT_EQ(OB_SUCCESS, merged_sstable->scan(iter_param_, context_, range, scanner));
ASSERT_EQ(OB_SUCCESS, res_iter.from(result1));
ObMockDirectReadIterator sstable_iter;
ASSERT_EQ(OB_SUCCESS, sstable_iter.init(scanner, allocator_, full_read_info_));
ASSERT_TRUE(res_iter.equals(sstable_iter, true/*cmp multi version row flag*/));
ASSERT_EQ(OB_SUCCESS, clear_tx_data());
scanner->~ObStoreRowIterator();
handle1.reset();
handle2.reset();
merger.reset();
}
} }
} }