From 2155b4291b87912798300fe7c6569d02181368eb Mon Sep 17 00:00:00 2001 From: obdev Date: Thu, 10 Nov 2022 08:08:25 +0000 Subject: [PATCH] revert bugfix for migrate --- .../compaction/ob_partition_merge_policy.cpp | 26 ++++++++++++------- .../compaction/ob_tablet_merge_ctx.cpp | 11 ++++---- unittest/storage/test_compaction_policy.cpp | 20 +++----------- 3 files changed, 24 insertions(+), 33 deletions(-) diff --git a/src/storage/compaction/ob_partition_merge_policy.cpp b/src/storage/compaction/ob_partition_merge_policy.cpp index 79b1cbaf51..75ebe45a8b 100644 --- a/src/storage/compaction/ob_partition_merge_policy.cpp +++ b/src/storage/compaction/ob_partition_merge_policy.cpp @@ -133,14 +133,14 @@ int ObPartitionMergePolicy::find_mini_merge_tables( // Keep max_snapshot_version currently because major merge must be done step by step int64_t max_snapshot_version = freeze_info.next.freeze_ts; ObITable *last_table = tablet.get_table_store().get_minor_sstables().get_boundary_table(true/*last*/); - const int64_t last_minor_log_ts = nullptr == last_table ? tablet.get_clog_checkpoint_ts() : last_table->get_end_log_ts(); + const int64_t clog_checkpoint_ts = tablet.get_clog_checkpoint_ts(); // Freezing in the restart phase may not satisfy end >= last_max_sstable, // so the memtable cannot be filtered by log_ts // can only take out all frozen memtable ObIMemtable *memtable = nullptr; const ObTabletID &tablet_id = tablet.get_tablet_meta().tablet_id_; - int64_t log_ts_included_memtable_cnt = 0; + bool contain_force_freeze_memtable = false; for (int64_t i = 0; OB_SUCC(ret) && i < memtable_handles.count(); ++i) { if (OB_ISNULL(memtable = static_cast(memtable_handles.at(i).get_table()))) { ret = OB_ERR_SYS; @@ -151,9 +151,12 @@ int ObPartitionMergePolicy::find_mini_merge_tables( } else if (!memtable->can_be_minor_merged()) { FLOG_INFO("memtable cannot mini merge now", K(ret), K(i), KPC(memtable), K(max_snapshot_version), K(memtable_handles), K(param)); break; - } else if (memtable->get_end_log_ts() <= last_minor_log_ts) { + } else if (memtable->get_end_log_ts() <= clog_checkpoint_ts) { if (!tablet_id.is_special_merge_tablet()) { - ++log_ts_included_memtable_cnt; + if (static_cast(memtable)->get_is_force_freeze() + && memtable->get_snapshot_version() > tablet.get_tablet_meta().snapshot_version_) { + contain_force_freeze_memtable = true; + } } else { LOG_DEBUG("memtable wait to release", K(param), KPC(memtable)); continue; @@ -189,9 +192,15 @@ int ObPartitionMergePolicy::find_mini_merge_tables( if (OB_SUCC(ret)) { result.suggest_merge_type_ = param.merge_type_; result.version_range_.multi_version_start_ = tablet.get_multi_version_start(); - if (log_ts_included_memtable_cnt > 0 && log_ts_included_memtable_cnt == result.handle_.get_count()) { - result.update_tablet_directly_ = true; - LOG_INFO("meet one empty force freeze memtable, could update tablet directly", K(ret), K(result)); + if (result.handle_.empty()) { + ret = OB_NO_NEED_MERGE; + } else if (result.log_ts_range_.end_log_ts_ <= clog_checkpoint_ts) { + if (contain_force_freeze_memtable) { + result.update_tablet_directly_ = true; + LOG_INFO("meet empty force freeze memtable, could update tablet directly", K(ret), K(result)); + } else { + ret = OB_NO_NEED_MERGE; + } } else if (OB_FAIL(refine_mini_merge_result(tablet, result))) { if (OB_NO_NEED_MERGE != ret) { LOG_WARN("failed to refine mini merge result", K(ret), K(tablet_id)); @@ -1010,9 +1019,6 @@ int ObPartitionMergePolicy::refine_mini_merge_result( if (OB_UNLIKELY(!table_store.is_valid())) { ret = OB_ERR_UNEXPECTED; LOG_WARN("Table store not valid", K(ret), K(table_store)); - } else if (result.handle_.empty()){ - ret = OB_NO_NEED_MERGE; - // LOG_WARN("no need mini merge", K(ret), K(result), K(table_store)); } else if (OB_ISNULL(last_table = table_store.get_minor_sstables().get_boundary_table(true/*last*/))) { // no minor sstable, skip to cut memtable's boundary } else if (result.log_ts_range_.start_log_ts_ > last_table->get_end_log_ts()) { diff --git a/src/storage/compaction/ob_tablet_merge_ctx.cpp b/src/storage/compaction/ob_tablet_merge_ctx.cpp index 555f2c984e..024fe77a02 100644 --- a/src/storage/compaction/ob_tablet_merge_ctx.cpp +++ b/src/storage/compaction/ob_tablet_merge_ctx.cpp @@ -758,15 +758,15 @@ int ObTabletMergeCtx::update_tablet_or_release_memtable(const ObGetMergeTablesRe } else if (schema_ctx_.storage_schema_->get_schema_version() > old_tablet->get_storage_schema().get_schema_version()) { ret = OB_ERR_UNEXPECTED; LOG_WARN("can't have larger storage schema", K(ret), K(schema_ctx_.storage_schema_), K(old_tablet->get_storage_schema())); + } else if (OB_UNLIKELY(get_merge_table_result.log_ts_range_.end_log_ts_ > old_tablet->get_tablet_meta().clog_checkpoint_ts_)) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("can't have larger end_log_ts", K(ret), K(get_merge_table_result), K(old_tablet->get_tablet_meta())); } else if (get_merge_table_result.version_range_.snapshot_version_ > old_tablet->get_snapshot_version()) { // need write slog to update snapshot_version on tablet_meta update_table_store_flag = true; - } else if (get_merge_table_result.log_ts_range_.end_log_ts_ > old_tablet->get_clog_checkpoint_ts()) { - // need write slog to update clog_checkpoint_log_ts on tablet_meta - update_table_store_flag = true; } - const int64_t release_memtable_log_ts = get_merge_table_result.log_ts_range_.end_log_ts_; + const int64_t release_memtable_log_ts = old_tablet->get_clog_checkpoint_ts(); if (OB_FAIL(ret)) { } else if (update_table_store_flag && OB_FAIL(update_tablet_directly(get_merge_table_result))) { LOG_WARN("failed to update tablet directly", K(ret), K(get_merge_table_result), K(update_table_store_flag)); @@ -783,7 +783,6 @@ int ObTabletMergeCtx::update_tablet_directly(const ObGetMergeTablesResult &get_m int ret = OB_SUCCESS; const int64_t rebuild_seq = ls_handle_.get_ls()->get_rebuild_seq(); log_ts_range_ = get_merge_table_result.log_ts_range_; - int64_t clog_checkpoint_ts = get_merge_table_result.log_ts_range_.end_log_ts_; ObTableHandleV2 empty_table_handle; ObUpdateTableStoreParam param( @@ -793,7 +792,7 @@ int ObTabletMergeCtx::update_tablet_directly(const ObGetMergeTablesResult &get_m schema_ctx_.storage_schema_, rebuild_seq, param_.is_major_merge(), - clog_checkpoint_ts); + 0/*clog_checkpoint_ts*/); ObTabletHandle new_tablet_handle; if (OB_FAIL(ls_handle_.get_ls()->update_tablet_table_store( param_.tablet_id_, param, new_tablet_handle))) { diff --git a/unittest/storage/test_compaction_policy.cpp b/unittest/storage/test_compaction_policy.cpp index eaebdf571c..8e0bd9b647 100644 --- a/unittest/storage/test_compaction_policy.cpp +++ b/unittest/storage/test_compaction_policy.cpp @@ -711,25 +711,11 @@ TEST_F(TestCompactionPolicy, check_mini_merge_basic) ObGetMergeTablesParam param; param.merge_type_ = ObMergeType::MINI_MERGE; ObGetMergeTablesResult result; - ret = ObPartitionMergePolicy::get_mini_merge_tables(param, 0, *tablet_handle_.get_obj(), result); - ASSERT_EQ(OB_SUCCESS, ret); - ASSERT_EQ(3, result.handle_.get_count()); - tablet_handle_.get_obj()->tablet_meta_.clog_checkpoint_ts_ = 300; tablet_handle_.get_obj()->tablet_meta_.snapshot_version_ = 300; - result.reset(); ret = ObPartitionMergePolicy::get_mini_merge_tables(param, 0, *tablet_handle_.get_obj(), result); - ASSERT_EQ(OB_SUCCESS, ret); - ASSERT_EQ(result.update_tablet_directly_, true); - - tablet_handle_.get_obj()->tablet_meta_.clog_checkpoint_ts_ = 280; - tablet_handle_.get_obj()->tablet_meta_.snapshot_version_ = 280; - result.reset(); - ret = ObPartitionMergePolicy::get_mini_merge_tables(param, 0, *tablet_handle_.get_obj(), result); - ASSERT_EQ(OB_SUCCESS, ret); - ASSERT_EQ(3, result.handle_.get_count()); - ASSERT_EQ(300, result.log_ts_range_.end_log_ts_); - ASSERT_EQ(result.update_tablet_directly_, true); + ASSERT_EQ(OB_NO_NEED_MERGE, ret); + ASSERT_EQ(result.update_tablet_directly_, false); } TEST_F(TestCompactionPolicy, check_minor_merge_basic) @@ -877,7 +863,7 @@ TEST_F(TestCompactionPolicy, check_no_need_major_merge) int main(int argc, char **argv) { system("rm -rf test_compaction_policy.log"); - OB_LOGGER.set_file_name("test_compaction_policy.log", true); + OB_LOGGER.set_file_name("test_compaction_policy.log"); OB_LOGGER.set_log_level("DEBUG"); CLOG_LOG(INFO, "begin unittest: test_compaction_policy"); ::testing::InitGoogleTest(&argc, argv);