remove redundant code and del lambda
This commit is contained in:
@ -37,11 +37,6 @@ using namespace common;
|
||||
namespace compaction
|
||||
{
|
||||
|
||||
ObMediumCompactionScheduleFunc::ChooseMediumScn ObMediumCompactionScheduleFunc::choose_medium_scn[MEDIUM_FUNC_CNT]
|
||||
= { ObMediumCompactionScheduleFunc::choose_medium_snapshot,
|
||||
ObMediumCompactionScheduleFunc::choose_major_snapshot,
|
||||
};
|
||||
|
||||
int64_t ObMediumCompactionScheduleFunc::to_string(char *buf, const int64_t buf_len) const
|
||||
{
|
||||
int64_t pos = 0;
|
||||
@ -61,48 +56,58 @@ int64_t ObMediumCompactionScheduleFunc::to_string(char *buf, const int64_t buf_l
|
||||
}
|
||||
|
||||
int ObMediumCompactionScheduleFunc::choose_medium_snapshot(
|
||||
const ObMediumCompactionScheduleFunc &func,
|
||||
ObLS &ls,
|
||||
ObTablet &tablet,
|
||||
const ObAdaptiveMergePolicy::AdaptiveMergeReason &merge_reason,
|
||||
ObArenaAllocator &allocator,
|
||||
const int64_t max_sync_medium_scn,
|
||||
ObMediumCompactionInfo &medium_info,
|
||||
ObGetMergeTablesResult &result,
|
||||
int64_t &schema_version)
|
||||
{
|
||||
UNUSEDx(func, allocator, schema_version);
|
||||
int ret = OB_SUCCESS;
|
||||
ObGetMergeTablesParam param;
|
||||
param.merge_type_ = MEDIUM_MERGE;
|
||||
if (OB_FAIL(ObAdaptiveMergePolicy::get_meta_merge_tables(
|
||||
param,
|
||||
ls,
|
||||
tablet,
|
||||
result))) {
|
||||
int64_t max_reserved_snapshot = 0;
|
||||
int64_t medium_snapshot = 0;
|
||||
ObTablet &tablet = *tablet_handle_.get_obj();
|
||||
if (OB_FAIL(ObAdaptiveMergePolicy::get_meta_merge_tables(param, ls_, tablet, result))) {
|
||||
if (OB_NO_NEED_MERGE != ret) {
|
||||
LOG_WARN("failed to get meta merge tables", K(ret), K(param));
|
||||
}
|
||||
} else if (FALSE_IT(medium_snapshot = result.version_range_.snapshot_version_)) {
|
||||
} else if (OB_FAIL(get_max_reserved_snapshot(max_reserved_snapshot))) {
|
||||
LOG_WARN("failed to get reserved snapshot", K(ret), KPC(this));
|
||||
} else if (medium_info.medium_snapshot_ < max_reserved_snapshot
|
||||
|| medium_info.medium_snapshot_ > tablet.get_snapshot_version()) {
|
||||
// chosen medium snapshot is far too old
|
||||
if (OB_FAIL(choose_new_medium_snapshot(max_reserved_snapshot, medium_info, result, schema_version))) {
|
||||
LOG_WARN("failed to choose new medium snapshot", KR(ret), K(medium_info));
|
||||
}
|
||||
} else if (OB_FAIL(tablet.get_schema_version_from_storage_schema(schema_version))) {
|
||||
LOG_WARN("failed to get schema version from tablet", KR(ret), K(tablet));
|
||||
}
|
||||
if (OB_FAIL(ret)) {
|
||||
} else if (medium_info.medium_snapshot_ <= max_sync_medium_scn) {
|
||||
ret = OB_NO_NEED_MERGE;
|
||||
} else if (OB_FAIL(check_frequency(max_reserved_snapshot, medium_snapshot))) { // check schedule interval
|
||||
if (OB_NO_NEED_MERGE != ret) {
|
||||
LOG_WARN("failed to check medium scn valid", K(ret), KPC(this));
|
||||
}
|
||||
} else {
|
||||
medium_info.set_basic_info(
|
||||
ObMediumCompactionInfo::MEDIUM_COMPACTION,
|
||||
merge_reason,
|
||||
result.version_range_.snapshot_version_);
|
||||
LOG_TRACE("choose_medium_snapshot", K(ret), "ls_id", ls.get_ls_id(),
|
||||
"tablet_id", tablet.get_tablet_meta().tablet_id_, K(result), K(medium_info));
|
||||
merge_reason_,
|
||||
medium_snapshot);
|
||||
LOG_TRACE("choose_medium_snapshot", K(ret), KPC(this), K(result), K(medium_info));
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObMediumCompactionScheduleFunc::find_valid_freeze_info(
|
||||
ObTablet &tablet,
|
||||
ObArenaAllocator &allocator,
|
||||
ObMediumCompactionInfo &medium_info,
|
||||
share::ObFreezeInfo &freeze_info,
|
||||
bool &force_schedule_medium_merge)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
force_schedule_medium_merge = false;
|
||||
|
||||
ObTablet &tablet = *tablet_handle_.get_obj();
|
||||
const ObTabletID &tablet_id = tablet.get_tablet_meta().tablet_id_;
|
||||
int64_t schedule_snapshot = 0;
|
||||
bool schedule_with_newer_info = false;
|
||||
@ -143,7 +148,7 @@ int ObMediumCompactionScheduleFunc::find_valid_freeze_info(
|
||||
K(tablet_id), K(last_sstable_schema_version), K(freeze_info));
|
||||
break;
|
||||
} else if (OB_FAIL(get_table_schema_to_merge(
|
||||
*schema_service, tablet, freeze_info.schema_version_, medium_info.medium_compat_version_, allocator, medium_info.storage_schema_))) {
|
||||
*schema_service, tablet, freeze_info.schema_version_, medium_info.medium_compat_version_, allocator_, medium_info.storage_schema_))) {
|
||||
if (OB_TABLE_IS_DELETED == ret) {
|
||||
// do nothing, end loop
|
||||
} else if (OB_ERR_SCHEMA_HISTORY_EMPTY == ret) {
|
||||
@ -169,27 +174,22 @@ int ObMediumCompactionScheduleFunc::find_valid_freeze_info(
|
||||
}
|
||||
|
||||
int ObMediumCompactionScheduleFunc::choose_major_snapshot(
|
||||
const ObMediumCompactionScheduleFunc &func,
|
||||
ObLS &ls,
|
||||
ObTablet &tablet,
|
||||
const ObAdaptiveMergePolicy::AdaptiveMergeReason &merge_reason,
|
||||
ObArenaAllocator &allocator,
|
||||
const int64_t max_sync_medium_scn,
|
||||
ObMediumCompactionInfo &medium_info,
|
||||
ObGetMergeTablesResult &result,
|
||||
int64_t &schema_version)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
const ObLSID &ls_id = ls.get_ls_id();
|
||||
const ObTabletID &tablet_id = tablet.get_tablet_meta().tablet_id_;
|
||||
ObTablet &tablet = *tablet_handle_.get_obj();
|
||||
share::ObFreezeInfo freeze_info;
|
||||
bool force_schedule_medium_merge = false;
|
||||
|
||||
if (OB_FAIL(find_valid_freeze_info(tablet, allocator, medium_info, freeze_info, force_schedule_medium_merge))) {
|
||||
if (OB_FAIL(find_valid_freeze_info(medium_info, freeze_info, force_schedule_medium_merge))) {
|
||||
LOG_WARN("failed to find valid freeze info", KR(ret));
|
||||
} else if (force_schedule_medium_merge) {
|
||||
if (OB_FAIL(switch_to_choose_medium_snapshot(func, allocator, ls, tablet, freeze_info.frozen_scn_.get_val_for_tx(), medium_info, schema_version))) {
|
||||
if (OB_FAIL(switch_to_choose_medium_snapshot(freeze_info.frozen_scn_.get_val_for_tx(), medium_info, schema_version))) {
|
||||
if (OB_EAGAIN != ret) {
|
||||
LOG_WARN("failed to switch to choose medium snapshot", K(ret), K(tablet));
|
||||
LOG_WARN("failed to switch to choose medium snapshot", K(ret), KPC(this));
|
||||
}
|
||||
}
|
||||
} else {
|
||||
@ -203,8 +203,7 @@ int ObMediumCompactionScheduleFunc::choose_major_snapshot(
|
||||
if (FAILEDx(ObPartitionMergePolicy::get_result_by_snapshot(tablet, medium_info.medium_snapshot_, result))) {
|
||||
LOG_WARN("failed get result for major", K(ret), K(medium_info));
|
||||
} else {
|
||||
LOG_TRACE("choose_major_snapshot", K(ret), "ls_id", ls.get_ls_id(),
|
||||
"tablet_id", tablet.get_tablet_meta().tablet_id_, K(medium_info), K(freeze_info), K(result),
|
||||
LOG_TRACE("choose_major_snapshot", K(ret), KPC(this), K(medium_info), K(freeze_info), K(result),
|
||||
K(medium_info), K(schema_version));
|
||||
#ifdef ERRSIM
|
||||
if (tablet.get_tablet_meta().tablet_id_.id() == 1) {
|
||||
@ -213,8 +212,7 @@ int ObMediumCompactionScheduleFunc::choose_major_snapshot(
|
||||
ret = OB_SUCCESS;
|
||||
medium_info.compaction_type_ = ObMediumCompactionInfo::MEDIUM_COMPACTION;
|
||||
medium_info.medium_snapshot_ += 100;
|
||||
FLOG_INFO("ERRSIM EN_SPECIAL_TABLE_HAVE_LARGER_SCN", "ls_id", ls.get_ls_id(),
|
||||
"tablet_id", tablet.get_tablet_meta().tablet_id_,K(medium_info));
|
||||
FLOG_INFO("ERRSIM EN_SPECIAL_TABLE_HAVE_LARGER_SCN", KPC(this),K(medium_info));
|
||||
}
|
||||
}
|
||||
#endif
|
||||
@ -223,10 +221,6 @@ int ObMediumCompactionScheduleFunc::choose_major_snapshot(
|
||||
}
|
||||
|
||||
int ObMediumCompactionScheduleFunc::switch_to_choose_medium_snapshot(
|
||||
const ObMediumCompactionScheduleFunc &func,
|
||||
ObArenaAllocator &allocator,
|
||||
ObLS &ls,
|
||||
ObTablet &tablet,
|
||||
const int64_t freeze_version,
|
||||
ObMediumCompactionInfo &medium_info,
|
||||
int64_t &schema_version)
|
||||
@ -234,12 +228,12 @@ int ObMediumCompactionScheduleFunc::switch_to_choose_medium_snapshot(
|
||||
int ret = OB_SUCCESS;
|
||||
int64_t medium_snapshot = 0;
|
||||
|
||||
if (func.weak_read_ts_ < freeze_version + 1) {
|
||||
if (weak_read_ts_ < freeze_version + 1) {
|
||||
ret = OB_EAGAIN;
|
||||
LOG_WARN("weak read ts is smaller than new medium snapshot, try later", K(ret), K(tablet));
|
||||
} else if (FALSE_IT(medium_snapshot = MAX(func.weak_read_ts_, freeze_version + 1))) {
|
||||
} else if (OB_FAIL(tablet.get_newest_schema_version(schema_version))) {
|
||||
LOG_WARN("fail to choose medium schema version", K(ret), K(tablet));
|
||||
LOG_WARN("weak read ts is smaller than new medium snapshot, try later", K(ret), KPC(this), K(freeze_version));
|
||||
} else if (FALSE_IT(medium_snapshot = MAX(weak_read_ts_, freeze_version + 1))) {
|
||||
} else if (OB_FAIL(tablet_handle_.get_obj()->get_newest_schema_version(schema_version))) {
|
||||
LOG_WARN("fail to choose medium schema version", K(ret), KPC(this));
|
||||
} else {
|
||||
medium_info.set_basic_info(
|
||||
ObMediumCompactionInfo::MEDIUM_COMPACTION,
|
||||
@ -275,8 +269,7 @@ int ObMediumCompactionScheduleFunc::get_status_from_inner_table(
|
||||
|
||||
// cal this func with PLAF LEADER ROLE && last_medium_scn_ = 0
|
||||
int ObMediumCompactionScheduleFunc::schedule_next_medium_for_leader(
|
||||
const int64_t major_snapshot,
|
||||
const bool force_schedule)
|
||||
const int64_t major_snapshot)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
ObRole role = INVALID_ROLE;
|
||||
@ -298,7 +291,7 @@ int ObMediumCompactionScheduleFunc::schedule_next_medium_for_leader(
|
||||
}
|
||||
}
|
||||
#endif
|
||||
ret = schedule_next_medium_primary_cluster(major_snapshot, force_schedule);
|
||||
ret = schedule_next_medium_primary_cluster(major_snapshot);
|
||||
} else {
|
||||
LOG_TRACE("not leader", K(ret), K(role), K(ls_.get_ls_id()));
|
||||
}
|
||||
@ -306,9 +299,7 @@ int ObMediumCompactionScheduleFunc::schedule_next_medium_for_leader(
|
||||
}
|
||||
|
||||
int ObMediumCompactionScheduleFunc::get_adaptive_reason(
|
||||
const int64_t schedule_major_snapshot,
|
||||
const bool force_schedule,
|
||||
ObAdaptiveMergePolicy::AdaptiveMergeReason &adaptive_merge_reason)
|
||||
const int64_t schedule_major_snapshot)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
int64_t max_sync_medium_scn = 0;
|
||||
@ -317,10 +308,8 @@ int ObMediumCompactionScheduleFunc::get_adaptive_reason(
|
||||
*tablet, *medium_info_list_, max_sync_medium_scn))) {
|
||||
LOG_WARN("failed to get max received medium scn", KR(ret), KPC(this));
|
||||
} else if (schedule_major_snapshot > max_sync_medium_scn) {
|
||||
adaptive_merge_reason = ObAdaptiveMergePolicy::AdaptiveMergeReason::TENANT_MAJOR;
|
||||
} else if (force_schedule) {
|
||||
adaptive_merge_reason = is_rebuild_column_group_ ? ObAdaptiveMergePolicy::REBUILD_COLUMN_GROUP : ObAdaptiveMergePolicy::USER_REQUEST;
|
||||
} else if (OB_FAIL(ObAdaptiveMergePolicy::get_adaptive_merge_reason(*tablet, adaptive_merge_reason))) {
|
||||
merge_reason_ = ObAdaptiveMergePolicy::AdaptiveMergeReason::TENANT_MAJOR;
|
||||
} else if (OB_FAIL(ObAdaptiveMergePolicy::get_adaptive_merge_reason(*tablet, merge_reason_))) {
|
||||
if (OB_HASH_NOT_EXIST != ret) {
|
||||
LOG_WARN("failed to get meta merge priority", K(ret), KPC(this));
|
||||
} else {
|
||||
@ -335,7 +324,7 @@ int ObMediumCompactionScheduleFunc::get_adaptive_reason(
|
||||
if (OB_FAIL(ret)) {
|
||||
FLOG_INFO("errsim EN_SCHEDULE_MEDIUM_COMPACTION", KPC(this));
|
||||
ret = OB_SUCCESS;
|
||||
adaptive_merge_reason = ObAdaptiveMergePolicy::AdaptiveMergeReason::LOAD_DATA_SCENE;
|
||||
merge_reason_ = ObAdaptiveMergePolicy::AdaptiveMergeReason::LOAD_DATA_SCENE;
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -344,11 +333,9 @@ int ObMediumCompactionScheduleFunc::get_adaptive_reason(
|
||||
}
|
||||
|
||||
int ObMediumCompactionScheduleFunc::schedule_next_medium_primary_cluster(
|
||||
const int64_t schedule_major_snapshot,
|
||||
const bool force_schedule)
|
||||
const int64_t schedule_major_snapshot)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
ObAdaptiveMergePolicy::AdaptiveMergeReason adaptive_merge_reason = ObAdaptiveMergePolicy::AdaptiveMergeReason::NONE;
|
||||
ObTabletCompactionScnInfo ret_info;
|
||||
// check last medium type, select inner table for last major
|
||||
bool schedule_medium_flag = false;
|
||||
@ -368,13 +355,14 @@ int ObMediumCompactionScheduleFunc::schedule_next_medium_primary_cluster(
|
||||
LOG_WARN("medium info list is unexpected null", K(ret), KPC(this), KPC_(medium_info_list));
|
||||
} else if (!medium_info_list_->could_schedule_next_round(last_major_snapshot_version)) { // check serialized list
|
||||
// do nothing
|
||||
} else if (OB_FAIL(get_adaptive_reason(schedule_major_snapshot, force_schedule, adaptive_merge_reason))) {
|
||||
} else if (!ObAdaptiveMergePolicy::is_valid_merge_reason(merge_reason_)
|
||||
&& OB_FAIL(get_adaptive_reason(schedule_major_snapshot))) {
|
||||
LOG_WARN("failed to get adaptive reason", KR(ret), K(schedule_major_snapshot));
|
||||
} else if (ObAdaptiveMergePolicy::is_valid_merge_reason(adaptive_merge_reason)) {
|
||||
} else if (ObAdaptiveMergePolicy::is_valid_merge_reason(merge_reason_)) {
|
||||
schedule_medium_flag = true;
|
||||
}
|
||||
LOG_TRACE("schedule next medium in primary cluster", K(ret), KPC(this), K(schedule_medium_flag),
|
||||
K(schedule_major_snapshot), K(adaptive_merge_reason), K(last_major_snapshot_version), KPC_(medium_info_list), K(max_sync_medium_scn));
|
||||
K(schedule_major_snapshot), K(merge_reason_), K(last_major_snapshot_version), KPC_(medium_info_list), K(max_sync_medium_scn));
|
||||
#ifdef ERRSIM
|
||||
if (OB_SUCC(ret)) {
|
||||
if (tablet->get_tablet_meta().tablet_id_.id() > ObTabletID::MIN_USER_TABLET_ID) {
|
||||
@ -407,13 +395,14 @@ int ObMediumCompactionScheduleFunc::schedule_next_medium_primary_cluster(
|
||||
schedule_flag = true;
|
||||
}
|
||||
if (OB_SUCC(ret) && schedule_flag) {
|
||||
ret = decide_medium_snapshot(adaptive_merge_reason);
|
||||
ret = decide_medium_snapshot();
|
||||
}
|
||||
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObMediumCompactionScheduleFunc::choose_medium_scn_for_user_request(
|
||||
int ObMediumCompactionScheduleFunc::choose_scn_for_user_request(
|
||||
const int64_t max_sync_medium_scn,
|
||||
ObMediumCompactionInfo &medium_info,
|
||||
ObGetMergeTablesResult &result,
|
||||
int64_t &schema_version)
|
||||
@ -421,9 +410,7 @@ int ObMediumCompactionScheduleFunc::choose_medium_scn_for_user_request(
|
||||
int ret = OB_SUCCESS;
|
||||
// check exist not finish freeze info
|
||||
schema_version = 0;
|
||||
int64_t max_sync_medium_scn = 0;
|
||||
int64_t max_reserved_snapshot = 0;
|
||||
const share::SCN &weak_read_ts = ls_.get_ls_wrs_handler()->get_ls_weak_read_ts();
|
||||
const int64_t latest_frozen_version = MTL(ObTenantFreezeInfoMgr*)->get_latest_frozen_version();
|
||||
const int64_t last_major_snapshot_version = tablet_handle_.get_obj()->get_last_major_snapshot_version();
|
||||
ObTablet *tablet = nullptr;
|
||||
@ -436,17 +423,15 @@ int ObMediumCompactionScheduleFunc::choose_medium_scn_for_user_request(
|
||||
} else if (latest_frozen_version > last_major_snapshot_version) {
|
||||
ret = OB_NO_NEED_MERGE;
|
||||
LOG_WARN("unfinished freeze info exist, can't schedule another medium", K(ret));
|
||||
} else if (OB_FAIL(tablet->get_max_sync_medium_scn(max_sync_medium_scn))) {
|
||||
LOG_WARN("failed to get max sync medium scn", K(ret), KPC(this));
|
||||
} else if (OB_FAIL(get_max_reserved_snapshot(max_reserved_snapshot))) {
|
||||
LOG_WARN("failed to get reserved snapshot", K(ret), KPC(this));
|
||||
} else if (FALSE_IT(medium_info.medium_snapshot_ = MAX(max_reserved_snapshot, weak_read_ts.get_val_for_tx()))) {
|
||||
} else if (FALSE_IT(medium_info.medium_snapshot_ = MAX(max_reserved_snapshot, weak_read_ts_))) {
|
||||
} else if (medium_info.medium_snapshot_ < max_sync_medium_scn) {
|
||||
ret = OB_NO_NEED_MERGE;
|
||||
LOG_WARN("chosen medium snapshot is synced before", K(ret), K(medium_info), K(max_sync_medium_scn));
|
||||
} else {
|
||||
medium_info.compaction_type_ = ObMediumCompactionInfo::MEDIUM_COMPACTION;
|
||||
medium_info.medium_merge_reason_ = is_rebuild_column_group_ ? ObAdaptiveMergePolicy::REBUILD_COLUMN_GROUP : ObAdaptiveMergePolicy::USER_REQUEST;
|
||||
medium_info.medium_merge_reason_ = merge_reason_;
|
||||
if (OB_FAIL(ObPartitionMergePolicy::get_result_by_snapshot(*tablet, medium_info.medium_snapshot_, result))) {
|
||||
LOG_WARN("failed to get result for major", K(ret), K(last_major_snapshot_version), K(medium_info));
|
||||
} else if (OB_FAIL(tablet->get_newest_schema_version(schema_version))) {
|
||||
@ -461,7 +446,7 @@ int ObMediumCompactionScheduleFunc::choose_medium_scn_for_user_request(
|
||||
|
||||
int ObMediumCompactionScheduleFunc::check_frequency(
|
||||
const int64_t max_reserved_snapshot,
|
||||
ObMediumCompactionInfo &medium_info)
|
||||
const int64_t medium_snapshot)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
ObTablet *tablet = tablet_handle_.get_obj();
|
||||
@ -471,9 +456,9 @@ int ObMediumCompactionScheduleFunc::check_frequency(
|
||||
const int64_t last_major_snapshot_version = tablet->get_last_major_snapshot_version();
|
||||
if (0 >= last_major_snapshot_version) {
|
||||
LOG_WARN("major sstable should not be empty", K(ret), K(last_major_snapshot_version));
|
||||
} else if (last_major_snapshot_version + time_interval > medium_info.medium_snapshot_) {
|
||||
} else if (last_major_snapshot_version + time_interval > medium_snapshot) {
|
||||
ret = OB_NO_NEED_MERGE;
|
||||
LOG_DEBUG("schedule medium frequently", K(ret), K(last_major_snapshot_version), K(medium_info),
|
||||
LOG_DEBUG("schedule medium frequently", K(ret), K(last_major_snapshot_version), K(medium_snapshot),
|
||||
K(time_interval));
|
||||
}
|
||||
}
|
||||
@ -504,6 +489,7 @@ int ObMediumCompactionScheduleFunc::get_max_reserved_snapshot(int64_t &max_reser
|
||||
LOG_WARN("failed to get reserved snapshot from freeze info mgr", K(ret), "tablet_id", tablet->get_tablet_meta().tablet_id_);
|
||||
} else {
|
||||
max_reserved_snapshot = MAX(ls_.get_min_reserved_snapshot(), snapshot_info.snapshot_);
|
||||
LOG_TRACE("get max reserved snapshot", KR(ret), K(max_reserved_snapshot), K(snapshot_info));
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
@ -541,15 +527,13 @@ int ObMediumCompactionScheduleFunc::choose_new_medium_snapshot(
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObMediumCompactionScheduleFunc::decide_medium_snapshot(
|
||||
const ObAdaptiveMergePolicy::AdaptiveMergeReason merge_reason)
|
||||
int ObMediumCompactionScheduleFunc::decide_medium_snapshot()
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
int tmp_ret = OB_SUCCESS;
|
||||
int64_t max_sync_medium_scn = 0;
|
||||
uint64_t compat_version = 0;
|
||||
ObTablet *tablet = nullptr;
|
||||
const bool is_major = (merge_reason == ObAdaptiveMergePolicy::TENANT_MAJOR);
|
||||
if (OB_UNLIKELY(!tablet_handle_.is_valid())) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_WARN("invalid tablet_handle", K(ret), K(tablet_handle_));
|
||||
@ -564,61 +548,31 @@ int ObMediumCompactionScheduleFunc::decide_medium_snapshot(
|
||||
LOG_WARN("failed to add dependent tablet", K(ret), KPC(this));
|
||||
} else {
|
||||
const ObTabletID &tablet_id = tablet->get_tablet_meta().tablet_id_;
|
||||
LOG_TRACE("decide_medium_snapshot", K(ret), KPC(this), K(compat_version), K(tablet_id), K(max_sync_medium_scn), K(merge_reason));
|
||||
int64_t max_reserved_snapshot = 0;
|
||||
LOG_TRACE("decide_medium_snapshot", K(ret), KPC(this), K(compat_version), K(tablet_id), K(max_sync_medium_scn), K_(merge_reason));
|
||||
int64_t schema_version = 0;
|
||||
ObGetMergeTablesResult result;
|
||||
ObMediumCompactionInfo medium_info;
|
||||
|
||||
if (OB_FAIL(medium_info.init_data_version(compat_version))) {
|
||||
LOG_WARN("fail to set data version", K(ret), K(tablet_id), K(compat_version));
|
||||
} else if (is_user_request(merge_reason)) {
|
||||
if (OB_FAIL(choose_medium_scn_for_user_request(medium_info, result, schema_version))) {
|
||||
} else if (is_user_request(merge_reason_)) {
|
||||
if (OB_FAIL(choose_scn_for_user_request(max_sync_medium_scn, medium_info, result, schema_version))) {
|
||||
LOG_WARN("failed to choose medium scn for user request", K(ret), KPC(this));
|
||||
}
|
||||
} else if (OB_FAIL(choose_medium_scn[is_major](*this, ls_, *tablet, merge_reason, allocator_, medium_info, result, schema_version))) {
|
||||
LOG_WARN("failed to choose medium snapshot", K(ret), KPC(this));
|
||||
} else if (medium_info.medium_snapshot_ <= max_sync_medium_scn) {
|
||||
ret = OB_NO_NEED_MERGE;
|
||||
} else if (is_major) {
|
||||
// do nothing
|
||||
} else if (OB_FAIL(get_max_reserved_snapshot(max_reserved_snapshot))) {
|
||||
LOG_WARN("failed to get multi_version_start", K(ret), KPC(this));
|
||||
} else if (medium_info.medium_snapshot_ < max_reserved_snapshot
|
||||
|| medium_info.medium_snapshot_ > tablet->get_snapshot_version()) {
|
||||
// chosen medium snapshot is far too old
|
||||
if (OB_FAIL(choose_new_medium_snapshot(max_reserved_snapshot, medium_info, result, schema_version))) {
|
||||
LOG_WARN("failed to choose new medium snapshot", KR(ret), K(medium_info));
|
||||
} else if (ObAdaptiveMergePolicy::TENANT_MAJOR == merge_reason_) {
|
||||
if (OB_FAIL(choose_major_snapshot(max_sync_medium_scn, medium_info, result, schema_version))) {
|
||||
LOG_WARN("failed to choose medium scn for major", K(ret), KPC(this));
|
||||
}
|
||||
} else if (OB_FAIL(tablet->get_schema_version_from_storage_schema(schema_version))){
|
||||
LOG_WARN("failed to get schema version from tablet", KR(ret), KPC(tablet));
|
||||
} else if (OB_FAIL(choose_medium_snapshot(max_sync_medium_scn, medium_info, result, schema_version))) {
|
||||
LOG_WARN("failed to choose medium scn for medium", K(ret), KPC(this));
|
||||
}
|
||||
if (OB_FAIL(ret)) {
|
||||
} else if (medium_info.medium_snapshot_ <= max_sync_medium_scn) {
|
||||
ret = OB_NO_NEED_MERGE;
|
||||
} else if (is_major || is_user_request(merge_reason)) {
|
||||
// do nothing
|
||||
} else if (OB_FAIL(check_frequency(max_reserved_snapshot, medium_info))) { // check schedule interval
|
||||
if (OB_NO_NEED_MERGE != ret) {
|
||||
LOG_WARN("failed to check medium scn valid", K(ret), KPC(this));
|
||||
}
|
||||
}
|
||||
#ifdef ERRSIM
|
||||
if (OB_SUCC(ret) || OB_NO_NEED_MERGE == ret) {
|
||||
if (tablet_id.id() > ObTabletID::MIN_USER_TABLET_ID) {
|
||||
ret = OB_E(EventTable::EN_SCHEDULE_MEDIUM_COMPACTION) ret;
|
||||
}
|
||||
if (OB_FAIL(ret)) {
|
||||
LOG_INFO("ERRSIM EN_SCHEDULE_MEDIUM_COMPACTION", K(ret), KPC(this));
|
||||
const int64_t snapshot_gc_ts = MTL(ObTenantFreezeInfoMgr*)->get_snapshot_gc_ts();
|
||||
medium_info.medium_snapshot_ = MIN(weak_read_ts_, snapshot_gc_ts);
|
||||
medium_info.compaction_type_ = ObMediumCompactionInfo::MEDIUM_COMPACTION;
|
||||
if (medium_info.medium_snapshot_ > max_sync_medium_scn
|
||||
&& medium_info.medium_snapshot_ >= max_reserved_snapshot) {
|
||||
FLOG_INFO("ERRSIM EN_SCHEDULE_MEDIUM_COMPACTION", KPC(this));
|
||||
ret = OB_SUCCESS;
|
||||
}
|
||||
}
|
||||
ret = errsim_choose_medium_snapshot(max_sync_medium_scn, medium_info);
|
||||
}
|
||||
if (OB_SUCC(ret)) {
|
||||
ret = OB_E(EventTable::EN_SCHEDULE_MEDIUM_FAILED) ret;
|
||||
@ -659,6 +613,34 @@ int ObMediumCompactionScheduleFunc::decide_medium_snapshot(
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObMediumCompactionScheduleFunc::errsim_choose_medium_snapshot(
|
||||
const int64_t max_sync_medium_scn,
|
||||
ObMediumCompactionInfo &medium_info)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
if (tablet_handle_.get_obj()->get_tablet_meta().tablet_id_.id() > ObTabletID::MIN_USER_TABLET_ID) {
|
||||
ret = OB_E(EventTable::EN_SCHEDULE_MEDIUM_COMPACTION) ret;
|
||||
}
|
||||
if (OB_FAIL(ret)) {
|
||||
LOG_INFO("ERRSIM EN_SCHEDULE_MEDIUM_COMPACTION", K(ret), KPC(this));
|
||||
const int64_t snapshot_gc_ts =
|
||||
MTL(ObTenantFreezeInfoMgr *)->get_snapshot_gc_ts();
|
||||
medium_info.medium_snapshot_ = MIN(weak_read_ts_, snapshot_gc_ts);
|
||||
medium_info.compaction_type_ = ObMediumCompactionInfo::MEDIUM_COMPACTION;
|
||||
int64_t max_reserved_snapshot = 0;
|
||||
if (OB_FAIL(get_max_reserved_snapshot(max_reserved_snapshot))) {
|
||||
LOG_WARN("failed to get reserved snapshot", K(ret), KPC(this));
|
||||
} else if (medium_info.medium_snapshot_ > max_sync_medium_scn
|
||||
&& medium_info.medium_snapshot_ >= max_reserved_snapshot) {
|
||||
FLOG_INFO("ERRSIM EN_SCHEDULE_MEDIUM_COMPACTION", KPC(this));
|
||||
ret = OB_SUCCESS;
|
||||
} else {
|
||||
ret = OB_NO_NEED_MERGE;
|
||||
}
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObMediumCompactionScheduleFunc::init_schema_changed(
|
||||
ObMediumCompactionInfo &medium_info)
|
||||
{
|
||||
|
||||
Reference in New Issue
Block a user