diff --git a/mittest/mtlenv/storage/test_mds_table_scan.cpp b/mittest/mtlenv/storage/test_mds_table_scan.cpp index 50d7d93a4..69fc2d11b 100644 --- a/mittest/mtlenv/storage/test_mds_table_scan.cpp +++ b/mittest/mtlenv/storage/test_mds_table_scan.cpp @@ -29,6 +29,7 @@ #include "storage/tablet/ob_mds_row_iterator.h" #include "storage/tablet/ob_mds_scan_param_helper.h" #include "storage/tablet/ob_mds_schema_helper.h" +#include "storage/compaction/ob_tablet_merge_ctx.h" #define USING_LOG_PREFIX STORAGE @@ -186,7 +187,11 @@ int TestMdsTableScan::wait_for_mds_table_flush(const common::ObTabletID &tablet_ ret = tablet->fetch_table_store(table_store_wrapper); EXPECT_EQ(OB_SUCCESS, ret); table_store = table_store_wrapper.get_member(); - EXPECT_EQ(mds_sstable_cnt_before + 1, table_store->mds_sstables_.count()); + STORAGE_LOG(INFO, "print mds sstables", K(table_store->mds_sstables_)); + if (table_store->mds_sstables_.count() != mds_sstable_cnt_before + 1) { + EXPECT_EQ(1, table_store->mds_sstables_.count()); + EXPECT_EQ(ObITable::TableType::MDS_MINOR_SSTABLE, table_store->mds_sstables_.at(0)->key_.table_type_); + } if (::testing::Test::HasFailure()) { ret = OB_TIMEOUT; @@ -1186,6 +1191,193 @@ TEST_F(TestMdsTableScan, test_minor_scan) ASSERT_EQ(OB_ITER_END, ret); } + +TEST_F(TestMdsTableScan, test_range_cross) +{ + int ret = OB_SUCCESS; + + // create tablet + const common::ObTabletID tablet_id(ObTimeUtility::fast_current_time() % 10000000000000); + ObTabletHandle tablet_handle; + ObTabletStatus status(ObTabletStatus::MAX); + share::SCN create_commit_scn; + create_commit_scn = share::SCN::plus(share::SCN::min_scn(), 100); + ret = create_tablet(tablet_id, tablet_handle); + ASSERT_EQ(OB_SUCCESS, ret); + + ObTablet *tablet = tablet_handle.get_obj(); + ASSERT_NE(nullptr, tablet); + share::SCN mds_checkpoint_scn = tablet->get_tablet_meta().mds_checkpoint_scn_; + ASSERT_EQ(share::SCN::base_scn(), mds_checkpoint_scn); + + // write data to mds table no.1 row + { + ObTabletCreateDeleteMdsUserData user_data; + user_data.tablet_status_ = ObTabletStatus::NORMAL; + user_data.data_type_ = ObTabletMdsUserDataType::CREATE_TABLET; + + mds::MdsCtx ctx(mds::MdsWriter(transaction::ObTransID(123))); + ret = tablet->set_tablet_status(user_data, ctx); + ASSERT_EQ(OB_SUCCESS, ret); + + ctx.single_log_commit(create_commit_scn, create_commit_scn); + } + + // write data to mds table no.2 row + { + ObTabletBindingMdsUserData user_data; + user_data.data_tablet_id_ = 100; + user_data.hidden_tablet_id_ = 101; + user_data.snapshot_version_ = 9527; + + mds::MdsCtx ctx(mds::MdsWriter(transaction::ObTransID(256))); + ret = tablet->set_ddl_info(user_data, ctx, 1_s/*lock_timeout_us*/); + ASSERT_EQ(OB_SUCCESS, ret); + + share::SCN redo_scn = share::SCN::plus(share::SCN::min_scn(), 150); + ctx.on_redo(redo_scn); + share::SCN commit_scn = share::SCN::plus(share::SCN::min_scn(), 200); + ctx.on_commit(commit_scn, commit_scn); + } + + + // mds table flush once + share::SCN decided_scn; + decided_scn = share::SCN::plus(share::SCN::min_scn(), 250); + ret = tablet->mds_table_flush(decided_scn); + ASSERT_EQ(OB_SUCCESS, ret); + + + // wait for mds table flush + ret = wait_for_mds_table_flush(tablet_id); + ASSERT_EQ(OB_SUCCESS, ret); + + // wait all mds nodes to be released + tablet_handle.reset(); + ret = wait_for_all_mds_nodes_released(tablet_id); + ASSERT_EQ(OB_SUCCESS, ret); + + ret = TestMdsTableScan::get_tablet(tablet_id, tablet_handle); + ASSERT_EQ(OB_SUCCESS, ret); + tablet = tablet_handle.get_obj(); + ASSERT_NE(nullptr, tablet); + + // get mini sstable. + ObTabletMemberWrapper table_store_wrapper; + ret = tablet->fetch_table_store(table_store_wrapper); + EXPECT_EQ(OB_SUCCESS, ret); + const ObTabletTableStore *table_store = table_store_wrapper.get_member(); + EXPECT_EQ(1, table_store->mds_sstables_.count()); + + const blocksstable::ObSSTable *mds_mini = table_store->mds_sstables_.at(0); + ObTableHandleV2 mds_mini_handle; + ObSSTable *new_sstable = nullptr; + ASSERT_EQ(OB_SUCCESS, mds_mini->deep_copy(allocator_, new_sstable, false)); + ASSERT_EQ(OB_SUCCESS, mds_mini_handle.set_sstable(new_sstable, &allocator_)); + + // write data to mds table no.3 row + { + compaction::ObMediumCompactionInfoKey key(250); + compaction::ObMediumCompactionInfo info; + ret = MediumInfoHelper::build_medium_compaction_info(allocator_, info, 250); + ASSERT_EQ(OB_SUCCESS, ret); + + mds::MdsCtx ctx(mds::MdsWriter(transaction::ObTransID(300))); + ret = tablet->set(key, info, ctx, 1_s/*lock_timeout_us*/); + ASSERT_EQ(OB_SUCCESS, ret); + + share::SCN redo_scn = share::SCN::plus(share::SCN::min_scn(), 260); + ctx.on_redo(redo_scn); + share::SCN commit_scn = share::SCN::plus(share::SCN::min_scn(), 270); + ctx.on_commit(commit_scn, commit_scn); + } + // write data to mds table no.4 row + { + compaction::ObMediumCompactionInfoKey key(270); + compaction::ObMediumCompactionInfo info; + ret = MediumInfoHelper::build_medium_compaction_info(allocator_, info, 270); + ASSERT_EQ(OB_SUCCESS, ret); + + mds::MdsCtx ctx(mds::MdsWriter(transaction::ObTransID(320))); + ret = tablet->set(key, info, ctx, 1_s/*lock_timeout_us*/); + ASSERT_EQ(OB_SUCCESS, ret); + + share::SCN redo_scn = share::SCN::plus(share::SCN::min_scn(), 280); + ctx.on_redo(redo_scn); + share::SCN commit_scn = share::SCN::plus(share::SCN::min_scn(), 290); + ctx.on_commit(commit_scn, commit_scn); + } + + share::SCN transfer_out_commit_scn = share::SCN::plus(share::SCN::min_scn(), 300); + // write data to mds table no.5 row + { + ObTabletCreateDeleteMdsUserData user_data; + user_data.tablet_status_ = ObTabletStatus::TRANSFER_OUT; + user_data.data_type_ = ObTabletMdsUserDataType::START_TRANSFER_OUT; + + mds::MdsCtx ctx(mds::MdsWriter(transaction::ObTransID(356))); + ret = tablet->set_tablet_status(user_data, ctx); + ASSERT_EQ(OB_SUCCESS, ret); + + ctx.single_log_commit(transfer_out_commit_scn, transfer_out_commit_scn); + } + + // mds table flush twice ,hope to build minor sstable + decided_scn = share::SCN::plus(share::SCN::min_scn(), 350); + ret = tablet->mds_table_flush(decided_scn); + ASSERT_EQ(OB_SUCCESS, ret); + + // wait for mds table flush except->triggle minor + ret = wait_for_mds_table_flush(tablet_id); + ASSERT_EQ(OB_SUCCESS, ret); + + // wait all mds nodes to be released + tablet_handle.reset(); + ret = wait_for_all_mds_nodes_released(tablet_id); + ASSERT_EQ(OB_SUCCESS, ret); + + // schedule mds minor + ASSERT_EQ(OB_SUCCESS, try_schedule_mds_minor(tablet_id)); + // test multi version get + constexpr uint8_t mds_unit_id = mds::TupleTypeIdx>::value; + common::ObString dummy_key; + const int64_t timeout_us = ObClockGenerator::getClock() + 1_s; + + ret = TestMdsTableScan::get_tablet(tablet_id, tablet_handle); + ASSERT_EQ(OB_SUCCESS, ret); + tablet = tablet_handle.get_obj(); + ASSERT_NE(nullptr, tablet); + + // check mds sstable be minor + + ret = tablet->fetch_table_store(table_store_wrapper); + EXPECT_EQ(OB_SUCCESS, ret); + const ObTabletTableStore *minor_table_store = table_store_wrapper.get_member(); + EXPECT_EQ(1, minor_table_store->mds_sstables_.count()); + EXPECT_EQ(ObITable::TableType::MDS_MINOR_SSTABLE, minor_table_store->mds_sstables_.at(0)->key_.table_type_); + EXPECT_EQ(OB_SUCCESS, mds_mini_handle.get_sstable(mds_mini)); + + // try push mini. + const share::SCN &flush_scn = mds_mini->get_end_scn(); + compaction::ObTabletMergeDagParam dag_param(compaction::MDS_MINI_MERGE, LS_ID, tablet_id, + tablet->get_tablet_meta().transfer_info_.transfer_seq_); + + compaction::ObTabletMergeCtx mini_ctx(dag_param, allocator_); + mini_ctx.static_param_.scn_range_.end_scn_ = flush_scn; + mini_ctx.static_param_.version_range_.snapshot_version_ = flush_scn.get_val_for_tx(); + mini_ctx.static_param_.scn_range_.start_scn_ = mds_mini->get_start_scn(); + + ObTabletHandle new_tablet_handle; + ObLSHandle ls_handle; + ObLS *ls = nullptr; + ASSERT_EQ(OB_SUCCESS, mini_ctx.get_ls_and_tablet()); + ASSERT_EQ(OB_SUCCESS, MTL(ObLSService*)->get_ls(LS_ID, ls_handle, ObLSGetMod::STORAGE_MOD)); + ASSERT_NE(nullptr, ls = mini_ctx.get_ls()); + ASSERT_EQ(OB_SUCCESS, mini_ctx.init_tablet_merge_info(false/*need_check*/)); + mini_ctx.static_param_.ls_rebuild_seq_ = ls->ls_meta_.get_rebuild_seq(); + ASSERT_EQ(OB_MINOR_SSTABLE_RANGE_CROSS, ls->build_new_tablet_from_mds_table(mini_ctx, tablet_id, mds_mini_handle, flush_scn, new_tablet_handle)); +} + } // namespace storage } // namespace oceanbase diff --git a/src/storage/multi_data_source/ob_mds_table_merge_task.cpp b/src/storage/multi_data_source/ob_mds_table_merge_task.cpp index 9cd264f8b..6fed7fe72 100644 --- a/src/storage/multi_data_source/ob_mds_table_merge_task.cpp +++ b/src/storage/multi_data_source/ob_mds_table_merge_task.cpp @@ -126,15 +126,15 @@ int ObMdsTableMergeTask::process() } else if (OB_UNLIKELY(!mds_table.get_mds_table_ptr()->is_construct_sequence_matched(mds_construct_sequence))) { ret = OB_NO_NEED_MERGE; LOG_WARN("construct sequence does not match current mds table, no need to merge", K(ret), K(ls_id), K(tablet_id), K(mds_construct_sequence)); - } else if (ctx.get_tablet()->get_mds_checkpoint_scn() >= flush_scn) { + } else if (tablet->get_mds_checkpoint_scn() >= flush_scn) { need_schedule_mds_minor = false; FLOG_INFO("flush scn smaller than mds ckpt scn, only flush nodes of mds table and do not generate mds mini", K(ret), K(ls_id), K(tablet_id), K(flush_scn), - "mds_checkpoint_scn", ctx.get_tablet()->get_mds_checkpoint_scn(), + "mds_checkpoint_scn", tablet->get_mds_checkpoint_scn(), KPC(mds_merge_dag_)); ctx.time_guard_click(ObStorageCompactionTimeGuard::EXECUTE); share::dag_yield(); - } else if (FALSE_IT(ctx.static_param_.scn_range_.start_scn_ = ctx.get_tablet()->get_mds_checkpoint_scn())) { + } else if (FALSE_IT(ctx.static_param_.scn_range_.start_scn_ = tablet->get_mds_checkpoint_scn())) { } else if (MDS_FAIL(build_mds_sstable(ctx, mds_construct_sequence, table_handle))) { LOG_WARN("fail to build mds sstable", K(ret), K(ls_id), K(tablet_id), KPC(mds_merge_dag_)); } else if (MDS_FAIL(ls->build_new_tablet_from_mds_table( diff --git a/src/storage/tablet/ob_tablet.cpp b/src/storage/tablet/ob_tablet.cpp index 8ecf4aab8..8d406f8b8 100644 --- a/src/storage/tablet/ob_tablet.cpp +++ b/src/storage/tablet/ob_tablet.cpp @@ -1304,12 +1304,6 @@ int ObTablet::init_with_mds_sstable( } else if (OB_UNLIKELY(!is_mds_merge(param.merge_type_))) { ret = OB_ERR_UNEXPECTED; LOG_WARN("unexpected merge type", K(ret), K(param)); - } else if (OB_UNLIKELY(!is_mds_minor_merge(param.merge_type_) && flush_scn <= old_tablet.tablet_meta_.mds_checkpoint_scn_)) { - ret = OB_ERR_UNEXPECTED; - LOG_WARN("unexpected error, flush scn is smaller than old mds checkpoint scn", K(ret), - "merge_type", merge_type_to_str(param.merge_type_), - K(flush_scn), - "mds_checkpoint_scn", old_tablet.tablet_meta_.mds_checkpoint_scn_); } else if (param.need_check_transfer_seq_ && OB_FAIL(check_transfer_seq_equal(old_tablet, param.transfer_seq_))) { LOG_WARN("failed to check transfer seq eq", K(ret), K(old_tablet), K(param)); } else if (CLICK_FAIL(old_tablet.fetch_table_store(old_table_store_wrapper))) { @@ -1463,8 +1457,7 @@ int ObTablet::init_for_compat( set_next_tablet_guard(old_tablet.next_tablet_guard_); } is_inited_ = true; - LOG_INFO("succeeded to init tablet for compat", - K(ret), K(old_tablet), K(mds_mini_sstable), KPC(this)); + LOG_INFO("succeeded to init tablet for compat", K(ret), K(old_tablet), K(mds_mini_sstable), KPC(this)); } ObTabletObjLoadHelper::free(tmp_arena_allocator, old_storage_schema); diff --git a/src/storage/tablet/ob_tablet_meta.cpp b/src/storage/tablet/ob_tablet_meta.cpp index a9da4b116..68bfcac06 100644 --- a/src/storage/tablet/ob_tablet_meta.cpp +++ b/src/storage/tablet/ob_tablet_meta.cpp @@ -243,7 +243,7 @@ int ObTabletMeta::init( ddl_snapshot_version_ = old_tablet_meta.ddl_snapshot_version_; max_sync_storage_schema_version_ = old_tablet_meta.max_sync_storage_schema_version_; max_serialized_medium_scn_ = old_tablet_meta.max_serialized_medium_scn_; - mds_checkpoint_scn_ = flush_scn; + mds_checkpoint_scn_ = SCN::max(flush_scn, old_tablet_meta.mds_checkpoint_scn_); transfer_info_ = old_tablet_meta.transfer_info_; extra_medium_info_ = old_tablet_meta.extra_medium_info_; space_usage_ = old_tablet_meta.space_usage_;