diff --git a/src/storage/compaction/ob_medium_compaction_func.cpp b/src/storage/compaction/ob_medium_compaction_func.cpp index 71a481fe77..749e77361f 100755 --- a/src/storage/compaction/ob_medium_compaction_func.cpp +++ b/src/storage/compaction/ob_medium_compaction_func.cpp @@ -63,7 +63,7 @@ int ObMediumCompactionScheduleFunc::choose_medium_snapshot( ObLS &ls, ObTablet &tablet, const ObAdaptiveMergePolicy::AdaptiveMergeReason &merge_reason, - ObIAllocator &allocator, + ObArenaAllocator &allocator, ObMediumCompactionInfo &medium_info, ObGetMergeTablesResult &result, int64_t &schema_version) @@ -95,7 +95,7 @@ int ObMediumCompactionScheduleFunc::choose_major_snapshot( ObLS &ls, ObTablet &tablet, const ObAdaptiveMergePolicy::AdaptiveMergeReason &merge_reason, - ObIAllocator &allocator, + ObArenaAllocator &allocator, ObMediumCompactionInfo &medium_info, ObGetMergeTablesResult &result, int64_t &schema_version) @@ -130,6 +130,8 @@ int ObMediumCompactionScheduleFunc::choose_major_snapshot( ret = OB_ERR_UNEXPECTED; LOG_WARN("failed to get schema service from MTL", K(ret)); } + + bool schedule_medium_merge = false; while (OB_SUCC(ret)) { if (OB_FAIL(MTL_CALL_FREEZE_INFO_MGR(get_freeze_info_behind_snapshot_version, schedule_snapshot, freeze_info))) { if (OB_ENTRY_NOT_EXIST != ret) { @@ -141,9 +143,10 @@ int ObMediumCompactionScheduleFunc::choose_major_snapshot( ret = OB_ERR_UNEXPECTED; LOG_WARN("schema version is invalid", K(ret), K(ls_id), K(tablet_id), K(freeze_info)); } else if (OB_UNLIKELY(freeze_info.schema_version < last_sstable_schema_version)) { - ret = OB_ERR_UNEXPECTED; - LOG_ERROR("schema version in freeze info is less than last sstable", K(ret), K(ls_id), K(tablet_id), - K(freeze_info), K(last_sstable_schema_version)); + schedule_medium_merge = true; + FLOG_INFO("schema version in freeze info is too small, try to schedule medium compaction instead", K(ret), + K(ls_id), K(tablet_id), K(last_sstable_schema_version), K(freeze_info)); + break; } else if (OB_FAIL(get_table_schema_to_merge( *schema_service, tablet, freeze_info.schema_version, allocator, medium_info))) { if (OB_TABLE_IS_DELETED == ret) { @@ -167,18 +170,53 @@ int ObMediumCompactionScheduleFunc::choose_major_snapshot( break; } } // end of while - if (OB_SUCC(ret)) { + + if (OB_FAIL(ret)) { + } else if (schedule_medium_merge) { + if (OB_FAIL(switch_to_choose_medium_snapshot(allocator, ls, tablet, freeze_info.freeze_version, medium_info, schema_version))) { + if (OB_EAGAIN != ret) { + LOG_WARN("failed to switch to choose medium snapshot", K(ret), K(tablet)); + } + } + } else { medium_info.compaction_type_ = ObMediumCompactionInfo::MAJOR_COMPACTION; medium_info.medium_merge_reason_ = ObAdaptiveMergePolicy::AdaptiveMergeReason::NONE; medium_info.medium_snapshot_ = freeze_info.freeze_version; schema_version = freeze_info.schema_version; - if (OB_FAIL(get_result_for_major(tablet, medium_info, result))) { - LOG_WARN("failed get result for major", K(ret), K(medium_info)); - } else { - LOG_TRACE("choose_major_snapshot", K(ret), "ls_id", ls.get_ls_id(), - "tablet_id", tablet.get_tablet_meta().tablet_id_, K(medium_info), K(freeze_info), K(result), - K(last_sstable_schema_version)); - } + } + + if (FAILEDx(get_result_for_major(tablet, medium_info, result))) { + LOG_WARN("failed get result for major", K(ret), K(medium_info)); + } else { + LOG_TRACE("choose_major_snapshot", K(ret), "ls_id", ls.get_ls_id(), + "tablet_id", tablet.get_tablet_meta().tablet_id_, K(medium_info), K(freeze_info), K(result), + K(last_sstable_schema_version)); + } + return ret; +} + +int ObMediumCompactionScheduleFunc::switch_to_choose_medium_snapshot( + ObArenaAllocator &allocator, + ObLS &ls, + ObTablet &tablet, + const int64_t freeze_version, + ObMediumCompactionInfo &medium_info, + int64_t &schema_version) +{ + int ret = OB_SUCCESS; + const int64_t ls_weak_read_ts = ls.get_ls_wrs_handler()->get_ls_weak_read_ts().get_val_for_tx(); + int64_t medium_snapshot = 0; + + if (ls_weak_read_ts < freeze_version + 1) { + ret = OB_EAGAIN; + LOG_WARN("weak read ts is smaller than new medium snapshot, try later", K(ret), K(tablet)); + } else if (FALSE_IT(medium_snapshot = MAX(ls_weak_read_ts, freeze_version + 1))) { + } else if (OB_FAIL(choose_medium_schema_version(allocator, medium_snapshot, tablet, schema_version))) { + LOG_WARN("fail to choose medium schema version", K(ret), K(tablet)); + } else { + medium_info.compaction_type_ = ObMediumCompactionInfo::MEDIUM_COMPACTION; + medium_info.medium_merge_reason_ = ObAdaptiveMergePolicy::AdaptiveMergeReason::NONE; + medium_info.medium_snapshot_ = medium_snapshot; } return ret; } @@ -374,7 +412,8 @@ int ObMediumCompactionScheduleFunc::choose_new_medium_snapshot( } int ObMediumCompactionScheduleFunc::choose_medium_schema_version( - const ObMediumCompactionInfo &medium_info, + common::ObArenaAllocator &allocator, + const int64_t medium_snapshot, ObTablet &tablet, int64_t &schema_version) { @@ -384,10 +423,10 @@ int ObMediumCompactionScheduleFunc::choose_medium_schema_version( int64_t store_column_cnt_in_schema = 0; ObSEArray memtables; - if (OB_FAIL(tablet.load_storage_schema(allocator_, schema_on_tablet))) { + if (OB_FAIL(tablet.load_storage_schema(allocator, schema_on_tablet))) { LOG_WARN("fail to load storage schema", K(ret)); } else if (FALSE_IT(schema_version = schema_on_tablet->schema_version_)) { - } else if (medium_info.medium_snapshot_ <= tablet_snapshot_version) { + } else if (medium_snapshot <= tablet_snapshot_version) { // do nothing, use schema version on tablet } else if (OB_FAIL(schema_on_tablet->get_store_column_count(store_column_cnt_in_schema, true/*full_col*/))) { LOG_WARN("failed to get store column count", K(ret), K(store_column_cnt_in_schema)); @@ -463,7 +502,7 @@ int ObMediumCompactionScheduleFunc::decide_medium_snapshot( OB_FAIL(choose_new_medium_snapshot(max_reserved_snapshot, medium_info, result))) { // chosen medium snapshot is far too old LOG_WARN("failed to choose new medium snapshot", KR(ret), K(max_reserved_snapshot), K(medium_info)); - } else if (OB_FAIL(choose_medium_schema_version(medium_info, *tablet, schema_version))) { + } else if (OB_FAIL(choose_medium_schema_version(allocator_, medium_info.medium_snapshot_, *tablet, schema_version))) { LOG_WARN("failed to choose medium schema version", K(ret), K(tablet)); } diff --git a/src/storage/compaction/ob_medium_compaction_func.h b/src/storage/compaction/ob_medium_compaction_func.h index 8c2d92d6bc..370d284ae7 100755 --- a/src/storage/compaction/ob_medium_compaction_func.h +++ b/src/storage/compaction/ob_medium_compaction_func.h @@ -99,7 +99,7 @@ protected: ObLS &ls, ObTablet &tablet, const ObAdaptiveMergePolicy::AdaptiveMergeReason &merge_reason, - ObIAllocator &allocator, + ObArenaAllocator &allocator, ObMediumCompactionInfo &medium_info, ObGetMergeTablesResult &result, int64_t &schema_version); @@ -107,10 +107,18 @@ protected: ObLS &ls, ObTablet &tablet, const ObAdaptiveMergePolicy::AdaptiveMergeReason &merge_reason, - ObIAllocator &allocator, + ObArenaAllocator &allocator, ObMediumCompactionInfo &medium_info, ObGetMergeTablesResult &result, int64_t &schema_version); + static int switch_to_choose_medium_snapshot( + ObArenaAllocator &allocator, + ObLS &ls, + ObTablet &tablet, + const int64_t freeze_version, + ObMediumCompactionInfo &medium_info, + int64_t &schema_version); + static int check_need_merge_and_schedule( ObLS &ls, ObTablet &tablet, @@ -121,8 +129,9 @@ protected: const int64_t max_reserved_snapshot, ObMediumCompactionInfo &medium_info, ObGetMergeTablesResult &result); - int choose_medium_schema_version( - const ObMediumCompactionInfo &medium_info, + static int choose_medium_schema_version( + common::ObArenaAllocator &allocator, + const int64_t medium_snapshot, ObTablet &tablet, int64_t &schema_version); int get_max_reserved_snapshot(int64_t &max_reserved_snapshot); @@ -139,7 +148,7 @@ protected: ObLS &ls, ObTablet &tablet, const ObAdaptiveMergePolicy::AdaptiveMergeReason &merge_reason, - ObIAllocator &allocator, + ObArenaAllocator &allocator, ObMediumCompactionInfo &medium_info, ObGetMergeTablesResult &result, int64_t &schema_version); diff --git a/src/storage/compaction/ob_tablet_merge_ctx.cpp b/src/storage/compaction/ob_tablet_merge_ctx.cpp index 38910ee786..02c25014f4 100755 --- a/src/storage/compaction/ob_tablet_merge_ctx.cpp +++ b/src/storage/compaction/ob_tablet_merge_ctx.cpp @@ -1026,7 +1026,6 @@ int ObTabletMergeCtx::init_merge_info() return ret; } - int ObTabletMergeCtx::get_storage_schema_to_merge(const ObTablesHandleArray &merge_tables_handle) { int ret = OB_SUCCESS; @@ -1059,8 +1058,10 @@ int ObTabletMergeCtx::get_storage_schema_to_merge(const ObTablesHandleArray &mer } // end of for if (OB_FAIL(ret)) { - } else if (max_column_cnt_in_memtable > column_cnt_in_schema - || max_schema_version_in_memtable > schema_on_tablet->get_schema_version()) { + } else if (max_column_cnt_in_memtable <= column_cnt_in_schema + && max_schema_version_in_memtable <= schema_on_tablet->get_schema_version()) { + // do nothing + } else { // need alloc new storage schema & set column cnt void *buf = nullptr; if (OB_ISNULL(buf = allocator_.alloc(sizeof(ObStorageSchema)))) {