fix schema 4016 problem in compaction

This commit is contained in:
Fengjingkun
2023-07-21 10:42:43 +00:00
committed by ob-robot
parent 3ba3e1b1ee
commit b2db80b767
3 changed files with 74 additions and 25 deletions

View File

@ -63,7 +63,7 @@ int ObMediumCompactionScheduleFunc::choose_medium_snapshot(
ObLS &ls, ObLS &ls,
ObTablet &tablet, ObTablet &tablet,
const ObAdaptiveMergePolicy::AdaptiveMergeReason &merge_reason, const ObAdaptiveMergePolicy::AdaptiveMergeReason &merge_reason,
ObIAllocator &allocator, ObArenaAllocator &allocator,
ObMediumCompactionInfo &medium_info, ObMediumCompactionInfo &medium_info,
ObGetMergeTablesResult &result, ObGetMergeTablesResult &result,
int64_t &schema_version) int64_t &schema_version)
@ -95,7 +95,7 @@ int ObMediumCompactionScheduleFunc::choose_major_snapshot(
ObLS &ls, ObLS &ls,
ObTablet &tablet, ObTablet &tablet,
const ObAdaptiveMergePolicy::AdaptiveMergeReason &merge_reason, const ObAdaptiveMergePolicy::AdaptiveMergeReason &merge_reason,
ObIAllocator &allocator, ObArenaAllocator &allocator,
ObMediumCompactionInfo &medium_info, ObMediumCompactionInfo &medium_info,
ObGetMergeTablesResult &result, ObGetMergeTablesResult &result,
int64_t &schema_version) int64_t &schema_version)
@ -130,6 +130,8 @@ int ObMediumCompactionScheduleFunc::choose_major_snapshot(
ret = OB_ERR_UNEXPECTED; ret = OB_ERR_UNEXPECTED;
LOG_WARN("failed to get schema service from MTL", K(ret)); LOG_WARN("failed to get schema service from MTL", K(ret));
} }
bool schedule_medium_merge = false;
while (OB_SUCC(ret)) { while (OB_SUCC(ret)) {
if (OB_FAIL(MTL_CALL_FREEZE_INFO_MGR(get_freeze_info_behind_snapshot_version, schedule_snapshot, freeze_info))) { if (OB_FAIL(MTL_CALL_FREEZE_INFO_MGR(get_freeze_info_behind_snapshot_version, schedule_snapshot, freeze_info))) {
if (OB_ENTRY_NOT_EXIST != ret) { if (OB_ENTRY_NOT_EXIST != ret) {
@ -141,9 +143,10 @@ int ObMediumCompactionScheduleFunc::choose_major_snapshot(
ret = OB_ERR_UNEXPECTED; ret = OB_ERR_UNEXPECTED;
LOG_WARN("schema version is invalid", K(ret), K(ls_id), K(tablet_id), K(freeze_info)); LOG_WARN("schema version is invalid", K(ret), K(ls_id), K(tablet_id), K(freeze_info));
} else if (OB_UNLIKELY(freeze_info.schema_version < last_sstable_schema_version)) { } else if (OB_UNLIKELY(freeze_info.schema_version < last_sstable_schema_version)) {
ret = OB_ERR_UNEXPECTED; schedule_medium_merge = true;
LOG_ERROR("schema version in freeze info is less than last sstable", K(ret), K(ls_id), K(tablet_id), FLOG_INFO("schema version in freeze info is too small, try to schedule medium compaction instead", K(ret),
K(freeze_info), K(last_sstable_schema_version)); K(ls_id), K(tablet_id), K(last_sstable_schema_version), K(freeze_info));
break;
} else if (OB_FAIL(get_table_schema_to_merge( } else if (OB_FAIL(get_table_schema_to_merge(
*schema_service, tablet, freeze_info.schema_version, allocator, medium_info))) { *schema_service, tablet, freeze_info.schema_version, allocator, medium_info))) {
if (OB_TABLE_IS_DELETED == ret) { if (OB_TABLE_IS_DELETED == ret) {
@ -167,18 +170,53 @@ int ObMediumCompactionScheduleFunc::choose_major_snapshot(
break; break;
} }
} // end of while } // end of while
if (OB_SUCC(ret)) {
if (OB_FAIL(ret)) {
} else if (schedule_medium_merge) {
if (OB_FAIL(switch_to_choose_medium_snapshot(allocator, ls, tablet, freeze_info.freeze_version, medium_info, schema_version))) {
if (OB_EAGAIN != ret) {
LOG_WARN("failed to switch to choose medium snapshot", K(ret), K(tablet));
}
}
} else {
medium_info.compaction_type_ = ObMediumCompactionInfo::MAJOR_COMPACTION; medium_info.compaction_type_ = ObMediumCompactionInfo::MAJOR_COMPACTION;
medium_info.medium_merge_reason_ = ObAdaptiveMergePolicy::AdaptiveMergeReason::NONE; medium_info.medium_merge_reason_ = ObAdaptiveMergePolicy::AdaptiveMergeReason::NONE;
medium_info.medium_snapshot_ = freeze_info.freeze_version; medium_info.medium_snapshot_ = freeze_info.freeze_version;
schema_version = freeze_info.schema_version; schema_version = freeze_info.schema_version;
if (OB_FAIL(get_result_for_major(tablet, medium_info, result))) { }
LOG_WARN("failed get result for major", K(ret), K(medium_info));
} else { if (FAILEDx(get_result_for_major(tablet, medium_info, result))) {
LOG_TRACE("choose_major_snapshot", K(ret), "ls_id", ls.get_ls_id(), LOG_WARN("failed get result for major", K(ret), K(medium_info));
"tablet_id", tablet.get_tablet_meta().tablet_id_, K(medium_info), K(freeze_info), K(result), } else {
K(last_sstable_schema_version)); LOG_TRACE("choose_major_snapshot", K(ret), "ls_id", ls.get_ls_id(),
} "tablet_id", tablet.get_tablet_meta().tablet_id_, K(medium_info), K(freeze_info), K(result),
K(last_sstable_schema_version));
}
return ret;
}
int ObMediumCompactionScheduleFunc::switch_to_choose_medium_snapshot(
ObArenaAllocator &allocator,
ObLS &ls,
ObTablet &tablet,
const int64_t freeze_version,
ObMediumCompactionInfo &medium_info,
int64_t &schema_version)
{
int ret = OB_SUCCESS;
const int64_t ls_weak_read_ts = ls.get_ls_wrs_handler()->get_ls_weak_read_ts().get_val_for_tx();
int64_t medium_snapshot = 0;
if (ls_weak_read_ts < freeze_version + 1) {
ret = OB_EAGAIN;
LOG_WARN("weak read ts is smaller than new medium snapshot, try later", K(ret), K(tablet));
} else if (FALSE_IT(medium_snapshot = MAX(ls_weak_read_ts, freeze_version + 1))) {
} else if (OB_FAIL(choose_medium_schema_version(allocator, medium_snapshot, tablet, schema_version))) {
LOG_WARN("fail to choose medium schema version", K(ret), K(tablet));
} else {
medium_info.compaction_type_ = ObMediumCompactionInfo::MEDIUM_COMPACTION;
medium_info.medium_merge_reason_ = ObAdaptiveMergePolicy::AdaptiveMergeReason::NONE;
medium_info.medium_snapshot_ = medium_snapshot;
} }
return ret; return ret;
} }
@ -374,7 +412,8 @@ int ObMediumCompactionScheduleFunc::choose_new_medium_snapshot(
} }
int ObMediumCompactionScheduleFunc::choose_medium_schema_version( int ObMediumCompactionScheduleFunc::choose_medium_schema_version(
const ObMediumCompactionInfo &medium_info, common::ObArenaAllocator &allocator,
const int64_t medium_snapshot,
ObTablet &tablet, ObTablet &tablet,
int64_t &schema_version) int64_t &schema_version)
{ {
@ -384,10 +423,10 @@ int ObMediumCompactionScheduleFunc::choose_medium_schema_version(
int64_t store_column_cnt_in_schema = 0; int64_t store_column_cnt_in_schema = 0;
ObSEArray<storage::ObITable *, MAX_MEMSTORE_CNT> memtables; ObSEArray<storage::ObITable *, MAX_MEMSTORE_CNT> memtables;
if (OB_FAIL(tablet.load_storage_schema(allocator_, schema_on_tablet))) { if (OB_FAIL(tablet.load_storage_schema(allocator, schema_on_tablet))) {
LOG_WARN("fail to load storage schema", K(ret)); LOG_WARN("fail to load storage schema", K(ret));
} else if (FALSE_IT(schema_version = schema_on_tablet->schema_version_)) { } else if (FALSE_IT(schema_version = schema_on_tablet->schema_version_)) {
} else if (medium_info.medium_snapshot_ <= tablet_snapshot_version) { } else if (medium_snapshot <= tablet_snapshot_version) {
// do nothing, use schema version on tablet // do nothing, use schema version on tablet
} else if (OB_FAIL(schema_on_tablet->get_store_column_count(store_column_cnt_in_schema, true/*full_col*/))) { } else if (OB_FAIL(schema_on_tablet->get_store_column_count(store_column_cnt_in_schema, true/*full_col*/))) {
LOG_WARN("failed to get store column count", K(ret), K(store_column_cnt_in_schema)); LOG_WARN("failed to get store column count", K(ret), K(store_column_cnt_in_schema));
@ -463,7 +502,7 @@ int ObMediumCompactionScheduleFunc::decide_medium_snapshot(
OB_FAIL(choose_new_medium_snapshot(max_reserved_snapshot, medium_info, result))) { OB_FAIL(choose_new_medium_snapshot(max_reserved_snapshot, medium_info, result))) {
// chosen medium snapshot is far too old // chosen medium snapshot is far too old
LOG_WARN("failed to choose new medium snapshot", KR(ret), K(max_reserved_snapshot), K(medium_info)); LOG_WARN("failed to choose new medium snapshot", KR(ret), K(max_reserved_snapshot), K(medium_info));
} else if (OB_FAIL(choose_medium_schema_version(medium_info, *tablet, schema_version))) { } else if (OB_FAIL(choose_medium_schema_version(allocator_, medium_info.medium_snapshot_, *tablet, schema_version))) {
LOG_WARN("failed to choose medium schema version", K(ret), K(tablet)); LOG_WARN("failed to choose medium schema version", K(ret), K(tablet));
} }

View File

@ -99,7 +99,7 @@ protected:
ObLS &ls, ObLS &ls,
ObTablet &tablet, ObTablet &tablet,
const ObAdaptiveMergePolicy::AdaptiveMergeReason &merge_reason, const ObAdaptiveMergePolicy::AdaptiveMergeReason &merge_reason,
ObIAllocator &allocator, ObArenaAllocator &allocator,
ObMediumCompactionInfo &medium_info, ObMediumCompactionInfo &medium_info,
ObGetMergeTablesResult &result, ObGetMergeTablesResult &result,
int64_t &schema_version); int64_t &schema_version);
@ -107,10 +107,18 @@ protected:
ObLS &ls, ObLS &ls,
ObTablet &tablet, ObTablet &tablet,
const ObAdaptiveMergePolicy::AdaptiveMergeReason &merge_reason, const ObAdaptiveMergePolicy::AdaptiveMergeReason &merge_reason,
ObIAllocator &allocator, ObArenaAllocator &allocator,
ObMediumCompactionInfo &medium_info, ObMediumCompactionInfo &medium_info,
ObGetMergeTablesResult &result, ObGetMergeTablesResult &result,
int64_t &schema_version); int64_t &schema_version);
static int switch_to_choose_medium_snapshot(
ObArenaAllocator &allocator,
ObLS &ls,
ObTablet &tablet,
const int64_t freeze_version,
ObMediumCompactionInfo &medium_info,
int64_t &schema_version);
static int check_need_merge_and_schedule( static int check_need_merge_and_schedule(
ObLS &ls, ObLS &ls,
ObTablet &tablet, ObTablet &tablet,
@ -121,8 +129,9 @@ protected:
const int64_t max_reserved_snapshot, const int64_t max_reserved_snapshot,
ObMediumCompactionInfo &medium_info, ObMediumCompactionInfo &medium_info,
ObGetMergeTablesResult &result); ObGetMergeTablesResult &result);
int choose_medium_schema_version( static int choose_medium_schema_version(
const ObMediumCompactionInfo &medium_info, common::ObArenaAllocator &allocator,
const int64_t medium_snapshot,
ObTablet &tablet, ObTablet &tablet,
int64_t &schema_version); int64_t &schema_version);
int get_max_reserved_snapshot(int64_t &max_reserved_snapshot); int get_max_reserved_snapshot(int64_t &max_reserved_snapshot);
@ -139,7 +148,7 @@ protected:
ObLS &ls, ObLS &ls,
ObTablet &tablet, ObTablet &tablet,
const ObAdaptiveMergePolicy::AdaptiveMergeReason &merge_reason, const ObAdaptiveMergePolicy::AdaptiveMergeReason &merge_reason,
ObIAllocator &allocator, ObArenaAllocator &allocator,
ObMediumCompactionInfo &medium_info, ObMediumCompactionInfo &medium_info,
ObGetMergeTablesResult &result, ObGetMergeTablesResult &result,
int64_t &schema_version); int64_t &schema_version);

View File

@ -1026,7 +1026,6 @@ int ObTabletMergeCtx::init_merge_info()
return ret; return ret;
} }
int ObTabletMergeCtx::get_storage_schema_to_merge(const ObTablesHandleArray &merge_tables_handle) int ObTabletMergeCtx::get_storage_schema_to_merge(const ObTablesHandleArray &merge_tables_handle)
{ {
int ret = OB_SUCCESS; int ret = OB_SUCCESS;
@ -1059,8 +1058,10 @@ int ObTabletMergeCtx::get_storage_schema_to_merge(const ObTablesHandleArray &mer
} // end of for } // end of for
if (OB_FAIL(ret)) { if (OB_FAIL(ret)) {
} else if (max_column_cnt_in_memtable > column_cnt_in_schema } else if (max_column_cnt_in_memtable <= column_cnt_in_schema
|| max_schema_version_in_memtable > schema_on_tablet->get_schema_version()) { && max_schema_version_in_memtable <= schema_on_tablet->get_schema_version()) {
// do nothing
} else {
// need alloc new storage schema & set column cnt // need alloc new storage schema & set column cnt
void *buf = nullptr; void *buf = nullptr;
if (OB_ISNULL(buf = allocator_.alloc(sizeof(ObStorageSchema)))) { if (OB_ISNULL(buf = allocator_.alloc(sizeof(ObStorageSchema)))) {