[CP] add case for minor merge

This commit is contained in:
yangqise7en 2024-07-11 10:30:05 +00:00 committed by ob-robot
parent 2e480d891a
commit a691750c86
3 changed files with 148 additions and 8 deletions

View File

@ -1006,7 +1006,7 @@ int ObPartitionMergePolicy::refine_minor_merge_result(
} else {
mini_sstable_row_cnt += sstable->get_row_count();
}
if (OB_FAIL(mini_tables.add_table(tmp_table_handle))) {
if (OB_FAIL(mini_tables.add_table(tmp_table_handle))) { // mini_tables hold continues small sstables
LOG_WARN("Failed to push mini minor table into array", K(ret));
}
}
@ -1015,15 +1015,17 @@ int ObPartitionMergePolicy::refine_minor_merge_result(
int64_t size_amplification_factor = OB_DEFAULT_COMPACTION_AMPLIFICATION_FACTOR;
{
omt::ObTenantConfigGuard tenant_config(TENANT_CONF(MTL_ID()));
if (tenant_config.is_valid()) {
if (tenant_config.is_valid() && int64_t(tenant_config->_minor_compaction_amplification_factor) > 0) {
size_amplification_factor = tenant_config->_minor_compaction_amplification_factor;
}
}
if (OB_FAIL(ret)) {
} else if (large_sstable_cnt > 1
|| mini_tables.get_count() <= minor_compact_trigger
|| mini_sstable_row_cnt > (large_sstable_row_cnt * size_amplification_factor / 100)) {
// no refine, use current result to compaction
} else if (mini_tables.get_count() <= minor_compact_trigger) {
ret = OB_NO_NEED_MERGE;
result.reset();
} else if (mini_tables.get_count() != result.handle_.get_count()) {
// reset the merge result, mini sstable merge into a new mini sstable
result.reset_handle_and_range();

View File

@ -1018,6 +1018,9 @@ int ObMockIteratorBuilder::static_init()
ObMockIteratorBuilder::parse_bigint)
|| OB_SUCCESS != str_to_obj_parse_func_.set_refactored(
ObString::make_string("upper_ver"),
ObMockIteratorBuilder::parse_bigint)
|| OB_SUCCESS != str_to_obj_parse_func_.set_refactored(
ObString::make_string("row_cnt"),
ObMockIteratorBuilder::parse_bigint)) {
ret = OB_INIT_FAIL;
STORAGE_LOG(WARN, "obj parse func hashtable insert failed");
@ -1034,7 +1037,9 @@ int ObMockIteratorBuilder::static_init()
|| OB_SUCCESS != str_to_obj_type_.set_refactored(
ObString::make_string("max_ver"), &BIGINT_TYPE)
|| OB_SUCCESS != str_to_obj_type_.set_refactored(
ObString::make_string("upper_ver"), &BIGINT_TYPE)) {
ObString::make_string("upper_ver"), &BIGINT_TYPE)
|| OB_SUCCESS != str_to_obj_type_.set_refactored(
ObString::make_string("row_cnt"), &BIGINT_TYPE)) {
ret = OB_INIT_FAIL;
STORAGE_LOG(WARN, "obj type hashtable insert failed");
} else {

View File

@ -92,6 +92,9 @@ public:
const int64_t max_merged_trans_version,
const int64_t upper_trans_version,
ObTableHandleV2 &table_handle);
static int mock_sstable_meta(
const int64_t row_count,
ObTableHandleV2 &table_handle);
static int mock_memtable(
const int64_t start_log_ts,
const int64_t end_log_ts,
@ -120,6 +123,7 @@ public:
static int batch_mock_tables(
common::ObArenaAllocator &allocator,
const char *key_data,
const bool have_row_cnt,
common::ObIArray<ObTableHandleV2> &major_tables,
common::ObIArray<ObTableHandleV2> &minor_tables,
common::ObIArray<ObTableHandleV2> &memtables,
@ -139,7 +143,8 @@ public:
int prepare_tablet(
const char *key_data,
const int64_t clog_checkpoint_ts,
const int64_t snapshot_version);
const int64_t snapshot_version,
const bool have_row_cnt = false);
public:
virtual void SetUp() override;
virtual void TearDown() override;
@ -310,6 +315,24 @@ int TestCompactionPolicy::mock_sstable(
return ret;
}
int TestCompactionPolicy::mock_sstable_meta(
const int64_t row_count,
ObTableHandleV2 &table_handle)
{
int ret = OB_SUCCESS;
ObSSTable *sstable = nullptr;
if (OB_UNLIKELY(row_count <= 0)) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("invalid row count", KR(ret), K(row_count));
} else if (OB_FAIL(table_handle.get_sstable(sstable))) {
LOG_WARN("failed to get sstable", KR(ret), K(table_handle));
} else {
sstable->meta_->basic_meta_.row_count_ = row_count;
sstable->meta_cache_.row_count_ = row_count;
}
return ret;
}
int TestCompactionPolicy::mock_memtable(
const int64_t start_scn,
const int64_t end_scn,
@ -570,6 +593,7 @@ int TestCompactionPolicy::batch_mock_memtables(
int TestCompactionPolicy::batch_mock_tables(
common::ObArenaAllocator &allocator,
const char *key_data,
const bool have_row_cnt,
common::ObIArray<ObTableHandleV2> &major_tables,
common::ObIArray<ObTableHandleV2> &minor_tables,
common::ObIArray<ObTableHandleV2> &memtables,
@ -597,6 +621,8 @@ int TestCompactionPolicy::batch_mock_tables(
ObITable::TableType table_type = (type == 10) ? ObITable::MAJOR_SSTABLE : ((type == 11) ? ObITable::MINOR_SSTABLE : ObITable::MINI_SSTABLE);
if (OB_FAIL(mock_sstable(allocator, table_type, cells[1].get_int(), cells[2].get_int(), cells[3].get_int(), cells[4].get_int(), table_handle))) {
LOG_WARN("failed to mock sstable", K(ret));
} else if (have_row_cnt && OB_FAIL(mock_sstable_meta(cells[5].get_int(), table_handle))) {
LOG_WARN("failed to mock sstable meta", KR(ret));
} else if (ObITable::MAJOR_SSTABLE == table_type) {
if (OB_FAIL(major_tables.push_back(table_handle))) {
LOG_WARN("failed to add table", K(ret));
@ -612,7 +638,8 @@ int TestCompactionPolicy::batch_mock_tables(
int TestCompactionPolicy::prepare_tablet(
const char *key_data,
const int64_t clog_checkpoint_ts,
const int64_t snapshot_version)
const int64_t snapshot_version,
const bool have_row_cnt)
{
int ret = OB_SUCCESS;
tablet_handle_.reset();
@ -626,7 +653,7 @@ int TestCompactionPolicy::prepare_tablet(
} else if (OB_FAIL(mock_tablet(allocator_, clog_checkpoint_ts, snapshot_version, tablet_handle_))) {
LOG_WARN("failed to mock tablet", K(ret));
} else if (OB_ISNULL(key_data)) {
} else if (OB_FAIL(batch_mock_tables(allocator_, key_data, major_tables_, minor_tables_, memtables_, tablet_handle_))) {
} else if (OB_FAIL(batch_mock_tables(allocator_, key_data, have_row_cnt, major_tables_, minor_tables_, memtables_, tablet_handle_))) {
LOG_WARN("failed to batch mock tables", K(ret));
} else if (OB_FAIL(mock_table_store(allocator_, tablet_handle_, major_tables_, minor_tables_))) {
LOG_WARN("failed to mock table store", K(ret));
@ -1133,6 +1160,112 @@ TEST_F(TestCompactionPolicy, check_sstable_continue_failed)
ASSERT_EQ(OB_ERR_SYS, ret);
}
TEST_F(TestCompactionPolicy, check_minor_merge_policy_with_large_minor)
{
int ret = OB_SUCCESS;
ObTenantFreezeInfoMgr *mgr = MTL(ObTenantFreezeInfoMgr *);
ASSERT_TRUE(nullptr != mgr);
common::ObArray<share::ObFreezeInfo> freeze_info;
share::SCN frozen_val;
frozen_val.val_ = 1;
ASSERT_EQ(OB_SUCCESS, freeze_info.push_back(share::ObFreezeInfo(frozen_val, 1, 0)));
ret = TestCompactionPolicy::prepare_freeze_info(500, freeze_info);
ASSERT_EQ(OB_SUCCESS, ret);
const char *key_data =
"table_type start_scn end_scn max_ver upper_ver row_cnt\n"
"10 0 1 1 1 1\n"
"11 1 150 150 150 3000000\n"
"12 150 200 200 200 100\n"
"12 200 350 350 350 10000\n";
ret = prepare_tablet(key_data, 350, 350, true/*have row cnt*/);
ASSERT_EQ(OB_SUCCESS, ret);
ObGetMergeTablesParam param;
param.merge_type_ = ObMergeType::MINOR_MERGE;
ObGetMergeTablesResult result;
FakeLS ls;
ret = ObPartitionMergePolicy::get_minor_merge_tables(param, ls, *tablet_handle_.get_obj(), result);
// small mini sstable count = 2, should not schedule minor merge
ASSERT_EQ(OB_NO_NEED_MERGE, ret);
}
TEST_F(TestCompactionPolicy, check_minor_merge_policy_with_large_minor2)
{
int ret = OB_SUCCESS;
ObTenantFreezeInfoMgr *mgr = MTL(ObTenantFreezeInfoMgr *);
ASSERT_TRUE(nullptr != mgr);
common::ObArray<share::ObFreezeInfo> freeze_info;
share::SCN frozen_val;
frozen_val.val_ = 1;
ASSERT_EQ(OB_SUCCESS, freeze_info.push_back(share::ObFreezeInfo(frozen_val, 1, 0)));
ret = TestCompactionPolicy::prepare_freeze_info(500, freeze_info);
ASSERT_EQ(OB_SUCCESS, ret);
const char *key_data =
"table_type start_scn end_scn max_ver upper_ver row_cnt\n"
"10 0 1 1 1 1\n"
"11 1 150 150 150 3000000\n"
"12 150 300 200 200 100\n"
"12 300 350 350 350 750000\n";
ret = prepare_tablet(key_data, 350, 350, true/*have row cnt*/);
ASSERT_EQ(OB_SUCCESS, ret);
ObGetMergeTablesParam param;
param.merge_type_ = ObMergeType::MINOR_MERGE;
ObGetMergeTablesResult result;
FakeLS ls;
ret = ObPartitionMergePolicy::get_minor_merge_tables(param, ls, *tablet_handle_.get_obj(), result);
// reach size_amplification_factor, need minor
ASSERT_EQ(OB_SUCCESS, ret);
ASSERT_EQ(3, result.handle_.get_count());
ASSERT_EQ(1, result.scn_range_.start_scn_.get_val_for_tx());
ASSERT_EQ(350, result.scn_range_.end_scn_.get_val_for_tx());
}
TEST_F(TestCompactionPolicy, check_minor_merge_policy_with_large_minor3)
{
int ret = OB_SUCCESS;
ObTenantFreezeInfoMgr *mgr = MTL(ObTenantFreezeInfoMgr *);
ASSERT_TRUE(nullptr != mgr);
common::ObArray<share::ObFreezeInfo> freeze_info;
share::SCN frozen_val;
frozen_val.val_ = 1;
ASSERT_EQ(OB_SUCCESS, freeze_info.push_back(share::ObFreezeInfo(frozen_val, 1, 0)));
ret = TestCompactionPolicy::prepare_freeze_info(500, freeze_info);
ASSERT_EQ(OB_SUCCESS, ret);
const char *key_data =
"table_type start_scn end_scn max_ver upper_ver row_cnt\n"
"10 0 1 1 1 1\n"
"11 1 150 150 150 3000000\n"
"12 150 250 200 200 100\n"
"12 250 300 200 200 100\n"
"12 300 350 350 350 600\n";
ret = prepare_tablet(key_data, 350, 350, true/*have row cnt*/);
ASSERT_EQ(OB_SUCCESS, ret);
ObGetMergeTablesParam param;
param.merge_type_ = ObMergeType::MINOR_MERGE;
ObGetMergeTablesResult result;
FakeLS ls;
ret = ObPartitionMergePolicy::get_minor_merge_tables(param, ls, *tablet_handle_.get_obj(), result);
// small mini sstable count reach minor_compact_trigger, all small mini sstables should schedule mini minor merge
ASSERT_EQ(OB_SUCCESS, ret);
ASSERT_EQ(3, result.handle_.get_count());
ASSERT_EQ(150, result.scn_range_.start_scn_.get_val_for_tx());
ASSERT_EQ(350, result.scn_range_.end_scn_.get_val_for_tx());
}
} //unittest
} //oceanbase
@ -1140,7 +1273,7 @@ TEST_F(TestCompactionPolicy, check_sstable_continue_failed)
int main(int argc, char **argv)
{
system("rm -rf test_compaction_policy.log*");
OB_LOGGER.set_file_name("test_compaction_policy.log", true);
OB_LOGGER.set_file_name("test_compaction_policy.log");
OB_LOGGER.set_log_level("INFO");
CLOG_LOG(INFO, "begin unittest: test_compaction_policy");
::testing::InitGoogleTest(&argc, argv);