fix lock_node only update schema_version & add tenant_id for allocator
This commit is contained in:
@ -65,9 +65,11 @@ int ObMediumCompactionScheduleFunc::choose_medium_snapshot(
|
||||
const ObAdaptiveMergePolicy::AdaptiveMergeReason &merge_reason,
|
||||
ObIAllocator &allocator,
|
||||
ObMediumCompactionInfo &medium_info,
|
||||
ObGetMergeTablesResult &result)
|
||||
ObGetMergeTablesResult &result,
|
||||
int64_t &schema_version)
|
||||
{
|
||||
UNUSED(allocator);
|
||||
UNUSED(schema_version);
|
||||
int ret = OB_SUCCESS;
|
||||
ObGetMergeTablesParam param;
|
||||
param.merge_type_ = META_MAJOR_MERGE;
|
||||
@ -95,7 +97,8 @@ int ObMediumCompactionScheduleFunc::choose_major_snapshot(
|
||||
const ObAdaptiveMergePolicy::AdaptiveMergeReason &merge_reason,
|
||||
ObIAllocator &allocator,
|
||||
ObMediumCompactionInfo &medium_info,
|
||||
ObGetMergeTablesResult &result)
|
||||
ObGetMergeTablesResult &result,
|
||||
int64_t &schema_version)
|
||||
{
|
||||
UNUSED(merge_reason);
|
||||
int ret = OB_SUCCESS;
|
||||
@ -168,7 +171,7 @@ int ObMediumCompactionScheduleFunc::choose_major_snapshot(
|
||||
medium_info.compaction_type_ = ObMediumCompactionInfo::MAJOR_COMPACTION;
|
||||
medium_info.medium_merge_reason_ = ObAdaptiveMergePolicy::AdaptiveMergeReason::NONE;
|
||||
medium_info.medium_snapshot_ = freeze_info.freeze_version;
|
||||
result.schema_version_ = freeze_info.schema_version;
|
||||
schema_version = freeze_info.schema_version;
|
||||
if (OB_FAIL(get_result_for_major(tablet, medium_info, result))) {
|
||||
LOG_WARN("failed get result for major", K(ret), K(medium_info));
|
||||
} else {
|
||||
@ -350,7 +353,8 @@ int ObMediumCompactionScheduleFunc::get_max_reserved_snapshot(int64_t &max_reser
|
||||
int ObMediumCompactionScheduleFunc::choose_new_medium_snapshot(
|
||||
const int64_t max_reserved_snapshot,
|
||||
ObMediumCompactionInfo &medium_info,
|
||||
ObGetMergeTablesResult &result)
|
||||
ObGetMergeTablesResult &result,
|
||||
int64_t &schema_version)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
ObTablet *tablet = tablet_handle_.get_obj();
|
||||
@ -369,20 +373,27 @@ int ObMediumCompactionScheduleFunc::choose_new_medium_snapshot(
|
||||
}
|
||||
if (OB_SUCC(ret)) { // update schema version for cur medium scn
|
||||
ObSEArray<storage::ObITable *, MAX_MEMSTORE_CNT> memtables;
|
||||
const ObStorageSchema *schema_on_tablet = nullptr;
|
||||
int64_t store_column_cnt_in_schema = 0;
|
||||
if (OB_FAIL(tablet->get_memtables(memtables, true/*need_active*/))) {
|
||||
LOG_WARN("failed to get memtables", KR(ret), KPC(tablet));
|
||||
} else if (OB_FAIL(tablet->load_storage_schema(allocator_, schema_on_tablet))) {
|
||||
LOG_WARN("fail to load storage schema", K(ret), KPC(this));
|
||||
} else if (OB_FAIL(schema_on_tablet->get_store_column_count(store_column_cnt_in_schema, true/*full_col*/))) {
|
||||
LOG_WARN("failed to get store column count", K(ret), K(store_column_cnt_in_schema));
|
||||
} else {
|
||||
int64_t max_schema_version_on_memtable = 0;
|
||||
int64_t unused_max_column_cnt_on_memtable = 0;
|
||||
for (int64_t idx = 0; OB_SUCC(ret) && idx < memtables.count(); ++idx) {
|
||||
memtable::ObMemtable *memtable = static_cast<memtable::ObMemtable *>(memtables.at(idx));
|
||||
if (OB_FAIL(memtable->get_schema_info(
|
||||
store_column_cnt_in_schema,
|
||||
max_schema_version_on_memtable, unused_max_column_cnt_on_memtable))) {
|
||||
LOG_WARN("failed to get schema info from memtable", KR(ret), KPC(memtable));
|
||||
}
|
||||
}
|
||||
if (OB_SUCC(ret)) {
|
||||
result.schema_version_ = MAX(max_schema_version_on_memtable, result.schema_version_);
|
||||
schema_version = MAX(max_schema_version_on_memtable, schema_version);
|
||||
LOG_INFO("chosen new medium snapshot", K(ret), KPC(this),
|
||||
K(medium_info), K(max_reserved_snapshot), K(result), K(max_schema_version_on_memtable));
|
||||
}
|
||||
@ -421,13 +432,14 @@ int ObMediumCompactionScheduleFunc::decide_medium_snapshot(
|
||||
ObGetMergeTablesResult result;
|
||||
ObMediumCompactionInfo medium_info;
|
||||
medium_info.data_version_ = compat_version;
|
||||
int64_t schema_version = 0;
|
||||
if (medium_info.data_version_ < DATA_VERSION_4_2_0_0) {
|
||||
medium_info.medium_compat_version_ = ObMediumCompactionInfo::MEIDUM_COMPAT_VERSION;
|
||||
} else {
|
||||
medium_info.medium_compat_version_ = ObMediumCompactionInfo::MEIDUM_COMPAT_VERSION_V2;
|
||||
}
|
||||
|
||||
if (OB_FAIL(choose_medium_scn[is_major](ls_, *tablet, merge_reason, allocator_, medium_info, result))) {
|
||||
if (OB_FAIL(choose_medium_scn[is_major](ls_, *tablet, merge_reason, allocator_, medium_info, result, schema_version))) {
|
||||
if (OB_NO_NEED_MERGE != ret) {
|
||||
LOG_WARN("failed to choose medium snapshot", K(ret), KPC(this));
|
||||
}
|
||||
@ -439,7 +451,7 @@ int ObMediumCompactionScheduleFunc::decide_medium_snapshot(
|
||||
LOG_WARN("failed to get multi_version_start", K(ret), KPC(this));
|
||||
} else if (medium_info.medium_snapshot_ < max_reserved_snapshot) {
|
||||
// chosen medium snapshot is far too old
|
||||
if (OB_FAIL(choose_new_medium_snapshot(max_reserved_snapshot, medium_info, result))) {
|
||||
if (OB_FAIL(choose_new_medium_snapshot(max_reserved_snapshot, medium_info, result, schema_version))) {
|
||||
LOG_WARN("failed to choose new medium snapshot", KR(ret), K(medium_info));
|
||||
}
|
||||
}
|
||||
@ -478,34 +490,34 @@ int ObMediumCompactionScheduleFunc::decide_medium_snapshot(
|
||||
}
|
||||
}
|
||||
#endif
|
||||
if (FAILEDx(prepare_medium_info(result, medium_info))) {
|
||||
if (OB_TABLE_IS_DELETED == ret) {
|
||||
ret = OB_SUCCESS;
|
||||
} else {
|
||||
LOG_WARN("failed to prepare medium info", K(ret), K(result));
|
||||
if (FAILEDx(prepare_medium_info(result, schema_version, medium_info))) {
|
||||
if (OB_TABLE_IS_DELETED == ret) {
|
||||
ret = OB_SUCCESS;
|
||||
} else {
|
||||
LOG_WARN("failed to prepare medium info", K(ret), K(result));
|
||||
}
|
||||
} else if (OB_FAIL(submit_medium_clog(medium_info))) {
|
||||
LOG_WARN("failed to submit medium clog and update inner table", K(ret), KPC(this));
|
||||
} else if (OB_TMP_FAIL(ls_.tablet_freeze(tablet_id, true/*is_sync*/))) {
|
||||
// need to freeze memtable with MediumCompactionInfo
|
||||
LOG_WARN("failed to freeze tablet", K(tmp_ret), KPC(this));
|
||||
}
|
||||
} else if (OB_FAIL(submit_medium_clog(medium_info))) {
|
||||
LOG_WARN("failed to submit medium clog and update inner table", K(ret), KPC(this));
|
||||
} else if (OB_TMP_FAIL(ls_.tablet_freeze(tablet_id, true/*is_sync*/))) {
|
||||
// need to freeze memtable with MediumCompactionInfo
|
||||
LOG_WARN("failed to freeze tablet", K(tmp_ret), KPC(this));
|
||||
}
|
||||
// delete tablet_id in ObLSReservedSnapshotMgr even if submit clog or update inner table failed
|
||||
if (OB_TMP_FAIL(ls_.del_dependent_medium_tablet(tablet_id))) {
|
||||
LOG_ERROR("failed to delete dependent medium tablet", K(tmp_ret), KPC(this));
|
||||
ob_abort();
|
||||
}
|
||||
ret = OB_NO_NEED_MERGE == ret ? OB_SUCCESS : ret;
|
||||
if (OB_FAIL(ret)) {
|
||||
// add schedule suspect info
|
||||
if (OB_TMP_FAIL(ADD_SUSPECT_INFO(MEDIUM_MERGE, ls_.get_ls_id(), tablet_id,
|
||||
ObSuspectInfoType::SUSPECT_SCHEDULE_MEDIUM_FAILED,
|
||||
medium_info.medium_snapshot_,
|
||||
medium_info.storage_schema_.store_column_cnt_,
|
||||
static_cast<int64_t>(ret)))) {
|
||||
LOG_WARN("failed to add suspect info", K(tmp_ret));
|
||||
// delete tablet_id in ObLSReservedSnapshotMgr even if submit clog or update inner table failed
|
||||
if (OB_TMP_FAIL(ls_.del_dependent_medium_tablet(tablet_id))) {
|
||||
LOG_ERROR("failed to delete dependent medium tablet", K(tmp_ret), KPC(this));
|
||||
ob_abort();
|
||||
}
|
||||
ret = OB_NO_NEED_MERGE == ret ? OB_SUCCESS : ret;
|
||||
if (OB_FAIL(ret)) {
|
||||
// add schedule suspect info
|
||||
if (OB_TMP_FAIL(ADD_SUSPECT_INFO(MEDIUM_MERGE, ls_.get_ls_id(), tablet_id,
|
||||
ObSuspectInfoType::SUSPECT_SCHEDULE_MEDIUM_FAILED,
|
||||
medium_info.medium_snapshot_,
|
||||
medium_info.storage_schema_.store_column_cnt_,
|
||||
static_cast<int64_t>(ret)))) {
|
||||
LOG_WARN("failed to add suspect info", K(tmp_ret));
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
return ret;
|
||||
@ -646,6 +658,7 @@ int ObMediumCompactionScheduleFunc::prepare_iter(
|
||||
|
||||
int ObMediumCompactionScheduleFunc::prepare_medium_info(
|
||||
const ObGetMergeTablesResult &result,
|
||||
const int64_t schema_version,
|
||||
ObMediumCompactionInfo &medium_info)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
@ -658,7 +671,7 @@ int ObMediumCompactionScheduleFunc::prepare_medium_info(
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_WARN("invalid tablet_handle", K(ret), K(tablet_handle_));
|
||||
} else if (FALSE_IT(tablet = tablet_handle_.get_obj())) {
|
||||
} else if (0 == result.schema_version_) { // not formal schema version
|
||||
} else if (0 == schema_version) { // not formal schema version
|
||||
ret = OB_NO_NEED_MERGE;
|
||||
} else if (medium_info.is_medium_compaction()) {
|
||||
const uint64_t tenant_id = MTL_ID();
|
||||
@ -667,7 +680,7 @@ int ObMediumCompactionScheduleFunc::prepare_medium_info(
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_WARN("failed to get schema service from MTL", K(ret));
|
||||
} else if (FALSE_IT(medium_info.storage_schema_.reset())) {
|
||||
} else if (OB_FAIL(get_table_schema_to_merge(*schema_service, *tablet, result.schema_version_, allocator_, medium_info))) {
|
||||
} else if (OB_FAIL(get_table_schema_to_merge(*schema_service, *tablet, schema_version, allocator_, medium_info))) {
|
||||
// for major compaction, storage schema is inited in choose_major_snapshot
|
||||
if (OB_TABLE_IS_DELETED != ret) {
|
||||
LOG_WARN("failed to get table schema", KR(ret), KPC(this), K(medium_info));
|
||||
|
||||
Reference in New Issue
Block a user