diff --git a/mittest/mtlenv/storage/test_multi_version_sstable_merge.cpp b/mittest/mtlenv/storage/test_multi_version_sstable_merge.cpp index f11e4f0d70..408177a5eb 100644 --- a/mittest/mtlenv/storage/test_multi_version_sstable_merge.cpp +++ b/mittest/mtlenv/storage/test_multi_version_sstable_merge.cpp @@ -2949,6 +2949,246 @@ TEST_F(TestMultiVersionMerge, test_major_range_cross_macro) merger.reset(); } +TEST_F(TestMultiVersionMerge, test_trans_cross_macro_with_ghost_row) +{ + int ret = OB_SUCCESS; + ObTabletMergeDagParam param; + ObTabletMergeCtx merge_context(param, allocator_); + ObPartitionMinorMerger merger(local_arena_, merge_context.static_param_); + + ObTableHandleV2 handle1; + const char *micro_data[2]; + micro_data[0] = + "bigint var bigint bigint bigint bigint flag multi_version_row_flag trans_id\n" + "1 var1 MIN 0 NOP 6 EXIST FU trans_id_1\n"; + + micro_data[1] = + "bigint var bigint bigint bigint bigint flag multi_version_row_flag \n" + "1 var1 MAGIC MAGIC NOP NOP EXIST LG\n" + "2 var2 -26 0 7 NOP EXIST LF\n"; + + int schema_rowkey_cnt = 2; + + int64_t snapshot_version = 10; + share::ObScnRange scn_range; + scn_range.start_scn_.set_min(); + scn_range.end_scn_.convert_for_tx(30); + prepare_table_schema(micro_data, schema_rowkey_cnt, scn_range, snapshot_version); + reset_writer(snapshot_version); + prepare_one_macro(micro_data, 1); + prepare_one_macro(µ_data[1], 1); + prepare_data_end(handle1); + merge_context.static_param_.tables_handle_.add_table(handle1); + STORAGE_LOG(INFO, "finish prepare sstable1"); + + ObTableHandleV2 handle2; + const char *micro_data2[2]; + micro_data2[0] = + "bigint var bigint bigint bigint bigint flag multi_version_row_flag trans_id\n" + "1 var1 MIN 0 8 NOP EXIST ULF trans_id_2\n"; + + micro_data2[1] = + "bigint var bigint bigint bigint bigint flag multi_version_row_flag \n" + "3 var3 -46 0 18 NOP EXIST LF\n"; + + snapshot_version = 20; + scn_range.start_scn_.convert_for_tx(30); + scn_range.end_scn_.convert_for_tx(50); + table_key_.scn_range_ = scn_range; + reset_writer(snapshot_version); + prepare_one_macro(micro_data2, 1); + prepare_one_macro(µ_data2[1], 1); + prepare_data_end(handle2); + merge_context.static_param_.tables_handle_.add_table(handle2); + STORAGE_LOG(INFO, "finish prepare sstable2"); + + ObLSID ls_id(ls_id_); + ObTabletID tablet_id(tablet_id_); + ObLSHandle ls_handle; + ObLSService *ls_svr = MTL(ObLSService*); + ASSERT_EQ(OB_SUCCESS, ls_svr->get_ls(ls_id, ls_handle, ObLSGetMod::STORAGE_MOD)); + + ObTxTable *tx_table = nullptr; + ObTxTableGuard tx_table_guard; + ls_handle.get_ls()->get_tx_table_guard(tx_table_guard); + ASSERT_NE(nullptr, tx_table = tx_table_guard.get_tx_table()); + + for (int64_t i = 1; i < 3; i++) { + ObTxData *tx_data = new ObTxData(); + transaction::ObTransID tx_id = i; + + // fill in data + tx_data->tx_id_ = tx_id; + tx_data->commit_version_.convert_for_tx(i * 20 + 9); + tx_data->start_scn_.convert_for_tx(i); + tx_data->end_scn_ = tx_data->commit_version_; + tx_data->state_ = ObTxData::COMMIT; + ASSERT_EQ(OB_SUCCESS, tx_table->insert(tx_data)); + delete tx_data; + } + + ObVersionRange trans_version_range; + trans_version_range.snapshot_version_ = 100; + trans_version_range.multi_version_start_ = 1; + trans_version_range.base_version_ = 1; + + prepare_merge_context(MINOR_MERGE, false, trans_version_range, merge_context); + // minor mrege + ObSSTable *merged_sstable = nullptr; + ASSERT_EQ(OB_SUCCESS, merger.merge_partition(merge_context, 0)); + build_sstable(merge_context, merged_sstable); + + const char *result1 = + "bigint var bigint bigint bigint bigint flag multi_version_row_flag trans_id\n" + "1 var1 -49 MIN 8 6 EXIST SCF\n" + "1 var1 -49 0 8 NOP EXIST \n" + "1 var1 -29 0 NOP 6 EXIST L\n" + "2 var2 -26 0 7 NOP EXIST LF\n" + "3 var3 -46 0 18 NOP EXIST LF\n"; + + ObMockIterator res_iter; + ObStoreRowIterator *scanner = NULL; + ObDatumRange range; + res_iter.reset(); + range.set_whole_range(); + trans_version_range.base_version_ = 1; + trans_version_range.multi_version_start_ = 1; + trans_version_range.snapshot_version_ = INT64_MAX; + prepare_query_param(trans_version_range); + ASSERT_EQ(OB_SUCCESS, merged_sstable->scan(iter_param_, context_, range, scanner)); + ASSERT_EQ(OB_SUCCESS, res_iter.from(result1)); + ObMockDirectReadIterator sstable_iter; + ASSERT_EQ(OB_SUCCESS, sstable_iter.init(scanner, allocator_, full_read_info_)); + ASSERT_TRUE(res_iter.equals(sstable_iter, true/*cmp multi version row flag*/)); + ASSERT_EQ(OB_SUCCESS, clear_tx_data()); + scanner->~ObStoreRowIterator(); + handle1.reset(); + handle2.reset(); + merger.reset(); +} + +TEST_F(TestMultiVersionMerge, test_trans_cross_macro_with_ghost_row2) +{ + int ret = OB_SUCCESS; + ObTabletMergeDagParam param; + ObTabletMergeCtx merge_context(param, allocator_); + ObPartitionMinorMerger merger(local_arena_, merge_context.static_param_); + + ObTableHandleV2 handle1; + const char *micro_data[2]; + micro_data[0] = + "bigint var bigint bigint bigint bigint flag multi_version_row_flag trans_id\n" + "1 var1 MIN 0 8 NOP EXIST FU trans_id_1\n" + "1 var1 -17 MIN 9 5 EXIST SC trans_id_0\n" + "1 var1 -17 0 9 5 EXIST C trans_id_0\n" + "1 var1 -12 0 9 NOP EXIST L trans_id_0\n"; + + micro_data[1] = + "bigint var bigint bigint bigint bigint flag multi_version_row_flag \n" + "3 var3 -16 0 18 NOP EXIST LF\n"; + + int schema_rowkey_cnt = 2; + + int64_t snapshot_version = 10; + share::ObScnRange scn_range; + scn_range.start_scn_.set_min(); + scn_range.end_scn_.convert_for_tx(30); + prepare_table_schema(micro_data, schema_rowkey_cnt, scn_range, snapshot_version); + reset_writer(snapshot_version); + prepare_one_macro(micro_data, 1); + prepare_one_macro(µ_data[1], 1); + prepare_data_end(handle1); + merge_context.static_param_.tables_handle_.add_table(handle1); + STORAGE_LOG(INFO, "finish prepare sstable1"); + + ObTableHandleV2 handle2; + const char *micro_data2[2]; + micro_data2[0] = + "bigint var bigint bigint bigint bigint flag multi_version_row_flag trans_id\n" + "1 var1 MIN 0 NOP 6 EXIST FU trans_id_2\n"; + + micro_data2[1] = + "bigint var bigint bigint bigint bigint flag multi_version_row_flag \n" + "1 var1 MAGIC MAGIC NOP NOP EXIST LG\n" + "2 var2 -26 0 7 NOP EXIST LF\n"; + + snapshot_version = 20; + scn_range.start_scn_.convert_for_tx(30); + scn_range.end_scn_.convert_for_tx(50); + table_key_.scn_range_ = scn_range; + reset_writer(snapshot_version); + prepare_one_macro(micro_data2, 1); + prepare_one_macro(µ_data2[1], 1); + prepare_data_end(handle2); + merge_context.static_param_.tables_handle_.add_table(handle2); + STORAGE_LOG(INFO, "finish prepare sstable2"); + + ObLSID ls_id(ls_id_); + ObTabletID tablet_id(tablet_id_); + ObLSHandle ls_handle; + ObLSService *ls_svr = MTL(ObLSService*); + ASSERT_EQ(OB_SUCCESS, ls_svr->get_ls(ls_id, ls_handle, ObLSGetMod::STORAGE_MOD)); + + ObTxTable *tx_table = nullptr; + ObTxTableGuard tx_table_guard; + ls_handle.get_ls()->get_tx_table_guard(tx_table_guard); + ASSERT_NE(nullptr, tx_table = tx_table_guard.get_tx_table()); + + for (int64_t i = 1; i < 3; i++) { + ObTxData *tx_data = new ObTxData(); + transaction::ObTransID tx_id = i; + + // fill in data + tx_data->tx_id_ = tx_id; + tx_data->commit_version_.convert_for_tx(i * 20 + 9); + tx_data->start_scn_.convert_for_tx(i); + tx_data->end_scn_ = tx_data->commit_version_; + tx_data->state_ = ObTxData::COMMIT; + ASSERT_EQ(OB_SUCCESS, tx_table->insert(tx_data)); + delete tx_data; + } + + ObVersionRange trans_version_range; + trans_version_range.snapshot_version_ = 100; + trans_version_range.multi_version_start_ = 18; + trans_version_range.base_version_ = 18; + + prepare_merge_context(MINOR_MERGE, false, trans_version_range, merge_context); + // minor mrege + ObSSTable *merged_sstable = nullptr; + ASSERT_EQ(OB_SUCCESS, merger.merge_partition(merge_context, 0)); + build_sstable(merge_context, merged_sstable); + + const char *result1 = + "bigint var bigint bigint bigint bigint flag multi_version_row_flag trans_id\n" + "1 var1 -49 MIN 8 6 EXIST SCF\n" + "1 var1 -49 0 NOP 6 EXIST N\n" + "1 var1 -29 0 8 NOP EXIST N\n" + "1 var1 -17 0 9 5 EXIST CL\n" + "2 var2 -26 0 7 NOP EXIST LF\n" + "3 var3 -16 0 18 NOP EXIST LF\n"; + + ObMockIterator res_iter; + ObStoreRowIterator *scanner = NULL; + ObDatumRange range; + res_iter.reset(); + range.set_whole_range(); + trans_version_range.base_version_ = 1; + trans_version_range.multi_version_start_ = 1; + trans_version_range.snapshot_version_ = INT64_MAX; + prepare_query_param(trans_version_range); + ASSERT_EQ(OB_SUCCESS, merged_sstable->scan(iter_param_, context_, range, scanner)); + ASSERT_EQ(OB_SUCCESS, res_iter.from(result1)); + ObMockDirectReadIterator sstable_iter; + ASSERT_EQ(OB_SUCCESS, sstable_iter.init(scanner, allocator_, full_read_info_)); + ASSERT_TRUE(res_iter.equals(sstable_iter, true/*cmp multi version row flag*/)); + ASSERT_EQ(OB_SUCCESS, clear_tx_data()); + scanner->~ObStoreRowIterator(); + handle1.reset(); + handle2.reset(); + merger.reset(); +} + } }