|
|
|
@ -190,6 +190,27 @@ int ObMediumCompactionScheduleFunc::schedule_next_medium_primary_cluster(
|
|
|
|
return ret;
|
|
|
|
return ret;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
int ObMediumCompactionScheduleFunc::get_max_reserved_snapshot(int64_t &max_reserved_snapshot)
|
|
|
|
|
|
|
|
{
|
|
|
|
|
|
|
|
int ret = OB_SUCCESS;
|
|
|
|
|
|
|
|
max_reserved_snapshot = INT64_MAX;
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
int64_t max_merged_snapshot = 0;
|
|
|
|
|
|
|
|
int64_t min_reserved_snapshot = INT64_MAX;
|
|
|
|
|
|
|
|
const ObTabletTableStore &table_store = tablet_.get_table_store();
|
|
|
|
|
|
|
|
if (0 == table_store.get_major_sstables().count()) {
|
|
|
|
|
|
|
|
ret = OB_ERR_UNEXPECTED;
|
|
|
|
|
|
|
|
LOG_WARN("major sstable should not be empty", K(ret), K(tablet_));
|
|
|
|
|
|
|
|
} else if (FALSE_IT(max_merged_snapshot = table_store.get_major_sstables().get_boundary_table(true/*last*/)->get_snapshot_version())) {
|
|
|
|
|
|
|
|
} else if (OB_FAIL(MTL(ObTenantFreezeInfoMgr*)->get_min_reserved_snapshot(
|
|
|
|
|
|
|
|
tablet_.get_tablet_meta().tablet_id_, max_merged_snapshot, min_reserved_snapshot))) {
|
|
|
|
|
|
|
|
LOG_WARN("failed to get multi version from freeze info mgr", K(ret), K(table_id));
|
|
|
|
|
|
|
|
} else {
|
|
|
|
|
|
|
|
max_reserved_snapshot = MAX(ls_.get_min_reserved_snapshot(), min_reserved_snapshot);
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
return ret;
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
int ObMediumCompactionScheduleFunc::decide_medium_snapshot(
|
|
|
|
int ObMediumCompactionScheduleFunc::decide_medium_snapshot(
|
|
|
|
const int64_t schedule_medium_snapshot,
|
|
|
|
const int64_t schedule_medium_snapshot,
|
|
|
|
const ObAdaptiveMergePolicy::AdaptiveMergeReason merge_reason)
|
|
|
|
const ObAdaptiveMergePolicy::AdaptiveMergeReason merge_reason)
|
|
|
|
@ -205,7 +226,7 @@ int ObMediumCompactionScheduleFunc::decide_medium_snapshot(
|
|
|
|
LOG_WARN("failed to add dependent tablet", K(ret), KPC(this));
|
|
|
|
LOG_WARN("failed to add dependent tablet", K(ret), KPC(this));
|
|
|
|
} else {
|
|
|
|
} else {
|
|
|
|
const bool is_major = (0 != schedule_medium_snapshot);
|
|
|
|
const bool is_major = (0 != schedule_medium_snapshot);
|
|
|
|
int64_t multi_version_start = 0;
|
|
|
|
int64_t max_reserved_snapshot = 0;
|
|
|
|
ObGetMergeTablesResult result;
|
|
|
|
ObGetMergeTablesResult result;
|
|
|
|
ObMediumCompactionInfo medium_info;
|
|
|
|
ObMediumCompactionInfo medium_info;
|
|
|
|
uint64_t compat_version = 0;
|
|
|
|
uint64_t compat_version = 0;
|
|
|
|
@ -223,27 +244,27 @@ int ObMediumCompactionScheduleFunc::decide_medium_snapshot(
|
|
|
|
// do nothing
|
|
|
|
// do nothing
|
|
|
|
} else if (medium_info.medium_snapshot_ <= max_sync_medium_scn) {
|
|
|
|
} else if (medium_info.medium_snapshot_ <= max_sync_medium_scn) {
|
|
|
|
ret = OB_NO_NEED_MERGE;
|
|
|
|
ret = OB_NO_NEED_MERGE;
|
|
|
|
} else if (OB_FAIL(ObTablet::get_kept_multi_version_start(ls_, tablet_, multi_version_start))) {
|
|
|
|
} else if (OB_FAIL(get_max_reserved_snapshot(max_reserved_snapshot))) {
|
|
|
|
LOG_WARN("failed to get multi_version_start", K(ret), KPC(this));
|
|
|
|
LOG_WARN("failed to get multi_version_start", K(ret), KPC(this));
|
|
|
|
} else if (medium_info.medium_snapshot_ < multi_version_start) {
|
|
|
|
} else if (medium_info.medium_snapshot_ < max_reserved_snapshot) {
|
|
|
|
// chosen medium snapshot is far too old
|
|
|
|
// chosen medium snapshot is far too old
|
|
|
|
LOG_INFO("chosen medium snapshot is invalid for multi_version_start", K(ret), KPC(this),
|
|
|
|
LOG_INFO("chosen medium snapshot is invalid for multi_version_start", K(ret), KPC(this),
|
|
|
|
K(medium_info), K(multi_version_start));
|
|
|
|
K(medium_info), K(max_reserved_snapshot));
|
|
|
|
const share::SCN &weak_read_ts = ls_.get_ls_wrs_handler()->get_ls_weak_read_ts();
|
|
|
|
const share::SCN &weak_read_ts = ls_.get_ls_wrs_handler()->get_ls_weak_read_ts();
|
|
|
|
if (medium_info.medium_snapshot_ == tablet_.get_snapshot_version() // no uncommitted sstable
|
|
|
|
if (medium_info.medium_snapshot_ == tablet_.get_snapshot_version() // no uncommitted sstable
|
|
|
|
&& weak_read_ts.get_val_for_tx() <= multi_version_start
|
|
|
|
&& weak_read_ts.get_val_for_tx() <= max_reserved_snapshot
|
|
|
|
&& weak_read_ts.get_val_for_tx() + DEFAULT_SCHEDULE_MEDIUM_INTERVAL < ObTimeUtility::current_time_ns()) {
|
|
|
|
&& weak_read_ts.get_val_for_tx() + DEFAULT_SCHEDULE_MEDIUM_INTERVAL < ObTimeUtility::current_time_ns()) {
|
|
|
|
medium_info.medium_snapshot_ = weak_read_ts.get_val_for_tx();
|
|
|
|
medium_info.medium_snapshot_ = weak_read_ts.get_val_for_tx();
|
|
|
|
LOG_INFO("use weak_read_ts to schedule medium", K(ret), KPC(this),
|
|
|
|
LOG_INFO("use weak_read_ts to schedule medium", K(ret), KPC(this),
|
|
|
|
K(medium_info), K(multi_version_start), K(weak_read_ts));
|
|
|
|
K(medium_info), K(max_reserved_snapshot), K(weak_read_ts));
|
|
|
|
} else {
|
|
|
|
} else {
|
|
|
|
ret = OB_NO_NEED_MERGE;
|
|
|
|
ret = OB_NO_NEED_MERGE;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
if (OB_SUCC(ret) && !is_major) {
|
|
|
|
if (OB_SUCC(ret) && !is_major) {
|
|
|
|
const int64_t current_time = ObTimeUtility::current_time_ns();
|
|
|
|
const int64_t current_time = ObTimeUtility::current_time_ns();
|
|
|
|
if (multi_version_start < current_time) {
|
|
|
|
if (max_reserved_snapshot < current_time) {
|
|
|
|
const int64_t time_interval = (current_time - multi_version_start) / 2;
|
|
|
|
const int64_t time_interval = (current_time - max_reserved_snapshot) / 2;
|
|
|
|
ObSSTable *table = static_cast<ObSSTable *>(tablet_.get_table_store().get_major_sstables().get_boundary_table(true/*last*/));
|
|
|
|
ObSSTable *table = static_cast<ObSSTable *>(tablet_.get_table_store().get_major_sstables().get_boundary_table(true/*last*/));
|
|
|
|
if (OB_ISNULL(table)) {
|
|
|
|
if (OB_ISNULL(table)) {
|
|
|
|
ret = OB_ERR_UNEXPECTED;
|
|
|
|
ret = OB_ERR_UNEXPECTED;
|
|
|
|
|