fix choose_and_save_storage_schema

This commit is contained in:
Fengjingkun
2023-09-13 02:40:36 +00:00
committed by ob-robot
parent fc0c96eb47
commit f1cfef336e
3 changed files with 29 additions and 37 deletions

View File

@ -60,6 +60,7 @@ int64_t ObMediumCompactionScheduleFunc::to_string(char *buf, const int64_t buf_l
} }
int ObMediumCompactionScheduleFunc::choose_medium_snapshot( int ObMediumCompactionScheduleFunc::choose_medium_snapshot(
const ObMediumCompactionScheduleFunc &func,
ObLS &ls, ObLS &ls,
ObTablet &tablet, ObTablet &tablet,
const ObAdaptiveMergePolicy::AdaptiveMergeReason &merge_reason, const ObAdaptiveMergePolicy::AdaptiveMergeReason &merge_reason,
@ -68,8 +69,7 @@ int ObMediumCompactionScheduleFunc::choose_medium_snapshot(
ObGetMergeTablesResult &result, ObGetMergeTablesResult &result,
int64_t &schema_version) int64_t &schema_version)
{ {
UNUSED(allocator); UNUSEDx(func, allocator, schema_version);
UNUSED(schema_version);
int ret = OB_SUCCESS; int ret = OB_SUCCESS;
ObGetMergeTablesParam param; ObGetMergeTablesParam param;
param.merge_type_ = META_MAJOR_MERGE; param.merge_type_ = META_MAJOR_MERGE;
@ -92,6 +92,7 @@ int ObMediumCompactionScheduleFunc::choose_medium_snapshot(
} }
int ObMediumCompactionScheduleFunc::choose_major_snapshot( int ObMediumCompactionScheduleFunc::choose_major_snapshot(
const ObMediumCompactionScheduleFunc &func,
ObLS &ls, ObLS &ls,
ObTablet &tablet, ObTablet &tablet,
const ObAdaptiveMergePolicy::AdaptiveMergeReason &merge_reason, const ObAdaptiveMergePolicy::AdaptiveMergeReason &merge_reason,
@ -173,7 +174,7 @@ int ObMediumCompactionScheduleFunc::choose_major_snapshot(
if (OB_FAIL(ret)) { if (OB_FAIL(ret)) {
} else if (schedule_medium_merge) { } else if (schedule_medium_merge) {
if (OB_FAIL(switch_to_choose_medium_snapshot(allocator, ls, tablet, freeze_info.freeze_version, medium_info, schema_version))) { if (OB_FAIL(switch_to_choose_medium_snapshot(func, allocator, ls, tablet, freeze_info.freeze_version, medium_info, schema_version))) {
if (OB_EAGAIN != ret) { if (OB_EAGAIN != ret) {
LOG_WARN("failed to switch to choose medium snapshot", K(ret), K(tablet)); LOG_WARN("failed to switch to choose medium snapshot", K(ret), K(tablet));
} }
@ -196,6 +197,7 @@ int ObMediumCompactionScheduleFunc::choose_major_snapshot(
} }
int ObMediumCompactionScheduleFunc::switch_to_choose_medium_snapshot( int ObMediumCompactionScheduleFunc::switch_to_choose_medium_snapshot(
const ObMediumCompactionScheduleFunc &func,
ObArenaAllocator &allocator, ObArenaAllocator &allocator,
ObLS &ls, ObLS &ls,
ObTablet &tablet, ObTablet &tablet,
@ -204,13 +206,12 @@ int ObMediumCompactionScheduleFunc::switch_to_choose_medium_snapshot(
int64_t &schema_version) int64_t &schema_version)
{ {
int ret = OB_SUCCESS; int ret = OB_SUCCESS;
const int64_t ls_weak_read_ts = ls.get_ls_wrs_handler()->get_ls_weak_read_ts().get_val_for_tx();
int64_t medium_snapshot = 0; int64_t medium_snapshot = 0;
if (ls_weak_read_ts < freeze_version + 1) { if (func.weak_read_ts_ < freeze_version + 1) {
ret = OB_EAGAIN; ret = OB_EAGAIN;
LOG_WARN("weak read ts is smaller than new medium snapshot, try later", K(ret), K(tablet)); LOG_WARN("weak read ts is smaller than new medium snapshot, try later", K(ret), K(tablet));
} else if (FALSE_IT(medium_snapshot = MAX(ls_weak_read_ts, freeze_version + 1))) { } else if (FALSE_IT(medium_snapshot = MAX(func.weak_read_ts_, freeze_version + 1))) {
} else if (OB_FAIL(choose_medium_schema_version(allocator, medium_snapshot, tablet, schema_version))) { } else if (OB_FAIL(choose_medium_schema_version(allocator, medium_snapshot, tablet, schema_version))) {
LOG_WARN("fail to choose medium schema version", K(ret), K(tablet)); LOG_WARN("fail to choose medium schema version", K(ret), K(tablet));
} else { } else {
@ -488,7 +489,7 @@ int ObMediumCompactionScheduleFunc::decide_medium_snapshot(
medium_info.medium_compat_version_ = ObMediumCompactionInfo::MEDIUM_COMPAT_VERSION_V2; medium_info.medium_compat_version_ = ObMediumCompactionInfo::MEDIUM_COMPAT_VERSION_V2;
} }
if (OB_FAIL(choose_medium_scn[is_major](ls_, *tablet, merge_reason, allocator_, medium_info, result, schema_version))) { if (OB_FAIL(choose_medium_scn[is_major](*this, ls_, *tablet, merge_reason, allocator_, medium_info, result, schema_version))) {
if (OB_NO_NEED_MERGE != ret) { if (OB_NO_NEED_MERGE != ret) {
LOG_WARN("failed to choose medium snapshot", K(ret), KPC(this)); LOG_WARN("failed to choose medium snapshot", K(ret), KPC(this));
} }

View File

@ -102,6 +102,7 @@ protected:
const share::ObLSID &ls_id, const share::ObLSID &ls_id,
const ObTabletID &tablet_id); const ObTabletID &tablet_id);
static int choose_medium_snapshot( static int choose_medium_snapshot(
const ObMediumCompactionScheduleFunc &func,
ObLS &ls, ObLS &ls,
ObTablet &tablet, ObTablet &tablet,
const ObAdaptiveMergePolicy::AdaptiveMergeReason &merge_reason, const ObAdaptiveMergePolicy::AdaptiveMergeReason &merge_reason,
@ -110,6 +111,7 @@ protected:
ObGetMergeTablesResult &result, ObGetMergeTablesResult &result,
int64_t &schema_version); int64_t &schema_version);
static int choose_major_snapshot( static int choose_major_snapshot(
const ObMediumCompactionScheduleFunc &func,
ObLS &ls, ObLS &ls,
ObTablet &tablet, ObTablet &tablet,
const ObAdaptiveMergePolicy::AdaptiveMergeReason &merge_reason, const ObAdaptiveMergePolicy::AdaptiveMergeReason &merge_reason,
@ -118,6 +120,7 @@ protected:
ObGetMergeTablesResult &result, ObGetMergeTablesResult &result,
int64_t &schema_version); int64_t &schema_version);
static int switch_to_choose_medium_snapshot( static int switch_to_choose_medium_snapshot(
const ObMediumCompactionScheduleFunc &func,
ObArenaAllocator &allocator, ObArenaAllocator &allocator,
ObLS &ls, ObLS &ls,
ObTablet &tablet, ObTablet &tablet,
@ -151,6 +154,7 @@ protected:
static const int64_t SCHEDULE_RANGE_ROW_COUNT_THRESHOLD = 1000 * 1000L; // 100w static const int64_t SCHEDULE_RANGE_ROW_COUNT_THRESHOLD = 1000 * 1000L; // 100w
static const int64_t MEDIUM_FUNC_CNT = 2; static const int64_t MEDIUM_FUNC_CNT = 2;
typedef int (*ChooseMediumScn)( typedef int (*ChooseMediumScn)(
const ObMediumCompactionScheduleFunc &func,
ObLS &ls, ObLS &ls,
ObTablet &tablet, ObTablet &tablet,
const ObAdaptiveMergePolicy::AdaptiveMergeReason &merge_reason, const ObAdaptiveMergePolicy::AdaptiveMergeReason &merge_reason,

View File

@ -3189,7 +3189,7 @@ int ObTablet::choose_and_save_storage_schema(
const share::ObLSID &ls_id = tablet_meta_.ls_id_; const share::ObLSID &ls_id = tablet_meta_.ls_id_;
const ObTabletID &tablet_id = tablet_meta_.tablet_id_; const ObTabletID &tablet_id = tablet_meta_.tablet_id_;
const ObStorageSchema *chosen_schema = &tablet_schema; const ObStorageSchema *chosen_schema = nullptr;
ObStorageSchema *tmp_storage_schema = nullptr; ObStorageSchema *tmp_storage_schema = nullptr;
int64_t tablet_schema_version = 0; int64_t tablet_schema_version = 0;
@ -3205,39 +3205,26 @@ int ObTablet::choose_and_save_storage_schema(
LOG_WARN("failed to get stored column count from schema", KR(ret), K(ls_id), K(tablet_id), K(tablet_schema)); LOG_WARN("failed to get stored column count from schema", KR(ret), K(ls_id), K(tablet_id), K(tablet_schema));
} else if (OB_FAIL(param_schema.get_store_column_count(param_schema_stored_col_cnt, true/*full_col*/))) { } else if (OB_FAIL(param_schema.get_store_column_count(param_schema_stored_col_cnt, true/*full_col*/))) {
LOG_WARN("failed to get stored column count from schema", KR(ret), K(ls_id), K(tablet_id), K(param_schema)); LOG_WARN("failed to get stored column count from schema", KR(ret), K(ls_id), K(tablet_id), K(param_schema));
} else if ((tablet_schema_version > param_schema_version && tablet_schema_stored_col_cnt < param_schema_stored_col_cnt) } else if (param_schema_version >= tablet_schema_version && param_schema_stored_col_cnt >= tablet_schema_stored_col_cnt) {
|| (tablet_schema_version < param_schema_version && tablet_schema_stored_col_cnt > param_schema_stored_col_cnt)) { // use param schema totally
/* The tablet schema may have come from a previous memtable, and its schema version may belong to other tables.
* Therefore, we should update the tablet schema with the larger schema version and a larger column count. */
if (OB_FAIL(ObTabletObjLoadHelper::alloc_and_new(allocator, tmp_storage_schema))) {
LOG_WARN("failed to alloc mem for tmp storage schema", K(ret), K(param_schema), K(tablet_schema));
} else if (OB_FAIL(tmp_storage_schema->init(allocator, param_schema, true/*column_info_simplified*/))) {
LOG_WARN("failed to init storage schema", K(ret), K(param_schema));
allocator.free(tmp_storage_schema);
tmp_storage_schema = nullptr;
} else {
tmp_storage_schema->column_cnt_ = MAX(tablet_schema.get_column_count(), param_schema.get_column_count());
tmp_storage_schema->store_column_cnt_ = MAX(tablet_schema_stored_col_cnt, param_schema_stored_col_cnt);
tmp_storage_schema->schema_version_ = MAX(tablet_schema_version, param_schema_version);
chosen_schema = tmp_storage_schema;
}
} else if (tablet_schema_version > param_schema_version
|| (tablet_schema_version == param_schema_version && tablet_schema_stored_col_cnt >= param_schema_stored_col_cnt)) {
// use tablet schema
LOG_INFO("tablet storage schema is no smaller than that in param",
"ls_id", ls_id,
"tablet_id", tablet_id,
"tablet_schema_version", tablet_schema.get_schema_version(),
"param_schema_version", param_schema.schema_version_,
"tablet_schema_column_cnt", tablet_schema_stored_col_cnt,
"param_schema_column_cnt", param_schema_stored_col_cnt);
} else {
// use param schema
chosen_schema = &param_schema; chosen_schema = &param_schema;
} else if (OB_FAIL(ObTabletObjLoadHelper::alloc_and_new(allocator, tmp_storage_schema))) {
LOG_WARN("failed to alloc mem for tmp storage schema", K(ret), K(param_schema), K(tablet_schema));
} else if (OB_FAIL(tmp_storage_schema->init(allocator, param_schema, true/*column_info_simplified*/))) {
LOG_WARN("failed to init storage schema", K(ret), K(param_schema));
ObTablet::free_storage_schema(allocator, tmp_storage_schema);
tmp_storage_schema = nullptr;
} else {
tmp_storage_schema->column_cnt_ = MAX(tablet_schema.get_column_count(), param_schema.get_column_count());
tmp_storage_schema->store_column_cnt_ = MAX(tablet_schema_stored_col_cnt, param_schema_stored_col_cnt);
tmp_storage_schema->schema_version_ = MAX(tablet_schema_version, param_schema_version);
chosen_schema = tmp_storage_schema;
} }
if (OB_FAIL(ret)) { if (OB_FAIL(ret)) {
} else if (OB_ISNULL(chosen_schema)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("chosen schema is unexpected null", K(ret), K(param_schema), K(tablet_schema));
} else { } else {
ALLOC_AND_INIT(allocator, storage_schema_addr_, *chosen_schema, true/*skip_column_info*/); ALLOC_AND_INIT(allocator, storage_schema_addr_, *chosen_schema, true/*skip_column_info*/);
} }