From 40ea51bcf3f0373bda4d6ff51b80d170cd9c632e Mon Sep 17 00:00:00 2001 From: obdev Date: Wed, 7 Feb 2024 11:07:42 +0000 Subject: [PATCH] check row mvcc flag --- .../test_multi_version_sstable_merge.cpp | 105 ++++++++++++++++-- src/storage/blocksstable/ob_datum_row.h | 4 + .../blocksstable/ob_macro_block_writer.cpp | 5 +- .../storage/test_partition_range_splite.cpp | 4 +- 4 files changed, 104 insertions(+), 14 deletions(-) diff --git a/mittest/mtlenv/storage/test_multi_version_sstable_merge.cpp b/mittest/mtlenv/storage/test_multi_version_sstable_merge.cpp index 408177a5e..69c11855a 100644 --- a/mittest/mtlenv/storage/test_multi_version_sstable_merge.cpp +++ b/mittest/mtlenv/storage/test_multi_version_sstable_merge.cpp @@ -461,11 +461,11 @@ TEST_F(TestMultiVersionMerge, uncommit_rowkey_committed_in_minor) ObTableHandleV2 handle1; const char *micro_data[2]; micro_data[0] = - "bigint var bigint bigint bigint bigint dml flag multi_version_row_flag\n" - "0 var1 -8 0 NOP 1 T_DML_UPDATE EXIST LF\n" - "1 var1 -9 0 10 NOP T_DML_UPDATE EXIST F\n" - "1 var1 -8 MIN 3 3 T_DML_UPDATE EXIST SC\n" - "1 var1 -8 0 3 NOP T_DML_UPDATE EXIST N\n"; + "bigint var bigint bigint bigint bigint dml flag multi_version_row_flag trans_id\n" + "0 var1 -8 0 NOP 1 T_DML_UPDATE EXIST LF trans_id_0\n" + "1 var1 -9 0 10 NOP T_DML_UPDATE EXIST FU trans_id_1\n" + "1 var1 -8 MIN 3 3 T_DML_UPDATE EXIST SC trans_id_0\n" + "1 var1 -8 0 3 NOP T_DML_UPDATE EXIST N trans_id_0\n"; micro_data[1] = "bigint var bigint bigint bigint bigint dml flag multi_version_row_flag\n" @@ -503,6 +503,32 @@ TEST_F(TestMultiVersionMerge, uncommit_rowkey_committed_in_minor) merge_context.static_param_.tables_handle_.add_table(handle2); STORAGE_LOG(INFO, "finish prepare sstable2"); + ObLSID ls_id(ls_id_); + ObTabletID tablet_id(tablet_id_); + ObLSHandle ls_handle; + ObLSService *ls_svr = MTL(ObLSService*); + ASSERT_EQ(OB_SUCCESS, ls_svr->get_ls(ls_id, ls_handle, ObLSGetMod::STORAGE_MOD)); + + ObTxTable *tx_table = nullptr; + ObTxTableGuard tx_table_guard; + ls_handle.get_ls()->get_tx_table_guard(tx_table_guard); + ASSERT_NE(nullptr, tx_table = tx_table_guard.get_tx_table()); + + for (int64_t i = 1; i <= 1; i++) { + ObTxData *tx_data = new ObTxData(); + transaction::ObTransID tx_id = i; + + // fill in data + tx_data->tx_id_ = tx_id; + tx_data->commit_version_.convert_for_tx(i * 10); + tx_data->start_scn_.convert_for_tx(i); + tx_data->end_scn_ = tx_data->commit_version_; + tx_data->state_ = ObTxData::COMMIT; + + ASSERT_EQ(OB_SUCCESS, tx_table->insert(tx_data)); + delete tx_data; + } + ObVersionRange trans_version_range; trans_version_range.snapshot_version_ = 100; trans_version_range.multi_version_start_ = 1; @@ -542,6 +568,7 @@ TEST_F(TestMultiVersionMerge, uncommit_rowkey_committed_in_minor) ObMockDirectReadIterator sstable_iter; ASSERT_EQ(OB_SUCCESS, sstable_iter.init(scanner, allocator_, full_read_info_)); ASSERT_TRUE(res_iter.equals(sstable_iter, true/*cmp multi version row flag*/)); + ASSERT_EQ(OB_SUCCESS, clear_tx_data()); scanner->~ObStoreRowIterator(); handle1.reset(); handle2.reset(); @@ -649,9 +676,9 @@ TEST_F(TestMultiVersionMerge, uncommit_rowkey_in_one_macro_committed_following_l ObTableHandleV2 handle1; const char *micro_data[2]; micro_data[0] = - "bigint var bigint bigint bigint bigint flag multi_version_row_flag\n" - "0 var1 -8 0 NOP 1 EXIST LF\n" - "1 var1 -9 0 10 NOP EXIST F\n"; + "bigint var bigint bigint bigint bigint flag multi_version_row_flag trans_id\n" + "0 var1 -8 0 NOP 1 EXIST LF trans_id_0\n" + "1 var1 -9 0 10 NOP EXIST FU trans_id_1\n"; micro_data[1] = "bigint var bigint bigint bigint bigint flag multi_version_row_flag\n" @@ -689,6 +716,32 @@ TEST_F(TestMultiVersionMerge, uncommit_rowkey_in_one_macro_committed_following_l merge_context.static_param_.tables_handle_.add_table(handle2); STORAGE_LOG(INFO, "finish prepare sstable2"); + ObLSID ls_id(ls_id_); + ObTabletID tablet_id(tablet_id_); + ObLSHandle ls_handle; + ObLSService *ls_svr = MTL(ObLSService*); + ASSERT_EQ(OB_SUCCESS, ls_svr->get_ls(ls_id, ls_handle, ObLSGetMod::STORAGE_MOD)); + + ObTxTable *tx_table = nullptr; + ObTxTableGuard tx_table_guard; + ls_handle.get_ls()->get_tx_table_guard(tx_table_guard); + ASSERT_NE(nullptr, tx_table = tx_table_guard.get_tx_table()); + + for (int64_t i = 1; i <= 1; i++) { + ObTxData *tx_data = new ObTxData(); + transaction::ObTransID tx_id = i; + + // fill in data + tx_data->tx_id_ = tx_id; + tx_data->commit_version_.convert_for_tx(i * 10); + tx_data->start_scn_.convert_for_tx(i); + tx_data->end_scn_ = tx_data->commit_version_; + tx_data->state_ = ObTxData::COMMIT; + + ASSERT_EQ(OB_SUCCESS, tx_table->insert(tx_data)); + delete tx_data; + } + ObVersionRange trans_version_range; trans_version_range.snapshot_version_ = 100; trans_version_range.multi_version_start_ = 1; @@ -727,6 +780,7 @@ TEST_F(TestMultiVersionMerge, uncommit_rowkey_in_one_macro_committed_following_l ObMockDirectReadIterator sstable_iter; ASSERT_EQ(OB_SUCCESS, sstable_iter.init(scanner, allocator_, full_read_info_)); ASSERT_TRUE(res_iter.equals(sstable_iter, true/*cmp multi version row flag*/)); + ASSERT_EQ(OB_SUCCESS, clear_tx_data()); scanner->~ObStoreRowIterator(); handle1.reset(); handle2.reset(); @@ -743,9 +797,9 @@ TEST_F(TestMultiVersionMerge, uncommit_rowkey_in_one_macro_committed_following_s ObTableHandleV2 handle1; const char *micro_data[2]; micro_data[0] = - "bigint var bigint bigint bigint bigint flag multi_version_row_flag\n" - "0 var1 -8 0 NOP 1 EXIST LF\n" - "1 var1 -9 0 10 NOP EXIST F\n"; + "bigint var bigint bigint bigint bigint flag multi_version_row_flag trans_id\n" + "0 var1 -8 0 NOP 1 EXIST LF trans_id_0\n" + "1 var1 -9 0 10 NOP EXIST FU trans_id_1\n"; micro_data[1] = "bigint var bigint bigint bigint bigint flag multi_version_row_flag\n" @@ -785,6 +839,34 @@ TEST_F(TestMultiVersionMerge, uncommit_rowkey_in_one_macro_committed_following_s merge_context.static_param_.tables_handle_.add_table(handle2); STORAGE_LOG(INFO, "finish prepare sstable2"); + + ObLSID ls_id(ls_id_); + ObTabletID tablet_id(tablet_id_); + ObLSHandle ls_handle; + ObLSService *ls_svr = MTL(ObLSService*); + ASSERT_EQ(OB_SUCCESS, ls_svr->get_ls(ls_id, ls_handle, ObLSGetMod::STORAGE_MOD)); + + ObTxTable *tx_table = nullptr; + ObTxTableGuard tx_table_guard; + ls_handle.get_ls()->get_tx_table_guard(tx_table_guard); + ASSERT_NE(nullptr, tx_table = tx_table_guard.get_tx_table()); + + for (int64_t i = 1; i <= 1; i++) { + ObTxData *tx_data = new ObTxData(); + transaction::ObTransID tx_id = i; + + // fill in data + tx_data->tx_id_ = tx_id; + tx_data->commit_version_.convert_for_tx(i * 10); + tx_data->start_scn_.convert_for_tx(i); + tx_data->end_scn_ = tx_data->commit_version_; + tx_data->state_ = ObTxData::COMMIT; + + ASSERT_EQ(OB_SUCCESS, tx_table->insert(tx_data)); + delete tx_data; + } + + ObVersionRange trans_version_range; trans_version_range.snapshot_version_ = 100; trans_version_range.multi_version_start_ = 1; @@ -824,6 +906,7 @@ TEST_F(TestMultiVersionMerge, uncommit_rowkey_in_one_macro_committed_following_s ObMockDirectReadIterator sstable_iter; ASSERT_EQ(OB_SUCCESS, sstable_iter.init(scanner, allocator_, full_read_info_)); ASSERT_TRUE(res_iter.equals(sstable_iter, true/*cmp multi version row flag*/)); + ASSERT_EQ(OB_SUCCESS, clear_tx_data()); scanner->~ObStoreRowIterator(); handle1.reset(); handle2.reset(); diff --git a/src/storage/blocksstable/ob_datum_row.h b/src/storage/blocksstable/ob_datum_row.h index 24cbb9ec3..3f96dff8f 100644 --- a/src/storage/blocksstable/ob_datum_row.h +++ b/src/storage/blocksstable/ob_datum_row.h @@ -276,6 +276,10 @@ public: { is_shadow_ = is_shadow_row; } + inline bool is_valid() const + { + return !is_first_multi_version_row() || is_uncommitted_row() || is_last_multi_version_row() || is_ghost_row() || is_shadow_row(); + } inline bool is_compacted_multi_version_row() const { return is_compacted_; } inline bool is_last_multi_version_row() const { return is_last_; } inline bool is_first_multi_version_row() const { return is_first_; } diff --git a/src/storage/blocksstable/ob_macro_block_writer.cpp b/src/storage/blocksstable/ob_macro_block_writer.cpp index 06b982637..f2662fe92 100644 --- a/src/storage/blocksstable/ob_macro_block_writer.cpp +++ b/src/storage/blocksstable/ob_macro_block_writer.cpp @@ -840,9 +840,12 @@ int ObMacroBlockWriter::check_order(const ObDatumRow &row) int64_t cur_row_version = 0; int64_t cur_sql_sequence = 0; if (!row.is_valid() || row.get_column_count() != data_store_desc_->get_row_column_count()) { - ret = OB_INVALID_ARGUMENT; + ret = OB_ERR_UNEXPECTED; STORAGE_LOG(ERROR, "invalid macro block writer input argument.", K(row), "row_column_count", data_store_desc_->get_row_column_count(), K(ret)); + } else if (OB_UNLIKELY(!row.mvcc_row_flag_.is_valid())) { + ret = OB_ERR_UNEXPECTED; + STORAGE_LOG(ERROR, "invalid mvcc_row_flag", K(ret), K(row.mvcc_row_flag_)); } else { ObMacroBlock &curr_block = macro_blocks_[current_index_]; cur_row_version = row.storage_datums_[trans_version_col_idx].get_int(); diff --git a/unittest/storage/test_partition_range_splite.cpp b/unittest/storage/test_partition_range_splite.cpp index 3c79e7422..b82564683 100644 --- a/unittest/storage/test_partition_range_splite.cpp +++ b/unittest/storage/test_partition_range_splite.cpp @@ -302,7 +302,7 @@ void TestRangeSpliter::prepare_sstable_handle(ObTableHandleV2 &handle, "bigint var bigint bigint flag multi_version_row_flag\n" "18 var1 -9 0 EXIST CLF\n" "19 var1 -9 MIN EXIST SCF\n" - "19 var1 -9 0 EXIST CF\n"; + "19 var1 -9 0 EXIST C\n"; macro_data[13] = "bigint var bigint bigint flag multi_version_row_flag\n" @@ -324,7 +324,7 @@ void TestRangeSpliter::prepare_sstable_handle(ObTableHandleV2 &handle, "bigint var bigint bigint flag multi_version_row_flag\n" "24 var1 -9 0 EXIST CLF\n" "25 var1 -9 MIN EXIST SCF\n" - "25 var1 -9 0 EXIST CF\n"; + "25 var1 -9 0 EXIST CL\n"; macro_data[17] = "bigint var bigint bigint flag multi_version_row_flag\n"