remove redundant code and del lambda
This commit is contained in:
parent
225d78f12e
commit
d425027b91
@ -27,6 +27,44 @@ namespace storage
|
||||
/*
|
||||
* ------------------------------- ObColumnIndexArray -------------------------------
|
||||
*/
|
||||
int64_t return_array_cnt(uint32_t schema_rowkey_cnt, const ObFixedMetaObjArray<int32_t> & array)
|
||||
{
|
||||
return array.count();
|
||||
}
|
||||
int64_t return_schema_rowkey_cnt(uint32_t schema_rowkey_cnt, const ObFixedMetaObjArray<int32_t> & array)
|
||||
{
|
||||
return schema_rowkey_cnt;
|
||||
}
|
||||
int32_t return_array_idx_for_memtable(uint32_t schema_rowkey_cnt, uint32_t column_cnt,
|
||||
int64_t idx,
|
||||
const ObFixedMetaObjArray<int32_t> &array)
|
||||
{
|
||||
int32_t ret_val = 0;
|
||||
const int32_t extra_rowkey_cnt = storage::ObMultiVersionRowkeyHelpper::get_extra_rowkey_col_cnt();
|
||||
OB_ASSERT(idx >= 0 && idx < column_cnt);
|
||||
if (idx < schema_rowkey_cnt) {
|
||||
ret_val = idx;
|
||||
} else if (idx < schema_rowkey_cnt + extra_rowkey_cnt) {
|
||||
ret_val = OB_INVALID_INDEX;
|
||||
} else {
|
||||
ret_val = idx - extra_rowkey_cnt;
|
||||
}
|
||||
return ret_val;
|
||||
}
|
||||
int32_t return_idx(uint32_t schema_rowkey_cnt, uint32_t column_cnt,
|
||||
int64_t idx,
|
||||
const ObFixedMetaObjArray<int32_t> &array)
|
||||
{
|
||||
OB_ASSERT(idx >= 0 && idx < column_cnt);
|
||||
return (int32_t)idx;
|
||||
}
|
||||
int32_t return_array_idx(uint32_t schema_rowkey_cnt, uint32_t column_cnt,
|
||||
int64_t idx,
|
||||
const ObFixedMetaObjArray<int32_t> &array)
|
||||
{
|
||||
OB_ASSERT(idx >= 0 && idx < array.count());
|
||||
return array[idx];
|
||||
}
|
||||
ObColumnIndexArray::ObColumnIndexArray(const bool rowkey_mode /* = false*/,
|
||||
const bool for_memtable /* = false*/)
|
||||
: version_(COLUMN_INDEX_ARRAY_VERSION),
|
||||
@ -39,32 +77,14 @@ ObColumnIndexArray::ObColumnIndexArray(const bool rowkey_mode /* = false*/,
|
||||
{
|
||||
if (rowkey_mode) {
|
||||
if (for_memtable) { // no multi_version rowkey in memtable
|
||||
at_func_ =[](uint32_t schema_rowkey_cnt, uint32_t column_cnt, int64_t idx, const ObFixedMetaObjArray<int32_t> &array) -> int32_t {
|
||||
int32_t ret_val = 0;
|
||||
const int32_t extra_rowkey_cnt = storage::ObMultiVersionRowkeyHelpper::get_extra_rowkey_col_cnt();
|
||||
OB_ASSERT(idx >= 0 && idx < column_cnt);
|
||||
if (idx < schema_rowkey_cnt) {
|
||||
ret_val = idx;
|
||||
} else if (idx < schema_rowkey_cnt + extra_rowkey_cnt) {
|
||||
ret_val = OB_INVALID_INDEX;
|
||||
} else {
|
||||
ret_val = idx - extra_rowkey_cnt;
|
||||
}
|
||||
return ret_val;
|
||||
};
|
||||
at_func_ = return_array_idx_for_memtable;
|
||||
} else {
|
||||
at_func_ =[](uint32_t schema_rowkey_cnt, uint32_t column_cnt, int64_t idx, const ObFixedMetaObjArray<int32_t> &) -> int32_t {
|
||||
OB_ASSERT(idx >= 0 && idx < column_cnt);
|
||||
return (int32_t)idx;
|
||||
};
|
||||
at_func_ = return_idx;
|
||||
}
|
||||
count_func_ = [](uint32_t column_cnt, const ObFixedMetaObjArray<int32_t> &) -> int64_t { return column_cnt; };
|
||||
count_func_ = return_schema_rowkey_cnt;
|
||||
} else {
|
||||
at_func_ = [](uint32_t, uint32_t, int64_t idx, const ObFixedMetaObjArray<int32_t> &array) -> int32_t {
|
||||
OB_ASSERT(idx >= 0 && idx < array.count());
|
||||
return array[idx];
|
||||
};
|
||||
count_func_ = [](uint32_t, const ObFixedMetaObjArray<int32_t> &array) -> int64_t { return array.count(); };
|
||||
at_func_ = return_array_idx;
|
||||
count_func_ = return_array_cnt;
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -37,11 +37,6 @@ using namespace common;
|
||||
namespace compaction
|
||||
{
|
||||
|
||||
ObMediumCompactionScheduleFunc::ChooseMediumScn ObMediumCompactionScheduleFunc::choose_medium_scn[MEDIUM_FUNC_CNT]
|
||||
= { ObMediumCompactionScheduleFunc::choose_medium_snapshot,
|
||||
ObMediumCompactionScheduleFunc::choose_major_snapshot,
|
||||
};
|
||||
|
||||
int64_t ObMediumCompactionScheduleFunc::to_string(char *buf, const int64_t buf_len) const
|
||||
{
|
||||
int64_t pos = 0;
|
||||
@ -61,48 +56,58 @@ int64_t ObMediumCompactionScheduleFunc::to_string(char *buf, const int64_t buf_l
|
||||
}
|
||||
|
||||
int ObMediumCompactionScheduleFunc::choose_medium_snapshot(
|
||||
const ObMediumCompactionScheduleFunc &func,
|
||||
ObLS &ls,
|
||||
ObTablet &tablet,
|
||||
const ObAdaptiveMergePolicy::AdaptiveMergeReason &merge_reason,
|
||||
ObArenaAllocator &allocator,
|
||||
const int64_t max_sync_medium_scn,
|
||||
ObMediumCompactionInfo &medium_info,
|
||||
ObGetMergeTablesResult &result,
|
||||
int64_t &schema_version)
|
||||
{
|
||||
UNUSEDx(func, allocator, schema_version);
|
||||
int ret = OB_SUCCESS;
|
||||
ObGetMergeTablesParam param;
|
||||
param.merge_type_ = MEDIUM_MERGE;
|
||||
if (OB_FAIL(ObAdaptiveMergePolicy::get_meta_merge_tables(
|
||||
param,
|
||||
ls,
|
||||
tablet,
|
||||
result))) {
|
||||
int64_t max_reserved_snapshot = 0;
|
||||
int64_t medium_snapshot = 0;
|
||||
ObTablet &tablet = *tablet_handle_.get_obj();
|
||||
if (OB_FAIL(ObAdaptiveMergePolicy::get_meta_merge_tables(param, ls_, tablet, result))) {
|
||||
if (OB_NO_NEED_MERGE != ret) {
|
||||
LOG_WARN("failed to get meta merge tables", K(ret), K(param));
|
||||
}
|
||||
} else if (FALSE_IT(medium_snapshot = result.version_range_.snapshot_version_)) {
|
||||
} else if (OB_FAIL(get_max_reserved_snapshot(max_reserved_snapshot))) {
|
||||
LOG_WARN("failed to get reserved snapshot", K(ret), KPC(this));
|
||||
} else if (medium_info.medium_snapshot_ < max_reserved_snapshot
|
||||
|| medium_info.medium_snapshot_ > tablet.get_snapshot_version()) {
|
||||
// chosen medium snapshot is far too old
|
||||
if (OB_FAIL(choose_new_medium_snapshot(max_reserved_snapshot, medium_info, result, schema_version))) {
|
||||
LOG_WARN("failed to choose new medium snapshot", KR(ret), K(medium_info));
|
||||
}
|
||||
} else if (OB_FAIL(tablet.get_schema_version_from_storage_schema(schema_version))) {
|
||||
LOG_WARN("failed to get schema version from tablet", KR(ret), K(tablet));
|
||||
}
|
||||
if (OB_FAIL(ret)) {
|
||||
} else if (medium_info.medium_snapshot_ <= max_sync_medium_scn) {
|
||||
ret = OB_NO_NEED_MERGE;
|
||||
} else if (OB_FAIL(check_frequency(max_reserved_snapshot, medium_snapshot))) { // check schedule interval
|
||||
if (OB_NO_NEED_MERGE != ret) {
|
||||
LOG_WARN("failed to check medium scn valid", K(ret), KPC(this));
|
||||
}
|
||||
} else {
|
||||
medium_info.set_basic_info(
|
||||
ObMediumCompactionInfo::MEDIUM_COMPACTION,
|
||||
merge_reason,
|
||||
result.version_range_.snapshot_version_);
|
||||
LOG_TRACE("choose_medium_snapshot", K(ret), "ls_id", ls.get_ls_id(),
|
||||
"tablet_id", tablet.get_tablet_meta().tablet_id_, K(result), K(medium_info));
|
||||
merge_reason_,
|
||||
medium_snapshot);
|
||||
LOG_TRACE("choose_medium_snapshot", K(ret), KPC(this), K(result), K(medium_info));
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObMediumCompactionScheduleFunc::find_valid_freeze_info(
|
||||
ObTablet &tablet,
|
||||
ObArenaAllocator &allocator,
|
||||
ObMediumCompactionInfo &medium_info,
|
||||
share::ObFreezeInfo &freeze_info,
|
||||
bool &force_schedule_medium_merge)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
force_schedule_medium_merge = false;
|
||||
|
||||
ObTablet &tablet = *tablet_handle_.get_obj();
|
||||
const ObTabletID &tablet_id = tablet.get_tablet_meta().tablet_id_;
|
||||
int64_t schedule_snapshot = 0;
|
||||
bool schedule_with_newer_info = false;
|
||||
@ -143,7 +148,7 @@ int ObMediumCompactionScheduleFunc::find_valid_freeze_info(
|
||||
K(tablet_id), K(last_sstable_schema_version), K(freeze_info));
|
||||
break;
|
||||
} else if (OB_FAIL(get_table_schema_to_merge(
|
||||
*schema_service, tablet, freeze_info.schema_version_, medium_info.medium_compat_version_, allocator, medium_info.storage_schema_))) {
|
||||
*schema_service, tablet, freeze_info.schema_version_, medium_info.medium_compat_version_, allocator_, medium_info.storage_schema_))) {
|
||||
if (OB_TABLE_IS_DELETED == ret) {
|
||||
// do nothing, end loop
|
||||
} else if (OB_ERR_SCHEMA_HISTORY_EMPTY == ret) {
|
||||
@ -169,27 +174,22 @@ int ObMediumCompactionScheduleFunc::find_valid_freeze_info(
|
||||
}
|
||||
|
||||
int ObMediumCompactionScheduleFunc::choose_major_snapshot(
|
||||
const ObMediumCompactionScheduleFunc &func,
|
||||
ObLS &ls,
|
||||
ObTablet &tablet,
|
||||
const ObAdaptiveMergePolicy::AdaptiveMergeReason &merge_reason,
|
||||
ObArenaAllocator &allocator,
|
||||
const int64_t max_sync_medium_scn,
|
||||
ObMediumCompactionInfo &medium_info,
|
||||
ObGetMergeTablesResult &result,
|
||||
int64_t &schema_version)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
const ObLSID &ls_id = ls.get_ls_id();
|
||||
const ObTabletID &tablet_id = tablet.get_tablet_meta().tablet_id_;
|
||||
ObTablet &tablet = *tablet_handle_.get_obj();
|
||||
share::ObFreezeInfo freeze_info;
|
||||
bool force_schedule_medium_merge = false;
|
||||
|
||||
if (OB_FAIL(find_valid_freeze_info(tablet, allocator, medium_info, freeze_info, force_schedule_medium_merge))) {
|
||||
if (OB_FAIL(find_valid_freeze_info(medium_info, freeze_info, force_schedule_medium_merge))) {
|
||||
LOG_WARN("failed to find valid freeze info", KR(ret));
|
||||
} else if (force_schedule_medium_merge) {
|
||||
if (OB_FAIL(switch_to_choose_medium_snapshot(func, allocator, ls, tablet, freeze_info.frozen_scn_.get_val_for_tx(), medium_info, schema_version))) {
|
||||
if (OB_FAIL(switch_to_choose_medium_snapshot(freeze_info.frozen_scn_.get_val_for_tx(), medium_info, schema_version))) {
|
||||
if (OB_EAGAIN != ret) {
|
||||
LOG_WARN("failed to switch to choose medium snapshot", K(ret), K(tablet));
|
||||
LOG_WARN("failed to switch to choose medium snapshot", K(ret), KPC(this));
|
||||
}
|
||||
}
|
||||
} else {
|
||||
@ -203,8 +203,7 @@ int ObMediumCompactionScheduleFunc::choose_major_snapshot(
|
||||
if (FAILEDx(ObPartitionMergePolicy::get_result_by_snapshot(tablet, medium_info.medium_snapshot_, result))) {
|
||||
LOG_WARN("failed get result for major", K(ret), K(medium_info));
|
||||
} else {
|
||||
LOG_TRACE("choose_major_snapshot", K(ret), "ls_id", ls.get_ls_id(),
|
||||
"tablet_id", tablet.get_tablet_meta().tablet_id_, K(medium_info), K(freeze_info), K(result),
|
||||
LOG_TRACE("choose_major_snapshot", K(ret), KPC(this), K(medium_info), K(freeze_info), K(result),
|
||||
K(medium_info), K(schema_version));
|
||||
#ifdef ERRSIM
|
||||
if (tablet.get_tablet_meta().tablet_id_.id() == 1) {
|
||||
@ -213,8 +212,7 @@ int ObMediumCompactionScheduleFunc::choose_major_snapshot(
|
||||
ret = OB_SUCCESS;
|
||||
medium_info.compaction_type_ = ObMediumCompactionInfo::MEDIUM_COMPACTION;
|
||||
medium_info.medium_snapshot_ += 100;
|
||||
FLOG_INFO("ERRSIM EN_SPECIAL_TABLE_HAVE_LARGER_SCN", "ls_id", ls.get_ls_id(),
|
||||
"tablet_id", tablet.get_tablet_meta().tablet_id_,K(medium_info));
|
||||
FLOG_INFO("ERRSIM EN_SPECIAL_TABLE_HAVE_LARGER_SCN", KPC(this),K(medium_info));
|
||||
}
|
||||
}
|
||||
#endif
|
||||
@ -223,10 +221,6 @@ int ObMediumCompactionScheduleFunc::choose_major_snapshot(
|
||||
}
|
||||
|
||||
int ObMediumCompactionScheduleFunc::switch_to_choose_medium_snapshot(
|
||||
const ObMediumCompactionScheduleFunc &func,
|
||||
ObArenaAllocator &allocator,
|
||||
ObLS &ls,
|
||||
ObTablet &tablet,
|
||||
const int64_t freeze_version,
|
||||
ObMediumCompactionInfo &medium_info,
|
||||
int64_t &schema_version)
|
||||
@ -234,12 +228,12 @@ int ObMediumCompactionScheduleFunc::switch_to_choose_medium_snapshot(
|
||||
int ret = OB_SUCCESS;
|
||||
int64_t medium_snapshot = 0;
|
||||
|
||||
if (func.weak_read_ts_ < freeze_version + 1) {
|
||||
if (weak_read_ts_ < freeze_version + 1) {
|
||||
ret = OB_EAGAIN;
|
||||
LOG_WARN("weak read ts is smaller than new medium snapshot, try later", K(ret), K(tablet));
|
||||
} else if (FALSE_IT(medium_snapshot = MAX(func.weak_read_ts_, freeze_version + 1))) {
|
||||
} else if (OB_FAIL(tablet.get_newest_schema_version(schema_version))) {
|
||||
LOG_WARN("fail to choose medium schema version", K(ret), K(tablet));
|
||||
LOG_WARN("weak read ts is smaller than new medium snapshot, try later", K(ret), KPC(this), K(freeze_version));
|
||||
} else if (FALSE_IT(medium_snapshot = MAX(weak_read_ts_, freeze_version + 1))) {
|
||||
} else if (OB_FAIL(tablet_handle_.get_obj()->get_newest_schema_version(schema_version))) {
|
||||
LOG_WARN("fail to choose medium schema version", K(ret), KPC(this));
|
||||
} else {
|
||||
medium_info.set_basic_info(
|
||||
ObMediumCompactionInfo::MEDIUM_COMPACTION,
|
||||
@ -275,8 +269,7 @@ int ObMediumCompactionScheduleFunc::get_status_from_inner_table(
|
||||
|
||||
// cal this func with PLAF LEADER ROLE && last_medium_scn_ = 0
|
||||
int ObMediumCompactionScheduleFunc::schedule_next_medium_for_leader(
|
||||
const int64_t major_snapshot,
|
||||
const bool force_schedule)
|
||||
const int64_t major_snapshot)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
ObRole role = INVALID_ROLE;
|
||||
@ -298,7 +291,7 @@ int ObMediumCompactionScheduleFunc::schedule_next_medium_for_leader(
|
||||
}
|
||||
}
|
||||
#endif
|
||||
ret = schedule_next_medium_primary_cluster(major_snapshot, force_schedule);
|
||||
ret = schedule_next_medium_primary_cluster(major_snapshot);
|
||||
} else {
|
||||
LOG_TRACE("not leader", K(ret), K(role), K(ls_.get_ls_id()));
|
||||
}
|
||||
@ -306,9 +299,7 @@ int ObMediumCompactionScheduleFunc::schedule_next_medium_for_leader(
|
||||
}
|
||||
|
||||
int ObMediumCompactionScheduleFunc::get_adaptive_reason(
|
||||
const int64_t schedule_major_snapshot,
|
||||
const bool force_schedule,
|
||||
ObAdaptiveMergePolicy::AdaptiveMergeReason &adaptive_merge_reason)
|
||||
const int64_t schedule_major_snapshot)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
int64_t max_sync_medium_scn = 0;
|
||||
@ -317,10 +308,8 @@ int ObMediumCompactionScheduleFunc::get_adaptive_reason(
|
||||
*tablet, *medium_info_list_, max_sync_medium_scn))) {
|
||||
LOG_WARN("failed to get max received medium scn", KR(ret), KPC(this));
|
||||
} else if (schedule_major_snapshot > max_sync_medium_scn) {
|
||||
adaptive_merge_reason = ObAdaptiveMergePolicy::AdaptiveMergeReason::TENANT_MAJOR;
|
||||
} else if (force_schedule) {
|
||||
adaptive_merge_reason = is_rebuild_column_group_ ? ObAdaptiveMergePolicy::REBUILD_COLUMN_GROUP : ObAdaptiveMergePolicy::USER_REQUEST;
|
||||
} else if (OB_FAIL(ObAdaptiveMergePolicy::get_adaptive_merge_reason(*tablet, adaptive_merge_reason))) {
|
||||
merge_reason_ = ObAdaptiveMergePolicy::AdaptiveMergeReason::TENANT_MAJOR;
|
||||
} else if (OB_FAIL(ObAdaptiveMergePolicy::get_adaptive_merge_reason(*tablet, merge_reason_))) {
|
||||
if (OB_HASH_NOT_EXIST != ret) {
|
||||
LOG_WARN("failed to get meta merge priority", K(ret), KPC(this));
|
||||
} else {
|
||||
@ -335,7 +324,7 @@ int ObMediumCompactionScheduleFunc::get_adaptive_reason(
|
||||
if (OB_FAIL(ret)) {
|
||||
FLOG_INFO("errsim EN_SCHEDULE_MEDIUM_COMPACTION", KPC(this));
|
||||
ret = OB_SUCCESS;
|
||||
adaptive_merge_reason = ObAdaptiveMergePolicy::AdaptiveMergeReason::LOAD_DATA_SCENE;
|
||||
merge_reason_ = ObAdaptiveMergePolicy::AdaptiveMergeReason::LOAD_DATA_SCENE;
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -344,11 +333,9 @@ int ObMediumCompactionScheduleFunc::get_adaptive_reason(
|
||||
}
|
||||
|
||||
int ObMediumCompactionScheduleFunc::schedule_next_medium_primary_cluster(
|
||||
const int64_t schedule_major_snapshot,
|
||||
const bool force_schedule)
|
||||
const int64_t schedule_major_snapshot)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
ObAdaptiveMergePolicy::AdaptiveMergeReason adaptive_merge_reason = ObAdaptiveMergePolicy::AdaptiveMergeReason::NONE;
|
||||
ObTabletCompactionScnInfo ret_info;
|
||||
// check last medium type, select inner table for last major
|
||||
bool schedule_medium_flag = false;
|
||||
@ -368,13 +355,14 @@ int ObMediumCompactionScheduleFunc::schedule_next_medium_primary_cluster(
|
||||
LOG_WARN("medium info list is unexpected null", K(ret), KPC(this), KPC_(medium_info_list));
|
||||
} else if (!medium_info_list_->could_schedule_next_round(last_major_snapshot_version)) { // check serialized list
|
||||
// do nothing
|
||||
} else if (OB_FAIL(get_adaptive_reason(schedule_major_snapshot, force_schedule, adaptive_merge_reason))) {
|
||||
} else if (!ObAdaptiveMergePolicy::is_valid_merge_reason(merge_reason_)
|
||||
&& OB_FAIL(get_adaptive_reason(schedule_major_snapshot))) {
|
||||
LOG_WARN("failed to get adaptive reason", KR(ret), K(schedule_major_snapshot));
|
||||
} else if (ObAdaptiveMergePolicy::is_valid_merge_reason(adaptive_merge_reason)) {
|
||||
} else if (ObAdaptiveMergePolicy::is_valid_merge_reason(merge_reason_)) {
|
||||
schedule_medium_flag = true;
|
||||
}
|
||||
LOG_TRACE("schedule next medium in primary cluster", K(ret), KPC(this), K(schedule_medium_flag),
|
||||
K(schedule_major_snapshot), K(adaptive_merge_reason), K(last_major_snapshot_version), KPC_(medium_info_list), K(max_sync_medium_scn));
|
||||
K(schedule_major_snapshot), K(merge_reason_), K(last_major_snapshot_version), KPC_(medium_info_list), K(max_sync_medium_scn));
|
||||
#ifdef ERRSIM
|
||||
if (OB_SUCC(ret)) {
|
||||
if (tablet->get_tablet_meta().tablet_id_.id() > ObTabletID::MIN_USER_TABLET_ID) {
|
||||
@ -407,13 +395,14 @@ int ObMediumCompactionScheduleFunc::schedule_next_medium_primary_cluster(
|
||||
schedule_flag = true;
|
||||
}
|
||||
if (OB_SUCC(ret) && schedule_flag) {
|
||||
ret = decide_medium_snapshot(adaptive_merge_reason);
|
||||
ret = decide_medium_snapshot();
|
||||
}
|
||||
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObMediumCompactionScheduleFunc::choose_medium_scn_for_user_request(
|
||||
int ObMediumCompactionScheduleFunc::choose_scn_for_user_request(
|
||||
const int64_t max_sync_medium_scn,
|
||||
ObMediumCompactionInfo &medium_info,
|
||||
ObGetMergeTablesResult &result,
|
||||
int64_t &schema_version)
|
||||
@ -421,9 +410,7 @@ int ObMediumCompactionScheduleFunc::choose_medium_scn_for_user_request(
|
||||
int ret = OB_SUCCESS;
|
||||
// check exist not finish freeze info
|
||||
schema_version = 0;
|
||||
int64_t max_sync_medium_scn = 0;
|
||||
int64_t max_reserved_snapshot = 0;
|
||||
const share::SCN &weak_read_ts = ls_.get_ls_wrs_handler()->get_ls_weak_read_ts();
|
||||
const int64_t latest_frozen_version = MTL(ObTenantFreezeInfoMgr*)->get_latest_frozen_version();
|
||||
const int64_t last_major_snapshot_version = tablet_handle_.get_obj()->get_last_major_snapshot_version();
|
||||
ObTablet *tablet = nullptr;
|
||||
@ -436,17 +423,15 @@ int ObMediumCompactionScheduleFunc::choose_medium_scn_for_user_request(
|
||||
} else if (latest_frozen_version > last_major_snapshot_version) {
|
||||
ret = OB_NO_NEED_MERGE;
|
||||
LOG_WARN("unfinished freeze info exist, can't schedule another medium", K(ret));
|
||||
} else if (OB_FAIL(tablet->get_max_sync_medium_scn(max_sync_medium_scn))) {
|
||||
LOG_WARN("failed to get max sync medium scn", K(ret), KPC(this));
|
||||
} else if (OB_FAIL(get_max_reserved_snapshot(max_reserved_snapshot))) {
|
||||
LOG_WARN("failed to get reserved snapshot", K(ret), KPC(this));
|
||||
} else if (FALSE_IT(medium_info.medium_snapshot_ = MAX(max_reserved_snapshot, weak_read_ts.get_val_for_tx()))) {
|
||||
} else if (FALSE_IT(medium_info.medium_snapshot_ = MAX(max_reserved_snapshot, weak_read_ts_))) {
|
||||
} else if (medium_info.medium_snapshot_ < max_sync_medium_scn) {
|
||||
ret = OB_NO_NEED_MERGE;
|
||||
LOG_WARN("chosen medium snapshot is synced before", K(ret), K(medium_info), K(max_sync_medium_scn));
|
||||
} else {
|
||||
medium_info.compaction_type_ = ObMediumCompactionInfo::MEDIUM_COMPACTION;
|
||||
medium_info.medium_merge_reason_ = is_rebuild_column_group_ ? ObAdaptiveMergePolicy::REBUILD_COLUMN_GROUP : ObAdaptiveMergePolicy::USER_REQUEST;
|
||||
medium_info.medium_merge_reason_ = merge_reason_;
|
||||
if (OB_FAIL(ObPartitionMergePolicy::get_result_by_snapshot(*tablet, medium_info.medium_snapshot_, result))) {
|
||||
LOG_WARN("failed to get result for major", K(ret), K(last_major_snapshot_version), K(medium_info));
|
||||
} else if (OB_FAIL(tablet->get_newest_schema_version(schema_version))) {
|
||||
@ -461,7 +446,7 @@ int ObMediumCompactionScheduleFunc::choose_medium_scn_for_user_request(
|
||||
|
||||
int ObMediumCompactionScheduleFunc::check_frequency(
|
||||
const int64_t max_reserved_snapshot,
|
||||
ObMediumCompactionInfo &medium_info)
|
||||
const int64_t medium_snapshot)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
ObTablet *tablet = tablet_handle_.get_obj();
|
||||
@ -471,9 +456,9 @@ int ObMediumCompactionScheduleFunc::check_frequency(
|
||||
const int64_t last_major_snapshot_version = tablet->get_last_major_snapshot_version();
|
||||
if (0 >= last_major_snapshot_version) {
|
||||
LOG_WARN("major sstable should not be empty", K(ret), K(last_major_snapshot_version));
|
||||
} else if (last_major_snapshot_version + time_interval > medium_info.medium_snapshot_) {
|
||||
} else if (last_major_snapshot_version + time_interval > medium_snapshot) {
|
||||
ret = OB_NO_NEED_MERGE;
|
||||
LOG_DEBUG("schedule medium frequently", K(ret), K(last_major_snapshot_version), K(medium_info),
|
||||
LOG_DEBUG("schedule medium frequently", K(ret), K(last_major_snapshot_version), K(medium_snapshot),
|
||||
K(time_interval));
|
||||
}
|
||||
}
|
||||
@ -504,6 +489,7 @@ int ObMediumCompactionScheduleFunc::get_max_reserved_snapshot(int64_t &max_reser
|
||||
LOG_WARN("failed to get reserved snapshot from freeze info mgr", K(ret), "tablet_id", tablet->get_tablet_meta().tablet_id_);
|
||||
} else {
|
||||
max_reserved_snapshot = MAX(ls_.get_min_reserved_snapshot(), snapshot_info.snapshot_);
|
||||
LOG_TRACE("get max reserved snapshot", KR(ret), K(max_reserved_snapshot), K(snapshot_info));
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
@ -541,15 +527,13 @@ int ObMediumCompactionScheduleFunc::choose_new_medium_snapshot(
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObMediumCompactionScheduleFunc::decide_medium_snapshot(
|
||||
const ObAdaptiveMergePolicy::AdaptiveMergeReason merge_reason)
|
||||
int ObMediumCompactionScheduleFunc::decide_medium_snapshot()
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
int tmp_ret = OB_SUCCESS;
|
||||
int64_t max_sync_medium_scn = 0;
|
||||
uint64_t compat_version = 0;
|
||||
ObTablet *tablet = nullptr;
|
||||
const bool is_major = (merge_reason == ObAdaptiveMergePolicy::TENANT_MAJOR);
|
||||
if (OB_UNLIKELY(!tablet_handle_.is_valid())) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_WARN("invalid tablet_handle", K(ret), K(tablet_handle_));
|
||||
@ -564,61 +548,31 @@ int ObMediumCompactionScheduleFunc::decide_medium_snapshot(
|
||||
LOG_WARN("failed to add dependent tablet", K(ret), KPC(this));
|
||||
} else {
|
||||
const ObTabletID &tablet_id = tablet->get_tablet_meta().tablet_id_;
|
||||
LOG_TRACE("decide_medium_snapshot", K(ret), KPC(this), K(compat_version), K(tablet_id), K(max_sync_medium_scn), K(merge_reason));
|
||||
int64_t max_reserved_snapshot = 0;
|
||||
LOG_TRACE("decide_medium_snapshot", K(ret), KPC(this), K(compat_version), K(tablet_id), K(max_sync_medium_scn), K_(merge_reason));
|
||||
int64_t schema_version = 0;
|
||||
ObGetMergeTablesResult result;
|
||||
ObMediumCompactionInfo medium_info;
|
||||
|
||||
if (OB_FAIL(medium_info.init_data_version(compat_version))) {
|
||||
LOG_WARN("fail to set data version", K(ret), K(tablet_id), K(compat_version));
|
||||
} else if (is_user_request(merge_reason)) {
|
||||
if (OB_FAIL(choose_medium_scn_for_user_request(medium_info, result, schema_version))) {
|
||||
} else if (is_user_request(merge_reason_)) {
|
||||
if (OB_FAIL(choose_scn_for_user_request(max_sync_medium_scn, medium_info, result, schema_version))) {
|
||||
LOG_WARN("failed to choose medium scn for user request", K(ret), KPC(this));
|
||||
}
|
||||
} else if (OB_FAIL(choose_medium_scn[is_major](*this, ls_, *tablet, merge_reason, allocator_, medium_info, result, schema_version))) {
|
||||
LOG_WARN("failed to choose medium snapshot", K(ret), KPC(this));
|
||||
} else if (medium_info.medium_snapshot_ <= max_sync_medium_scn) {
|
||||
ret = OB_NO_NEED_MERGE;
|
||||
} else if (is_major) {
|
||||
// do nothing
|
||||
} else if (OB_FAIL(get_max_reserved_snapshot(max_reserved_snapshot))) {
|
||||
LOG_WARN("failed to get multi_version_start", K(ret), KPC(this));
|
||||
} else if (medium_info.medium_snapshot_ < max_reserved_snapshot
|
||||
|| medium_info.medium_snapshot_ > tablet->get_snapshot_version()) {
|
||||
// chosen medium snapshot is far too old
|
||||
if (OB_FAIL(choose_new_medium_snapshot(max_reserved_snapshot, medium_info, result, schema_version))) {
|
||||
LOG_WARN("failed to choose new medium snapshot", KR(ret), K(medium_info));
|
||||
} else if (ObAdaptiveMergePolicy::TENANT_MAJOR == merge_reason_) {
|
||||
if (OB_FAIL(choose_major_snapshot(max_sync_medium_scn, medium_info, result, schema_version))) {
|
||||
LOG_WARN("failed to choose medium scn for major", K(ret), KPC(this));
|
||||
}
|
||||
} else if (OB_FAIL(tablet->get_schema_version_from_storage_schema(schema_version))){
|
||||
LOG_WARN("failed to get schema version from tablet", KR(ret), KPC(tablet));
|
||||
} else if (OB_FAIL(choose_medium_snapshot(max_sync_medium_scn, medium_info, result, schema_version))) {
|
||||
LOG_WARN("failed to choose medium scn for medium", K(ret), KPC(this));
|
||||
}
|
||||
if (OB_FAIL(ret)) {
|
||||
} else if (medium_info.medium_snapshot_ <= max_sync_medium_scn) {
|
||||
ret = OB_NO_NEED_MERGE;
|
||||
} else if (is_major || is_user_request(merge_reason)) {
|
||||
// do nothing
|
||||
} else if (OB_FAIL(check_frequency(max_reserved_snapshot, medium_info))) { // check schedule interval
|
||||
if (OB_NO_NEED_MERGE != ret) {
|
||||
LOG_WARN("failed to check medium scn valid", K(ret), KPC(this));
|
||||
}
|
||||
}
|
||||
#ifdef ERRSIM
|
||||
if (OB_SUCC(ret) || OB_NO_NEED_MERGE == ret) {
|
||||
if (tablet_id.id() > ObTabletID::MIN_USER_TABLET_ID) {
|
||||
ret = OB_E(EventTable::EN_SCHEDULE_MEDIUM_COMPACTION) ret;
|
||||
}
|
||||
if (OB_FAIL(ret)) {
|
||||
LOG_INFO("ERRSIM EN_SCHEDULE_MEDIUM_COMPACTION", K(ret), KPC(this));
|
||||
const int64_t snapshot_gc_ts = MTL(ObTenantFreezeInfoMgr*)->get_snapshot_gc_ts();
|
||||
medium_info.medium_snapshot_ = MIN(weak_read_ts_, snapshot_gc_ts);
|
||||
medium_info.compaction_type_ = ObMediumCompactionInfo::MEDIUM_COMPACTION;
|
||||
if (medium_info.medium_snapshot_ > max_sync_medium_scn
|
||||
&& medium_info.medium_snapshot_ >= max_reserved_snapshot) {
|
||||
FLOG_INFO("ERRSIM EN_SCHEDULE_MEDIUM_COMPACTION", KPC(this));
|
||||
ret = OB_SUCCESS;
|
||||
}
|
||||
}
|
||||
ret = errsim_choose_medium_snapshot(max_sync_medium_scn, medium_info);
|
||||
}
|
||||
if (OB_SUCC(ret)) {
|
||||
ret = OB_E(EventTable::EN_SCHEDULE_MEDIUM_FAILED) ret;
|
||||
@ -659,6 +613,34 @@ int ObMediumCompactionScheduleFunc::decide_medium_snapshot(
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObMediumCompactionScheduleFunc::errsim_choose_medium_snapshot(
|
||||
const int64_t max_sync_medium_scn,
|
||||
ObMediumCompactionInfo &medium_info)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
if (tablet_handle_.get_obj()->get_tablet_meta().tablet_id_.id() > ObTabletID::MIN_USER_TABLET_ID) {
|
||||
ret = OB_E(EventTable::EN_SCHEDULE_MEDIUM_COMPACTION) ret;
|
||||
}
|
||||
if (OB_FAIL(ret)) {
|
||||
LOG_INFO("ERRSIM EN_SCHEDULE_MEDIUM_COMPACTION", K(ret), KPC(this));
|
||||
const int64_t snapshot_gc_ts =
|
||||
MTL(ObTenantFreezeInfoMgr *)->get_snapshot_gc_ts();
|
||||
medium_info.medium_snapshot_ = MIN(weak_read_ts_, snapshot_gc_ts);
|
||||
medium_info.compaction_type_ = ObMediumCompactionInfo::MEDIUM_COMPACTION;
|
||||
int64_t max_reserved_snapshot = 0;
|
||||
if (OB_FAIL(get_max_reserved_snapshot(max_reserved_snapshot))) {
|
||||
LOG_WARN("failed to get reserved snapshot", K(ret), KPC(this));
|
||||
} else if (medium_info.medium_snapshot_ > max_sync_medium_scn
|
||||
&& medium_info.medium_snapshot_ >= max_reserved_snapshot) {
|
||||
FLOG_INFO("ERRSIM EN_SCHEDULE_MEDIUM_COMPACTION", KPC(this));
|
||||
ret = OB_SUCCESS;
|
||||
} else {
|
||||
ret = OB_NO_NEED_MERGE;
|
||||
}
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObMediumCompactionScheduleFunc::init_schema_changed(
|
||||
ObMediumCompactionInfo &medium_info)
|
||||
{
|
||||
|
@ -33,14 +33,14 @@ public:
|
||||
const SCN &weak_read_ts,
|
||||
const ObMediumCompactionInfoList &medium_info_list,
|
||||
ObScheduleStatistics *schedule_stat,
|
||||
const bool is_rebuild_column_group = false)
|
||||
const ObAdaptiveMergePolicy::AdaptiveMergeReason merge_reason = ObAdaptiveMergePolicy::NONE)
|
||||
: allocator_("MediumSchedule"),
|
||||
ls_(ls),
|
||||
tablet_handle_(tablet_handle),
|
||||
weak_read_ts_(weak_read_ts.get_val_for_tx()),
|
||||
medium_info_list_(&medium_info_list),
|
||||
schedule_stat_(schedule_stat),
|
||||
is_rebuild_column_group_(is_rebuild_column_group)
|
||||
merge_reason_(merge_reason)
|
||||
{}
|
||||
~ObMediumCompactionScheduleFunc() {}
|
||||
|
||||
@ -76,13 +76,11 @@ public:
|
||||
ObCompactionTimeGuard &time_guard);
|
||||
|
||||
int schedule_next_medium_for_leader(
|
||||
const int64_t major_snapshot,
|
||||
const bool force_schedule);
|
||||
const int64_t major_snapshot);
|
||||
|
||||
int64_t to_string(char* buf, const int64_t buf_len) const;
|
||||
protected:
|
||||
int decide_medium_snapshot(
|
||||
const ObAdaptiveMergePolicy::AdaptiveMergeReason merge_reason);
|
||||
int decide_medium_snapshot();
|
||||
static int get_status_from_inner_table(
|
||||
const ObLSID &ls_id,
|
||||
const ObTabletID &tablet_id,
|
||||
@ -120,35 +118,21 @@ protected:
|
||||
static int batch_check_medium_checksum(
|
||||
const ObIArray<ObTabletCheckInfo> &tablet_ls_infos,
|
||||
const ObIArray<ObTabletReplicaChecksumItem> &checksum_items);
|
||||
static int choose_medium_snapshot(
|
||||
const ObMediumCompactionScheduleFunc &func,
|
||||
ObLS &ls,
|
||||
ObTablet &tablet,
|
||||
const ObAdaptiveMergePolicy::AdaptiveMergeReason &merge_reason,
|
||||
ObArenaAllocator &allocator,
|
||||
int choose_medium_snapshot(
|
||||
const int64_t max_sync_medium_scn,
|
||||
ObMediumCompactionInfo &medium_info,
|
||||
ObGetMergeTablesResult &result,
|
||||
int64_t &schema_version);
|
||||
static int choose_major_snapshot(
|
||||
const ObMediumCompactionScheduleFunc &func,
|
||||
ObLS &ls,
|
||||
ObTablet &tablet,
|
||||
const ObAdaptiveMergePolicy::AdaptiveMergeReason &merge_reason,
|
||||
ObArenaAllocator &allocator,
|
||||
int choose_major_snapshot(
|
||||
const int64_t max_sync_medium_scn,
|
||||
ObMediumCompactionInfo &medium_info,
|
||||
ObGetMergeTablesResult &result,
|
||||
int64_t &schema_version);
|
||||
static int find_valid_freeze_info(
|
||||
ObTablet &tablet,
|
||||
ObArenaAllocator &allocator,
|
||||
int find_valid_freeze_info(
|
||||
ObMediumCompactionInfo &medium_info,
|
||||
share::ObFreezeInfo &freeze_info,
|
||||
bool &force_schedule_medium_merge);
|
||||
static int switch_to_choose_medium_snapshot(
|
||||
const ObMediumCompactionScheduleFunc &func,
|
||||
ObArenaAllocator &allocator,
|
||||
ObLS &ls,
|
||||
ObTablet &tablet,
|
||||
int switch_to_choose_medium_snapshot(
|
||||
const int64_t freeze_version,
|
||||
ObMediumCompactionInfo &medium_info,
|
||||
int64_t &schema_version);
|
||||
@ -161,8 +145,7 @@ protected:
|
||||
bool &tablet_need_freeze_flag,
|
||||
bool &create_dag_flag);
|
||||
int schedule_next_medium_primary_cluster(
|
||||
const int64_t major_snapshot,
|
||||
const bool force_schedule);
|
||||
const int64_t major_snapshot);
|
||||
|
||||
int choose_new_medium_snapshot(
|
||||
const int64_t max_reserved_snapshot,
|
||||
@ -178,43 +161,33 @@ protected:
|
||||
|
||||
int check_frequency(
|
||||
const int64_t max_reserved_snapshot,
|
||||
ObMediumCompactionInfo &medium_info);
|
||||
int choose_medium_scn_for_user_request(
|
||||
const int64_t medium_snapshot);
|
||||
int choose_scn_for_user_request(
|
||||
const int64_t max_sync_medium_scn,
|
||||
ObMediumCompactionInfo &medium_info,
|
||||
ObGetMergeTablesResult &result,
|
||||
int64_t &schema_version);
|
||||
int get_adaptive_reason(
|
||||
const int64_t schedule_major_snapshot,
|
||||
const bool force_schedule,
|
||||
ObAdaptiveMergePolicy::AdaptiveMergeReason &adaptive_merge_reason);
|
||||
int get_adaptive_reason(const int64_t schedule_major_snapshot);
|
||||
static const int64_t DEFAULT_SCHEDULE_MEDIUM_INTERVAL = 60L * 1000L * 1000L; // 60s
|
||||
static constexpr double SCHEDULE_RANGE_INC_ROW_COUNT_PERCENRAGE_THRESHOLD = 0.2;
|
||||
static const int64_t SCHEDULE_RANGE_ROW_COUNT_THRESHOLD = 1000 * 1000L; // 100w
|
||||
static const int64_t MEDIUM_FUNC_CNT = 2;
|
||||
typedef int (*ChooseMediumScn)(
|
||||
const ObMediumCompactionScheduleFunc &func,
|
||||
ObLS &ls,
|
||||
ObTablet &tablet,
|
||||
const ObAdaptiveMergePolicy::AdaptiveMergeReason &merge_reason,
|
||||
ObArenaAllocator &allocator,
|
||||
ObMediumCompactionInfo &medium_info,
|
||||
ObGetMergeTablesResult &result,
|
||||
int64_t &schema_version);
|
||||
static ChooseMediumScn choose_medium_scn[MEDIUM_FUNC_CNT];
|
||||
static bool is_user_request(const ObAdaptiveMergePolicy::AdaptiveMergeReason merge_reason)
|
||||
{
|
||||
return ObAdaptiveMergePolicy::USER_REQUEST == merge_reason
|
||||
|| ObAdaptiveMergePolicy::REBUILD_COLUMN_GROUP == merge_reason;
|
||||
}
|
||||
int errsim_choose_medium_snapshot(
|
||||
const int64_t max_sync_medium_scn,
|
||||
ObMediumCompactionInfo &medium_info);
|
||||
|
||||
private:
|
||||
ObArenaAllocator allocator_;
|
||||
ObLS &ls_;
|
||||
ObTabletHandle tablet_handle_;
|
||||
int64_t weak_read_ts_; // weak_read_ts_ should get before tablet
|
||||
const int64_t weak_read_ts_; // weak_read_ts_ should get before tablet
|
||||
const ObMediumCompactionInfoList *medium_info_list_;
|
||||
ObScheduleStatistics *schedule_stat_;
|
||||
const bool is_rebuild_column_group_;
|
||||
ObAdaptiveMergePolicy::AdaptiveMergeReason merge_reason_;
|
||||
};
|
||||
|
||||
} //namespace compaction
|
||||
|
@ -1362,7 +1362,7 @@ int ObTenantTabletScheduler::schedule_next_medium_for_leader(
|
||||
if ((!tablet_merge_finish || get_enable_adaptive_compaction()) // schedule major or adaptive compaction
|
||||
&& tablet_could_schedule_merge) {
|
||||
if (OB_FAIL(func.schedule_next_medium_for_leader(
|
||||
tablet_merge_finish ? 0 : major_merge_version, false/*force_schedule*/))) { // schedule another round
|
||||
tablet_merge_finish ? 0 : major_merge_version))) { // schedule another round
|
||||
LOG_WARN("failed to schedule next medium", K(ret), K(ls_id), K(tablet_id));
|
||||
if (OB_FAIL(MTL(compaction::ObDiagnoseTabletMgr *)->add_diagnose_tablet(ls_id, tablet_id,
|
||||
share::ObDiagnoseTabletType::TYPE_MEDIUM_MERGE))) {
|
||||
@ -1636,15 +1636,15 @@ int ObTenantTabletScheduler::schedule_tablet_medium(
|
||||
&& tablet_could_schedule_merge) {
|
||||
// schedule another round
|
||||
ObMediumCompactionScheduleFunc func(ls, tablet_handle, weak_read_ts, *medium_list, &schedule_stats_);
|
||||
if (OB_TMP_FAIL(func.schedule_next_medium_for_leader(
|
||||
tablet_merge_finish ? 0 : merge_version, false /*force_schedule*/))) {
|
||||
if (OB_TMP_FAIL(func.schedule_next_medium_for_leader(tablet_merge_finish ? 0 : merge_version))) {
|
||||
if (OB_NOT_MASTER == tmp_ret) {
|
||||
is_leader = false;
|
||||
} else {
|
||||
LOG_WARN("failed to schedule next medium", K(tmp_ret), K(ls_id), K(tablet_id));
|
||||
}
|
||||
need_diagnose = true;
|
||||
} else if (FALSE_IT(time_guard.click(ObCompactionScheduleTimeGuard::SCHEDULE_NEXT_MEDIUM))){
|
||||
} else {
|
||||
time_guard.click(ObCompactionScheduleTimeGuard::SCHEDULE_NEXT_MEDIUM);
|
||||
}
|
||||
}
|
||||
|
||||
@ -1895,7 +1895,8 @@ int ObTenantTabletScheduler::try_schedule_tablet_medium_merge(
|
||||
} else {
|
||||
ObMediumCompactionScheduleFunc func(
|
||||
*ls_handle.get_ls(), tablet_handle, weak_read_ts, *medium_info_list,
|
||||
nullptr /*schedule_stat*/, is_rebuild_column_group);
|
||||
nullptr /*schedule_stat*/,
|
||||
is_rebuild_column_group ? ObAdaptiveMergePolicy::REBUILD_COLUMN_GROUP : ObAdaptiveMergePolicy::USER_REQUEST);
|
||||
const int64_t merge_version = get_frozen_version();
|
||||
const int64_t last_major_snapshot_version = tablet_handle.get_obj()->get_last_major_snapshot_version();
|
||||
|
||||
@ -1907,7 +1908,7 @@ int ObTenantTabletScheduler::try_schedule_tablet_medium_merge(
|
||||
ret = OB_NOT_SUPPORTED;
|
||||
LOG_WARN("tablet need check finish, can't schedule another medium", K(ret), K(ls_id), K(tablet_id),
|
||||
"wait_check_medium_scn", medium_info_list->get_wait_check_medium_scn());
|
||||
} else if (OB_TMP_FAIL(func.schedule_next_medium_for_leader(0/*major_snapshot*/, true/*force_schedule*/))) {
|
||||
} else if (OB_TMP_FAIL(func.schedule_next_medium_for_leader(0/*major_snapshot*/))) {
|
||||
if (OB_EAGAIN != tmp_ret) {
|
||||
LOG_WARN("failed to schedule medium", K(tmp_ret), K(ls_id), K(tablet_id));
|
||||
}
|
||||
|
Loading…
x
Reference in New Issue
Block a user