From d425027b91840c8293c4cbc909f0cc1b0b5a0dfe Mon Sep 17 00:00:00 2001 From: obdev Date: Wed, 7 Feb 2024 19:22:29 +0000 Subject: [PATCH] remove redundant code and del lambda --- src/storage/access/ob_table_read_info.cpp | 66 ++++-- .../compaction/ob_medium_compaction_func.cpp | 216 ++++++++---------- .../compaction/ob_medium_compaction_func.h | 67 ++---- .../compaction/ob_tenant_tablet_scheduler.cpp | 13 +- 4 files changed, 169 insertions(+), 193 deletions(-) diff --git a/src/storage/access/ob_table_read_info.cpp b/src/storage/access/ob_table_read_info.cpp index c4808e2b5..8234f859d 100644 --- a/src/storage/access/ob_table_read_info.cpp +++ b/src/storage/access/ob_table_read_info.cpp @@ -27,6 +27,44 @@ namespace storage /* * ------------------------------- ObColumnIndexArray ------------------------------- */ +int64_t return_array_cnt(uint32_t schema_rowkey_cnt, const ObFixedMetaObjArray & array) +{ + return array.count(); +} +int64_t return_schema_rowkey_cnt(uint32_t schema_rowkey_cnt, const ObFixedMetaObjArray & array) +{ + return schema_rowkey_cnt; +} +int32_t return_array_idx_for_memtable(uint32_t schema_rowkey_cnt, uint32_t column_cnt, + int64_t idx, + const ObFixedMetaObjArray &array) +{ + int32_t ret_val = 0; + const int32_t extra_rowkey_cnt = storage::ObMultiVersionRowkeyHelpper::get_extra_rowkey_col_cnt(); + OB_ASSERT(idx >= 0 && idx < column_cnt); + if (idx < schema_rowkey_cnt) { + ret_val = idx; + } else if (idx < schema_rowkey_cnt + extra_rowkey_cnt) { + ret_val = OB_INVALID_INDEX; + } else { + ret_val = idx - extra_rowkey_cnt; + } + return ret_val; +} +int32_t return_idx(uint32_t schema_rowkey_cnt, uint32_t column_cnt, + int64_t idx, + const ObFixedMetaObjArray &array) +{ + OB_ASSERT(idx >= 0 && idx < column_cnt); + return (int32_t)idx; +} +int32_t return_array_idx(uint32_t schema_rowkey_cnt, uint32_t column_cnt, + int64_t idx, + const ObFixedMetaObjArray &array) +{ + OB_ASSERT(idx >= 0 && idx < array.count()); + return array[idx]; +} ObColumnIndexArray::ObColumnIndexArray(const bool rowkey_mode /* = false*/, const bool for_memtable /* = false*/) : version_(COLUMN_INDEX_ARRAY_VERSION), @@ -39,32 +77,14 @@ ObColumnIndexArray::ObColumnIndexArray(const bool rowkey_mode /* = false*/, { if (rowkey_mode) { if (for_memtable) { // no multi_version rowkey in memtable - at_func_ =[](uint32_t schema_rowkey_cnt, uint32_t column_cnt, int64_t idx, const ObFixedMetaObjArray &array) -> int32_t { - int32_t ret_val = 0; - const int32_t extra_rowkey_cnt = storage::ObMultiVersionRowkeyHelpper::get_extra_rowkey_col_cnt(); - OB_ASSERT(idx >= 0 && idx < column_cnt); - if (idx < schema_rowkey_cnt) { - ret_val = idx; - } else if (idx < schema_rowkey_cnt + extra_rowkey_cnt) { - ret_val = OB_INVALID_INDEX; - } else { - ret_val = idx - extra_rowkey_cnt; - } - return ret_val; - }; + at_func_ = return_array_idx_for_memtable; } else { - at_func_ =[](uint32_t schema_rowkey_cnt, uint32_t column_cnt, int64_t idx, const ObFixedMetaObjArray &) -> int32_t { - OB_ASSERT(idx >= 0 && idx < column_cnt); - return (int32_t)idx; - }; + at_func_ = return_idx; } - count_func_ = [](uint32_t column_cnt, const ObFixedMetaObjArray &) -> int64_t { return column_cnt; }; + count_func_ = return_schema_rowkey_cnt; } else { - at_func_ = [](uint32_t, uint32_t, int64_t idx, const ObFixedMetaObjArray &array) -> int32_t { - OB_ASSERT(idx >= 0 && idx < array.count()); - return array[idx]; - }; - count_func_ = [](uint32_t, const ObFixedMetaObjArray &array) -> int64_t { return array.count(); }; + at_func_ = return_array_idx; + count_func_ = return_array_cnt; } } diff --git a/src/storage/compaction/ob_medium_compaction_func.cpp b/src/storage/compaction/ob_medium_compaction_func.cpp index fc64595f1..07d4e5ad8 100644 --- a/src/storage/compaction/ob_medium_compaction_func.cpp +++ b/src/storage/compaction/ob_medium_compaction_func.cpp @@ -37,11 +37,6 @@ using namespace common; namespace compaction { -ObMediumCompactionScheduleFunc::ChooseMediumScn ObMediumCompactionScheduleFunc::choose_medium_scn[MEDIUM_FUNC_CNT] - = { ObMediumCompactionScheduleFunc::choose_medium_snapshot, - ObMediumCompactionScheduleFunc::choose_major_snapshot, - }; - int64_t ObMediumCompactionScheduleFunc::to_string(char *buf, const int64_t buf_len) const { int64_t pos = 0; @@ -61,48 +56,58 @@ int64_t ObMediumCompactionScheduleFunc::to_string(char *buf, const int64_t buf_l } int ObMediumCompactionScheduleFunc::choose_medium_snapshot( - const ObMediumCompactionScheduleFunc &func, - ObLS &ls, - ObTablet &tablet, - const ObAdaptiveMergePolicy::AdaptiveMergeReason &merge_reason, - ObArenaAllocator &allocator, + const int64_t max_sync_medium_scn, ObMediumCompactionInfo &medium_info, ObGetMergeTablesResult &result, int64_t &schema_version) { - UNUSEDx(func, allocator, schema_version); int ret = OB_SUCCESS; ObGetMergeTablesParam param; param.merge_type_ = MEDIUM_MERGE; - if (OB_FAIL(ObAdaptiveMergePolicy::get_meta_merge_tables( - param, - ls, - tablet, - result))) { + int64_t max_reserved_snapshot = 0; + int64_t medium_snapshot = 0; + ObTablet &tablet = *tablet_handle_.get_obj(); + if (OB_FAIL(ObAdaptiveMergePolicy::get_meta_merge_tables(param, ls_, tablet, result))) { if (OB_NO_NEED_MERGE != ret) { LOG_WARN("failed to get meta merge tables", K(ret), K(param)); } + } else if (FALSE_IT(medium_snapshot = result.version_range_.snapshot_version_)) { + } else if (OB_FAIL(get_max_reserved_snapshot(max_reserved_snapshot))) { + LOG_WARN("failed to get reserved snapshot", K(ret), KPC(this)); + } else if (medium_info.medium_snapshot_ < max_reserved_snapshot + || medium_info.medium_snapshot_ > tablet.get_snapshot_version()) { + // chosen medium snapshot is far too old + if (OB_FAIL(choose_new_medium_snapshot(max_reserved_snapshot, medium_info, result, schema_version))) { + LOG_WARN("failed to choose new medium snapshot", KR(ret), K(medium_info)); + } + } else if (OB_FAIL(tablet.get_schema_version_from_storage_schema(schema_version))) { + LOG_WARN("failed to get schema version from tablet", KR(ret), K(tablet)); + } + if (OB_FAIL(ret)) { + } else if (medium_info.medium_snapshot_ <= max_sync_medium_scn) { + ret = OB_NO_NEED_MERGE; + } else if (OB_FAIL(check_frequency(max_reserved_snapshot, medium_snapshot))) { // check schedule interval + if (OB_NO_NEED_MERGE != ret) { + LOG_WARN("failed to check medium scn valid", K(ret), KPC(this)); + } } else { medium_info.set_basic_info( ObMediumCompactionInfo::MEDIUM_COMPACTION, - merge_reason, - result.version_range_.snapshot_version_); - LOG_TRACE("choose_medium_snapshot", K(ret), "ls_id", ls.get_ls_id(), - "tablet_id", tablet.get_tablet_meta().tablet_id_, K(result), K(medium_info)); + merge_reason_, + medium_snapshot); + LOG_TRACE("choose_medium_snapshot", K(ret), KPC(this), K(result), K(medium_info)); } return ret; } int ObMediumCompactionScheduleFunc::find_valid_freeze_info( - ObTablet &tablet, - ObArenaAllocator &allocator, ObMediumCompactionInfo &medium_info, share::ObFreezeInfo &freeze_info, bool &force_schedule_medium_merge) { int ret = OB_SUCCESS; force_schedule_medium_merge = false; - + ObTablet &tablet = *tablet_handle_.get_obj(); const ObTabletID &tablet_id = tablet.get_tablet_meta().tablet_id_; int64_t schedule_snapshot = 0; bool schedule_with_newer_info = false; @@ -143,7 +148,7 @@ int ObMediumCompactionScheduleFunc::find_valid_freeze_info( K(tablet_id), K(last_sstable_schema_version), K(freeze_info)); break; } else if (OB_FAIL(get_table_schema_to_merge( - *schema_service, tablet, freeze_info.schema_version_, medium_info.medium_compat_version_, allocator, medium_info.storage_schema_))) { + *schema_service, tablet, freeze_info.schema_version_, medium_info.medium_compat_version_, allocator_, medium_info.storage_schema_))) { if (OB_TABLE_IS_DELETED == ret) { // do nothing, end loop } else if (OB_ERR_SCHEMA_HISTORY_EMPTY == ret) { @@ -169,27 +174,22 @@ int ObMediumCompactionScheduleFunc::find_valid_freeze_info( } int ObMediumCompactionScheduleFunc::choose_major_snapshot( - const ObMediumCompactionScheduleFunc &func, - ObLS &ls, - ObTablet &tablet, - const ObAdaptiveMergePolicy::AdaptiveMergeReason &merge_reason, - ObArenaAllocator &allocator, + const int64_t max_sync_medium_scn, ObMediumCompactionInfo &medium_info, ObGetMergeTablesResult &result, int64_t &schema_version) { int ret = OB_SUCCESS; - const ObLSID &ls_id = ls.get_ls_id(); - const ObTabletID &tablet_id = tablet.get_tablet_meta().tablet_id_; + ObTablet &tablet = *tablet_handle_.get_obj(); share::ObFreezeInfo freeze_info; bool force_schedule_medium_merge = false; - if (OB_FAIL(find_valid_freeze_info(tablet, allocator, medium_info, freeze_info, force_schedule_medium_merge))) { + if (OB_FAIL(find_valid_freeze_info(medium_info, freeze_info, force_schedule_medium_merge))) { LOG_WARN("failed to find valid freeze info", KR(ret)); } else if (force_schedule_medium_merge) { - if (OB_FAIL(switch_to_choose_medium_snapshot(func, allocator, ls, tablet, freeze_info.frozen_scn_.get_val_for_tx(), medium_info, schema_version))) { + if (OB_FAIL(switch_to_choose_medium_snapshot(freeze_info.frozen_scn_.get_val_for_tx(), medium_info, schema_version))) { if (OB_EAGAIN != ret) { - LOG_WARN("failed to switch to choose medium snapshot", K(ret), K(tablet)); + LOG_WARN("failed to switch to choose medium snapshot", K(ret), KPC(this)); } } } else { @@ -203,8 +203,7 @@ int ObMediumCompactionScheduleFunc::choose_major_snapshot( if (FAILEDx(ObPartitionMergePolicy::get_result_by_snapshot(tablet, medium_info.medium_snapshot_, result))) { LOG_WARN("failed get result for major", K(ret), K(medium_info)); } else { - LOG_TRACE("choose_major_snapshot", K(ret), "ls_id", ls.get_ls_id(), - "tablet_id", tablet.get_tablet_meta().tablet_id_, K(medium_info), K(freeze_info), K(result), + LOG_TRACE("choose_major_snapshot", K(ret), KPC(this), K(medium_info), K(freeze_info), K(result), K(medium_info), K(schema_version)); #ifdef ERRSIM if (tablet.get_tablet_meta().tablet_id_.id() == 1) { @@ -213,8 +212,7 @@ int ObMediumCompactionScheduleFunc::choose_major_snapshot( ret = OB_SUCCESS; medium_info.compaction_type_ = ObMediumCompactionInfo::MEDIUM_COMPACTION; medium_info.medium_snapshot_ += 100; - FLOG_INFO("ERRSIM EN_SPECIAL_TABLE_HAVE_LARGER_SCN", "ls_id", ls.get_ls_id(), - "tablet_id", tablet.get_tablet_meta().tablet_id_,K(medium_info)); + FLOG_INFO("ERRSIM EN_SPECIAL_TABLE_HAVE_LARGER_SCN", KPC(this),K(medium_info)); } } #endif @@ -223,10 +221,6 @@ int ObMediumCompactionScheduleFunc::choose_major_snapshot( } int ObMediumCompactionScheduleFunc::switch_to_choose_medium_snapshot( - const ObMediumCompactionScheduleFunc &func, - ObArenaAllocator &allocator, - ObLS &ls, - ObTablet &tablet, const int64_t freeze_version, ObMediumCompactionInfo &medium_info, int64_t &schema_version) @@ -234,12 +228,12 @@ int ObMediumCompactionScheduleFunc::switch_to_choose_medium_snapshot( int ret = OB_SUCCESS; int64_t medium_snapshot = 0; - if (func.weak_read_ts_ < freeze_version + 1) { + if (weak_read_ts_ < freeze_version + 1) { ret = OB_EAGAIN; - LOG_WARN("weak read ts is smaller than new medium snapshot, try later", K(ret), K(tablet)); - } else if (FALSE_IT(medium_snapshot = MAX(func.weak_read_ts_, freeze_version + 1))) { - } else if (OB_FAIL(tablet.get_newest_schema_version(schema_version))) { - LOG_WARN("fail to choose medium schema version", K(ret), K(tablet)); + LOG_WARN("weak read ts is smaller than new medium snapshot, try later", K(ret), KPC(this), K(freeze_version)); + } else if (FALSE_IT(medium_snapshot = MAX(weak_read_ts_, freeze_version + 1))) { + } else if (OB_FAIL(tablet_handle_.get_obj()->get_newest_schema_version(schema_version))) { + LOG_WARN("fail to choose medium schema version", K(ret), KPC(this)); } else { medium_info.set_basic_info( ObMediumCompactionInfo::MEDIUM_COMPACTION, @@ -275,8 +269,7 @@ int ObMediumCompactionScheduleFunc::get_status_from_inner_table( // cal this func with PLAF LEADER ROLE && last_medium_scn_ = 0 int ObMediumCompactionScheduleFunc::schedule_next_medium_for_leader( - const int64_t major_snapshot, - const bool force_schedule) + const int64_t major_snapshot) { int ret = OB_SUCCESS; ObRole role = INVALID_ROLE; @@ -298,7 +291,7 @@ int ObMediumCompactionScheduleFunc::schedule_next_medium_for_leader( } } #endif - ret = schedule_next_medium_primary_cluster(major_snapshot, force_schedule); + ret = schedule_next_medium_primary_cluster(major_snapshot); } else { LOG_TRACE("not leader", K(ret), K(role), K(ls_.get_ls_id())); } @@ -306,9 +299,7 @@ int ObMediumCompactionScheduleFunc::schedule_next_medium_for_leader( } int ObMediumCompactionScheduleFunc::get_adaptive_reason( - const int64_t schedule_major_snapshot, - const bool force_schedule, - ObAdaptiveMergePolicy::AdaptiveMergeReason &adaptive_merge_reason) + const int64_t schedule_major_snapshot) { int ret = OB_SUCCESS; int64_t max_sync_medium_scn = 0; @@ -317,10 +308,8 @@ int ObMediumCompactionScheduleFunc::get_adaptive_reason( *tablet, *medium_info_list_, max_sync_medium_scn))) { LOG_WARN("failed to get max received medium scn", KR(ret), KPC(this)); } else if (schedule_major_snapshot > max_sync_medium_scn) { - adaptive_merge_reason = ObAdaptiveMergePolicy::AdaptiveMergeReason::TENANT_MAJOR; - } else if (force_schedule) { - adaptive_merge_reason = is_rebuild_column_group_ ? ObAdaptiveMergePolicy::REBUILD_COLUMN_GROUP : ObAdaptiveMergePolicy::USER_REQUEST; - } else if (OB_FAIL(ObAdaptiveMergePolicy::get_adaptive_merge_reason(*tablet, adaptive_merge_reason))) { + merge_reason_ = ObAdaptiveMergePolicy::AdaptiveMergeReason::TENANT_MAJOR; + } else if (OB_FAIL(ObAdaptiveMergePolicy::get_adaptive_merge_reason(*tablet, merge_reason_))) { if (OB_HASH_NOT_EXIST != ret) { LOG_WARN("failed to get meta merge priority", K(ret), KPC(this)); } else { @@ -335,7 +324,7 @@ int ObMediumCompactionScheduleFunc::get_adaptive_reason( if (OB_FAIL(ret)) { FLOG_INFO("errsim EN_SCHEDULE_MEDIUM_COMPACTION", KPC(this)); ret = OB_SUCCESS; - adaptive_merge_reason = ObAdaptiveMergePolicy::AdaptiveMergeReason::LOAD_DATA_SCENE; + merge_reason_ = ObAdaptiveMergePolicy::AdaptiveMergeReason::LOAD_DATA_SCENE; } } } @@ -344,11 +333,9 @@ int ObMediumCompactionScheduleFunc::get_adaptive_reason( } int ObMediumCompactionScheduleFunc::schedule_next_medium_primary_cluster( - const int64_t schedule_major_snapshot, - const bool force_schedule) + const int64_t schedule_major_snapshot) { int ret = OB_SUCCESS; - ObAdaptiveMergePolicy::AdaptiveMergeReason adaptive_merge_reason = ObAdaptiveMergePolicy::AdaptiveMergeReason::NONE; ObTabletCompactionScnInfo ret_info; // check last medium type, select inner table for last major bool schedule_medium_flag = false; @@ -368,13 +355,14 @@ int ObMediumCompactionScheduleFunc::schedule_next_medium_primary_cluster( LOG_WARN("medium info list is unexpected null", K(ret), KPC(this), KPC_(medium_info_list)); } else if (!medium_info_list_->could_schedule_next_round(last_major_snapshot_version)) { // check serialized list // do nothing - } else if (OB_FAIL(get_adaptive_reason(schedule_major_snapshot, force_schedule, adaptive_merge_reason))) { + } else if (!ObAdaptiveMergePolicy::is_valid_merge_reason(merge_reason_) + && OB_FAIL(get_adaptive_reason(schedule_major_snapshot))) { LOG_WARN("failed to get adaptive reason", KR(ret), K(schedule_major_snapshot)); - } else if (ObAdaptiveMergePolicy::is_valid_merge_reason(adaptive_merge_reason)) { + } else if (ObAdaptiveMergePolicy::is_valid_merge_reason(merge_reason_)) { schedule_medium_flag = true; } LOG_TRACE("schedule next medium in primary cluster", K(ret), KPC(this), K(schedule_medium_flag), - K(schedule_major_snapshot), K(adaptive_merge_reason), K(last_major_snapshot_version), KPC_(medium_info_list), K(max_sync_medium_scn)); + K(schedule_major_snapshot), K(merge_reason_), K(last_major_snapshot_version), KPC_(medium_info_list), K(max_sync_medium_scn)); #ifdef ERRSIM if (OB_SUCC(ret)) { if (tablet->get_tablet_meta().tablet_id_.id() > ObTabletID::MIN_USER_TABLET_ID) { @@ -407,13 +395,14 @@ int ObMediumCompactionScheduleFunc::schedule_next_medium_primary_cluster( schedule_flag = true; } if (OB_SUCC(ret) && schedule_flag) { - ret = decide_medium_snapshot(adaptive_merge_reason); + ret = decide_medium_snapshot(); } return ret; } -int ObMediumCompactionScheduleFunc::choose_medium_scn_for_user_request( +int ObMediumCompactionScheduleFunc::choose_scn_for_user_request( + const int64_t max_sync_medium_scn, ObMediumCompactionInfo &medium_info, ObGetMergeTablesResult &result, int64_t &schema_version) @@ -421,9 +410,7 @@ int ObMediumCompactionScheduleFunc::choose_medium_scn_for_user_request( int ret = OB_SUCCESS; // check exist not finish freeze info schema_version = 0; - int64_t max_sync_medium_scn = 0; int64_t max_reserved_snapshot = 0; - const share::SCN &weak_read_ts = ls_.get_ls_wrs_handler()->get_ls_weak_read_ts(); const int64_t latest_frozen_version = MTL(ObTenantFreezeInfoMgr*)->get_latest_frozen_version(); const int64_t last_major_snapshot_version = tablet_handle_.get_obj()->get_last_major_snapshot_version(); ObTablet *tablet = nullptr; @@ -436,17 +423,15 @@ int ObMediumCompactionScheduleFunc::choose_medium_scn_for_user_request( } else if (latest_frozen_version > last_major_snapshot_version) { ret = OB_NO_NEED_MERGE; LOG_WARN("unfinished freeze info exist, can't schedule another medium", K(ret)); - } else if (OB_FAIL(tablet->get_max_sync_medium_scn(max_sync_medium_scn))) { - LOG_WARN("failed to get max sync medium scn", K(ret), KPC(this)); } else if (OB_FAIL(get_max_reserved_snapshot(max_reserved_snapshot))) { LOG_WARN("failed to get reserved snapshot", K(ret), KPC(this)); - } else if (FALSE_IT(medium_info.medium_snapshot_ = MAX(max_reserved_snapshot, weak_read_ts.get_val_for_tx()))) { + } else if (FALSE_IT(medium_info.medium_snapshot_ = MAX(max_reserved_snapshot, weak_read_ts_))) { } else if (medium_info.medium_snapshot_ < max_sync_medium_scn) { ret = OB_NO_NEED_MERGE; LOG_WARN("chosen medium snapshot is synced before", K(ret), K(medium_info), K(max_sync_medium_scn)); } else { medium_info.compaction_type_ = ObMediumCompactionInfo::MEDIUM_COMPACTION; - medium_info.medium_merge_reason_ = is_rebuild_column_group_ ? ObAdaptiveMergePolicy::REBUILD_COLUMN_GROUP : ObAdaptiveMergePolicy::USER_REQUEST; + medium_info.medium_merge_reason_ = merge_reason_; if (OB_FAIL(ObPartitionMergePolicy::get_result_by_snapshot(*tablet, medium_info.medium_snapshot_, result))) { LOG_WARN("failed to get result for major", K(ret), K(last_major_snapshot_version), K(medium_info)); } else if (OB_FAIL(tablet->get_newest_schema_version(schema_version))) { @@ -461,7 +446,7 @@ int ObMediumCompactionScheduleFunc::choose_medium_scn_for_user_request( int ObMediumCompactionScheduleFunc::check_frequency( const int64_t max_reserved_snapshot, - ObMediumCompactionInfo &medium_info) + const int64_t medium_snapshot) { int ret = OB_SUCCESS; ObTablet *tablet = tablet_handle_.get_obj(); @@ -471,9 +456,9 @@ int ObMediumCompactionScheduleFunc::check_frequency( const int64_t last_major_snapshot_version = tablet->get_last_major_snapshot_version(); if (0 >= last_major_snapshot_version) { LOG_WARN("major sstable should not be empty", K(ret), K(last_major_snapshot_version)); - } else if (last_major_snapshot_version + time_interval > medium_info.medium_snapshot_) { + } else if (last_major_snapshot_version + time_interval > medium_snapshot) { ret = OB_NO_NEED_MERGE; - LOG_DEBUG("schedule medium frequently", K(ret), K(last_major_snapshot_version), K(medium_info), + LOG_DEBUG("schedule medium frequently", K(ret), K(last_major_snapshot_version), K(medium_snapshot), K(time_interval)); } } @@ -504,6 +489,7 @@ int ObMediumCompactionScheduleFunc::get_max_reserved_snapshot(int64_t &max_reser LOG_WARN("failed to get reserved snapshot from freeze info mgr", K(ret), "tablet_id", tablet->get_tablet_meta().tablet_id_); } else { max_reserved_snapshot = MAX(ls_.get_min_reserved_snapshot(), snapshot_info.snapshot_); + LOG_TRACE("get max reserved snapshot", KR(ret), K(max_reserved_snapshot), K(snapshot_info)); } return ret; } @@ -541,15 +527,13 @@ int ObMediumCompactionScheduleFunc::choose_new_medium_snapshot( return ret; } -int ObMediumCompactionScheduleFunc::decide_medium_snapshot( - const ObAdaptiveMergePolicy::AdaptiveMergeReason merge_reason) +int ObMediumCompactionScheduleFunc::decide_medium_snapshot() { int ret = OB_SUCCESS; int tmp_ret = OB_SUCCESS; int64_t max_sync_medium_scn = 0; uint64_t compat_version = 0; ObTablet *tablet = nullptr; - const bool is_major = (merge_reason == ObAdaptiveMergePolicy::TENANT_MAJOR); if (OB_UNLIKELY(!tablet_handle_.is_valid())) { ret = OB_ERR_UNEXPECTED; LOG_WARN("invalid tablet_handle", K(ret), K(tablet_handle_)); @@ -564,61 +548,31 @@ int ObMediumCompactionScheduleFunc::decide_medium_snapshot( LOG_WARN("failed to add dependent tablet", K(ret), KPC(this)); } else { const ObTabletID &tablet_id = tablet->get_tablet_meta().tablet_id_; - LOG_TRACE("decide_medium_snapshot", K(ret), KPC(this), K(compat_version), K(tablet_id), K(max_sync_medium_scn), K(merge_reason)); - int64_t max_reserved_snapshot = 0; + LOG_TRACE("decide_medium_snapshot", K(ret), KPC(this), K(compat_version), K(tablet_id), K(max_sync_medium_scn), K_(merge_reason)); int64_t schema_version = 0; ObGetMergeTablesResult result; ObMediumCompactionInfo medium_info; if (OB_FAIL(medium_info.init_data_version(compat_version))) { LOG_WARN("fail to set data version", K(ret), K(tablet_id), K(compat_version)); - } else if (is_user_request(merge_reason)) { - if (OB_FAIL(choose_medium_scn_for_user_request(medium_info, result, schema_version))) { + } else if (is_user_request(merge_reason_)) { + if (OB_FAIL(choose_scn_for_user_request(max_sync_medium_scn, medium_info, result, schema_version))) { LOG_WARN("failed to choose medium scn for user request", K(ret), KPC(this)); } - } else if (OB_FAIL(choose_medium_scn[is_major](*this, ls_, *tablet, merge_reason, allocator_, medium_info, result, schema_version))) { - LOG_WARN("failed to choose medium snapshot", K(ret), KPC(this)); - } else if (medium_info.medium_snapshot_ <= max_sync_medium_scn) { - ret = OB_NO_NEED_MERGE; - } else if (is_major) { - // do nothing - } else if (OB_FAIL(get_max_reserved_snapshot(max_reserved_snapshot))) { - LOG_WARN("failed to get multi_version_start", K(ret), KPC(this)); - } else if (medium_info.medium_snapshot_ < max_reserved_snapshot - || medium_info.medium_snapshot_ > tablet->get_snapshot_version()) { - // chosen medium snapshot is far too old - if (OB_FAIL(choose_new_medium_snapshot(max_reserved_snapshot, medium_info, result, schema_version))) { - LOG_WARN("failed to choose new medium snapshot", KR(ret), K(medium_info)); + } else if (ObAdaptiveMergePolicy::TENANT_MAJOR == merge_reason_) { + if (OB_FAIL(choose_major_snapshot(max_sync_medium_scn, medium_info, result, schema_version))) { + LOG_WARN("failed to choose medium scn for major", K(ret), KPC(this)); } - } else if (OB_FAIL(tablet->get_schema_version_from_storage_schema(schema_version))){ - LOG_WARN("failed to get schema version from tablet", KR(ret), KPC(tablet)); + } else if (OB_FAIL(choose_medium_snapshot(max_sync_medium_scn, medium_info, result, schema_version))) { + LOG_WARN("failed to choose medium scn for medium", K(ret), KPC(this)); } if (OB_FAIL(ret)) { } else if (medium_info.medium_snapshot_ <= max_sync_medium_scn) { ret = OB_NO_NEED_MERGE; - } else if (is_major || is_user_request(merge_reason)) { - // do nothing - } else if (OB_FAIL(check_frequency(max_reserved_snapshot, medium_info))) { // check schedule interval - if (OB_NO_NEED_MERGE != ret) { - LOG_WARN("failed to check medium scn valid", K(ret), KPC(this)); - } } #ifdef ERRSIM if (OB_SUCC(ret) || OB_NO_NEED_MERGE == ret) { - if (tablet_id.id() > ObTabletID::MIN_USER_TABLET_ID) { - ret = OB_E(EventTable::EN_SCHEDULE_MEDIUM_COMPACTION) ret; - } - if (OB_FAIL(ret)) { - LOG_INFO("ERRSIM EN_SCHEDULE_MEDIUM_COMPACTION", K(ret), KPC(this)); - const int64_t snapshot_gc_ts = MTL(ObTenantFreezeInfoMgr*)->get_snapshot_gc_ts(); - medium_info.medium_snapshot_ = MIN(weak_read_ts_, snapshot_gc_ts); - medium_info.compaction_type_ = ObMediumCompactionInfo::MEDIUM_COMPACTION; - if (medium_info.medium_snapshot_ > max_sync_medium_scn - && medium_info.medium_snapshot_ >= max_reserved_snapshot) { - FLOG_INFO("ERRSIM EN_SCHEDULE_MEDIUM_COMPACTION", KPC(this)); - ret = OB_SUCCESS; - } - } + ret = errsim_choose_medium_snapshot(max_sync_medium_scn, medium_info); } if (OB_SUCC(ret)) { ret = OB_E(EventTable::EN_SCHEDULE_MEDIUM_FAILED) ret; @@ -659,6 +613,34 @@ int ObMediumCompactionScheduleFunc::decide_medium_snapshot( return ret; } +int ObMediumCompactionScheduleFunc::errsim_choose_medium_snapshot( + const int64_t max_sync_medium_scn, + ObMediumCompactionInfo &medium_info) +{ + int ret = OB_SUCCESS; + if (tablet_handle_.get_obj()->get_tablet_meta().tablet_id_.id() > ObTabletID::MIN_USER_TABLET_ID) { + ret = OB_E(EventTable::EN_SCHEDULE_MEDIUM_COMPACTION) ret; + } + if (OB_FAIL(ret)) { + LOG_INFO("ERRSIM EN_SCHEDULE_MEDIUM_COMPACTION", K(ret), KPC(this)); + const int64_t snapshot_gc_ts = + MTL(ObTenantFreezeInfoMgr *)->get_snapshot_gc_ts(); + medium_info.medium_snapshot_ = MIN(weak_read_ts_, snapshot_gc_ts); + medium_info.compaction_type_ = ObMediumCompactionInfo::MEDIUM_COMPACTION; + int64_t max_reserved_snapshot = 0; + if (OB_FAIL(get_max_reserved_snapshot(max_reserved_snapshot))) { + LOG_WARN("failed to get reserved snapshot", K(ret), KPC(this)); + } else if (medium_info.medium_snapshot_ > max_sync_medium_scn + && medium_info.medium_snapshot_ >= max_reserved_snapshot) { + FLOG_INFO("ERRSIM EN_SCHEDULE_MEDIUM_COMPACTION", KPC(this)); + ret = OB_SUCCESS; + } else { + ret = OB_NO_NEED_MERGE; + } + } + return ret; +} + int ObMediumCompactionScheduleFunc::init_schema_changed( ObMediumCompactionInfo &medium_info) { diff --git a/src/storage/compaction/ob_medium_compaction_func.h b/src/storage/compaction/ob_medium_compaction_func.h index e59d7e595..c74cefb3f 100644 --- a/src/storage/compaction/ob_medium_compaction_func.h +++ b/src/storage/compaction/ob_medium_compaction_func.h @@ -33,14 +33,14 @@ public: const SCN &weak_read_ts, const ObMediumCompactionInfoList &medium_info_list, ObScheduleStatistics *schedule_stat, - const bool is_rebuild_column_group = false) + const ObAdaptiveMergePolicy::AdaptiveMergeReason merge_reason = ObAdaptiveMergePolicy::NONE) : allocator_("MediumSchedule"), ls_(ls), tablet_handle_(tablet_handle), weak_read_ts_(weak_read_ts.get_val_for_tx()), medium_info_list_(&medium_info_list), schedule_stat_(schedule_stat), - is_rebuild_column_group_(is_rebuild_column_group) + merge_reason_(merge_reason) {} ~ObMediumCompactionScheduleFunc() {} @@ -76,13 +76,11 @@ public: ObCompactionTimeGuard &time_guard); int schedule_next_medium_for_leader( - const int64_t major_snapshot, - const bool force_schedule); + const int64_t major_snapshot); int64_t to_string(char* buf, const int64_t buf_len) const; protected: - int decide_medium_snapshot( - const ObAdaptiveMergePolicy::AdaptiveMergeReason merge_reason); + int decide_medium_snapshot(); static int get_status_from_inner_table( const ObLSID &ls_id, const ObTabletID &tablet_id, @@ -120,35 +118,21 @@ protected: static int batch_check_medium_checksum( const ObIArray &tablet_ls_infos, const ObIArray &checksum_items); - static int choose_medium_snapshot( - const ObMediumCompactionScheduleFunc &func, - ObLS &ls, - ObTablet &tablet, - const ObAdaptiveMergePolicy::AdaptiveMergeReason &merge_reason, - ObArenaAllocator &allocator, + int choose_medium_snapshot( + const int64_t max_sync_medium_scn, ObMediumCompactionInfo &medium_info, ObGetMergeTablesResult &result, int64_t &schema_version); - static int choose_major_snapshot( - const ObMediumCompactionScheduleFunc &func, - ObLS &ls, - ObTablet &tablet, - const ObAdaptiveMergePolicy::AdaptiveMergeReason &merge_reason, - ObArenaAllocator &allocator, + int choose_major_snapshot( + const int64_t max_sync_medium_scn, ObMediumCompactionInfo &medium_info, ObGetMergeTablesResult &result, int64_t &schema_version); - static int find_valid_freeze_info( - ObTablet &tablet, - ObArenaAllocator &allocator, + int find_valid_freeze_info( ObMediumCompactionInfo &medium_info, share::ObFreezeInfo &freeze_info, bool &force_schedule_medium_merge); - static int switch_to_choose_medium_snapshot( - const ObMediumCompactionScheduleFunc &func, - ObArenaAllocator &allocator, - ObLS &ls, - ObTablet &tablet, + int switch_to_choose_medium_snapshot( const int64_t freeze_version, ObMediumCompactionInfo &medium_info, int64_t &schema_version); @@ -161,8 +145,7 @@ protected: bool &tablet_need_freeze_flag, bool &create_dag_flag); int schedule_next_medium_primary_cluster( - const int64_t major_snapshot, - const bool force_schedule); + const int64_t major_snapshot); int choose_new_medium_snapshot( const int64_t max_reserved_snapshot, @@ -178,43 +161,33 @@ protected: int check_frequency( const int64_t max_reserved_snapshot, - ObMediumCompactionInfo &medium_info); - int choose_medium_scn_for_user_request( + const int64_t medium_snapshot); + int choose_scn_for_user_request( + const int64_t max_sync_medium_scn, ObMediumCompactionInfo &medium_info, ObGetMergeTablesResult &result, int64_t &schema_version); - int get_adaptive_reason( - const int64_t schedule_major_snapshot, - const bool force_schedule, - ObAdaptiveMergePolicy::AdaptiveMergeReason &adaptive_merge_reason); + int get_adaptive_reason(const int64_t schedule_major_snapshot); static const int64_t DEFAULT_SCHEDULE_MEDIUM_INTERVAL = 60L * 1000L * 1000L; // 60s static constexpr double SCHEDULE_RANGE_INC_ROW_COUNT_PERCENRAGE_THRESHOLD = 0.2; static const int64_t SCHEDULE_RANGE_ROW_COUNT_THRESHOLD = 1000 * 1000L; // 100w - static const int64_t MEDIUM_FUNC_CNT = 2; - typedef int (*ChooseMediumScn)( - const ObMediumCompactionScheduleFunc &func, - ObLS &ls, - ObTablet &tablet, - const ObAdaptiveMergePolicy::AdaptiveMergeReason &merge_reason, - ObArenaAllocator &allocator, - ObMediumCompactionInfo &medium_info, - ObGetMergeTablesResult &result, - int64_t &schema_version); - static ChooseMediumScn choose_medium_scn[MEDIUM_FUNC_CNT]; static bool is_user_request(const ObAdaptiveMergePolicy::AdaptiveMergeReason merge_reason) { return ObAdaptiveMergePolicy::USER_REQUEST == merge_reason || ObAdaptiveMergePolicy::REBUILD_COLUMN_GROUP == merge_reason; } + int errsim_choose_medium_snapshot( + const int64_t max_sync_medium_scn, + ObMediumCompactionInfo &medium_info); private: ObArenaAllocator allocator_; ObLS &ls_; ObTabletHandle tablet_handle_; - int64_t weak_read_ts_; // weak_read_ts_ should get before tablet + const int64_t weak_read_ts_; // weak_read_ts_ should get before tablet const ObMediumCompactionInfoList *medium_info_list_; ObScheduleStatistics *schedule_stat_; - const bool is_rebuild_column_group_; + ObAdaptiveMergePolicy::AdaptiveMergeReason merge_reason_; }; } //namespace compaction diff --git a/src/storage/compaction/ob_tenant_tablet_scheduler.cpp b/src/storage/compaction/ob_tenant_tablet_scheduler.cpp index 9504e2278..05490c3a5 100644 --- a/src/storage/compaction/ob_tenant_tablet_scheduler.cpp +++ b/src/storage/compaction/ob_tenant_tablet_scheduler.cpp @@ -1362,7 +1362,7 @@ int ObTenantTabletScheduler::schedule_next_medium_for_leader( if ((!tablet_merge_finish || get_enable_adaptive_compaction()) // schedule major or adaptive compaction && tablet_could_schedule_merge) { if (OB_FAIL(func.schedule_next_medium_for_leader( - tablet_merge_finish ? 0 : major_merge_version, false/*force_schedule*/))) { // schedule another round + tablet_merge_finish ? 0 : major_merge_version))) { // schedule another round LOG_WARN("failed to schedule next medium", K(ret), K(ls_id), K(tablet_id)); if (OB_FAIL(MTL(compaction::ObDiagnoseTabletMgr *)->add_diagnose_tablet(ls_id, tablet_id, share::ObDiagnoseTabletType::TYPE_MEDIUM_MERGE))) { @@ -1636,15 +1636,15 @@ int ObTenantTabletScheduler::schedule_tablet_medium( && tablet_could_schedule_merge) { // schedule another round ObMediumCompactionScheduleFunc func(ls, tablet_handle, weak_read_ts, *medium_list, &schedule_stats_); - if (OB_TMP_FAIL(func.schedule_next_medium_for_leader( - tablet_merge_finish ? 0 : merge_version, false /*force_schedule*/))) { + if (OB_TMP_FAIL(func.schedule_next_medium_for_leader(tablet_merge_finish ? 0 : merge_version))) { if (OB_NOT_MASTER == tmp_ret) { is_leader = false; } else { LOG_WARN("failed to schedule next medium", K(tmp_ret), K(ls_id), K(tablet_id)); } need_diagnose = true; - } else if (FALSE_IT(time_guard.click(ObCompactionScheduleTimeGuard::SCHEDULE_NEXT_MEDIUM))){ + } else { + time_guard.click(ObCompactionScheduleTimeGuard::SCHEDULE_NEXT_MEDIUM); } } @@ -1895,7 +1895,8 @@ int ObTenantTabletScheduler::try_schedule_tablet_medium_merge( } else { ObMediumCompactionScheduleFunc func( *ls_handle.get_ls(), tablet_handle, weak_read_ts, *medium_info_list, - nullptr /*schedule_stat*/, is_rebuild_column_group); + nullptr /*schedule_stat*/, + is_rebuild_column_group ? ObAdaptiveMergePolicy::REBUILD_COLUMN_GROUP : ObAdaptiveMergePolicy::USER_REQUEST); const int64_t merge_version = get_frozen_version(); const int64_t last_major_snapshot_version = tablet_handle.get_obj()->get_last_major_snapshot_version(); @@ -1907,7 +1908,7 @@ int ObTenantTabletScheduler::try_schedule_tablet_medium_merge( ret = OB_NOT_SUPPORTED; LOG_WARN("tablet need check finish, can't schedule another medium", K(ret), K(ls_id), K(tablet_id), "wait_check_medium_scn", medium_info_list->get_wait_check_medium_scn()); - } else if (OB_TMP_FAIL(func.schedule_next_medium_for_leader(0/*major_snapshot*/, true/*force_schedule*/))) { + } else if (OB_TMP_FAIL(func.schedule_next_medium_for_leader(0/*major_snapshot*/))) { if (OB_EAGAIN != tmp_ret) { LOG_WARN("failed to schedule medium", K(tmp_ret), K(ls_id), K(tablet_id)); }