set medium_info.last_medium_snapshot_ according to major not meta.

This commit is contained in:
Tsunaou 2024-03-25 03:46:05 +00:00 committed by ob-robot
parent 98b4455cf3
commit 86e8b925b9
4 changed files with 73 additions and 50 deletions

View File

@ -223,6 +223,7 @@ public:
static const int64_t MEDIUM_COMPAT_VERSION_V2 = 2; // for add last_medium_snapshot_
static const int64_t MEDIUM_COMPAT_VERSION_V3 = 3; // for stanby tenant, not throw medium info
static const int64_t MEDIUM_COMPAT_VERSION_V4 = 4; // after this version, use is_schema_changed on medium info
static const int64_t MEDIUM_COMPAT_VERSION_LATEST = MEDIUM_COMPAT_VERSION_V4;
private:
static const int32_t SCS_ONE_BIT = 1;
static const int32_t SCS_RESERVED_BITS = 32;

View File

@ -122,7 +122,7 @@ int ObMediumListChecker::check_next_schedule_medium(
int ret = OB_SUCCESS;
if (nullptr != next_medium_info
&& last_major_snapshot > 0
&& ObMediumCompactionInfo::MEDIUM_COMPAT_VERSION_V2 == next_medium_info->medium_compat_version_
&& ObMediumCompactionInfo::MEDIUM_COMPAT_VERSION_V2 <= next_medium_info->medium_compat_version_
&& next_medium_info->medium_snapshot_ > last_major_snapshot) {
if (next_medium_info->from_cur_cluster()) { // same cluster_id & same tenant_id
if (OB_UNLIKELY(next_medium_info->last_medium_snapshot_ != last_major_snapshot)) {

View File

@ -1421,9 +1421,17 @@ int ObAdaptiveMergePolicy::find_adaptive_merge_tables(
} else if (table_store->get_minor_sstables().empty() || table_store->get_major_sstables().empty()) {
ret = OB_NO_NEED_MERGE;
LOG_DEBUG("no minor/major sstable to do meta major merge", K(ret), KPC(table_store));
} else if (OB_ISNULL(base_table = nullptr == table_store->get_meta_major_sstable()
? static_cast<ObSSTable*>(table_store->get_major_sstables().get_boundary_table(true/*last*/))
: table_store->get_meta_major_sstable())) {
} else if (is_meta_major_merge(merge_type)) {
base_table = table_store->get_meta_major_sstable();
if (nullptr == base_table) {
base_table = static_cast<ObSSTable*>(table_store->get_major_sstables().get_boundary_table(true/*last*/));
}
} else {
base_table = static_cast<ObSSTable*>(table_store->get_major_sstables().get_boundary_table(true/*last*/));
}
if (OB_FAIL(ret)) {
} else if (OB_ISNULL(base_table)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("get unexpected null base table", K(ret), KPC(table_store), K(tablet));
} else if (OB_FAIL(ObPartitionMergePolicy::get_boundary_snapshot_version(tablet, min_snapshot, max_snapshot))) {

View File

@ -38,17 +38,18 @@ public:
{
allocator_.reset();
}
void set_basic_info(ObMediumCompactionInfo &medium_info)
void set_basic_info(ObMediumCompactionInfo &medium_info, const int64_t compat_version)
{
medium_info.compaction_type_ = ObMediumCompactionInfo::MEDIUM_COMPACTION;
medium_info.tenant_id_ = MTL_ID();
medium_info.data_version_ = 100;
medium_info.cluster_id_ = INIT_CLUSTER_ID;
medium_info.medium_compat_version_ = ObMediumCompactionInfo::MEDIUM_COMPAT_VERSION_V2;
medium_info.medium_compat_version_ = compat_version;
}
int construct_array(
const char *snapshot_list,
ObMediumListChecker::MediumInfoArray &array,
const int64_t medium_compat_version = ObMediumCompactionInfo::MEDIUM_COMPAT_VERSION_V2,
const int64_t last_medium_scn_of_first_medium_info = 1,
const int64_t cluster_id = INIT_CLUSTER_ID);
int construct_array(
@ -90,6 +91,7 @@ int TestMediumListChecker::construct_array(
int TestMediumListChecker::construct_array(
const char *snapshot_list,
ObMediumListChecker::MediumInfoArray &input_array,
const int64_t medium_compat_version,
const int64_t last_medium_scn_of_first_medium_info,
const int64_t cluster_id)
{
@ -97,7 +99,7 @@ int TestMediumListChecker::construct_array(
construct_array(snapshot_list, array_);
OB_ASSERT(array_.count() <= ARRAY_SIZE);
for (int i = 0; OB_SUCC(ret) && i < array_.count(); ++i) {
set_basic_info(medium_info_array_[i]);
set_basic_info(medium_info_array_[i], medium_compat_version);
medium_info_array_[i].medium_snapshot_ = array_.at(i);
medium_info_array_[i].last_medium_snapshot_ = (i > 0 ? array_.at(i - 1) : last_medium_scn_of_first_medium_info);
ret = input_array.push_back(&medium_info_array_[i]);
@ -109,30 +111,34 @@ int TestMediumListChecker::construct_array(
TEST_F(TestMediumListChecker, test_validate_medium_info_list)
{
int ret = OB_SUCCESS;
ObExtraMediumInfo extra_info;
extra_info.last_medium_scn_ = 100;
ObSEArray<compaction::ObMediumCompactionInfo*, 10> array;
int64_t medium_compat_version = ObMediumCompactionInfo::MEDIUM_COMPAT_VERSION_V2;
for ( ; medium_compat_version <= ObMediumCompactionInfo::MEDIUM_COMPAT_VERSION_LATEST; medium_compat_version++) {
COMMON_LOG(INFO, "Start test for compat version", K(medium_compat_version));
ObExtraMediumInfo extra_info;
extra_info.last_medium_scn_ = 100;
ObSEArray<compaction::ObMediumCompactionInfo*, 10> array;
ASSERT_EQ(OB_SUCCESS, construct_array("300, 400, 500", array, 10/*last_medium_scn_of_first_medium_info*/));
ret = ObMediumListChecker::validate_medium_info_list(extra_info, &array, 100/*last_major_snapshot*/);
ASSERT_EQ(OB_ERR_UNEXPECTED, ret);
ASSERT_EQ(OB_SUCCESS, construct_array("300, 400, 500", array, medium_compat_version, 10/*last_medium_scn_of_first_medium_info*/));
ret = ObMediumListChecker::validate_medium_info_list(extra_info, &array, 100/*last_major_snapshot*/);
ASSERT_EQ(OB_ERR_UNEXPECTED, ret);
array.reset();
ASSERT_EQ(OB_SUCCESS, construct_array("200, 400, 500", array, 100/*last_medium_scn_of_first_medium_info*/));
ret = ObMediumListChecker::validate_medium_info_list(extra_info, &array, 50/*last_major_snapshot*/);
ASSERT_EQ(OB_ERR_UNEXPECTED, ret);
array.reset();
ASSERT_EQ(OB_SUCCESS, construct_array("200, 400, 500", array, medium_compat_version, 100/*last_medium_scn_of_first_medium_info*/));
ret = ObMediumListChecker::validate_medium_info_list(extra_info, &array, 50/*last_major_snapshot*/);
ASSERT_EQ(OB_ERR_UNEXPECTED, ret);
ret = ObMediumListChecker::validate_medium_info_list(extra_info, &array, 100/*last_major_snapshot*/);
ASSERT_EQ(OB_SUCCESS, ret);
ret = ObMediumListChecker::validate_medium_info_list(extra_info, &array, 100/*last_major_snapshot*/);
ASSERT_EQ(OB_SUCCESS, ret);
extra_info.last_medium_scn_ = 1000;
ret = ObMediumListChecker::validate_medium_info_list(extra_info, &array, 1000/*last_major_snapshot*/);
ASSERT_EQ(OB_SUCCESS, ret);
extra_info.last_medium_scn_ = 1000;
ret = ObMediumListChecker::validate_medium_info_list(extra_info, &array, 1000/*last_major_snapshot*/);
ASSERT_EQ(OB_SUCCESS, ret);
// push item without clear array
ASSERT_EQ(OB_SUCCESS, construct_array("900", array, 700/*last_medium_scn_of_first_medium_info*/));
ret = ObMediumListChecker::check_continue(array);
ASSERT_EQ(OB_ERR_UNEXPECTED, ret);
// push item without clear array
ASSERT_EQ(OB_SUCCESS, construct_array("900", array, medium_compat_version, 700/*last_medium_scn_of_first_medium_info*/));
ret = ObMediumListChecker::check_continue(array);
ASSERT_EQ(OB_ERR_UNEXPECTED, ret);
}
}
TEST_F(TestMediumListChecker, test_check_extra_info)
@ -160,40 +166,48 @@ TEST_F(TestMediumListChecker, test_check_extra_info)
TEST_F(TestMediumListChecker, test_check_next_schedule_medium)
{
int ret = OB_SUCCESS;
ObMediumCompactionInfo medium_info;
set_basic_info(medium_info);
medium_info.medium_snapshot_ = 130;
medium_info.last_medium_snapshot_ = 100;
int64_t medium_compat_version = ObMediumCompactionInfo::MEDIUM_COMPAT_VERSION_V2;
for ( ; medium_compat_version <= ObMediumCompactionInfo::MEDIUM_COMPAT_VERSION_LATEST; medium_compat_version++) {
COMMON_LOG(INFO, "Start test for compat version", K(medium_compat_version));
ObMediumCompactionInfo medium_info;
set_basic_info(medium_info, medium_compat_version);
medium_info.medium_snapshot_ = 130;
medium_info.last_medium_snapshot_ = 100;
ret = ObMediumListChecker::check_next_schedule_medium(&medium_info, 100/*last_major_snapshot*/);
ASSERT_EQ(OB_SUCCESS, ret);
ret = ObMediumListChecker::check_next_schedule_medium(&medium_info, 100/*last_major_snapshot*/);
ASSERT_EQ(OB_SUCCESS, ret);
ret = ObMediumListChecker::check_next_schedule_medium(&medium_info, 120/*last_major_snapshot*/);
ASSERT_EQ(OB_ERR_UNEXPECTED, ret);
ret = ObMediumListChecker::check_next_schedule_medium(&medium_info, 120/*last_major_snapshot*/);
ASSERT_EQ(OB_ERR_UNEXPECTED, ret);
// medium from different cluster
medium_info.cluster_id_ = OTHER_CLUSTER_ID;
ret = ObMediumListChecker::check_next_schedule_medium(&medium_info, 50/*last_major_snapshot*/);
ASSERT_EQ(OB_SUCCESS, ret);
// medium from different cluster
medium_info.cluster_id_ = OTHER_CLUSTER_ID;
ret = ObMediumListChecker::check_next_schedule_medium(&medium_info, 50/*last_major_snapshot*/);
ASSERT_EQ(OB_SUCCESS, ret);
}
}
TEST_F(TestMediumListChecker, test_filter_finish_medium_info)
{
int ret = OB_SUCCESS;
ObSEArray<compaction::ObMediumCompactionInfo*, 10> array;
ASSERT_EQ(OB_SUCCESS, construct_array("300, 400, 500", array));
int64_t next_medium_info_idx = 0;
ret = ObMediumListChecker::filter_finish_medium_info(array, 100, next_medium_info_idx);
ASSERT_EQ(OB_SUCCESS, ret);
ASSERT_EQ(next_medium_info_idx, 0);
int64_t medium_compat_version = ObMediumCompactionInfo::MEDIUM_COMPAT_VERSION_V2;
for ( ; medium_compat_version <= ObMediumCompactionInfo::MEDIUM_COMPAT_VERSION_LATEST; medium_compat_version++) {
COMMON_LOG(INFO, "Start test for compat version", K(medium_compat_version));
ObSEArray<compaction::ObMediumCompactionInfo*, 10> array;
ASSERT_EQ(OB_SUCCESS, construct_array("300, 400, 500", array, medium_compat_version));
int64_t next_medium_info_idx = 0;
ret = ObMediumListChecker::filter_finish_medium_info(array, 100, next_medium_info_idx);
ASSERT_EQ(OB_SUCCESS, ret);
ASSERT_EQ(next_medium_info_idx, 0);
ret = ObMediumListChecker::filter_finish_medium_info(array, 500, next_medium_info_idx);
ASSERT_EQ(OB_SUCCESS, ret);
ASSERT_EQ(next_medium_info_idx, 3);
ret = ObMediumListChecker::filter_finish_medium_info(array, 500, next_medium_info_idx);
ASSERT_EQ(OB_SUCCESS, ret);
ASSERT_EQ(next_medium_info_idx, 3);
ret = ObMediumListChecker::filter_finish_medium_info(array, 400, next_medium_info_idx);
ASSERT_EQ(OB_SUCCESS, ret);
ASSERT_EQ(next_medium_info_idx, 2);
ret = ObMediumListChecker::filter_finish_medium_info(array, 400, next_medium_info_idx);
ASSERT_EQ(OB_SUCCESS, ret);
ASSERT_EQ(next_medium_info_idx, 2);
}
}
}//end namespace unittest