diff --git a/src/storage/compaction/ob_medium_compaction_func.cpp b/src/storage/compaction/ob_medium_compaction_func.cpp index 72da8cc0ef..616cebc8e4 100755 --- a/src/storage/compaction/ob_medium_compaction_func.cpp +++ b/src/storage/compaction/ob_medium_compaction_func.cpp @@ -65,9 +65,11 @@ int ObMediumCompactionScheduleFunc::choose_medium_snapshot( const ObAdaptiveMergePolicy::AdaptiveMergeReason &merge_reason, ObIAllocator &allocator, ObMediumCompactionInfo &medium_info, - ObGetMergeTablesResult &result) + ObGetMergeTablesResult &result, + int64_t &schema_version) { UNUSED(allocator); + UNUSED(schema_version); int ret = OB_SUCCESS; ObGetMergeTablesParam param; param.merge_type_ = META_MAJOR_MERGE; @@ -95,7 +97,8 @@ int ObMediumCompactionScheduleFunc::choose_major_snapshot( const ObAdaptiveMergePolicy::AdaptiveMergeReason &merge_reason, ObIAllocator &allocator, ObMediumCompactionInfo &medium_info, - ObGetMergeTablesResult &result) + ObGetMergeTablesResult &result, + int64_t &schema_version) { UNUSED(merge_reason); int ret = OB_SUCCESS; @@ -168,7 +171,7 @@ int ObMediumCompactionScheduleFunc::choose_major_snapshot( medium_info.compaction_type_ = ObMediumCompactionInfo::MAJOR_COMPACTION; medium_info.medium_merge_reason_ = ObAdaptiveMergePolicy::AdaptiveMergeReason::NONE; medium_info.medium_snapshot_ = freeze_info.freeze_version; - result.schema_version_ = freeze_info.schema_version; + schema_version = freeze_info.schema_version; if (OB_FAIL(get_result_for_major(tablet, medium_info, result))) { LOG_WARN("failed get result for major", K(ret), K(medium_info)); } else { @@ -350,7 +353,8 @@ int ObMediumCompactionScheduleFunc::get_max_reserved_snapshot(int64_t &max_reser int ObMediumCompactionScheduleFunc::choose_new_medium_snapshot( const int64_t max_reserved_snapshot, ObMediumCompactionInfo &medium_info, - ObGetMergeTablesResult &result) + ObGetMergeTablesResult &result, + int64_t &schema_version) { int ret = OB_SUCCESS; ObTablet *tablet = tablet_handle_.get_obj(); @@ -369,20 +373,27 @@ int ObMediumCompactionScheduleFunc::choose_new_medium_snapshot( } if (OB_SUCC(ret)) { // update schema version for cur medium scn ObSEArray memtables; + const ObStorageSchema *schema_on_tablet = nullptr; + int64_t store_column_cnt_in_schema = 0; if (OB_FAIL(tablet->get_memtables(memtables, true/*need_active*/))) { LOG_WARN("failed to get memtables", KR(ret), KPC(tablet)); + } else if (OB_FAIL(tablet->load_storage_schema(allocator_, schema_on_tablet))) { + LOG_WARN("fail to load storage schema", K(ret), KPC(this)); + } else if (OB_FAIL(schema_on_tablet->get_store_column_count(store_column_cnt_in_schema, true/*full_col*/))) { + LOG_WARN("failed to get store column count", K(ret), K(store_column_cnt_in_schema)); } else { int64_t max_schema_version_on_memtable = 0; int64_t unused_max_column_cnt_on_memtable = 0; for (int64_t idx = 0; OB_SUCC(ret) && idx < memtables.count(); ++idx) { memtable::ObMemtable *memtable = static_cast(memtables.at(idx)); if (OB_FAIL(memtable->get_schema_info( + store_column_cnt_in_schema, max_schema_version_on_memtable, unused_max_column_cnt_on_memtable))) { LOG_WARN("failed to get schema info from memtable", KR(ret), KPC(memtable)); } } if (OB_SUCC(ret)) { - result.schema_version_ = MAX(max_schema_version_on_memtable, result.schema_version_); + schema_version = MAX(max_schema_version_on_memtable, schema_version); LOG_INFO("chosen new medium snapshot", K(ret), KPC(this), K(medium_info), K(max_reserved_snapshot), K(result), K(max_schema_version_on_memtable)); } @@ -421,13 +432,14 @@ int ObMediumCompactionScheduleFunc::decide_medium_snapshot( ObGetMergeTablesResult result; ObMediumCompactionInfo medium_info; medium_info.data_version_ = compat_version; + int64_t schema_version = 0; if (medium_info.data_version_ < DATA_VERSION_4_2_0_0) { medium_info.medium_compat_version_ = ObMediumCompactionInfo::MEIDUM_COMPAT_VERSION; } else { medium_info.medium_compat_version_ = ObMediumCompactionInfo::MEIDUM_COMPAT_VERSION_V2; } - if (OB_FAIL(choose_medium_scn[is_major](ls_, *tablet, merge_reason, allocator_, medium_info, result))) { + if (OB_FAIL(choose_medium_scn[is_major](ls_, *tablet, merge_reason, allocator_, medium_info, result, schema_version))) { if (OB_NO_NEED_MERGE != ret) { LOG_WARN("failed to choose medium snapshot", K(ret), KPC(this)); } @@ -439,7 +451,7 @@ int ObMediumCompactionScheduleFunc::decide_medium_snapshot( LOG_WARN("failed to get multi_version_start", K(ret), KPC(this)); } else if (medium_info.medium_snapshot_ < max_reserved_snapshot) { // chosen medium snapshot is far too old - if (OB_FAIL(choose_new_medium_snapshot(max_reserved_snapshot, medium_info, result))) { + if (OB_FAIL(choose_new_medium_snapshot(max_reserved_snapshot, medium_info, result, schema_version))) { LOG_WARN("failed to choose new medium snapshot", KR(ret), K(medium_info)); } } @@ -478,34 +490,34 @@ int ObMediumCompactionScheduleFunc::decide_medium_snapshot( } } #endif - if (FAILEDx(prepare_medium_info(result, medium_info))) { - if (OB_TABLE_IS_DELETED == ret) { - ret = OB_SUCCESS; - } else { - LOG_WARN("failed to prepare medium info", K(ret), K(result)); + if (FAILEDx(prepare_medium_info(result, schema_version, medium_info))) { + if (OB_TABLE_IS_DELETED == ret) { + ret = OB_SUCCESS; + } else { + LOG_WARN("failed to prepare medium info", K(ret), K(result)); + } + } else if (OB_FAIL(submit_medium_clog(medium_info))) { + LOG_WARN("failed to submit medium clog and update inner table", K(ret), KPC(this)); + } else if (OB_TMP_FAIL(ls_.tablet_freeze(tablet_id, true/*is_sync*/))) { + // need to freeze memtable with MediumCompactionInfo + LOG_WARN("failed to freeze tablet", K(tmp_ret), KPC(this)); } - } else if (OB_FAIL(submit_medium_clog(medium_info))) { - LOG_WARN("failed to submit medium clog and update inner table", K(ret), KPC(this)); - } else if (OB_TMP_FAIL(ls_.tablet_freeze(tablet_id, true/*is_sync*/))) { - // need to freeze memtable with MediumCompactionInfo - LOG_WARN("failed to freeze tablet", K(tmp_ret), KPC(this)); - } - // delete tablet_id in ObLSReservedSnapshotMgr even if submit clog or update inner table failed - if (OB_TMP_FAIL(ls_.del_dependent_medium_tablet(tablet_id))) { - LOG_ERROR("failed to delete dependent medium tablet", K(tmp_ret), KPC(this)); - ob_abort(); - } - ret = OB_NO_NEED_MERGE == ret ? OB_SUCCESS : ret; - if (OB_FAIL(ret)) { - // add schedule suspect info - if (OB_TMP_FAIL(ADD_SUSPECT_INFO(MEDIUM_MERGE, ls_.get_ls_id(), tablet_id, - ObSuspectInfoType::SUSPECT_SCHEDULE_MEDIUM_FAILED, - medium_info.medium_snapshot_, - medium_info.storage_schema_.store_column_cnt_, - static_cast(ret)))) { - LOG_WARN("failed to add suspect info", K(tmp_ret)); + // delete tablet_id in ObLSReservedSnapshotMgr even if submit clog or update inner table failed + if (OB_TMP_FAIL(ls_.del_dependent_medium_tablet(tablet_id))) { + LOG_ERROR("failed to delete dependent medium tablet", K(tmp_ret), KPC(this)); + ob_abort(); + } + ret = OB_NO_NEED_MERGE == ret ? OB_SUCCESS : ret; + if (OB_FAIL(ret)) { + // add schedule suspect info + if (OB_TMP_FAIL(ADD_SUSPECT_INFO(MEDIUM_MERGE, ls_.get_ls_id(), tablet_id, + ObSuspectInfoType::SUSPECT_SCHEDULE_MEDIUM_FAILED, + medium_info.medium_snapshot_, + medium_info.storage_schema_.store_column_cnt_, + static_cast(ret)))) { + LOG_WARN("failed to add suspect info", K(tmp_ret)); + } } - } } } return ret; @@ -646,6 +658,7 @@ int ObMediumCompactionScheduleFunc::prepare_iter( int ObMediumCompactionScheduleFunc::prepare_medium_info( const ObGetMergeTablesResult &result, + const int64_t schema_version, ObMediumCompactionInfo &medium_info) { int ret = OB_SUCCESS; @@ -658,7 +671,7 @@ int ObMediumCompactionScheduleFunc::prepare_medium_info( ret = OB_ERR_UNEXPECTED; LOG_WARN("invalid tablet_handle", K(ret), K(tablet_handle_)); } else if (FALSE_IT(tablet = tablet_handle_.get_obj())) { - } else if (0 == result.schema_version_) { // not formal schema version + } else if (0 == schema_version) { // not formal schema version ret = OB_NO_NEED_MERGE; } else if (medium_info.is_medium_compaction()) { const uint64_t tenant_id = MTL_ID(); @@ -667,7 +680,7 @@ int ObMediumCompactionScheduleFunc::prepare_medium_info( ret = OB_ERR_UNEXPECTED; LOG_WARN("failed to get schema service from MTL", K(ret)); } else if (FALSE_IT(medium_info.storage_schema_.reset())) { - } else if (OB_FAIL(get_table_schema_to_merge(*schema_service, *tablet, result.schema_version_, allocator_, medium_info))) { + } else if (OB_FAIL(get_table_schema_to_merge(*schema_service, *tablet, schema_version, allocator_, medium_info))) { // for major compaction, storage schema is inited in choose_major_snapshot if (OB_TABLE_IS_DELETED != ret) { LOG_WARN("failed to get table schema", KR(ret), KPC(this), K(medium_info)); diff --git a/src/storage/compaction/ob_medium_compaction_func.h b/src/storage/compaction/ob_medium_compaction_func.h index 6043b1d262..555831d011 100755 --- a/src/storage/compaction/ob_medium_compaction_func.h +++ b/src/storage/compaction/ob_medium_compaction_func.h @@ -66,7 +66,10 @@ protected: const ObLSID &ls_id, const ObTabletID &tablet_id, share::ObTabletCompactionScnInfo &ret_info); - int prepare_medium_info(const ObGetMergeTablesResult &result, ObMediumCompactionInfo &medium_info); + int prepare_medium_info( + const ObGetMergeTablesResult &result, + const int64_t schema_version, + ObMediumCompactionInfo &medium_info); int init_parallel_range( const ObGetMergeTablesResult &result, ObMediumCompactionInfo &medium_info); @@ -95,14 +98,16 @@ protected: const ObAdaptiveMergePolicy::AdaptiveMergeReason &merge_reason, ObIAllocator &allocator, ObMediumCompactionInfo &medium_info, - ObGetMergeTablesResult &result); + ObGetMergeTablesResult &result, + int64_t &schema_version); static int choose_major_snapshot( ObLS &ls, ObTablet &tablet, const ObAdaptiveMergePolicy::AdaptiveMergeReason &merge_reason, ObIAllocator &allocator, ObMediumCompactionInfo &medium_info, - ObGetMergeTablesResult &result); + ObGetMergeTablesResult &result, + int64_t &schema_version); static int check_need_merge_and_schedule( ObLS &ls, ObTablet &tablet, @@ -112,7 +117,8 @@ protected: int choose_new_medium_snapshot( const int64_t max_reserved_snapshot, ObMediumCompactionInfo &medium_info, - ObGetMergeTablesResult &result); + ObGetMergeTablesResult &result, + int64_t &schema_version); int get_max_reserved_snapshot(int64_t &max_reserved_snapshot); static int get_table_id( ObMultiVersionSchemaService &schema_service, @@ -130,7 +136,8 @@ protected: const ObAdaptiveMergePolicy::AdaptiveMergeReason &merge_reason, ObIAllocator &allocator, ObMediumCompactionInfo &medium_info, - ObGetMergeTablesResult &result); + ObGetMergeTablesResult &result, + int64_t &schema_version); static ChooseMediumScn choose_medium_scn[MEDIUM_FUNC_CNT]; private: diff --git a/src/storage/compaction/ob_partition_merge_policy.cpp b/src/storage/compaction/ob_partition_merge_policy.cpp index ae7746c3ce..91aed7d2cd 100755 --- a/src/storage/compaction/ob_partition_merge_policy.cpp +++ b/src/storage/compaction/ob_partition_merge_policy.cpp @@ -101,8 +101,6 @@ int ObPartitionMergePolicy::get_medium_merge_tables( table_store_wrapper.get_member()->get_major_sstables().get_boundary_table(true/*last*/)))) { ret = OB_ENTRY_NOT_EXIST; LOG_ERROR("major sstable not exist", K(ret), KPC(table_store_wrapper.get_member())); - } else if (OB_FAIL(base_table->get_frozen_schema_version(result.base_schema_version_))) { - LOG_WARN("failed to get frozen schema version", K(ret)); } else if (OB_FAIL(result.handle_.add_sstable(base_table, table_store_wrapper.get_meta_handle()))) { LOG_WARN("failed to add base_table to result", K(ret)); } else if (base_table->get_snapshot_version() >= param.merge_version_) { @@ -316,40 +314,11 @@ int ObPartitionMergePolicy::deal_with_minor_result( LOG_WARN("failed to check continues", K(ret), K(result)); } else if (OB_FAIL(get_multi_version_start(merge_type, ls, tablet, result.version_range_))) { LOG_WARN("failed to get kept multi_version_start", K(ret), K(merge_type), K(tablet)); - } else if (OB_FAIL(tablet.get_schema_version_from_storage_schema(result.schema_version_))) { - LOG_WARN("failed to get schema version from storage schema", K(ret)); } else { - if (tablet.is_ls_inner_tablet()) { - // do nothing - result.base_schema_version_ = result.schema_version_; - } else if (MINI_MERGE == merge_type) { - ObITable *table = NULL; - result.base_schema_version_ = result.schema_version_; - int64_t max_schema_version_on_memtable = 0; - int64_t unused_max_column_cnt_on_memtable = 0; - for (int64_t i = 0; OB_SUCC(ret) && i < result.handle_.get_count(); ++i) { - if (OB_ISNULL(table = result.handle_.get_table(i)) || !table->is_memtable()) { - ret = OB_ERR_SYS; - LOG_ERROR("get unexpected table", KPC(table), K(ret)); - } else if (OB_FAIL(reinterpret_cast(table)->get_schema_info( - max_schema_version_on_memtable, unused_max_column_cnt_on_memtable))) { - LOG_WARN("failed to get schema info from memtable", KR(ret), KPC(table)); - } - } - if (OB_SUCC(ret)) { - result.schema_version_ = MAX(result.schema_version_, max_schema_version_on_memtable); - } - } else { // for minor - if (OB_FAIL(result.handle_.get_table(0)->get_frozen_schema_version(result.base_schema_version_))) { - LOG_WARN("failed to get frozen schema version", K(ret), K(result)); - } - } - if (OB_SUCC(ret)) { - result.version_range_.base_version_ = 0; - if (OB_SUCC(ret) && !is_mini_merge(merge_type)) { - if (OB_FAIL(tablet.get_recycle_version(result.version_range_.multi_version_start_, result.version_range_.base_version_))) { - LOG_WARN("Fail to get table store recycle version", K(ret), K(result.version_range_), K(tablet)); - } + result.version_range_.base_version_ = 0; + if (OB_SUCC(ret) && !is_mini_merge(merge_type)) { + if (OB_FAIL(tablet.get_recycle_version(result.version_range_.multi_version_start_, result.version_range_.base_version_))) { + LOG_WARN("Fail to get table store recycle version", K(ret), K(result.version_range_), K(tablet)); } } } @@ -1329,17 +1298,13 @@ int ObAdaptiveMergePolicy::get_meta_merge_tables( } } else if (OB_FAIL(result.handle_.check_continues(nullptr))) { LOG_WARN("failed to check continues", K(ret), K(result)); - } else if (OB_FAIL(tablet.get_schema_version_from_storage_schema(result.schema_version_))) { - LOG_WARN("failed to ge schema version from storage schema", K(ret)); } else if (FALSE_IT(result.suggest_merge_type_ = META_MAJOR_MERGE)) { } else if (FALSE_IT(result.version_range_.snapshot_version_ = MIN(tablet.get_snapshot_version(), result.version_range_.snapshot_version_))) { - // choose version should less than tablet::snapshot + // chosen version should less than tablet::snapshot } else if (OB_FAIL(ObPartitionMergePolicy::get_multi_version_start( param.merge_type_, ls, tablet, result.version_range_))) { LOG_WARN("failed to get multi version_start", K(ret)); - } else if (OB_FAIL(result.handle_.get_table(0)->get_frozen_schema_version(result.base_schema_version_))) { - LOG_WARN("failed to get frozen schema version", K(ret), K(result)); } else { FLOG_INFO("succeed to get meta major merge tables", K(result), K(table_store_wrapper)); } diff --git a/src/storage/compaction/ob_partition_merger.cpp b/src/storage/compaction/ob_partition_merger.cpp index 0a9fc1403c..c97d985c6d 100644 --- a/src/storage/compaction/ob_partition_merger.cpp +++ b/src/storage/compaction/ob_partition_merger.cpp @@ -37,7 +37,7 @@ namespace compaction */ ObPartitionMerger::ObPartitionMerger() - : allocator_("partMerger"), + : allocator_("partMerger", OB_MALLOC_NORMAL_BLOCK_SIZE, MTL_ID()), merge_ctx_(nullptr), merge_progress_(nullptr), partition_fuser_(nullptr), diff --git a/src/storage/compaction/ob_tablet_merge_ctx.cpp b/src/storage/compaction/ob_tablet_merge_ctx.cpp index 4576cee728..dacf11883c 100755 --- a/src/storage/compaction/ob_tablet_merge_ctx.cpp +++ b/src/storage/compaction/ob_tablet_merge_ctx.cpp @@ -488,7 +488,6 @@ ObCompactionTimeGuard & ObCompactionTimeGuard::operator=(const ObCompactionTimeG ObSchemaMergeCtx::ObSchemaMergeCtx(ObArenaAllocator &allocator) : allocator_(allocator), - base_schema_version_(0), schema_version_(0), storage_schema_(nullptr) { @@ -584,7 +583,6 @@ bool ObTabletMergeCtx::is_valid() const && !tables_handle_.empty() && create_snapshot_version_ >= 0 && schema_ctx_.schema_version_ >= 0 - && schema_ctx_.base_schema_version_ >= 0 && NULL != schema_ctx_.storage_schema_ && schema_ctx_.storage_schema_->is_valid() && sstable_logic_seq_ >= 0 @@ -740,7 +738,7 @@ int ObTabletMergeCtx::init_get_medium_compaction_info( if (FAILEDx(check_medium_info_and_last_major(medium_info, get_merge_table_result))) { LOG_WARN("failed to check medium info and last major sstable", KR(ret), K(medium_info), K(get_merge_table_result)); } else { - get_merge_table_result.schema_version_ = medium_info.storage_schema_.schema_version_; + schema_ctx_.schema_version_ = medium_info.storage_schema_.schema_version_; data_version_ = medium_info.data_version_; is_tenant_major_merge_ = medium_info.is_major_compaction(); } @@ -976,8 +974,6 @@ int ObTabletMergeCtx::get_basic_info_from_result( } } - schema_ctx_.base_schema_version_ = get_merge_table_result.base_schema_version_; - schema_ctx_.schema_version_ = get_merge_table_result.schema_version_; create_snapshot_version_ = get_merge_table_result.create_snapshot_version_; schedule_major_ = get_merge_table_result.schedule_major_; } @@ -1103,6 +1099,9 @@ int ObTabletMergeCtx::get_storage_schema_to_merge(const ObTablesHandleArray &mer if (OB_FAIL(tablet_handle_.get_obj()->load_storage_schema(allocator_, schema_on_tablet))) { LOG_WARN("failed to load storage schema", K(ret), K_(tablet_handle)); } else if (is_mini_merge(merge_type) && !param_.tablet_id_.is_ls_inner_tablet()) { + if (OB_FAIL(schema_on_tablet->get_store_column_count(column_cnt_in_schema, true/*full_col*/))) { + LOG_WARN("failed to get store column count", K(ret), K(column_cnt_in_schema)); + } ObITable *table = nullptr; memtable::ObMemtable *memtable = nullptr; for (int i = merge_tables_handle.get_count() - 1; OB_SUCC(ret) && i >= 0; --i) { @@ -1112,14 +1111,13 @@ int ObTabletMergeCtx::get_storage_schema_to_merge(const ObTablesHandleArray &mer } else if (OB_ISNULL(memtable = dynamic_cast(table))) { ret = OB_ERR_UNEXPECTED; LOG_WARN("table pointer does not point to a ObMemtable object", KPC(table)); - } else if (OB_FAIL(memtable->get_schema_info( + } else if (OB_FAIL(memtable->get_schema_info(column_cnt_in_schema, max_schema_version_in_memtable, max_column_cnt_in_memtable))) { LOG_WARN("failed to get schema info from memtable", KR(ret), KPC(memtable)); } } // end of for - if (FAILEDx(schema_on_tablet->get_store_column_count(column_cnt_in_schema, true/*full_col*/))) { - LOG_WARN("failed to get store column count", K(ret), K(column_cnt_in_schema)); + if (OB_FAIL(ret)) { } else if (max_column_cnt_in_memtable > column_cnt_in_schema || max_schema_version_in_memtable > schema_on_tablet->get_schema_version()) { // need alloc new storage schema & set column cnt diff --git a/src/storage/compaction/ob_tablet_merge_ctx.h b/src/storage/compaction/ob_tablet_merge_ctx.h index e6e9c7cbbd..b857868138 100755 --- a/src/storage/compaction/ob_tablet_merge_ctx.h +++ b/src/storage/compaction/ob_tablet_merge_ctx.h @@ -97,11 +97,10 @@ struct ObSchemaMergeCtx } } common::ObArenaAllocator &allocator_; - int64_t base_schema_version_; int64_t schema_version_; const ObStorageSchema *storage_schema_; // schema for all merge - TO_STRING_KV(K_(base_schema_version), K_(schema_version), KPC_(storage_schema)); + TO_STRING_KV(K_(schema_version), KPC_(storage_schema)); }; class ObCompactionTimeGuard : public common::occam::ObOccamTimeGuard diff --git a/src/storage/high_availability/ob_tablet_backfill_tx.cpp b/src/storage/high_availability/ob_tablet_backfill_tx.cpp index 00fb8a20e4..9eb386f9c5 100644 --- a/src/storage/high_availability/ob_tablet_backfill_tx.cpp +++ b/src/storage/high_availability/ob_tablet_backfill_tx.cpp @@ -865,9 +865,6 @@ int ObTabletTableBackfillTXTask::prepare_merge_ctx_() LOG_WARN("failed to add table into tables handle", K(ret), K(table_handle_)); } else if (OB_FAIL(tablet_merge_ctx_.get_storage_schema_to_merge(tablet_merge_ctx_.tables_handle_))) { LOG_ERROR("Fail to get storage schema", K(ret), K(tablet_merge_ctx_)); - } else { - //get_basic_info_from_result result - tablet_merge_ctx_.schema_ctx_.base_schema_version_ = tablet_merge_ctx_.schema_ctx_.schema_version_; } if (OB_FAIL(ret)) { diff --git a/src/storage/memtable/ob_memtable.cpp b/src/storage/memtable/ob_memtable.cpp index f471b9bcf8..b9619522ae 100755 --- a/src/storage/memtable/ob_memtable.cpp +++ b/src/storage/memtable/ob_memtable.cpp @@ -1065,8 +1065,10 @@ int ObMemtable::replay_row(ObStoreCtx &ctx, if (part_ctx->need_update_schema_version(log_id, scn)) { ctx.mvcc_acc_ctx_.mem_ctx_->set_table_version(table_version); } - set_max_data_schema_version(table_version); - set_max_column_cnt(column_cnt); + if (dml_flag != blocksstable::ObDmlFlag::DF_LOCK) { + set_max_data_schema_version(table_version); + set_max_column_cnt(column_cnt); + } } } return ret; @@ -2294,6 +2296,7 @@ int64_t ObMemtable::get_max_column_cnt() const } int ObMemtable::get_schema_info( + const int64_t input_column_cnt, int64_t &max_schema_version_on_memtable, int64_t &max_column_cnt_on_memtable) const { @@ -2302,8 +2305,7 @@ int ObMemtable::get_schema_info( if (IS_NOT_INIT) { ret = OB_NOT_INIT; TRANS_LOG(WARN, "not inited", K(ret)); - } else if (get_max_column_cnt() > max_column_cnt_on_memtable - || get_max_data_schema_version() > max_schema_version_on_memtable) { + } else if (get_max_column_cnt() > input_column_cnt) { TRANS_LOG(INFO, "column cnt or schema version is updated by memtable", KPC(this), K(max_column_cnt_on_memtable), K(max_schema_version_on_memtable)); max_column_cnt_on_memtable = MAX(max_column_cnt_on_memtable, get_max_column_cnt()); diff --git a/src/storage/memtable/ob_memtable.h b/src/storage/memtable/ob_memtable.h index 7292e37e4d..b175f19928 100644 --- a/src/storage/memtable/ob_memtable.h +++ b/src/storage/memtable/ob_memtable.h @@ -321,6 +321,7 @@ public: void set_max_column_cnt(const int64_t column_cnt); int64_t get_max_column_cnt() const; int get_schema_info( + const int64_t input_column_cnt, int64_t &max_schema_version_on_memtable, int64_t &max_column_cnt_on_memtable) const; int row_compact(ObMvccRow *value, @@ -491,8 +492,8 @@ public: // TODO(handora.qc) ready_for_flush interface adjustment bool is_can_flush() { return ObMemtableFreezeState::READY_FOR_FLUSH == freeze_state_ && share::SCN::max_scn() != get_end_scn(); } INHERIT_TO_STRING_KV("ObITable", ObITable, KP(this), KP_(memtable_mgr), K_(timestamp), K_(state), - K_(freeze_clock), K_(max_schema_version), K_(max_data_schema_version), K_(write_ref_cnt), K_(local_allocator), - K_(unsubmitted_cnt), K_(unsynced_cnt), + K_(freeze_clock), K_(max_schema_version), K_(max_data_schema_version), K_(max_column_cnt), + K_(write_ref_cnt), K_(local_allocator), K_(unsubmitted_cnt), K_(unsynced_cnt), K_(logging_blocked), K_(unset_active_memtable_logging_blocked), K_(resolve_active_memtable_left_boundary), K_(contain_hotspot_row), K_(max_end_scn), K_(rec_scn), K_(snapshot_version), K_(migration_clog_checkpoint_scn), K_(is_tablet_freeze), K_(is_force_freeze), K_(contain_hotspot_row), diff --git a/src/storage/ob_storage_struct.cpp b/src/storage/ob_storage_struct.cpp index 96f3b55edc..d8d17301e2 100644 --- a/src/storage/ob_storage_struct.cpp +++ b/src/storage/ob_storage_struct.cpp @@ -133,8 +133,6 @@ ObGetMergeTablesResult::ObGetMergeTablesResult() : version_range_(), handle_(), merge_version_(), - base_schema_version_(INVALID_INT_VALUE), - schema_version_(INVALID_INT_VALUE), create_snapshot_version_(INVALID_INT_VALUE), suggest_merge_type_(INVALID_MERGE_TYPE), update_tablet_directly_(false), @@ -149,8 +147,6 @@ bool ObGetMergeTablesResult::is_valid() const return scn_range_.is_valid() && handle_.get_count() >= 1 && merge_version_ >= 0 - && base_schema_version_ >= 0 - && schema_version_ >= 0 && create_snapshot_version_ >= 0 && (suggest_merge_type_ > INVALID_MERGE_TYPE && suggest_merge_type_ < MERGE_TYPE_MAX); } @@ -167,8 +163,6 @@ void ObGetMergeTablesResult::reset() version_range_.reset(); handle_.reset(); merge_version_ = ObVersionRange::MIN_VERSION; - base_schema_version_ = INVALID_INT_VALUE; - schema_version_ = INVALID_INT_VALUE; create_snapshot_version_ = 0; suggest_merge_type_ = INVALID_MERGE_TYPE; schedule_major_ = false; @@ -185,8 +179,6 @@ int ObGetMergeTablesResult::copy_basic_info(const ObGetMergeTablesResult &src) } else { version_range_ = src.version_range_; merge_version_ = src.merge_version_; - base_schema_version_ = src.base_schema_version_; - schema_version_ = src.schema_version_; create_snapshot_version_ = src.create_snapshot_version_; suggest_merge_type_ = src.suggest_merge_type_; schedule_major_ = src.schedule_major_; diff --git a/src/storage/ob_storage_struct.h b/src/storage/ob_storage_struct.h index 2f6b0995d0..1c51d9df41 100755 --- a/src/storage/ob_storage_struct.h +++ b/src/storage/ob_storage_struct.h @@ -254,8 +254,6 @@ struct ObGetMergeTablesResult common::ObVersionRange version_range_; ObTablesHandleArray handle_; int64_t merge_version_; - int64_t base_schema_version_; - int64_t schema_version_; int64_t create_snapshot_version_; ObMergeType suggest_merge_type_; bool update_tablet_directly_; @@ -271,7 +269,7 @@ struct ObGetMergeTablesResult void reset(); int assign(const ObGetMergeTablesResult &src); int copy_basic_info(const ObGetMergeTablesResult &src); - TO_STRING_KV(K_(version_range), K_(scn_range), K_(merge_version), K_(base_schema_version), K_(schema_version), + TO_STRING_KV(K_(version_range), K_(scn_range), K_(merge_version), K_(create_snapshot_version), K_(suggest_merge_type), K_(handle), K_(update_tablet_directly), K_(schedule_major), K_(read_base_version)); }; diff --git a/src/storage/tablet/ob_table_store_util.cpp b/src/storage/tablet/ob_table_store_util.cpp index 0a5617335b..890648f1f5 100755 --- a/src/storage/tablet/ob_table_store_util.cpp +++ b/src/storage/tablet/ob_table_store_util.cpp @@ -795,7 +795,7 @@ int ObMemtableArray::find( ret = OB_ERR_SYS; LOG_WARN("table must not null", K(ret), KPC(memtable), KPC(this)); } else if (memtable->get_end_scn() == start_scn) { - if (static_cast(memtable)->get_snapshot_version() > base_version) { + if (memtable->get_snapshot_version() > base_version) { mem_pos = i; table = memtable; break; diff --git a/src/storage/tablet/ob_tablet.cpp b/src/storage/tablet/ob_tablet.cpp index bef0162de6..43033a68bc 100755 --- a/src/storage/tablet/ob_tablet.cpp +++ b/src/storage/tablet/ob_tablet.cpp @@ -4456,6 +4456,10 @@ int ObTablet::get_storage_schema_for_transfer_in( LOG_WARN("memtable mgr should not be NULL", K(ret), KP(memtable_mgr)); } else if (OB_FAIL(memtable_mgr->get_all_memtables(memtables))) { LOG_WARN("failed to get all memtables", K(ret), KPC(this)); + } else if (OB_FAIL(load_storage_schema(allocator, tablet_storage_schema))) { + LOG_WARN("fail to load storage schema", K(ret), K_(storage_schema_addr)); + } else if (OB_FAIL(tablet_storage_schema->get_store_column_count(store_column_cnt_in_schema, true/*full_col*/))) { + LOG_WARN("failed to get store column count", K(ret), K(store_column_cnt_in_schema)); } else { for (int64_t i = 0; OB_SUCC(ret) && i < memtables.count(); ++i) { ObITable *table = memtables.at(i).get_table(); @@ -4463,6 +4467,7 @@ int ObTablet::get_storage_schema_for_transfer_in( ret = OB_ERR_UNEXPECTED; LOG_WARN("table in tables_handle is invalid", K(ret), KP(table)); } else if (OB_FAIL(static_cast(table)->get_schema_info( + store_column_cnt_in_schema, max_schema_version_in_memtable, max_column_cnt_in_memtable))) { LOG_WARN("failed to get schema info from memtable", KR(ret), KPC(table)); } @@ -4470,12 +4475,8 @@ int ObTablet::get_storage_schema_for_transfer_in( } if (OB_FAIL(ret)) { - } else if (OB_FAIL(load_storage_schema(allocator, tablet_storage_schema))) { - LOG_WARN("fail to load storage schema", K(ret), K_(storage_schema_addr)); } else if (OB_FAIL(storage_schema.deep_copy(tablet_storage_schema, &allocator))) { LOG_WARN("failed to get tx data from tablet", K(ret), K(ls_id), K(tablet_id), KPC(tablet_storage_schema)); - } else if (OB_FAIL(storage_schema.get_store_column_count(store_column_cnt_in_schema, true/*full_col*/))) { - LOG_WARN("failed to get store column count", K(ret), K(store_column_cnt_in_schema)); } else { int64_t old_column_cnt = storage_schema.get_column_count(); int64_t old_schema_version = storage_schema.get_schema_version(); diff --git a/unittest/storage/test_compaction_policy.cpp b/unittest/storage/test_compaction_policy.cpp index 7636d9ddf7..fb9d15673c 100644 --- a/unittest/storage/test_compaction_policy.cpp +++ b/unittest/storage/test_compaction_policy.cpp @@ -130,9 +130,6 @@ public: common::ObIArray &snapshots); void prepare_schema(share::schema::ObTableSchema &table_schema); - int prepare_medium_list( - const char *snapshot_list, - ObTabletHandle &tablet_handle); int construct_array( const char *snapshot_list, ObIArray &array); @@ -420,7 +417,7 @@ int TestCompactionPolicy::mock_tablet( } else if (OB_FAIL(ObTabletCreateDeleteHelper::create_tmp_tablet(key, allocator, tablet_handle))) { LOG_WARN("failed to acquire tablet", K(ret), K(key)); } else if (FALSE_IT(tablet = tablet_handle.get_obj())) { - } else if (OB_FAIL(tablet->init(allocator, ls_id, tablet_id, tablet_id, empty_tablet_id, empty_tablet_id, + } else if (OB_FAIL(tablet->init(allocator, ls_id, tablet_id, tablet_id, SCN::min_scn(), snapshot_version, table_schema, compat_mode, table_store_flag, nullptr, ls_handle.get_ls()->get_freezer()))) { LOG_WARN("failed to init tablet", K(ret), K(ls_id), K(tablet_id), K(snapshot_version), K(table_schema), K(compat_mode)); @@ -455,21 +452,6 @@ int TestCompactionPolicy::construct_array( return ret; } -int TestCompactionPolicy::prepare_medium_list( - const char *snapshot_list, - ObTabletHandle &tablet_handle) -{ - int ret = OB_SUCCESS; - ObTablet &tablet = *tablet_handle.get_obj(); - construct_array(snapshot_list, array_); - tablet.medium_info_list_addr_.get_ptr()->reset_list(); - for (int i = 0; OB_SUCC(ret) && i < array_.count(); ++i) { - medium_info_.medium_snapshot_ = array_.at(i); - ret = tablet.medium_info_list_addr_.get_ptr()->add_medium_compaction_info(medium_info_); - } - return ret; -} - int TestCompactionPolicy::check_result_tables_handle( const char *end_log_ts_list, const ObGetMergeTablesResult &result) @@ -1066,52 +1048,51 @@ TEST_F(TestCompactionPolicy, check_no_need_major_merge) ASSERT_EQ(OB_NO_NEED_MERGE, ret); } -TEST_F(TestCompactionPolicy, test_minor_with_medium) +TEST_F(TestCompactionPolicy, test_medium_info_serialize) { int ret = OB_SUCCESS; - ObTenantFreezeInfoMgr *mgr = MTL(ObTenantFreezeInfoMgr *); - ASSERT_TRUE(nullptr != mgr); + // prepare parallel_rowkey_list + const int64_t concurrent_cnt = 5; + ObArenaAllocator allocator; + ObDatumRowkey datum_rowkey_list[concurrent_cnt]; + ObDatumRowkey tmp_datum_rowkey; + ObStorageDatum datums[OB_INNER_MAX_ROWKEY_COLUMN_NUMBER]; + tmp_datum_rowkey.assign(datums, OB_INNER_MAX_ROWKEY_COLUMN_NUMBER); - common::ObArray freeze_info; - common::ObArray snapshots; - share::SCN scn; - ASSERT_EQ(OB_SUCCESS, freeze_info.push_back(ObTenantFreezeInfoMgr::FreezeInfo(1, 1, 0))); - ASSERT_EQ(OB_SUCCESS, freeze_info.push_back(ObTenantFreezeInfoMgr::FreezeInfo(140, 1, 0))); + medium_info_.contain_parallel_range_ = true; + medium_info_.parallel_merge_info_.list_size_ = concurrent_cnt; + medium_info_.parallel_merge_info_.parallel_datum_rowkey_list_ = datum_rowkey_list; - ret = TestCompactionPolicy::prepare_freeze_info(500, freeze_info, snapshots); + for (int64_t idx = 0; idx < concurrent_cnt; ++idx) { + tmp_datum_rowkey.datums_[0].set_string("aaaaa"); + tmp_datum_rowkey.datums_[1].set_int(idx); + tmp_datum_rowkey.datum_cnt_ = 2; + if (OB_FAIL(tmp_datum_rowkey.deep_copy( + medium_info_.parallel_merge_info_.parallel_datum_rowkey_list_[idx] /*dst*/, allocator))) { + LOG_WARN("failed to deep copy datum rowkey", KR(ret), K(idx), K(tmp_datum_rowkey)); + } + } + + const int64_t buf_len = ObParallelMergeInfo::MAX_PARALLEL_RANGE_SERIALIZE_LEN; + char medium_info_buf[buf_len]; + int64_t write_pos = 0; + ret = medium_info_.serialize(medium_info_buf, buf_len, write_pos); ASSERT_EQ(OB_SUCCESS, ret); + ASSERT_EQ(medium_info_.get_serialize_size(), write_pos); - const char *key_data = - "table_type start_scn end_scn max_ver upper_ver\n" - "10 0 1 1 1 \n" - "11 150 200 200 200 \n" - "11 200 250 250 250 \n" - "11 250 300 300 300 \n" - "11 300 340 340 340 \n"; - - ret = prepare_tablet(key_data, 340, 340); - ASSERT_EQ(OB_SUCCESS, ret); - - ObGetMergeTablesParam param; - param.merge_type_ = ObMergeType::MINOR_MERGE; - param.merge_version_ = 0; - ObGetMergeTablesResult result; - FakeLS ls; - - prepare_medium_list("240", tablet_handle_); - ret = ObPartitionMergePolicy::get_minor_merge_tables(param, ls, *tablet_handle_.get_obj(), result); - ASSERT_EQ(OB_SUCCESS, ret); - ASSERT_EQ(OB_SUCCESS, check_result_tables_handle("250, 300, 340", result)); - - prepare_medium_list("150", tablet_handle_); - ret = ObPartitionMergePolicy::get_minor_merge_tables(param, ls, *tablet_handle_.get_obj(), result); - ASSERT_EQ(OB_SUCCESS, ret); - ASSERT_EQ(OB_SUCCESS, check_result_tables_handle("200, 250, 300, 340", result)); - - prepare_medium_list("300", tablet_handle_); - ret = ObPartitionMergePolicy::get_minor_merge_tables(param, ls, *tablet_handle_.get_obj(), result); - ASSERT_EQ(OB_NO_NEED_MERGE, ret); + ObMediumCompactionInfo deserialize_medium_info; + int64_t pos = 0; + ret = deserialize_medium_info.deserialize(allocator, medium_info_buf, write_pos, pos); + ASSERT_EQ(pos, write_pos); + ASSERT_EQ(deserialize_medium_info.contain_parallel_range_, true); + ASSERT_EQ(deserialize_medium_info.parallel_merge_info_.list_size_, concurrent_cnt); + for (int64_t idx = 0; idx < concurrent_cnt; ++idx) { + ASSERT_TRUE( + medium_info_.parallel_merge_info_.parallel_datum_rowkey_list_[idx].datums_[0] + == deserialize_medium_info.parallel_merge_info_.parallel_datum_rowkey_list_[idx].datums_[0]); + ASSERT_TRUE(idx == deserialize_medium_info.parallel_merge_info_.parallel_datum_rowkey_list_[idx].datums_[1].get_int()); + } } } //unittest @@ -1120,7 +1101,7 @@ TEST_F(TestCompactionPolicy, test_minor_with_medium) int main(int argc, char **argv) { - system("rm -rf test_compaction_policy.log"); + system("rm -rf test_compaction_policy.log*"); OB_LOGGER.set_file_name("test_compaction_policy.log"); OB_LOGGER.set_log_level("DEBUG"); CLOG_LOG(INFO, "begin unittest: test_compaction_policy"); diff --git a/unittest/storage/test_major_rows_merger.cpp b/unittest/storage/test_major_rows_merger.cpp index 40a209e321..95168a67c4 100644 --- a/unittest/storage/test_major_rows_merger.cpp +++ b/unittest/storage/test_major_rows_merger.cpp @@ -141,8 +141,6 @@ void ObMajorRowsMergerTest::prepare_merge_context(const ObMergeType &merge_type, table_merge_schema_.reset(); OK(table_merge_schema_.init(allocator_, table_schema_, lib::Worker::CompatMode::MYSQL)); - merge_context.schema_ctx_.base_schema_version_ = table_schema_.get_schema_version(); - merge_context.schema_ctx_.schema_version_ = table_schema_.get_schema_version(); merge_context.schema_ctx_.storage_schema_ = &table_merge_schema_; merge_context.is_full_merge_ = is_full_merge; diff --git a/unittest/storage/test_parallel_minor_dag.cpp b/unittest/storage/test_parallel_minor_dag.cpp index c0827ae02b..bd8ed42044 100644 --- a/unittest/storage/test_parallel_minor_dag.cpp +++ b/unittest/storage/test_parallel_minor_dag.cpp @@ -108,8 +108,6 @@ int TestParallelMinorDag::prepare_merge_result( result.version_range_.snapshot_version_ = 100; result.version_range_.multi_version_start_ = 100; result.merge_version_ = 0; - result.base_schema_version_ = 0; - result.schema_version_ = 0; result.create_snapshot_version_ = 0; result.suggest_merge_type_ = MINOR_MERGE;