diff --git a/src/storage/compaction/ob_partition_merge_policy.cpp b/src/storage/compaction/ob_partition_merge_policy.cpp index 701a46bca..3e181c6ef 100644 --- a/src/storage/compaction/ob_partition_merge_policy.cpp +++ b/src/storage/compaction/ob_partition_merge_policy.cpp @@ -1006,7 +1006,7 @@ int ObPartitionMergePolicy::refine_minor_merge_result( } else { mini_sstable_row_cnt += sstable->get_row_count(); } - if (OB_FAIL(mini_tables.add_table(tmp_table_handle))) { + if (OB_FAIL(mini_tables.add_table(tmp_table_handle))) { // mini_tables hold continues small sstables LOG_WARN("Failed to push mini minor table into array", K(ret)); } } @@ -1015,15 +1015,17 @@ int ObPartitionMergePolicy::refine_minor_merge_result( int64_t size_amplification_factor = OB_DEFAULT_COMPACTION_AMPLIFICATION_FACTOR; { omt::ObTenantConfigGuard tenant_config(TENANT_CONF(MTL_ID())); - if (tenant_config.is_valid()) { + if (tenant_config.is_valid() && int64_t(tenant_config->_minor_compaction_amplification_factor) > 0) { size_amplification_factor = tenant_config->_minor_compaction_amplification_factor; } } if (OB_FAIL(ret)) { } else if (large_sstable_cnt > 1 - || mini_tables.get_count() <= minor_compact_trigger || mini_sstable_row_cnt > (large_sstable_row_cnt * size_amplification_factor / 100)) { // no refine, use current result to compaction + } else if (mini_tables.get_count() <= minor_compact_trigger) { + ret = OB_NO_NEED_MERGE; + result.reset(); } else if (mini_tables.get_count() != result.handle_.get_count()) { // reset the merge result, mini sstable merge into a new mini sstable result.reset_handle_and_range(); diff --git a/unittest/storage/mockcontainer/mock_ob_iterator.cpp b/unittest/storage/mockcontainer/mock_ob_iterator.cpp index b7f2e32a7..9893d1109 100644 --- a/unittest/storage/mockcontainer/mock_ob_iterator.cpp +++ b/unittest/storage/mockcontainer/mock_ob_iterator.cpp @@ -1018,6 +1018,9 @@ int ObMockIteratorBuilder::static_init() ObMockIteratorBuilder::parse_bigint) || OB_SUCCESS != str_to_obj_parse_func_.set_refactored( ObString::make_string("upper_ver"), + ObMockIteratorBuilder::parse_bigint) + || OB_SUCCESS != str_to_obj_parse_func_.set_refactored( + ObString::make_string("row_cnt"), ObMockIteratorBuilder::parse_bigint)) { ret = OB_INIT_FAIL; STORAGE_LOG(WARN, "obj parse func hashtable insert failed"); @@ -1034,7 +1037,9 @@ int ObMockIteratorBuilder::static_init() || OB_SUCCESS != str_to_obj_type_.set_refactored( ObString::make_string("max_ver"), &BIGINT_TYPE) || OB_SUCCESS != str_to_obj_type_.set_refactored( - ObString::make_string("upper_ver"), &BIGINT_TYPE)) { + ObString::make_string("upper_ver"), &BIGINT_TYPE) + || OB_SUCCESS != str_to_obj_type_.set_refactored( + ObString::make_string("row_cnt"), &BIGINT_TYPE)) { ret = OB_INIT_FAIL; STORAGE_LOG(WARN, "obj type hashtable insert failed"); } else { diff --git a/unittest/storage/test_compaction_policy.cpp b/unittest/storage/test_compaction_policy.cpp index f2a0aa546..1136909a5 100644 --- a/unittest/storage/test_compaction_policy.cpp +++ b/unittest/storage/test_compaction_policy.cpp @@ -92,6 +92,9 @@ public: const int64_t max_merged_trans_version, const int64_t upper_trans_version, ObTableHandleV2 &table_handle); + static int mock_sstable_meta( + const int64_t row_count, + ObTableHandleV2 &table_handle); static int mock_memtable( const int64_t start_log_ts, const int64_t end_log_ts, @@ -120,6 +123,7 @@ public: static int batch_mock_tables( common::ObArenaAllocator &allocator, const char *key_data, + const bool have_row_cnt, common::ObIArray &major_tables, common::ObIArray &minor_tables, common::ObIArray &memtables, @@ -139,7 +143,8 @@ public: int prepare_tablet( const char *key_data, const int64_t clog_checkpoint_ts, - const int64_t snapshot_version); + const int64_t snapshot_version, + const bool have_row_cnt = false); public: virtual void SetUp() override; virtual void TearDown() override; @@ -310,6 +315,24 @@ int TestCompactionPolicy::mock_sstable( return ret; } +int TestCompactionPolicy::mock_sstable_meta( + const int64_t row_count, + ObTableHandleV2 &table_handle) +{ + int ret = OB_SUCCESS; + ObSSTable *sstable = nullptr; + if (OB_UNLIKELY(row_count <= 0)) { + ret = OB_INVALID_ARGUMENT; + LOG_WARN("invalid row count", KR(ret), K(row_count)); + } else if (OB_FAIL(table_handle.get_sstable(sstable))) { + LOG_WARN("failed to get sstable", KR(ret), K(table_handle)); + } else { + sstable->meta_->basic_meta_.row_count_ = row_count; + sstable->meta_cache_.row_count_ = row_count; + } + return ret; +} + int TestCompactionPolicy::mock_memtable( const int64_t start_scn, const int64_t end_scn, @@ -570,6 +593,7 @@ int TestCompactionPolicy::batch_mock_memtables( int TestCompactionPolicy::batch_mock_tables( common::ObArenaAllocator &allocator, const char *key_data, + const bool have_row_cnt, common::ObIArray &major_tables, common::ObIArray &minor_tables, common::ObIArray &memtables, @@ -597,6 +621,8 @@ int TestCompactionPolicy::batch_mock_tables( ObITable::TableType table_type = (type == 10) ? ObITable::MAJOR_SSTABLE : ((type == 11) ? ObITable::MINOR_SSTABLE : ObITable::MINI_SSTABLE); if (OB_FAIL(mock_sstable(allocator, table_type, cells[1].get_int(), cells[2].get_int(), cells[3].get_int(), cells[4].get_int(), table_handle))) { LOG_WARN("failed to mock sstable", K(ret)); + } else if (have_row_cnt && OB_FAIL(mock_sstable_meta(cells[5].get_int(), table_handle))) { + LOG_WARN("failed to mock sstable meta", KR(ret)); } else if (ObITable::MAJOR_SSTABLE == table_type) { if (OB_FAIL(major_tables.push_back(table_handle))) { LOG_WARN("failed to add table", K(ret)); @@ -612,7 +638,8 @@ int TestCompactionPolicy::batch_mock_tables( int TestCompactionPolicy::prepare_tablet( const char *key_data, const int64_t clog_checkpoint_ts, - const int64_t snapshot_version) + const int64_t snapshot_version, + const bool have_row_cnt) { int ret = OB_SUCCESS; tablet_handle_.reset(); @@ -626,7 +653,7 @@ int TestCompactionPolicy::prepare_tablet( } else if (OB_FAIL(mock_tablet(allocator_, clog_checkpoint_ts, snapshot_version, tablet_handle_))) { LOG_WARN("failed to mock tablet", K(ret)); } else if (OB_ISNULL(key_data)) { - } else if (OB_FAIL(batch_mock_tables(allocator_, key_data, major_tables_, minor_tables_, memtables_, tablet_handle_))) { + } else if (OB_FAIL(batch_mock_tables(allocator_, key_data, have_row_cnt, major_tables_, minor_tables_, memtables_, tablet_handle_))) { LOG_WARN("failed to batch mock tables", K(ret)); } else if (OB_FAIL(mock_table_store(allocator_, tablet_handle_, major_tables_, minor_tables_))) { LOG_WARN("failed to mock table store", K(ret)); @@ -1133,6 +1160,112 @@ TEST_F(TestCompactionPolicy, check_sstable_continue_failed) ASSERT_EQ(OB_ERR_SYS, ret); } +TEST_F(TestCompactionPolicy, check_minor_merge_policy_with_large_minor) +{ + int ret = OB_SUCCESS; + ObTenantFreezeInfoMgr *mgr = MTL(ObTenantFreezeInfoMgr *); + ASSERT_TRUE(nullptr != mgr); + + common::ObArray freeze_info; + share::SCN frozen_val; + frozen_val.val_ = 1; + ASSERT_EQ(OB_SUCCESS, freeze_info.push_back(share::ObFreezeInfo(frozen_val, 1, 0))); + + ret = TestCompactionPolicy::prepare_freeze_info(500, freeze_info); + ASSERT_EQ(OB_SUCCESS, ret); + + const char *key_data = + "table_type start_scn end_scn max_ver upper_ver row_cnt\n" + "10 0 1 1 1 1\n" + "11 1 150 150 150 3000000\n" + "12 150 200 200 200 100\n" + "12 200 350 350 350 10000\n"; + + ret = prepare_tablet(key_data, 350, 350, true/*have row cnt*/); + ASSERT_EQ(OB_SUCCESS, ret); + + ObGetMergeTablesParam param; + param.merge_type_ = ObMergeType::MINOR_MERGE; + ObGetMergeTablesResult result; + FakeLS ls; + ret = ObPartitionMergePolicy::get_minor_merge_tables(param, ls, *tablet_handle_.get_obj(), result); + // small mini sstable count = 2, should not schedule minor merge + ASSERT_EQ(OB_NO_NEED_MERGE, ret); +} + +TEST_F(TestCompactionPolicy, check_minor_merge_policy_with_large_minor2) +{ + int ret = OB_SUCCESS; + ObTenantFreezeInfoMgr *mgr = MTL(ObTenantFreezeInfoMgr *); + ASSERT_TRUE(nullptr != mgr); + + common::ObArray freeze_info; + share::SCN frozen_val; + frozen_val.val_ = 1; + ASSERT_EQ(OB_SUCCESS, freeze_info.push_back(share::ObFreezeInfo(frozen_val, 1, 0))); + + ret = TestCompactionPolicy::prepare_freeze_info(500, freeze_info); + ASSERT_EQ(OB_SUCCESS, ret); + + const char *key_data = + "table_type start_scn end_scn max_ver upper_ver row_cnt\n" + "10 0 1 1 1 1\n" + "11 1 150 150 150 3000000\n" + "12 150 300 200 200 100\n" + "12 300 350 350 350 750000\n"; + + ret = prepare_tablet(key_data, 350, 350, true/*have row cnt*/); + ASSERT_EQ(OB_SUCCESS, ret); + + ObGetMergeTablesParam param; + param.merge_type_ = ObMergeType::MINOR_MERGE; + ObGetMergeTablesResult result; + FakeLS ls; + ret = ObPartitionMergePolicy::get_minor_merge_tables(param, ls, *tablet_handle_.get_obj(), result); + // reach size_amplification_factor, need minor + ASSERT_EQ(OB_SUCCESS, ret); + ASSERT_EQ(3, result.handle_.get_count()); + ASSERT_EQ(1, result.scn_range_.start_scn_.get_val_for_tx()); + ASSERT_EQ(350, result.scn_range_.end_scn_.get_val_for_tx()); +} + +TEST_F(TestCompactionPolicy, check_minor_merge_policy_with_large_minor3) +{ + int ret = OB_SUCCESS; + ObTenantFreezeInfoMgr *mgr = MTL(ObTenantFreezeInfoMgr *); + ASSERT_TRUE(nullptr != mgr); + + common::ObArray freeze_info; + share::SCN frozen_val; + frozen_val.val_ = 1; + ASSERT_EQ(OB_SUCCESS, freeze_info.push_back(share::ObFreezeInfo(frozen_val, 1, 0))); + + ret = TestCompactionPolicy::prepare_freeze_info(500, freeze_info); + ASSERT_EQ(OB_SUCCESS, ret); + + const char *key_data = + "table_type start_scn end_scn max_ver upper_ver row_cnt\n" + "10 0 1 1 1 1\n" + "11 1 150 150 150 3000000\n" + "12 150 250 200 200 100\n" + "12 250 300 200 200 100\n" + "12 300 350 350 350 600\n"; + + ret = prepare_tablet(key_data, 350, 350, true/*have row cnt*/); + ASSERT_EQ(OB_SUCCESS, ret); + + ObGetMergeTablesParam param; + param.merge_type_ = ObMergeType::MINOR_MERGE; + ObGetMergeTablesResult result; + FakeLS ls; + ret = ObPartitionMergePolicy::get_minor_merge_tables(param, ls, *tablet_handle_.get_obj(), result); + // small mini sstable count reach minor_compact_trigger, all small mini sstables should schedule mini minor merge + ASSERT_EQ(OB_SUCCESS, ret); + ASSERT_EQ(3, result.handle_.get_count()); + ASSERT_EQ(150, result.scn_range_.start_scn_.get_val_for_tx()); + ASSERT_EQ(350, result.scn_range_.end_scn_.get_val_for_tx()); +} + } //unittest } //oceanbase @@ -1140,7 +1273,7 @@ TEST_F(TestCompactionPolicy, check_sstable_continue_failed) int main(int argc, char **argv) { system("rm -rf test_compaction_policy.log*"); - OB_LOGGER.set_file_name("test_compaction_policy.log", true); + OB_LOGGER.set_file_name("test_compaction_policy.log"); OB_LOGGER.set_log_level("INFO"); CLOG_LOG(INFO, "begin unittest: test_compaction_policy"); ::testing::InitGoogleTest(&argc, argv);