fix medium compaction not scheduling

This commit is contained in:
Fengjingkun
2023-06-26 12:12:18 +00:00
committed by ob-robot
parent 92b0d42d39
commit 9d8f21e540
3 changed files with 53 additions and 37 deletions

View File

@ -345,7 +345,7 @@ int ObMediumCompactionScheduleFunc::get_max_reserved_snapshot(int64_t &max_reser
tablet->get_tablet_meta().tablet_id_, max_merged_snapshot, min_reserved_snapshot))) { tablet->get_tablet_meta().tablet_id_, max_merged_snapshot, min_reserved_snapshot))) {
LOG_WARN("failed to get multi version from freeze info mgr", K(ret), K(table_id)); LOG_WARN("failed to get multi version from freeze info mgr", K(ret), K(table_id));
} else { } else {
max_reserved_snapshot = MAX(ls_.get_min_reserved_snapshot(), min_reserved_snapshot); max_reserved_snapshot = MAX(ls_.get_min_reserved_snapshot(), min_reserved_snapshot);
} }
return ret; return ret;
} }
@ -353,8 +353,7 @@ int ObMediumCompactionScheduleFunc::get_max_reserved_snapshot(int64_t &max_reser
int ObMediumCompactionScheduleFunc::choose_new_medium_snapshot( int ObMediumCompactionScheduleFunc::choose_new_medium_snapshot(
const int64_t max_reserved_snapshot, const int64_t max_reserved_snapshot,
ObMediumCompactionInfo &medium_info, ObMediumCompactionInfo &medium_info,
ObGetMergeTablesResult &result, ObGetMergeTablesResult &result)
int64_t &schema_version)
{ {
int ret = OB_SUCCESS; int ret = OB_SUCCESS;
ObTablet *tablet = tablet_handle_.get_obj(); ObTablet *tablet = tablet_handle_.get_obj();
@ -371,32 +370,43 @@ int ObMediumCompactionScheduleFunc::choose_new_medium_snapshot(
LOG_INFO("use weak_read_ts to schedule medium", K(ret), KPC(this), LOG_INFO("use weak_read_ts to schedule medium", K(ret), KPC(this),
K(medium_info), K(max_reserved_snapshot), K_(weak_read_ts), K(snapshot_gc_ts)); K(medium_info), K(max_reserved_snapshot), K_(weak_read_ts), K(snapshot_gc_ts));
} }
if (OB_SUCC(ret)) { // update schema version for cur medium scn return ret;
ObSEArray<storage::ObITable *, MAX_MEMSTORE_CNT> memtables; }
const ObStorageSchema *schema_on_tablet = nullptr;
int64_t store_column_cnt_in_schema = 0; int ObMediumCompactionScheduleFunc::choose_medium_schema_version(
if (OB_FAIL(tablet->get_memtables(memtables, true/*need_active*/))) { const ObMediumCompactionInfo &medium_info,
LOG_WARN("failed to get memtables", KR(ret), KPC(tablet)); ObTablet &tablet,
} else if (OB_FAIL(tablet->load_storage_schema(allocator_, schema_on_tablet))) { int64_t &schema_version)
LOG_WARN("fail to load storage schema", K(ret), KPC(this)); {
} else if (OB_FAIL(schema_on_tablet->get_store_column_count(store_column_cnt_in_schema, true/*full_col*/))) { int ret = OB_SUCCESS;
LOG_WARN("failed to get store column count", K(ret), K(store_column_cnt_in_schema)); const int64_t tablet_snapshot_version = tablet.get_snapshot_version();
const ObStorageSchema *schema_on_tablet = nullptr;
int64_t store_column_cnt_in_schema = 0;
ObSEArray<storage::ObITable *, MAX_MEMSTORE_CNT> memtables;
if (OB_FAIL(tablet.load_storage_schema(allocator_, schema_on_tablet))) {
LOG_WARN("fail to load storage schema", K(ret));
} else if (FALSE_IT(schema_version = schema_on_tablet->schema_version_)) {
} else if (medium_info.medium_snapshot_ <= tablet_snapshot_version) {
// do nothing, use schema version on tablet
} else if (OB_FAIL(schema_on_tablet->get_store_column_count(store_column_cnt_in_schema, true/*full_col*/))) {
LOG_WARN("failed to get store column count", K(ret), K(store_column_cnt_in_schema));
} else if (OB_FAIL(tablet.get_memtables(memtables, true/*need_active*/))) {
LOG_WARN("failed to get memtables", KR(ret), K(tablet));
}
int64_t max_schema_version_on_memtable = 0;
int64_t max_column_cnt_on_memtable = 0; // placeholder
for (int64_t idx = 0; OB_SUCC(ret) && idx < memtables.count(); ++idx) {
memtable::ObMemtable *memtable = static_cast<memtable::ObMemtable *>(memtables.at(idx));
if (memtable->get_snapshot_version() <= tablet_snapshot_version) {
// continue
} else if (OB_FAIL(memtable->get_schema_info(
store_column_cnt_in_schema, max_schema_version_on_memtable, max_column_cnt_on_memtable))) {
LOG_WARN("failed to get schema info from memtable", KR(ret), KPC(memtable));
} else { } else {
int64_t max_schema_version_on_memtable = 0; schema_version = MAX(schema_version, max_schema_version_on_memtable);
int64_t unused_max_column_cnt_on_memtable = 0; break;
for (int64_t idx = 0; OB_SUCC(ret) && idx < memtables.count(); ++idx) {
memtable::ObMemtable *memtable = static_cast<memtable::ObMemtable *>(memtables.at(idx));
if (OB_FAIL(memtable->get_schema_info(
store_column_cnt_in_schema,
max_schema_version_on_memtable, unused_max_column_cnt_on_memtable))) {
LOG_WARN("failed to get schema info from memtable", KR(ret), KPC(memtable));
}
}
if (OB_SUCC(ret)) {
schema_version = MAX(max_schema_version_on_memtable, schema_version);
LOG_INFO("chosen new medium snapshot", K(ret), KPC(this),
K(medium_info), K(max_reserved_snapshot), K(result), K(max_schema_version_on_memtable));
}
} }
} }
return ret; return ret;
@ -449,11 +459,12 @@ int ObMediumCompactionScheduleFunc::decide_medium_snapshot(
// do nothing // do nothing
} else if (OB_FAIL(get_max_reserved_snapshot(max_reserved_snapshot))) { } else if (OB_FAIL(get_max_reserved_snapshot(max_reserved_snapshot))) {
LOG_WARN("failed to get multi_version_start", K(ret), KPC(this)); LOG_WARN("failed to get multi_version_start", K(ret), KPC(this));
} else if (medium_info.medium_snapshot_ < max_reserved_snapshot) { } else if (medium_info.medium_snapshot_ < max_reserved_snapshot &&
OB_FAIL(choose_new_medium_snapshot(max_reserved_snapshot, medium_info, result))) {
// chosen medium snapshot is far too old // chosen medium snapshot is far too old
if (OB_FAIL(choose_new_medium_snapshot(max_reserved_snapshot, medium_info, result, schema_version))) { LOG_WARN("failed to choose new medium snapshot", KR(ret), K(max_reserved_snapshot), K(medium_info));
LOG_WARN("failed to choose new medium snapshot", KR(ret), K(medium_info)); } else if (OB_FAIL(choose_medium_schema_version(medium_info, *tablet, schema_version))) {
} LOG_WARN("failed to choose medium schema version", K(ret), K(tablet));
} }
if (OB_SUCC(ret) && !is_major) { if (OB_SUCC(ret) && !is_major) {

View File

@ -115,10 +115,13 @@ protected:
const ObMediumCompactionInfo::ObCompactionType compaction_type); const ObMediumCompactionInfo::ObCompactionType compaction_type);
int schedule_next_medium_primary_cluster(const int64_t major_snapshot, ObTenantTabletScheduler::ObScheduleStatistics &schedule_stat); int schedule_next_medium_primary_cluster(const int64_t major_snapshot, ObTenantTabletScheduler::ObScheduleStatistics &schedule_stat);
int choose_new_medium_snapshot( int choose_new_medium_snapshot(
const int64_t max_reserved_snapshot, const int64_t max_reserved_snapshot,
ObMediumCompactionInfo &medium_info, ObMediumCompactionInfo &medium_info,
ObGetMergeTablesResult &result, ObGetMergeTablesResult &result);
int64_t &schema_version); int choose_medium_schema_version(
const ObMediumCompactionInfo &medium_info,
ObTablet &tablet,
int64_t &schema_version);
int get_max_reserved_snapshot(int64_t &max_reserved_snapshot); int get_max_reserved_snapshot(int64_t &max_reserved_snapshot);
static int get_table_id( static int get_table_id(
ObMultiVersionSchemaService &schema_service, ObMultiVersionSchemaService &schema_service,

View File

@ -674,7 +674,9 @@ int ObTenantTabletStatMgr::get_tablet_analyzer(
ObTenantSysStat sys_stat; ObTenantSysStat sys_stat;
if (OB_FAIL(get_latest_tablet_stat(ls_id, tablet_id, analyzer.tablet_stat_))) { if (OB_FAIL(get_latest_tablet_stat(ls_id, tablet_id, analyzer.tablet_stat_))) {
LOG_WARN("failed to get latest tablet stat", K(ret), K(ls_id), K(tablet_id)); if (OB_HASH_NOT_EXIST != ret) {
LOG_WARN("failed to get latest tablet stat", K(ret), K(ls_id), K(tablet_id));
}
} else if (OB_FAIL(get_sys_stat(sys_stat))) { } else if (OB_FAIL(get_sys_stat(sys_stat))) {
LOG_WARN("failed to get sys stat", K(ret)); LOG_WARN("failed to get sys stat", K(ret));
} else { } else {