remove flush scn comparison logic
This commit is contained in:
parent
a003920857
commit
282b374fc4
@ -29,6 +29,7 @@
|
||||
#include "storage/tablet/ob_mds_row_iterator.h"
|
||||
#include "storage/tablet/ob_mds_scan_param_helper.h"
|
||||
#include "storage/tablet/ob_mds_schema_helper.h"
|
||||
#include "storage/compaction/ob_tablet_merge_ctx.h"
|
||||
|
||||
#define USING_LOG_PREFIX STORAGE
|
||||
|
||||
@ -186,7 +187,11 @@ int TestMdsTableScan::wait_for_mds_table_flush(const common::ObTabletID &tablet_
|
||||
ret = tablet->fetch_table_store(table_store_wrapper);
|
||||
EXPECT_EQ(OB_SUCCESS, ret);
|
||||
table_store = table_store_wrapper.get_member();
|
||||
EXPECT_EQ(mds_sstable_cnt_before + 1, table_store->mds_sstables_.count());
|
||||
STORAGE_LOG(INFO, "print mds sstables", K(table_store->mds_sstables_));
|
||||
if (table_store->mds_sstables_.count() != mds_sstable_cnt_before + 1) {
|
||||
EXPECT_EQ(1, table_store->mds_sstables_.count());
|
||||
EXPECT_EQ(ObITable::TableType::MDS_MINOR_SSTABLE, table_store->mds_sstables_.at(0)->key_.table_type_);
|
||||
}
|
||||
|
||||
if (::testing::Test::HasFailure()) {
|
||||
ret = OB_TIMEOUT;
|
||||
@ -1186,6 +1191,193 @@ TEST_F(TestMdsTableScan, test_minor_scan)
|
||||
ASSERT_EQ(OB_ITER_END, ret);
|
||||
}
|
||||
|
||||
|
||||
TEST_F(TestMdsTableScan, test_range_cross)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
|
||||
// create tablet
|
||||
const common::ObTabletID tablet_id(ObTimeUtility::fast_current_time() % 10000000000000);
|
||||
ObTabletHandle tablet_handle;
|
||||
ObTabletStatus status(ObTabletStatus::MAX);
|
||||
share::SCN create_commit_scn;
|
||||
create_commit_scn = share::SCN::plus(share::SCN::min_scn(), 100);
|
||||
ret = create_tablet(tablet_id, tablet_handle);
|
||||
ASSERT_EQ(OB_SUCCESS, ret);
|
||||
|
||||
ObTablet *tablet = tablet_handle.get_obj();
|
||||
ASSERT_NE(nullptr, tablet);
|
||||
share::SCN mds_checkpoint_scn = tablet->get_tablet_meta().mds_checkpoint_scn_;
|
||||
ASSERT_EQ(share::SCN::base_scn(), mds_checkpoint_scn);
|
||||
|
||||
// write data to mds table no.1 row
|
||||
{
|
||||
ObTabletCreateDeleteMdsUserData user_data;
|
||||
user_data.tablet_status_ = ObTabletStatus::NORMAL;
|
||||
user_data.data_type_ = ObTabletMdsUserDataType::CREATE_TABLET;
|
||||
|
||||
mds::MdsCtx ctx(mds::MdsWriter(transaction::ObTransID(123)));
|
||||
ret = tablet->set_tablet_status(user_data, ctx);
|
||||
ASSERT_EQ(OB_SUCCESS, ret);
|
||||
|
||||
ctx.single_log_commit(create_commit_scn, create_commit_scn);
|
||||
}
|
||||
|
||||
// write data to mds table no.2 row
|
||||
{
|
||||
ObTabletBindingMdsUserData user_data;
|
||||
user_data.data_tablet_id_ = 100;
|
||||
user_data.hidden_tablet_id_ = 101;
|
||||
user_data.snapshot_version_ = 9527;
|
||||
|
||||
mds::MdsCtx ctx(mds::MdsWriter(transaction::ObTransID(256)));
|
||||
ret = tablet->set_ddl_info(user_data, ctx, 1_s/*lock_timeout_us*/);
|
||||
ASSERT_EQ(OB_SUCCESS, ret);
|
||||
|
||||
share::SCN redo_scn = share::SCN::plus(share::SCN::min_scn(), 150);
|
||||
ctx.on_redo(redo_scn);
|
||||
share::SCN commit_scn = share::SCN::plus(share::SCN::min_scn(), 200);
|
||||
ctx.on_commit(commit_scn, commit_scn);
|
||||
}
|
||||
|
||||
|
||||
// mds table flush once
|
||||
share::SCN decided_scn;
|
||||
decided_scn = share::SCN::plus(share::SCN::min_scn(), 250);
|
||||
ret = tablet->mds_table_flush(decided_scn);
|
||||
ASSERT_EQ(OB_SUCCESS, ret);
|
||||
|
||||
|
||||
// wait for mds table flush
|
||||
ret = wait_for_mds_table_flush(tablet_id);
|
||||
ASSERT_EQ(OB_SUCCESS, ret);
|
||||
|
||||
// wait all mds nodes to be released
|
||||
tablet_handle.reset();
|
||||
ret = wait_for_all_mds_nodes_released(tablet_id);
|
||||
ASSERT_EQ(OB_SUCCESS, ret);
|
||||
|
||||
ret = TestMdsTableScan::get_tablet(tablet_id, tablet_handle);
|
||||
ASSERT_EQ(OB_SUCCESS, ret);
|
||||
tablet = tablet_handle.get_obj();
|
||||
ASSERT_NE(nullptr, tablet);
|
||||
|
||||
// get mini sstable.
|
||||
ObTabletMemberWrapper<ObTabletTableStore> table_store_wrapper;
|
||||
ret = tablet->fetch_table_store(table_store_wrapper);
|
||||
EXPECT_EQ(OB_SUCCESS, ret);
|
||||
const ObTabletTableStore *table_store = table_store_wrapper.get_member();
|
||||
EXPECT_EQ(1, table_store->mds_sstables_.count());
|
||||
|
||||
const blocksstable::ObSSTable *mds_mini = table_store->mds_sstables_.at(0);
|
||||
ObTableHandleV2 mds_mini_handle;
|
||||
ObSSTable *new_sstable = nullptr;
|
||||
ASSERT_EQ(OB_SUCCESS, mds_mini->deep_copy(allocator_, new_sstable, false));
|
||||
ASSERT_EQ(OB_SUCCESS, mds_mini_handle.set_sstable(new_sstable, &allocator_));
|
||||
|
||||
// write data to mds table no.3 row
|
||||
{
|
||||
compaction::ObMediumCompactionInfoKey key(250);
|
||||
compaction::ObMediumCompactionInfo info;
|
||||
ret = MediumInfoHelper::build_medium_compaction_info(allocator_, info, 250);
|
||||
ASSERT_EQ(OB_SUCCESS, ret);
|
||||
|
||||
mds::MdsCtx ctx(mds::MdsWriter(transaction::ObTransID(300)));
|
||||
ret = tablet->set(key, info, ctx, 1_s/*lock_timeout_us*/);
|
||||
ASSERT_EQ(OB_SUCCESS, ret);
|
||||
|
||||
share::SCN redo_scn = share::SCN::plus(share::SCN::min_scn(), 260);
|
||||
ctx.on_redo(redo_scn);
|
||||
share::SCN commit_scn = share::SCN::plus(share::SCN::min_scn(), 270);
|
||||
ctx.on_commit(commit_scn, commit_scn);
|
||||
}
|
||||
// write data to mds table no.4 row
|
||||
{
|
||||
compaction::ObMediumCompactionInfoKey key(270);
|
||||
compaction::ObMediumCompactionInfo info;
|
||||
ret = MediumInfoHelper::build_medium_compaction_info(allocator_, info, 270);
|
||||
ASSERT_EQ(OB_SUCCESS, ret);
|
||||
|
||||
mds::MdsCtx ctx(mds::MdsWriter(transaction::ObTransID(320)));
|
||||
ret = tablet->set(key, info, ctx, 1_s/*lock_timeout_us*/);
|
||||
ASSERT_EQ(OB_SUCCESS, ret);
|
||||
|
||||
share::SCN redo_scn = share::SCN::plus(share::SCN::min_scn(), 280);
|
||||
ctx.on_redo(redo_scn);
|
||||
share::SCN commit_scn = share::SCN::plus(share::SCN::min_scn(), 290);
|
||||
ctx.on_commit(commit_scn, commit_scn);
|
||||
}
|
||||
|
||||
share::SCN transfer_out_commit_scn = share::SCN::plus(share::SCN::min_scn(), 300);
|
||||
// write data to mds table no.5 row
|
||||
{
|
||||
ObTabletCreateDeleteMdsUserData user_data;
|
||||
user_data.tablet_status_ = ObTabletStatus::TRANSFER_OUT;
|
||||
user_data.data_type_ = ObTabletMdsUserDataType::START_TRANSFER_OUT;
|
||||
|
||||
mds::MdsCtx ctx(mds::MdsWriter(transaction::ObTransID(356)));
|
||||
ret = tablet->set_tablet_status(user_data, ctx);
|
||||
ASSERT_EQ(OB_SUCCESS, ret);
|
||||
|
||||
ctx.single_log_commit(transfer_out_commit_scn, transfer_out_commit_scn);
|
||||
}
|
||||
|
||||
// mds table flush twice ,hope to build minor sstable
|
||||
decided_scn = share::SCN::plus(share::SCN::min_scn(), 350);
|
||||
ret = tablet->mds_table_flush(decided_scn);
|
||||
ASSERT_EQ(OB_SUCCESS, ret);
|
||||
|
||||
// wait for mds table flush except->triggle minor
|
||||
ret = wait_for_mds_table_flush(tablet_id);
|
||||
ASSERT_EQ(OB_SUCCESS, ret);
|
||||
|
||||
// wait all mds nodes to be released
|
||||
tablet_handle.reset();
|
||||
ret = wait_for_all_mds_nodes_released(tablet_id);
|
||||
ASSERT_EQ(OB_SUCCESS, ret);
|
||||
|
||||
// schedule mds minor
|
||||
ASSERT_EQ(OB_SUCCESS, try_schedule_mds_minor(tablet_id));
|
||||
// test multi version get
|
||||
constexpr uint8_t mds_unit_id = mds::TupleTypeIdx<mds::NormalMdsTable, mds::MdsUnit<mds::DummyKey, ObTabletCreateDeleteMdsUserData>>::value;
|
||||
common::ObString dummy_key;
|
||||
const int64_t timeout_us = ObClockGenerator::getClock() + 1_s;
|
||||
|
||||
ret = TestMdsTableScan::get_tablet(tablet_id, tablet_handle);
|
||||
ASSERT_EQ(OB_SUCCESS, ret);
|
||||
tablet = tablet_handle.get_obj();
|
||||
ASSERT_NE(nullptr, tablet);
|
||||
|
||||
// check mds sstable be minor
|
||||
|
||||
ret = tablet->fetch_table_store(table_store_wrapper);
|
||||
EXPECT_EQ(OB_SUCCESS, ret);
|
||||
const ObTabletTableStore *minor_table_store = table_store_wrapper.get_member();
|
||||
EXPECT_EQ(1, minor_table_store->mds_sstables_.count());
|
||||
EXPECT_EQ(ObITable::TableType::MDS_MINOR_SSTABLE, minor_table_store->mds_sstables_.at(0)->key_.table_type_);
|
||||
EXPECT_EQ(OB_SUCCESS, mds_mini_handle.get_sstable(mds_mini));
|
||||
|
||||
// try push mini.
|
||||
const share::SCN &flush_scn = mds_mini->get_end_scn();
|
||||
compaction::ObTabletMergeDagParam dag_param(compaction::MDS_MINI_MERGE, LS_ID, tablet_id,
|
||||
tablet->get_tablet_meta().transfer_info_.transfer_seq_);
|
||||
|
||||
compaction::ObTabletMergeCtx mini_ctx(dag_param, allocator_);
|
||||
mini_ctx.static_param_.scn_range_.end_scn_ = flush_scn;
|
||||
mini_ctx.static_param_.version_range_.snapshot_version_ = flush_scn.get_val_for_tx();
|
||||
mini_ctx.static_param_.scn_range_.start_scn_ = mds_mini->get_start_scn();
|
||||
|
||||
ObTabletHandle new_tablet_handle;
|
||||
ObLSHandle ls_handle;
|
||||
ObLS *ls = nullptr;
|
||||
ASSERT_EQ(OB_SUCCESS, mini_ctx.get_ls_and_tablet());
|
||||
ASSERT_EQ(OB_SUCCESS, MTL(ObLSService*)->get_ls(LS_ID, ls_handle, ObLSGetMod::STORAGE_MOD));
|
||||
ASSERT_NE(nullptr, ls = mini_ctx.get_ls());
|
||||
ASSERT_EQ(OB_SUCCESS, mini_ctx.init_tablet_merge_info(false/*need_check*/));
|
||||
mini_ctx.static_param_.ls_rebuild_seq_ = ls->ls_meta_.get_rebuild_seq();
|
||||
ASSERT_EQ(OB_MINOR_SSTABLE_RANGE_CROSS, ls->build_new_tablet_from_mds_table(mini_ctx, tablet_id, mds_mini_handle, flush_scn, new_tablet_handle));
|
||||
}
|
||||
|
||||
} // namespace storage
|
||||
} // namespace oceanbase
|
||||
|
||||
|
@ -126,15 +126,15 @@ int ObMdsTableMergeTask::process()
|
||||
} else if (OB_UNLIKELY(!mds_table.get_mds_table_ptr()->is_construct_sequence_matched(mds_construct_sequence))) {
|
||||
ret = OB_NO_NEED_MERGE;
|
||||
LOG_WARN("construct sequence does not match current mds table, no need to merge", K(ret), K(ls_id), K(tablet_id), K(mds_construct_sequence));
|
||||
} else if (ctx.get_tablet()->get_mds_checkpoint_scn() >= flush_scn) {
|
||||
} else if (tablet->get_mds_checkpoint_scn() >= flush_scn) {
|
||||
need_schedule_mds_minor = false;
|
||||
FLOG_INFO("flush scn smaller than mds ckpt scn, only flush nodes of mds table and do not generate mds mini",
|
||||
K(ret), K(ls_id), K(tablet_id), K(flush_scn),
|
||||
"mds_checkpoint_scn", ctx.get_tablet()->get_mds_checkpoint_scn(),
|
||||
"mds_checkpoint_scn", tablet->get_mds_checkpoint_scn(),
|
||||
KPC(mds_merge_dag_));
|
||||
ctx.time_guard_click(ObStorageCompactionTimeGuard::EXECUTE);
|
||||
share::dag_yield();
|
||||
} else if (FALSE_IT(ctx.static_param_.scn_range_.start_scn_ = ctx.get_tablet()->get_mds_checkpoint_scn())) {
|
||||
} else if (FALSE_IT(ctx.static_param_.scn_range_.start_scn_ = tablet->get_mds_checkpoint_scn())) {
|
||||
} else if (MDS_FAIL(build_mds_sstable(ctx, mds_construct_sequence, table_handle))) {
|
||||
LOG_WARN("fail to build mds sstable", K(ret), K(ls_id), K(tablet_id), KPC(mds_merge_dag_));
|
||||
} else if (MDS_FAIL(ls->build_new_tablet_from_mds_table(
|
||||
|
@ -1304,12 +1304,6 @@ int ObTablet::init_with_mds_sstable(
|
||||
} else if (OB_UNLIKELY(!is_mds_merge(param.merge_type_))) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_WARN("unexpected merge type", K(ret), K(param));
|
||||
} else if (OB_UNLIKELY(!is_mds_minor_merge(param.merge_type_) && flush_scn <= old_tablet.tablet_meta_.mds_checkpoint_scn_)) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_WARN("unexpected error, flush scn is smaller than old mds checkpoint scn", K(ret),
|
||||
"merge_type", merge_type_to_str(param.merge_type_),
|
||||
K(flush_scn),
|
||||
"mds_checkpoint_scn", old_tablet.tablet_meta_.mds_checkpoint_scn_);
|
||||
} else if (param.need_check_transfer_seq_ && OB_FAIL(check_transfer_seq_equal(old_tablet, param.transfer_seq_))) {
|
||||
LOG_WARN("failed to check transfer seq eq", K(ret), K(old_tablet), K(param));
|
||||
} else if (CLICK_FAIL(old_tablet.fetch_table_store(old_table_store_wrapper))) {
|
||||
@ -1463,8 +1457,7 @@ int ObTablet::init_for_compat(
|
||||
set_next_tablet_guard(old_tablet.next_tablet_guard_);
|
||||
}
|
||||
is_inited_ = true;
|
||||
LOG_INFO("succeeded to init tablet for compat",
|
||||
K(ret), K(old_tablet), K(mds_mini_sstable), KPC(this));
|
||||
LOG_INFO("succeeded to init tablet for compat", K(ret), K(old_tablet), K(mds_mini_sstable), KPC(this));
|
||||
}
|
||||
ObTabletObjLoadHelper::free(tmp_arena_allocator, old_storage_schema);
|
||||
|
||||
|
@ -243,7 +243,7 @@ int ObTabletMeta::init(
|
||||
ddl_snapshot_version_ = old_tablet_meta.ddl_snapshot_version_;
|
||||
max_sync_storage_schema_version_ = old_tablet_meta.max_sync_storage_schema_version_;
|
||||
max_serialized_medium_scn_ = old_tablet_meta.max_serialized_medium_scn_;
|
||||
mds_checkpoint_scn_ = flush_scn;
|
||||
mds_checkpoint_scn_ = SCN::max(flush_scn, old_tablet_meta.mds_checkpoint_scn_);
|
||||
transfer_info_ = old_tablet_meta.transfer_info_;
|
||||
extra_medium_info_ = old_tablet_meta.extra_medium_info_;
|
||||
space_usage_ = old_tablet_meta.space_usage_;
|
||||
|
Loading…
x
Reference in New Issue
Block a user