From 751a0c587661ac355cec760c3e9ea21cd117a16c Mon Sep 17 00:00:00 2001 From: Fengjingkun Date: Fri, 10 Nov 2023 03:44:41 +0000 Subject: [PATCH] fix meta merge && optimize meta merge scheduling --- deps/oblib/src/lib/utility/ob_tracepoint.h | 1 + .../blocksstable/ob_macro_block_writer.cpp | 2 +- .../ob_shared_macro_block_manager.cpp | 4 +- src/storage/column_store/ob_co_merge_ctx.cpp | 7 +- src/storage/column_store/ob_co_merge_dag.cpp | 10 +- .../compaction/ob_basic_tablet_merge_ctx.cpp | 78 ++++++++- .../compaction/ob_basic_tablet_merge_ctx.h | 1 + .../compaction/ob_medium_compaction_func.cpp | 106 ++++++----- .../compaction/ob_medium_compaction_func.h | 3 +- .../compaction/ob_partition_merge_iter.cpp | 8 +- .../compaction/ob_partition_merge_policy.cpp | 165 ++++++++---------- .../compaction/ob_partition_merge_policy.h | 18 +- .../ob_partition_parallel_merge_ctx.cpp | 10 +- .../compaction/ob_tablet_merge_ctx.cpp | 163 +++++++++-------- src/storage/compaction/ob_tablet_merge_ctx.h | 1 + .../compaction/ob_tablet_merge_info.cpp | 5 +- .../compaction/ob_tablet_merge_task.cpp | 4 +- .../compaction/ob_tenant_tablet_scheduler.cpp | 88 +++++----- .../compaction/ob_tenant_tablet_scheduler.h | 6 +- src/storage/ob_i_table.cpp | 8 +- src/storage/ob_i_table.h | 17 +- src/storage/ob_tenant_tablet_stat_mgr.cpp | 26 +++ src/storage/ob_tenant_tablet_stat_mgr.h | 3 + src/storage/tablet/ob_tablet_table_store.cpp | 10 +- 24 files changed, 421 insertions(+), 323 deletions(-) diff --git a/deps/oblib/src/lib/utility/ob_tracepoint.h b/deps/oblib/src/lib/utility/ob_tracepoint.h index 8d7ed6f9a..36f063769 100644 --- a/deps/oblib/src/lib/utility/ob_tracepoint.h +++ b/deps/oblib/src/lib/utility/ob_tracepoint.h @@ -684,6 +684,7 @@ class EventTable EN_SPECIAL_TABLE_HAVE_LARGER_SCN = 732, EN_COMPACTION_CO_PUSH_TABLES_FAILED = 733, EN_COMPACTION_CO_MERGE_PARTITION_LONG_TIME = 734, + EN_COMPACTION_SCHEDULE_META_MERGE = 735, // please add new trace point after 750 EN_SESSION_LEAK_COUNT_THRESHOLD = 751, diff --git a/src/storage/blocksstable/ob_macro_block_writer.cpp b/src/storage/blocksstable/ob_macro_block_writer.cpp index f0e55be08..5709cc26e 100644 --- a/src/storage/blocksstable/ob_macro_block_writer.cpp +++ b/src/storage/blocksstable/ob_macro_block_writer.cpp @@ -1536,7 +1536,7 @@ int ObMacroBlockWriter::merge_micro_block(const ObMicroBlock µ_block) } else if (!micro_block.is_valid()) { ret = OB_INVALID_ARGUMENT; STORAGE_LOG(WARN, "invalid micro_block", K(micro_block), K(ret)); - } else if (OB_UNLIKELY(!data_store_desc_->is_major_merge_type())) { + } else if (OB_UNLIKELY(!data_store_desc_->is_major_or_meta_merge_type())) { // forbid micro block level merge for minor merge now ret = OB_ERR_UNEXPECTED; STORAGE_LOG(WARN, "minor merge does not allow micro block level merge", K(ret)); diff --git a/src/storage/blocksstable/ob_shared_macro_block_manager.cpp b/src/storage/blocksstable/ob_shared_macro_block_manager.cpp index c1440d29a..24e56fc41 100644 --- a/src/storage/blocksstable/ob_shared_macro_block_manager.cpp +++ b/src/storage/blocksstable/ob_shared_macro_block_manager.cpp @@ -812,7 +812,9 @@ int ObSharedMacroBlockMgr::parse_merge_type(const ObSSTable &sstable, ObMergeTyp merge_type = ObMergeType::INVALID_MERGE_TYPE; if (sstable.is_major_sstable()) { - merge_type = ObMergeType::MAJOR_MERGE; + merge_type = sstable.is_meta_major_sstable() + ? ObMergeType::META_MAJOR_MERGE + : ObMergeType::MAJOR_MERGE; } else if (sstable.is_minor_sstable()) { merge_type = ObMergeType::MINOR_MERGE; } else { diff --git a/src/storage/column_store/ob_co_merge_ctx.cpp b/src/storage/column_store/ob_co_merge_ctx.cpp index f012c80dc..e6bfb67c4 100644 --- a/src/storage/column_store/ob_co_merge_ctx.cpp +++ b/src/storage/column_store/ob_co_merge_ctx.cpp @@ -144,7 +144,12 @@ int ObCOTabletMergeCtx::init_tablet_merge_info(const bool need_check) int ObCOTabletMergeCtx::prepare_schema() { int ret = OB_SUCCESS; - if (OB_FAIL(get_medium_compaction_info())) { + + if (is_meta_major_merge(static_param_.get_merge_type())) { + if (OB_FAIL(get_meta_compaction_info())) { + LOG_WARN("failed to get meta compaction info", K(ret), KPC(this)); + } + } else if (OB_FAIL(get_medium_compaction_info())) { // have checked medium info inside LOG_WARN("failed to get medium compaction info", K(ret), KPC(this)); } diff --git a/src/storage/column_store/ob_co_merge_dag.cpp b/src/storage/column_store/ob_co_merge_dag.cpp index 5dbe8c4f5..70bd39837 100644 --- a/src/storage/column_store/ob_co_merge_dag.cpp +++ b/src/storage/column_store/ob_co_merge_dag.cpp @@ -122,7 +122,7 @@ int ObCOMergePrepareDag::init_by_param(const share::ObIDagInitParam *param) ret = OB_INVALID_ARGUMENT; LOG_WARN("Invalid argument", K(ret), K(param)); } else if (FALSE_IT(merge_param = static_cast(param))) { - } else if (OB_UNLIKELY(!is_major_merge_type(merge_param->merge_type_) || !merge_param->is_valid())) { + } else if (OB_UNLIKELY(!is_major_or_meta_merge_type(merge_param->merge_type_) || !merge_param->is_valid())) { ret = OB_ERR_SYS; LOG_ERROR("Unexpected merge type to init major merge dag", K(ret), KPC(merge_param)); } else if (OB_FAIL(ObTabletMergeDag::inner_init(merge_param))) { @@ -313,7 +313,7 @@ int ObCOMergeScheduleDag::init_by_param(const share::ObIDagInitParam *param) ret = OB_INVALID_ARGUMENT; LOG_WARN("Invalid argument", K(ret), K(param)); } else if (FALSE_IT(merge_param = static_cast(param))) { - } else if (OB_UNLIKELY(!is_major_merge_type(merge_param->merge_type_) || !merge_param->is_valid())) { + } else if (OB_UNLIKELY(!is_major_or_meta_merge_type(merge_param->merge_type_) || !merge_param->is_valid())) { ret = OB_ERR_SYS; LOG_ERROR("Unexpected merge type to init major merge dag", K(ret), KPC(merge_param)); } else if (OB_FAIL(ObTabletMergeDag::inner_init(merge_param))) { @@ -454,7 +454,7 @@ int ObCOMergeBatchExeDag::init_by_param(const share::ObIDagInitParam *param) ret = OB_INVALID_ARGUMENT; LOG_WARN("Invalid argument", K(ret), K(param)); } else if (FALSE_IT(merge_param = static_cast(param))) { - } else if (OB_UNLIKELY(!is_major_merge_type(merge_param->merge_type_) || !merge_param->is_valid())) { + } else if (OB_UNLIKELY(!is_major_or_meta_merge_type(merge_param->merge_type_) || !merge_param->is_valid())) { ret = OB_ERR_SYS; LOG_ERROR("Unexpected merge type to init major merge dag", K(ret), KPC(merge_param)); } else if (OB_FAIL(ObTabletMergeDag::inner_init(merge_param))) { @@ -879,7 +879,7 @@ int ObCOMergeFinishDag::init_by_param(const share::ObIDagInitParam *param) ret = OB_INVALID_ARGUMENT; LOG_WARN("Invalid argument", K(ret), K(param)); } else if (FALSE_IT(merge_param = static_cast(param))) { - } else if (OB_UNLIKELY(!is_major_merge_type(merge_param->merge_type_) || !merge_param->is_valid())) { + } else if (OB_UNLIKELY(!is_major_or_meta_merge_type(merge_param->merge_type_) || !merge_param->is_valid())) { ret = OB_ERR_SYS; LOG_ERROR("Unexpected merge type to init major merge dag", K(ret), KPC(merge_param)); } else if (OB_FAIL(ObTabletMergeDag::inner_init(merge_param))) { @@ -1019,7 +1019,7 @@ int ObCOMergeDagNet::init_by_param(const ObIDagInitParam *param) ret = OB_INVALID_ARGUMENT; LOG_WARN("Invalid argument to init dag net", K(ret), K(param)); } else if (FALSE_IT(merge_param = static_cast(param))) { - } else if (OB_UNLIKELY(!is_major_merge_type(merge_param->merge_type_) || !merge_param->is_valid())) { + } else if (OB_UNLIKELY(!is_major_or_meta_merge_type(merge_param->merge_type_) || !merge_param->is_valid())) { ret = OB_ERR_SYS; LOG_ERROR("Unexpected merge type to init major merge dag", K(ret), KPC(merge_param)); } else { diff --git a/src/storage/compaction/ob_basic_tablet_merge_ctx.cpp b/src/storage/compaction/ob_basic_tablet_merge_ctx.cpp index f905f3e94..2d476b829 100644 --- a/src/storage/compaction/ob_basic_tablet_merge_ctx.cpp +++ b/src/storage/compaction/ob_basic_tablet_merge_ctx.cpp @@ -22,6 +22,7 @@ #include "storage/blocksstable/ob_data_store_desc.h" #include "storage/ob_storage_schema_util.h" #include "ob_medium_list_checker.h" +#include "share/schema/ob_tenant_schema_service.h" namespace oceanbase { @@ -207,7 +208,7 @@ int ObStaticMergeParam::cal_major_merge_param() const ObTablesHandleArray &tables_handle = tables_handle_; if (OB_UNLIKELY(tables_handle.empty() || NULL == (base_table = static_cast(tables_handle.get_table(0))) - || (!base_table->is_major_sstable() && !base_table->is_meta_major_sstable()))) { + || !base_table->is_major_sstable())) { ret = OB_ENTRY_NOT_EXIST; LOG_WARN("base table must be major or meta major", K(ret), K(tables_handle)); } else if (OB_FAIL(base_table->get_meta(sstable_meta_hdl))) { @@ -254,7 +255,7 @@ int ObStaticMergeParam::cal_major_merge_param() } if (OB_SUCC(ret)) { - if (is_full_merge_ || (merge_level_ != MACRO_BLOCK_MERGE_LEVEL && is_schema_changed_)) { + if (is_full_merge_ || is_meta_major_merge(get_merge_type()) || (merge_level_ != MACRO_BLOCK_MERGE_LEVEL && is_schema_changed_)) { merge_level_ = MACRO_BLOCK_MERGE_LEVEL; } } @@ -731,22 +732,25 @@ ObITable::TableType ObBasicTabletMergeCtx::get_merged_table_type( { ObITable::TableType table_type = ObITable::MAX_TABLE_TYPE; - if (is_major_merge_type(get_merge_type())) { // MAJOR_MERGE + if (is_major_or_meta_merge_type(get_merge_type())) { // MAJOR / META MERGE + const bool is_meta_merge = is_meta_major_merge(get_merge_type()); if (nullptr == cg_schema) { - table_type = ObITable::TableType::MAJOR_SSTABLE; + table_type = is_meta_merge + ? ObITable::TableType::META_MAJOR_SSTABLE + : ObITable::TableType::MAJOR_SSTABLE; } else if (cg_schema->is_all_column_group()) { - table_type = ObITable::TableType::COLUMN_ORIENTED_SSTABLE; + table_type = is_meta_merge + ? ObITable::TableType::COLUMN_ORIENTED_META_SSTABLE + : ObITable::TableType::COLUMN_ORIENTED_SSTABLE; } else if (cg_schema->is_rowkey_column_group()) { table_type = is_main_table - ? ObITable::TableType::COLUMN_ORIENTED_SSTABLE + ? (is_meta_merge ? ObITable::TableType::COLUMN_ORIENTED_META_SSTABLE : ObITable::TableType::COLUMN_ORIENTED_SSTABLE) : ObITable::TableType::ROWKEY_COLUMN_GROUP_SSTABLE; } else { table_type = ObITable::TableType::NORMAL_COLUMN_GROUP_SSTABLE; } } else if (MINI_MERGE == get_merge_type()) { table_type = ObITable::TableType::MINI_SSTABLE; - } else if (META_MAJOR_MERGE == get_merge_type()) { - table_type = ObITable::TableType::META_MAJOR_SSTABLE; } else if (DDL_KV_MERGE == get_merge_type()) { table_type = ObITable::TableType::DDL_DUMP_SSTABLE; } else { // MINOR_MERGE || HISTORY_MINOR_MERGE @@ -784,6 +788,11 @@ void ObBasicTabletMergeCtx::build_update_table_store_param( if (is_mini_merge(merge_type) && nullptr != sstable) { clog_checkpoint_scn = sstable->get_end_scn(); } + if (is_meta_major_merge(get_merge_type())) { + param.multi_version_start_ = tablet_handle_.get_obj()->get_multi_version_start(); + param.snapshot_version_ = tablet_handle_.get_obj()->get_snapshot_version(); + } + param.sstable_ = sstable; param.snapshot_version_ = static_param_.version_range_.snapshot_version_; param.multi_version_start_ = get_tablet_id().is_ls_inner_tablet() ? 1 : static_param_.version_range_.multi_version_start_; @@ -804,7 +813,6 @@ int ObBasicTabletMergeCtx::update_tablet( ObTabletHandle &new_tablet_handle) { int ret = OB_SUCCESS; - const ObMergeType merge_type = get_merge_type(); // means finish current major/medium compaction ObArenaAllocator allocator("MdsAlloc", OB_MALLOC_NORMAL_BLOCK_SIZE, MTL_ID(), ObCtxIds::MERGE_NORMAL_CTX_ID); if (share::is_reserve_mode()) { @@ -1027,5 +1035,57 @@ int ObBasicTabletMergeCtx::swap_tablet(ObGetMergeTablesResult &get_merge_table_r return ret; } +int ObBasicTabletMergeCtx::get_meta_compaction_info() +{ + int ret = OB_SUCCESS; + ObTablet *tablet = get_tablet(); + ObMultiVersionSchemaService *schema_service = nullptr; + int64_t full_stored_col_cnt = 0; + int64_t schema_version = 0; + ObStorageSchema *storage_schema = nullptr; + + if (OB_UNLIKELY(!is_meta_major_merge(static_param_.get_merge_type()) + || nullptr != static_param_.schema_)) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("get unexpected static param", K(ret), K(static_param_), KPC(static_param_.schema_)); + } else if (OB_FAIL(ObStorageSchemaUtil::alloc_storage_schema(mem_ctx_.get_allocator(), storage_schema))) { + LOG_WARN("failed to alloc storage schema", K(ret)); + } else if (OB_ISNULL(schema_service = MTL(ObTenantSchemaService *)->get_schema_service())) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("failed to get schema service from MTL", K(ret)); + } else if (OB_FAIL(tablet->get_schema_version_from_storage_schema(schema_version))){ + LOG_WARN("failed to get schema version from tablet", KR(ret), KPC(tablet)); + } else if (OB_FAIL(ObMediumCompactionScheduleFunc::get_table_schema_to_merge( + *schema_service, *tablet, schema_version, ObMediumCompactionInfo::MEDIUM_COMPAT_VERSION_V3, mem_ctx_.get_allocator(), *storage_schema))) { + if (OB_TABLE_IS_DELETED != ret) { + LOG_WARN("failed to get table schema", KR(ret), KPC(this)); + } + } else if (OB_FAIL(storage_schema->get_stored_column_count_in_sstable(full_stored_col_cnt))) { + LOG_WARN("failed to get stored column count in sstable", K(ret), KPC(storage_schema)); + } else if (OB_UNLIKELY(tablet->get_last_major_column_count() > full_stored_col_cnt)) { + ret = OB_ERR_UNEXPECTED; + LOG_ERROR("stored col cnt in curr schema is less than old major sstable", K(ret), + "col_cnt_in_sstable", tablet->get_last_major_column_count(), + "col_cnt_in_schema", full_stored_col_cnt, KPC(this)); + } else { + static_param_.schema_ = storage_schema; + } + + if (OB_SUCC(ret)) { + static_param_.schema_version_ = storage_schema->schema_version_; + static_param_.data_version_ = DATA_CURRENT_VERSION; + static_param_.is_rebuild_column_store_ = false; + static_param_.dag_param_.is_tenant_major_merge_ = false; + static_param_.is_schema_changed_ = true; // use MACRO_BLOCK_MERGE_LEVEL + static_param_.merge_reason_ = ObAdaptiveMergePolicy::TOMBSTONE_SCENE; + FLOG_INFO("get storage schema to meta merge", "param", get_dag_param(), KPC_(static_param_.schema)); + } + if (OB_FAIL(ret) && OB_NOT_NULL(storage_schema)) { + ObStorageSchemaUtil::free_storage_schema(mem_ctx_.get_allocator(), storage_schema); + static_param_.schema_ = nullptr; + } + return ret; +} + } // namespace compaction } // namespace oceanbase diff --git a/src/storage/compaction/ob_basic_tablet_merge_ctx.h b/src/storage/compaction/ob_basic_tablet_merge_ctx.h index 6cfd0da35..e04c2d848 100644 --- a/src/storage/compaction/ob_basic_tablet_merge_ctx.h +++ b/src/storage/compaction/ob_basic_tablet_merge_ctx.h @@ -235,6 +235,7 @@ protected: int swap_tablet(); int get_medium_compaction_info(); // for major int swap_tablet(ObGetMergeTablesResult &get_merge_table_result); // for major + int get_meta_compaction_info(); // for meta major static const int64_t LARGE_VOLUME_DATA_ROW_COUNT_THREASHOLD = 1000L * 1000L; // 100w static const int64_t LARGE_VOLUME_DATA_MACRO_COUNT_THREASHOLD = 300L; public: diff --git a/src/storage/compaction/ob_medium_compaction_func.cpp b/src/storage/compaction/ob_medium_compaction_func.cpp index 9a7c37e98..48d711311 100644 --- a/src/storage/compaction/ob_medium_compaction_func.cpp +++ b/src/storage/compaction/ob_medium_compaction_func.cpp @@ -73,7 +73,7 @@ int ObMediumCompactionScheduleFunc::choose_medium_snapshot( UNUSEDx(func, allocator, schema_version); int ret = OB_SUCCESS; ObGetMergeTablesParam param; - param.merge_type_ = META_MAJOR_MERGE; + param.merge_type_ = MEDIUM_MERGE; if (OB_FAIL(ObAdaptiveMergePolicy::get_meta_merge_tables( param, ls, @@ -150,7 +150,7 @@ int ObMediumCompactionScheduleFunc::choose_major_snapshot( K(ls_id), K(tablet_id), K(last_sstable_schema_version), K(freeze_info)); break; } else if (OB_FAIL(get_table_schema_to_merge( - *schema_service, tablet, freeze_info.schema_version_, allocator, medium_info))) { + *schema_service, tablet, freeze_info.schema_version_, medium_info.medium_compat_version_, allocator, medium_info.storage_schema_))) { if (OB_TABLE_IS_DELETED == ret) { // do nothing, end loop } else if (OB_ERR_SCHEMA_HISTORY_EMPTY == ret) { @@ -341,14 +341,11 @@ int ObMediumCompactionScheduleFunc::schedule_next_medium_primary_cluster( int64_t max_sync_medium_scn = 0; int64_t last_major_snapshot_version = 0; ObTablet *tablet = nullptr; - ObTabletMemberWrapper table_store_wrapper; if (OB_UNLIKELY(!tablet_handle_.is_valid())) { ret = OB_ERR_UNEXPECTED; LOG_WARN("invalid tablet_handle", K(ret), K(tablet_handle_)); } else if (FALSE_IT(tablet = tablet_handle_.get_obj())) { - } else if (OB_FAIL(tablet->fetch_table_store(table_store_wrapper))) { - LOG_WARN("fail to fetch table store", K(ret)); } else if (FALSE_IT(last_major_snapshot_version = tablet->get_last_major_snapshot_version())) { } else if (0 >= last_major_snapshot_version) { // no major, do nothing @@ -499,11 +496,10 @@ int ObMediumCompactionScheduleFunc::get_max_reserved_snapshot(int64_t &max_reser int ret = OB_SUCCESS; max_reserved_snapshot = INT64_MAX; - int64_t max_merged_snapshot = 0; ObStorageSnapshotInfo snapshot_info; int64_t last_major_snapshot_version = 0; ObTablet *tablet = nullptr; - ObTabletMemberWrapper wrapper; + if (OB_UNLIKELY(!tablet_handle_.is_valid())) { ret = OB_ERR_UNEXPECTED; LOG_WARN("invalid tablet_handle", K(ret), K(tablet_handle_)); @@ -515,9 +511,8 @@ int ObMediumCompactionScheduleFunc::get_max_reserved_snapshot(int64_t &max_reser } else if (0 == ls_.get_min_reserved_snapshot()) { ret = OB_NO_NEED_MERGE; // not sync reserved snapshot yet, should not schedule now - } else if (FALSE_IT(max_merged_snapshot = last_major_snapshot_version)) { } else if (OB_FAIL(MTL(ObTenantFreezeInfoMgr*)->get_min_reserved_snapshot( - tablet->get_tablet_meta().tablet_id_, max_merged_snapshot, snapshot_info))) { + tablet->get_tablet_meta().tablet_id_, last_major_snapshot_version, snapshot_info))) { LOG_WARN("failed to get multi version from freeze info mgr", K(ret), "tablet_id", tablet->get_tablet_meta().tablet_id_); } else { max_reserved_snapshot = MAX(ls_.get_min_reserved_snapshot(), snapshot_info.snapshot_); @@ -883,7 +878,8 @@ int ObMediumCompactionScheduleFunc::prepare_medium_info( ret = OB_ERR_UNEXPECTED; LOG_WARN("failed to get schema service from MTL", K(ret)); } else if (FALSE_IT(medium_info.storage_schema_.reset())) { - } else if (OB_FAIL(get_table_schema_to_merge(*schema_service, *tablet, schema_version, allocator_, medium_info))) { + } else if (OB_FAIL(get_table_schema_to_merge(*schema_service, *tablet, schema_version, + medium_info.medium_compat_version_, allocator_, medium_info.storage_schema_))) { // for major compaction, storage schema is inited in choose_major_snapshot if (OB_TABLE_IS_DELETED != ret) { LOG_WARN("failed to get table schema", KR(ret), KPC(this), K(medium_info)); @@ -935,8 +931,9 @@ int ObMediumCompactionScheduleFunc::get_table_schema_to_merge( ObMultiVersionSchemaService &schema_service, const ObTablet &tablet, const int64_t schema_version, + const int64_t medium_compat_version, ObIAllocator &allocator, - ObMediumCompactionInfo &medium_info) + ObStorageSchema &storage_schema) { int ret = OB_SUCCESS; const uint64_t tenant_id = MTL_ID(); @@ -990,20 +987,20 @@ int ObMediumCompactionScheduleFunc::get_table_schema_to_merge( #endif int64_t storage_schema_version = ObStorageSchema::STORAGE_SCHEMA_VERSION_V3; - if (medium_info.medium_compat_version_ < ObMediumCompactionInfo::MEDIUM_COMPAT_VERSION_V2) { - storage_schema_version = ObStorageSchema::STORAGE_SCHEMA_VERSION; - } else if (medium_info.medium_compat_version_ < ObMediumCompactionInfo::MEDIUM_COMPAT_VERSION_V3) { - storage_schema_version = ObStorageSchema::STORAGE_SCHEMA_VERSION_V2; + + if (medium_compat_version < ObMediumCompactionInfo::MEDIUM_COMPAT_VERSION_V2) { + storage_schema_version = ObStorageSchema::STORAGE_SCHEMA_VERSION; + } else if (medium_compat_version < ObMediumCompactionInfo::MEDIUM_COMPAT_VERSION_V3) { + storage_schema_version = ObStorageSchema::STORAGE_SCHEMA_VERSION_V2; } // for old version medium info, need generate old version schema - if (FAILEDx(medium_info.storage_schema_.init( + if (FAILEDx(storage_schema.init( allocator, *table_schema, tablet.get_tablet_meta().compat_mode_, false/*skip_column_info*/, storage_schema_version))) { LOG_WARN("failed to init storage schema", K(ret), K(schema_version)); } else { LOG_INFO("get schema to merge", K(tablet_id), K(table_id), K(schema_version), K(save_schema_version), - K(medium_info.storage_schema_), - K(*reinterpret_cast(table_schema))); + K(storage_schema), K(*reinterpret_cast(table_schema))); } return ret; } @@ -1306,7 +1303,6 @@ int ObMediumCompactionScheduleFunc::schedule_tablet_medium_merge( { int ret = OB_SUCCESS; create_dag_flag = false; - ObTabletMemberWrapper table_store_wrapper; #ifdef ERRSIM ret = OB_E(EventTable::EN_MEDIUM_CREATE_DAG) ret; if (OB_FAIL(ret)) { @@ -1314,46 +1310,44 @@ int ObMediumCompactionScheduleFunc::schedule_tablet_medium_merge( return ret; } #endif - if (OB_FAIL(tablet.fetch_table_store(table_store_wrapper))) { - LOG_WARN("failed to fetch table store", K(ret), K(tablet)); - } else { - const int64_t last_major_snapshot = tablet.get_last_major_snapshot_version(); - if (MTL(ObTenantTabletScheduler *)->could_major_merge_start() && last_major_snapshot > 0) { - const ObTabletID &tablet_id = tablet.get_tablet_meta().tablet_id_; - const ObLSID &ls_id = ls.get_ls_id(); - ObArenaAllocator temp_allocator("GetMediumInfo", OB_MALLOC_NORMAL_BLOCK_SIZE, MTL_ID()); // for load medium info - const ObMediumCompactionInfoList *medium_list = nullptr; - bool schedule_flag = false; - const int64_t inner_table_merged_version = MTL(ObTenantTabletScheduler *)->get_inner_table_merged_scn(); - if (OB_FAIL(tablet.read_medium_info_list(temp_allocator, medium_list))) { - LOG_WARN("failed to load medium info list", K(ret), K(tablet)); - } else if (ObMediumCompactionInfo::MAJOR_COMPACTION == medium_list->get_last_compaction_type() - && inner_table_merged_version < medium_list->get_last_compaction_scn() - && !MTL_TENANT_ROLE_CACHE_IS_PRIMARY_OR_INVALID()) { // for STANDBY/RESTORE TENANT - ObTabletCompactionScnInfo ret_info; - // for standby/restore tenant, need select inner_table to check RS status before schedule new round - if (!scheduler_called) { // should not visit inner table, wait for scheduler loop - } else if (OB_FAIL(get_status_from_inner_table(ls_id, tablet_id, ret_info))) { - LOG_WARN("failed to get status from inner tablet", K(ret), K(ls_id), K(tablet_id)); - } else if (ret_info.could_schedule_next_round(medium_list->get_last_compaction_scn())) { - LOG_INFO("success to check RS major checksum validation finished", K(ret), K(ls_id), K(tablet_id)); - schedule_flag = true; - } - } else { + const int64_t last_major_snapshot = tablet.get_last_major_snapshot_version(); + if (MTL(ObTenantTabletScheduler *)->could_major_merge_start() && last_major_snapshot > 0) { + const ObTabletID &tablet_id = tablet.get_tablet_meta().tablet_id_; + const ObLSID &ls_id = ls.get_ls_id(); + ObArenaAllocator temp_allocator("GetMediumInfo", OB_MALLOC_NORMAL_BLOCK_SIZE, MTL_ID()); // for load medium info + const ObMediumCompactionInfoList *medium_list = nullptr; + bool schedule_flag = false; + const int64_t inner_table_merged_version = MTL(ObTenantTabletScheduler *)->get_inner_table_merged_scn(); + if (OB_FAIL(tablet.read_medium_info_list(temp_allocator, medium_list))) { + LOG_WARN("failed to load medium info list", K(ret), K(tablet)); + } else if (ObMediumCompactionInfo::MAJOR_COMPACTION == medium_list->get_last_compaction_type() + && inner_table_merged_version < medium_list->get_last_compaction_scn() + && !MTL_TENANT_ROLE_CACHE_IS_PRIMARY_OR_INVALID()) { // for STANDBY/RESTORE TENANT + ObTabletCompactionScnInfo ret_info; + // for standby/restore tenant, need select inner_table to check RS status before schedule new round + if (!scheduler_called) { // should not visit inner table, wait for scheduler loop + } else if (OB_FAIL(get_status_from_inner_table(ls_id, tablet_id, ret_info))) { + LOG_WARN("failed to get status from inner tablet", K(ret), K(ls_id), K(tablet_id)); + } else if (ret_info.could_schedule_next_round(medium_list->get_last_compaction_scn())) { + LOG_INFO("success to check RS major checksum validation finished", K(ret), K(ls_id), K(tablet_id)); schedule_flag = true; } - if (OB_FAIL(ret) || !schedule_flag) { - } else { - const int64_t major_frozen_snapshot = 0 == input_major_snapshot ? MTL(ObTenantTabletScheduler *)->get_frozen_version() : input_major_snapshot; - ObMediumCompactionInfo::ObCompactionType compaction_type = ObMediumCompactionInfo::COMPACTION_TYPE_MAX; - int64_t schedule_scn = 0; - if (OB_FAIL(read_medium_info_from_list(*medium_list, last_major_snapshot, - major_frozen_snapshot, compaction_type, schedule_scn))) { - } else if (schedule_scn > 0 - && OB_FAIL(check_need_merge_and_schedule(ls, tablet, schedule_scn, compaction_type, tablet_need_freeze_flag, create_dag_flag))) { - LOG_WARN("failed to check medium merge", K(ret), K(ls_id), K(tablet_id), K(schedule_scn)); - } + } else { + schedule_flag = true; + } + + if (OB_FAIL(ret) || !schedule_flag) { + } else { + const int64_t major_frozen_snapshot = 0 == input_major_snapshot ? MTL(ObTenantTabletScheduler *)->get_frozen_version() : input_major_snapshot; + ObMediumCompactionInfo::ObCompactionType compaction_type = ObMediumCompactionInfo::COMPACTION_TYPE_MAX; + int64_t schedule_scn = 0; + if (OB_FAIL(read_medium_info_from_list(*medium_list, last_major_snapshot, + major_frozen_snapshot, compaction_type, schedule_scn))) { + LOG_WARN("failed to read medium info from list", K(ret), K(ls_id), K(tablet_id), KPC(medium_list), K(last_major_snapshot)); + } else if (schedule_scn > 0 + && OB_FAIL(check_need_merge_and_schedule(ls, tablet, schedule_scn, compaction_type, tablet_need_freeze_flag, create_dag_flag))) { + LOG_WARN("failed to check medium merge", K(ret), K(ls_id), K(tablet_id), K(schedule_scn)); } } } diff --git a/src/storage/compaction/ob_medium_compaction_func.h b/src/storage/compaction/ob_medium_compaction_func.h index f023f42f2..e4a30a0e7 100644 --- a/src/storage/compaction/ob_medium_compaction_func.h +++ b/src/storage/compaction/ob_medium_compaction_func.h @@ -66,8 +66,9 @@ public: ObMultiVersionSchemaService &schema_service, const ObTablet &tablet, const int64_t schema_version, + const int64_t medium_compat_version, ObIAllocator &allocator, - ObMediumCompactionInfo &medium_info); + storage::ObStorageSchema &storage_schema); static int batch_check_medium_finish( hash::ObHashMap &ls_info_map, ObIArray &finish_tablet_ls_infos, diff --git a/src/storage/compaction/ob_partition_merge_iter.cpp b/src/storage/compaction/ob_partition_merge_iter.cpp index 6aae66760..4b551997d 100644 --- a/src/storage/compaction/ob_partition_merge_iter.cpp +++ b/src/storage/compaction/ob_partition_merge_iter.cpp @@ -237,9 +237,9 @@ int ObPartitionMergeIter::init(const ObMergeParameter &merge_param, ObITable *ta ret = OB_INIT_TWICE; LOG_WARN("ObPartitionMergeIter init twice", K(ret)); } else if (OB_UNLIKELY(!merge_param.is_valid() || - read_info == nullptr || table == nullptr || !table->is_major_sstable())) { + read_info == nullptr || table == nullptr || !table->is_major_sstable())) { ret = OB_INVALID_ARGUMENT; - LOG_WARN("Invalid arguments to init ObPartitionMergeIter", K(ret), K(merge_param), KPC(table)); + LOG_WARN("Invalid arguments to init ObPartitionMergeIter", K(ret), K(merge_param), KPC(read_info), KPC(table)); } else { table_ = table; read_info_ = read_info; @@ -505,7 +505,7 @@ bool ObPartitionMacroMergeIter::inner_check(const ObMergeParameter &merge_param) } else if (static_param.is_full_merge_) { bret = false; LOG_WARN_RET(OB_ERR_UNEXPECTED, "Unexpected full merge for major macro merge iter", K(bret), K(static_param)); - } else if (OB_UNLIKELY(!table_->is_major_sstable() && !table_->is_meta_major_sstable())) { + } else if (OB_UNLIKELY(!table_->is_major_sstable())) { bret = false; LOG_WARN_RET(OB_ERR_UNEXPECTED, "Unexpected base table type for major macro merge iter", K(bret), KPC(table_)); } @@ -844,7 +844,7 @@ bool ObPartitionMicroMergeIter::inner_check(const ObMergeParameter &merge_param) } else if (OB_UNLIKELY(!is_base_iter())) { bret = false; LOG_WARN_RET(OB_ERR_UNEXPECTED, "Unexpected iter idx for major micro merge iter", K(bret), K(merge_param)); - } else if (OB_UNLIKELY(!table_->is_major_sstable() && !table_->is_meta_major_sstable())) { + } else if (OB_UNLIKELY(!table_->is_major_sstable())) { bret = false; LOG_WARN_RET(OB_ERR_UNEXPECTED, "Unexpected base table type for major macro merge iter", K(bret), KPC(table_)); } diff --git a/src/storage/compaction/ob_partition_merge_policy.cpp b/src/storage/compaction/ob_partition_merge_policy.cpp index 3dba6ac00..624739c6f 100644 --- a/src/storage/compaction/ob_partition_merge_policy.cpp +++ b/src/storage/compaction/ob_partition_merge_policy.cpp @@ -1380,72 +1380,65 @@ int ObAdaptiveMergePolicy::get_meta_merge_tables( const ObMergeType merge_type = param.merge_type_; result.reset(); - if (OB_UNLIKELY(META_MAJOR_MERGE != merge_type)) { + if (OB_UNLIKELY(META_MAJOR_MERGE != merge_type && MEDIUM_MERGE != merge_type)) { ret = OB_INVALID_ARGUMENT; LOG_WARN("invalid args", K(ret), "merge_type", merge_type_to_str(merge_type)); - } else if (OB_FAIL(find_meta_major_tables(tablet, result))) { + } else if (OB_FAIL(find_adaptive_merge_tables(merge_type, tablet, result))) { if (OB_NO_NEED_MERGE != ret) { LOG_WARN("Failed to find minor merge tables", K(ret)); } } else if (OB_FAIL(result.handle_.check_continues(nullptr))) { LOG_WARN("failed to check continues", K(ret), K(result)); - } else if (FALSE_IT(result.version_range_.snapshot_version_ = - MIN(tablet.get_snapshot_version(), result.version_range_.snapshot_version_))) { - // chosen version should less than tablet::snapshot - } else if (OB_FAIL(ObPartitionMergePolicy::get_multi_version_start( - param.merge_type_, ls, tablet, result.version_range_, result.snapshot_info_))) { - LOG_WARN("failed to get multi version_start", K(ret)); - } else { - FLOG_INFO("succeed to get meta major merge tables", K(result), K(tablet)); + } else if (MEDIUM_MERGE == merge_type) { + result.version_range_.snapshot_version_ = MIN(tablet.get_snapshot_version(), result.version_range_.snapshot_version_); + if (OB_FAIL(ObPartitionMergePolicy::get_multi_version_start( + merge_type, ls, tablet, result.version_range_, result.snapshot_info_))) { + LOG_WARN("failed to get multi version_start", K(ret)); + } + } + + if (OB_SUCC(ret)) { + FLOG_INFO("succeed to get meta major merge tables", K(merge_type), K(result), K(tablet)); } return ret; } -int ObAdaptiveMergePolicy::find_meta_major_tables( +int ObAdaptiveMergePolicy::find_adaptive_merge_tables( + const ObMergeType &merge_type, const storage::ObTablet &tablet, ObGetMergeTablesResult &result) { int ret = OB_SUCCESS; int64_t min_snapshot = 0; int64_t max_snapshot = 0; - int64_t base_row_cnt = 0; - int64_t inc_row_cnt = 0; - int64_t tx_determ_table_cnt = 0; ObTabletMemberWrapper table_store_wrapper; const ObTabletTableStore *table_store = nullptr; + ObSSTable *base_table = nullptr; if (OB_FAIL(tablet.fetch_table_store(table_store_wrapper))) { LOG_WARN("fail to fetch table store", K(ret)); } else if (OB_UNLIKELY(NULL == (table_store = table_store_wrapper.get_member()) || !table_store->is_valid())) { ret = OB_ERR_UNEXPECTED; LOG_WARN("ObTabletTableStore is not valid", K(ret), K(table_store_wrapper)); + } else if (table_store->get_minor_sstables().empty() || table_store->get_major_sstables().empty()) { + ret = OB_NO_NEED_MERGE; + LOG_DEBUG("no minor/major sstable to do meta major merge", K(ret), KPC(table_store)); + } else if (OB_ISNULL(base_table = nullptr == table_store->get_meta_major_sstable() + ? static_cast(table_store->get_major_sstables().get_boundary_table(true/*last*/)) + : table_store->get_meta_major_sstable())) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("get unexpected null base table", K(ret), KPC(table_store), K(tablet)); + } else if (OB_FAIL(ObPartitionMergePolicy::get_boundary_snapshot_version(tablet, min_snapshot, max_snapshot))) { + LOG_WARN("failed to get boundary snapshot version", K(ret), KPC(base_table), K(min_snapshot), K(max_snapshot)); + } else if (base_table->get_snapshot_version() < min_snapshot || max_snapshot != INT64_MAX /*exist next freeze info*/) { + ret = OB_NO_NEED_MERGE; + LOG_DEBUG("no need meta merge when the tablet is doing major merge", K(ret), K(min_snapshot), K(max_snapshot), KPC(base_table)); + } else if (OB_FAIL(add_meta_merge_result(base_table, table_store_wrapper.get_meta_handle(), result, true))) { + LOG_WARN("failed to add base table to meta merge result", K(ret), KPC(base_table), K(result)); } else { - ObITable *last_major = table_store_wrapper.get_member()->get_major_sstables().get_boundary_table(true); - ObITable *base_table = table_store_wrapper.get_member()->get_meta_major_sstable(); - - const ObSSTableArray &minor_tables = table_store_wrapper.get_member()->get_minor_sstables(); - if (minor_tables.empty() || nullptr == last_major) { - ret = OB_NO_NEED_MERGE; - LOG_DEBUG("no minor/major sstable to do meta major merge", K(ret), K(minor_tables), KPC(last_major)); - } else if (FALSE_IT(base_table = nullptr == table_store->get_meta_major_sstable() - ? last_major - : table_store->get_meta_major_sstable())) { - } else if (OB_FAIL(ObPartitionMergePolicy::get_boundary_snapshot_version( - tablet, min_snapshot, max_snapshot, false/*check_table_cnt*/, false/*is_multi_version_merge*/))) { - if (OB_NO_NEED_MERGE != ret) { - LOG_WARN("Failed to find meta merge base table", K(ret), KPC(last_major), KPC(base_table)); - } - } else if (base_table->get_snapshot_version() < min_snapshot || max_snapshot != INT64_MAX) { - // max_snapshot == INT64_MAX means there's no next freeze_info - ret = OB_NO_NEED_MERGE; - LOG_DEBUG("no need meta merge when the tablet is doing major merge", K(ret), K(min_snapshot), K(max_snapshot), KPC(base_table)); - } else if (OB_FAIL(add_meta_merge_result(base_table, table_store_wrapper.get_meta_handle(), result, true/*update_snapshot*/))) { - LOG_WARN("failed to add base table to meta merge result", K(ret), KPC(base_table), K(result)); - } else { - base_row_cnt = static_cast(base_table)->get_row_count(); - ++tx_determ_table_cnt; // inc for base_table - } - + int64_t tx_determ_table_cnt = 1; + int64_t inc_row_cnt = 0; + const ObSSTableArray &minor_tables = table_store->get_minor_sstables(); bool found_undeterm_table = false; for (int64_t i = 0; OB_SUCC(ret) && i < minor_tables.count(); ++i) { ObITable *table = minor_tables[i]; @@ -1454,39 +1447,53 @@ int ObAdaptiveMergePolicy::find_meta_major_tables( LOG_WARN("get unexpected table", K(ret), K(i), K(PRINT_TS_WRAPPER(table_store_wrapper))); } else if (result.handle_.get_count() <= 1 && table->get_upper_trans_version() <= base_table->get_snapshot_version()) { continue; // skip minor sstable which has been merged - } else if (!found_undeterm_table && table->is_trans_state_deterministic()) { - ++tx_determ_table_cnt; - ObSSTableMetaHandle inc_handle; - if (OB_FAIL(static_cast(table)->get_meta(inc_handle))) { - LOG_WARN("failed to inc table meta", K(ret), KPC(table)); + } else if (!table->is_trans_state_deterministic()) { + if (is_meta_major_merge(merge_type)) { + break; } else { - inc_row_cnt += inc_handle.get_sstable_meta().get_row_count(); + found_undeterm_table = true; } - } else { - found_undeterm_table = true; + } else if (!found_undeterm_table) { + ++tx_determ_table_cnt; + inc_row_cnt += static_cast(table)->get_row_count(); } - if (FAILEDx(add_meta_merge_result( - table, table_store_wrapper.get_meta_handle(), result, !found_undeterm_table))) { + if (FAILEDx(add_meta_merge_result(table, table_store_wrapper.get_meta_handle(), result, !found_undeterm_table))) { LOG_WARN("failed to add minor table to meta merge result", K(ret)); } - } + } // end for + + bool scanty_tx_determ_table = tx_determ_table_cnt < 2; + bool scanty_inc_row_cnt = inc_row_cnt < TRANS_STATE_DETERM_ROW_CNT_THRESHOLD + || inc_row_cnt < INC_ROW_COUNT_PERCENTAGE_THRESHOLD * base_table->get_row_count(); + +#ifdef ERRSIM + #define META_POLICY_ERRSIM(tracepoint) \ + do { \ + if (OB_SUCC(ret)) { \ + ret = OB_E((EventTable::tracepoint)) OB_SUCCESS; \ + if (OB_FAIL(ret)) { \ + ret = OB_SUCCESS; \ + STORAGE_LOG(INFO, "ERRSIM " #tracepoint); \ + scanty_tx_determ_table = false; \ + scanty_inc_row_cnt = false; \ + } \ + } \ + } while(0); + META_POLICY_ERRSIM(EN_COMPACTION_SCHEDULE_META_MERGE); + #undef META_POLICY_ERRSIM +#endif if (OB_FAIL(ret)) { - } else if (tx_determ_table_cnt < 2) { + } else if (scanty_tx_determ_table || scanty_inc_row_cnt) { ret = OB_NO_NEED_MERGE; - if (REACH_TENANT_TIME_INTERVAL(60 * 1000 * 1000/*60s*/)) { - LOG_INFO("no enough table for meta merge", K(ret), K(result), K(PRINT_TS_WRAPPER(table_store_wrapper))); - } - } else if (inc_row_cnt < TRANS_STATE_DETERM_ROW_CNT_THRESHOLD - || inc_row_cnt < INC_ROW_COUNT_PERCENTAGE_THRESHOLD * base_row_cnt) { - ret = OB_NO_NEED_MERGE; - if (REACH_TENANT_TIME_INTERVAL(60 * 1000 * 1000/*60s*/)) { - LOG_INFO("found sstable could merge is not enough", K(ret), K(inc_row_cnt), K(base_row_cnt)); + if (REACH_TENANT_TIME_INTERVAL(30_s)) { + LOG_INFO("no enough table or no enough rows for meta merge", K(ret), + K(scanty_tx_determ_table), K(scanty_inc_row_cnt), K(result), K(PRINT_TS_WRAPPER(table_store_wrapper))); } } else if (result.version_range_.snapshot_version_ < tablet.get_multi_version_start()) { ret = OB_NO_NEED_MERGE; - if (REACH_TENANT_TIME_INTERVAL(60 * 1000 * 1000/*60s*/)) { + if (REACH_TENANT_TIME_INTERVAL(30_s)) { LOG_INFO("chosen snapshot is abandoned", K(ret), K(result), K(tablet.get_multi_version_start())); } } @@ -1508,40 +1515,6 @@ int ObAdaptiveMergePolicy::find_meta_major_tables( return ret; } -int ObAdaptiveMergePolicy::find_base_table_and_inc_version( - ObITable *last_major_table, - ObITable *last_minor_table, - ObITable *&meta_base_table, - int64_t &merge_inc_version) -{ - int ret = OB_SUCCESS; - // find meta base table - if (OB_NOT_NULL(last_major_table)) { - if (OB_ISNULL(meta_base_table)) { - meta_base_table = last_major_table; - } else if (OB_UNLIKELY(meta_base_table->get_snapshot_version() <= last_major_table->get_snapshot_version())) { - ret = OB_ERR_UNEXPECTED; - LOG_WARN("meta major table covered by major", K(ret), KPC(meta_base_table), KPC(last_major_table)); - } - } - - // find meta merge inc version - if (OB_FAIL(ret)) { - } else if (OB_NOT_NULL(last_major_table) && OB_NOT_NULL(last_minor_table)) { - merge_inc_version = MAX(last_major_table->get_snapshot_version(), last_minor_table->get_max_merged_trans_version()); - } else if (OB_NOT_NULL(last_major_table)) { - merge_inc_version = last_major_table->get_snapshot_version(); - } else if (OB_NOT_NULL(last_minor_table)){ - merge_inc_version = last_minor_table->get_max_merged_trans_version(); - } - - if (OB_SUCC(ret) && (NULL == meta_base_table || merge_inc_version <= 0)) { - ret = OB_NO_NEED_MERGE; - LOG_WARN("cannot meta merge with null base table or inc version", K(ret), K(meta_base_table), K(merge_inc_version)); - } - return ret; -} - int ObAdaptiveMergePolicy::add_meta_merge_result( ObITable *table, const ObStorageMetaHandle &table_meta_handle, @@ -1555,7 +1528,7 @@ int ObAdaptiveMergePolicy::add_meta_merge_result( LOG_WARN("get invalid argument", K(ret), KPC(table)); } else if (OB_FAIL(result.handle_.add_sstable(table, table_meta_handle))) { LOG_WARN("failed to add table", K(ret), KPC(table)); - } else if (table->is_meta_major_sstable() || table->is_major_sstable()) { + } else if (table->is_major_sstable()) { result.version_range_.base_version_ = 0; result.version_range_.multi_version_start_ = table->get_snapshot_version(); result.version_range_.snapshot_version_ = table->get_snapshot_version(); diff --git a/src/storage/compaction/ob_partition_merge_policy.h b/src/storage/compaction/ob_partition_merge_policy.h index 391dbdaba..beb3d9c6e 100644 --- a/src/storage/compaction/ob_partition_merge_policy.h +++ b/src/storage/compaction/ob_partition_merge_policy.h @@ -85,8 +85,8 @@ public: const storage::ObTablet &tablet, int64_t &min_snapshot, int64_t &max_snapshot, - const bool check_table_cnt, - const bool is_multi_version_merge); + const bool check_table_cnt = false, + const bool is_multi_version_merge = false); static int diagnose_table_count_unsafe( const compaction::ObMergeType merge_type, @@ -246,12 +246,10 @@ public: AdaptiveMergeReason &reason); private: - static int find_meta_major_tables(const storage::ObTablet &tablet, - storage::ObGetMergeTablesResult &result); - static int find_base_table_and_inc_version(storage::ObITable *last_major_table, - storage::ObITable *last_minor_table, - storage::ObITable *&meta_base_table, - int64_t &merge_inc_version); + static int find_adaptive_merge_tables( + const ObMergeType &merge_type, + const storage::ObTablet &tablet, + storage::ObGetMergeTablesResult &result); static int add_meta_merge_result(storage::ObITable *table, const storage::ObStorageMetaHandle &table_meta_handle, storage::ObGetMergeTablesResult &result, @@ -273,11 +271,11 @@ private: const storage::ObTablet &tablet, AdaptiveMergeReason &merge_reason); -private: +public: static constexpr int64_t SCHEDULE_META_MERGE_INTERVAL = 120L * 1000L * 1000L; //120s static constexpr int64_t INC_ROW_COUNT_THRESHOLD = 100L * 1000L; // 10w static constexpr int64_t TOMBSTONE_ROW_COUNT_THRESHOLD = 30L * 1000L; // 3w - static constexpr int64_t BASE_ROW_COUNT_THRESHOLD = 10L * 1000L; // 5w + static constexpr int64_t BASE_ROW_COUNT_THRESHOLD = 10L * 1000L; // 1w static constexpr int64_t LOAD_DATA_SCENE_THRESHOLD = 70; static constexpr int64_t TOMBSTONE_SCENE_THRESHOLD = 50; static constexpr float INC_ROW_COUNT_PERCENTAGE_THRESHOLD = 0.5; diff --git a/src/storage/compaction/ob_partition_parallel_merge_ctx.cpp b/src/storage/compaction/ob_partition_parallel_merge_ctx.cpp index 54ffc9ace..1b76cdffa 100644 --- a/src/storage/compaction/ob_partition_parallel_merge_ctx.cpp +++ b/src/storage/compaction/ob_partition_parallel_merge_ctx.cpp @@ -100,7 +100,7 @@ int ObParallelMergeCtx::init(compaction::ObBasicTabletMergeCtx &merge_ctx) if (OB_FAIL(init_serial_merge())) { STORAGE_LOG(WARN, "Failed to init serialize merge", K(ret), K(tablet_size), K(merge_ctx)); } - } else if (is_major_merge_type(merge_type)) { + } else if (is_major_or_meta_merge_type(merge_type)) { if (OB_FAIL(init_parallel_major_merge(merge_ctx))) { STORAGE_LOG(WARN, "Failed to init parallel major merge", K(ret)); } @@ -113,8 +113,10 @@ int ObParallelMergeCtx::init(compaction::ObBasicTabletMergeCtx &merge_ctx) STORAGE_LOG(WARN, "Failed to init parallel setting for mini minor merge", K(ret)); } } else { - ret = OB_ERR_UNDEFINED; - STORAGE_LOG(WARN, "get unexpected merge type", K(ret), K(merge_type), K(tablet_size), K(merge_ctx)); + // just use serial merge + if (OB_FAIL(init_serial_merge())) { + STORAGE_LOG(WARN, "Failed to init serialize merge", K(ret), K(tablet_size), K(merge_ctx)); + } } if (OB_SUCC(ret)) { @@ -228,7 +230,7 @@ int ObParallelMergeCtx::init_parallel_major_merge(compaction::ObBasicTabletMerge { int ret = OB_SUCCESS; const ObITable *first_table = nullptr; - if (OB_UNLIKELY(!is_major_merge_type(merge_ctx.get_merge_type()))) { + if (OB_UNLIKELY(!is_major_or_meta_merge_type(merge_ctx.get_merge_type()))) { ret = OB_INVALID_ARGUMENT; STORAGE_LOG(WARN, "Invalid argument to init parallel major merge", K(ret), K(merge_ctx)); } else if (OB_UNLIKELY(nullptr == (first_table = merge_ctx.get_tables_handle().get_table(0)) diff --git a/src/storage/compaction/ob_tablet_merge_ctx.cpp b/src/storage/compaction/ob_tablet_merge_ctx.cpp index 1431763d2..f146d8d18 100644 --- a/src/storage/compaction/ob_tablet_merge_ctx.cpp +++ b/src/storage/compaction/ob_tablet_merge_ctx.cpp @@ -158,44 +158,92 @@ int ObTabletMiniMergeCtx::update_tablet( } else if (FALSE_IT(time_guard_click(ObStorageCompactionTimeGuard::RELEASE_MEMTABLE))) { } else { // schedule after mini - int tmp_ret = OB_SUCCESS; - if (!get_tablet_id().is_ls_inner_tablet()) { - (void) try_schedule_compaction_after_mini(new_tablet_handle); - } - if (OB_TMP_FAIL(ObTenantTabletScheduler::schedule_tablet_minor_merge( - static_param_.ls_handle_, - new_tablet_handle))) { - if (OB_SIZE_OVERFLOW != tmp_ret) { - LOG_WARN("failed to schedule special tablet minor merge", K(tmp_ret), - "ls_id", get_ls_id(), "tablet_id", get_tablet_id()); - } - } - time_guard_click(ObStorageCompactionTimeGuard::SCHEDULE_OTHER_COMPACTION); + try_schedule_compaction_after_mini(new_tablet_handle); } return ret; } -void ObTabletMiniMergeCtx::try_schedule_compaction_after_mini( - ObTabletHandle &tablet_handle) +void ObTabletMiniMergeCtx::try_schedule_compaction_after_mini(ObTabletHandle &tablet_handle) { int tmp_ret = OB_SUCCESS; - bool non_used_freeze_flag = false; // no meaning, just for placeholder for refering - bool non_used_schedule_dag_flag = false; - // report tablet stat - if (0 == get_merge_info().get_sstable_merge_info().macro_block_count_) { - // empty mini compaction, no need to reprot stat - } else if (OB_TMP_FAIL(try_report_tablet_stat_after_mini())) { - LOG_WARN_RET(tmp_ret, "failed to report table stat after mini compaction", "param", get_dag_param()); + bool create_meta_dag = false; + + if (get_tablet_id().is_ls_inner_tablet() || + 0 == get_merge_info().get_sstable_merge_info().macro_block_count_) { + // do nothing + } else if (OB_TMP_FAIL(try_schedule_meta_merge(tablet_handle, create_meta_dag))) { + LOG_WARN_RET(tmp_ret, "failed to schedule meta merge", K(get_dag_param())); } - if (OB_TMP_FAIL(ObMediumCompactionScheduleFunc::schedule_tablet_medium_merge( - *get_ls(), - *tablet_handle.get_obj(), - non_used_freeze_flag, - non_used_schedule_dag_flag))) { - if (OB_SIZE_OVERFLOW != tmp_ret && OB_EAGAIN != tmp_ret) { - LOG_WARN_RET(tmp_ret, "failed to schedule tablet medium merge", K(tmp_ret), "param", get_dag_param()); + + if (create_meta_dag || 0 == get_merge_info().get_sstable_merge_info().macro_block_count_) { + // no need to schedule minor merge + } else if (OB_TMP_FAIL(ObTenantTabletScheduler::schedule_tablet_minor_merge( + static_param_.ls_handle_, tablet_handle))) { + if (OB_SIZE_OVERFLOW != tmp_ret) { + LOG_WARN_RET(tmp_ret, "failed to schedule special tablet minor merge", + "ls_id", get_ls_id(), "tablet_id", get_tablet_id()); } } + time_guard_click(ObStorageCompactionTimeGuard::SCHEDULE_OTHER_COMPACTION); +} + +int ObTabletMiniMergeCtx::try_schedule_meta_merge( + ObTabletHandle &tablet_handle, + bool &create_meta_dag) +{ + int ret = OB_SUCCESS; + int tmp_ret = OB_SUCCESS; + create_meta_dag = false; + + bool is_tombstone_scene = info_collector_.tnode_stat_.delete_row_count_ >= compaction::ObAdaptiveMergePolicy::TOMBSTONE_ROW_COUNT_THRESHOLD; + bool medium_is_cooling_down = tablet_handle.get_obj()->get_last_major_snapshot_version() / 1000 + 600_s > ObTimeUtility::fast_current_time(); +#ifdef ERRSIM + #define SCHEDULE_META_ERRSIM(tracepoint) \ + do { \ + if (OB_SUCC(ret)) { \ + ret = OB_E((EventTable::tracepoint)) OB_SUCCESS; \ + if (OB_FAIL(ret)) { \ + ret = OB_SUCCESS; \ + STORAGE_LOG(INFO, "ERRSIM " #tracepoint); \ + is_tombstone_scene = true; \ + medium_is_cooling_down = true; \ + } \ + } \ + } while(0); + SCHEDULE_META_ERRSIM(EN_COMPACTION_SCHEDULE_META_MERGE); + #undef SCHEDULE_META_ERRSIM +#endif + + // try to schedule meta major merge + if (is_tombstone_scene && medium_is_cooling_down) { + if (OB_TMP_FAIL(ObTenantTabletScheduler::schedule_tablet_meta_merge(static_param_.ls_handle_, tablet_handle, create_meta_dag))) { + LOG_WARN_RET(tmp_ret, "failed to schedule meta merge for tablet", "param", get_dag_param()); + } + } + + // deal with tablet stat + if (create_meta_dag) { + MTL(ObTenantTabletStatMgr *)->clear_tablet_stat(get_ls_id(), get_tablet_id()); + FLOG_INFO("clear tablet stat", "ls_id", get_ls_id(), "tablet_id", get_tablet_id(), "tnode_stat", info_collector_.tnode_stat_); + } else { + (void) try_report_tablet_stat_after_mini(); + } + + // try schedule medium merge + if (!medium_is_cooling_down) { + bool non_used_freeze_flag = false; // no meaning, just for placeholder for refering + bool non_used_schedule_dag_flag = false; + if (OB_TMP_FAIL(ObMediumCompactionScheduleFunc::schedule_tablet_medium_merge( + *get_ls(), + *tablet_handle.get_obj(), + non_used_freeze_flag, + non_used_schedule_dag_flag))) { + if (OB_SIZE_OVERFLOW != tmp_ret && OB_EAGAIN != tmp_ret) { + LOG_WARN_RET(tmp_ret, "failed to schedule tablet medium merge", K(tmp_ret), "param", get_dag_param()); + } + } + } + return ret; } int ObTabletMiniMergeCtx::try_report_tablet_stat_after_mini() @@ -211,46 +259,16 @@ int ObTabletMiniMergeCtx::try_report_tablet_stat_after_mini() } else if (ObTabletStat::MERGE_REPORT_MIN_ROW_CNT >= tnode_stat.get_dml_count()) { // insufficient data, skip to report } else { - ObSEArray tables; - ObQueryFlag query_flag(ObQueryFlag::Forward, - true, /*is daily merge scan*/ - true, /*is read multiple macro block*/ - true, /*sys task scan, read one macro block in single io*/ - false, /*full row scan flag, obsoleted*/ - false, /*index back*/ - false); /*query_stat*/ - ObTableEstimateBaseInput base_input(query_flag, - tablet_id.id(), - transaction::ObTransID(), - tables, - tablet_handle_); - ObDatumRange whole_range; - whole_range.set_whole_range(); - ObSEArray ranges; - ObPartitionEst part_estimate; - ObSEArray records; - - if (OB_FAIL(ranges.push_back(whole_range))) { - LOG_WARN("failed to add ranges", KR(ret), K(ranges), K(whole_range)); - } else if (OB_FAIL(get_tables_handle().get_tables(tables))) { - LOG_WARN("failed to get tables array", KR(ret), KPC(this)); - } else if (OB_FAIL(ObTableEstimator::estimate_row_count_for_scan( - base_input, ranges, part_estimate, records))) { - LOG_WARN("failed to estimate row counts", KR(ret), K(part_estimate), K(records)); - } else if (0 == part_estimate.logical_row_count_ && - ObTabletStat::MERGE_REPORT_MIN_ROW_CNT >= part_estimate.physical_row_count_) { - } else { - ObTabletStat report_stat; - bool report_succ = false; - report_stat.ls_id_ = get_ls_id().id(); - report_stat.tablet_id_ = get_tablet_id().id(); - report_stat.merge_cnt_ = 1; - report_stat.insert_row_cnt_ = tnode_stat.insert_row_count_; - report_stat.update_row_cnt_ = tnode_stat.update_row_count_; - report_stat.delete_row_cnt_ = tnode_stat.delete_row_count_; - if (OB_FAIL(MTL(ObTenantTabletStatMgr *)->report_stat(report_stat, report_succ))) { - LOG_WARN("failed to report tablet stat", KR(ret)); - } + ObTabletStat report_stat; + bool report_succ = false; + report_stat.ls_id_ = get_ls_id().id(); + report_stat.tablet_id_ = get_tablet_id().id(); + report_stat.merge_cnt_ = 1; + report_stat.insert_row_cnt_ = tnode_stat.insert_row_count_; + report_stat.update_row_cnt_ = tnode_stat.update_row_count_; + report_stat.delete_row_cnt_ = tnode_stat.delete_row_count_; + if (OB_FAIL(MTL(ObTenantTabletStatMgr *)->report_stat(report_stat, report_succ))) { + LOG_WARN("failed to report tablet stat", KR(ret)); } } FLOG_INFO("try report tablet stat", KR(ret), K(ls_id), K(tablet_id), K(tnode_stat), K(report_succ)); @@ -413,7 +431,12 @@ int ObTabletExeMergeCtx::prepare_compaction_filter() int ObTabletMajorMergeCtx::prepare_schema() { int ret = OB_SUCCESS; - if (!MTL(ObTenantTabletScheduler *)->could_major_merge_start()) { + + if (is_meta_major_merge(static_param_.get_merge_type())) { + if (OB_FAIL(get_meta_compaction_info())) { + LOG_WARN("failed to get meta compaction info", K(ret), KPC(this)); + } + } else if (!MTL(ObTenantTabletScheduler *)->could_major_merge_start()) { ret = OB_CANCELED; LOG_INFO("Merge has been paused", KR(ret), "param", get_dag_param()); } else if (OB_FAIL(get_medium_compaction_info())) { diff --git a/src/storage/compaction/ob_tablet_merge_ctx.h b/src/storage/compaction/ob_tablet_merge_ctx.h index 6c1ab8d33..d1f44379e 100644 --- a/src/storage/compaction/ob_tablet_merge_ctx.h +++ b/src/storage/compaction/ob_tablet_merge_ctx.h @@ -73,6 +73,7 @@ private: const blocksstable::ObSSTable &sstable, ObTabletHandle &new_tablet_handle) override; void try_schedule_compaction_after_mini(storage::ObTabletHandle &tablet_handle); + int try_schedule_meta_merge(ObTabletHandle &tablet_handle, bool &create_meta_dag); int try_report_tablet_stat_after_mini(); }; diff --git a/src/storage/compaction/ob_tablet_merge_info.cpp b/src/storage/compaction/ob_tablet_merge_info.cpp index b3180309d..4c1fda298 100644 --- a/src/storage/compaction/ob_tablet_merge_info.cpp +++ b/src/storage/compaction/ob_tablet_merge_info.cpp @@ -152,7 +152,8 @@ int ObTabletMergeInfo::build_create_sstable_param(const ObBasicTabletMergeCtx &c } param.table_key_ = table_key; - if (ObITable::TableType::COLUMN_ORIENTED_SSTABLE == table_key.table_type_) { + if (ObITable::TableType::COLUMN_ORIENTED_SSTABLE == table_key.table_type_ || + ObITable::TableType::COLUMN_ORIENTED_META_SSTABLE == table_key.table_type_) { param.co_base_type_ = cg_schema->is_all_column_group() ? ObCOSSTableBaseType::ALL_CG_TYPE : ObCOSSTableBaseType::ROWKEY_CG_TYPE; @@ -216,7 +217,7 @@ int ObTabletMergeInfo::build_create_sstable_param(const ObBasicTabletMergeCtx &c param.other_block_ids_ = res.other_block_ids_; param.ddl_scn_.set_min(); MEMCPY(param.encrypt_key_, res.encrypt_key_, share::OB_MAX_TABLESPACE_ENCRYPT_KEY_LENGTH); - if (is_major_merge_type(static_param.get_merge_type())) { + if (is_major_or_meta_merge_type(static_param.get_merge_type())) { if (OB_FAIL(param.column_checksums_.assign(res.data_column_checksums_))) { LOG_WARN("fail to fill column checksum", K(ret), K(res)); } diff --git a/src/storage/compaction/ob_tablet_merge_task.cpp b/src/storage/compaction/ob_tablet_merge_task.cpp index 989772761..e4976f61b 100644 --- a/src/storage/compaction/ob_tablet_merge_task.cpp +++ b/src/storage/compaction/ob_tablet_merge_task.cpp @@ -187,7 +187,7 @@ int ObMergeParameter::init( merge_version_range_.snapshot_version_ = MERGE_READ_SNAPSHOT_VERSION; } - if (is_major_merge_type(static_param_.get_merge_type()) && !get_schema()->is_row_store()) { + if (is_major_or_meta_merge_type(static_param_.get_merge_type()) && !get_schema()->is_row_store()) { if (OB_ISNULL(allocator)) { ret = OB_ERR_UNEXPECTED; STORAGE_LOG(WARN, "unexpected null allocator", K(ret)); @@ -1121,7 +1121,7 @@ int ObTabletMergeTask::process() } else { ctx_->mem_ctx_.mem_click(); if (OB_FAIL(merger_->merge_partition(*ctx_, idx_))) { - if (is_major_merge_type(ctx_->get_merge_type()) && OB_ENCODING_EST_SIZE_OVERFLOW == ret) { + if (is_major_or_meta_merge_type(ctx_->get_merge_type()) && OB_ENCODING_EST_SIZE_OVERFLOW == ret) { STORAGE_LOG(WARN, "failed to merge partition with possibly encoding error, " "retry with flat row store type", K(ret), KPC(ctx_), K_(idx)); merger_->reset(); diff --git a/src/storage/compaction/ob_tenant_tablet_scheduler.cpp b/src/storage/compaction/ob_tenant_tablet_scheduler.cpp index b1bf10ff3..5ab74c599 100644 --- a/src/storage/compaction/ob_tenant_tablet_scheduler.cpp +++ b/src/storage/compaction/ob_tenant_tablet_scheduler.cpp @@ -866,71 +866,84 @@ int ObTenantTabletScheduler::schedule_merge_dag( return ret; } -int ObTenantTabletScheduler::schedule_tablet_meta_major_merge( +int ObTenantTabletScheduler::schedule_tablet_meta_merge( ObLSHandle &ls_handle, ObTabletHandle &tablet_handle, - const compaction::ObMediumCompactionInfoList &medium_list) + bool &has_created_dag) { int ret = OB_SUCCESS; + ObTablet *tablet = nullptr; + has_created_dag = false; + if (OB_UNLIKELY(!ls_handle.is_valid() || !tablet_handle.is_valid())) { ret = OB_INVALID_ARGUMENT; LOG_WARN("invalid argument", K(ret), K(ls_handle), K(tablet_handle)); + } else if (FALSE_IT(tablet = tablet_handle.get_obj())) { } else { const ObLSID &ls_id = ls_handle.get_ls()->get_ls_id(); - const ObTabletID &tablet_id = tablet_handle.get_obj()->get_tablet_meta().tablet_id_; - LOG_INFO("start try to schedule tablet meta major merge", K(ls_id), K(tablet_id), K(tablet_handle)); // tmp log, remove later - - ObGetMergeTablesParam param; - ObGetMergeTablesResult result; - - const int64_t last_major_snapshot_version = tablet_handle.get_obj()->get_last_major_snapshot_version(); - ObAdaptiveMergePolicy::AdaptiveMergeReason adaptive_merge_reason = ObAdaptiveMergePolicy::AdaptiveMergeReason::NONE; + const ObTabletID &tablet_id = tablet->get_tablet_meta().tablet_id_; + const int64_t last_major_snapshot_version = tablet->get_last_major_snapshot_version(); int64_t max_sync_medium_scn = 0; ObArenaAllocator allocator("GetMediumList", OB_MALLOC_NORMAL_BLOCK_SIZE, MTL_ID()); const compaction::ObMediumCompactionInfoList *medium_list = nullptr; - if (OB_FAIL(tablet_handle.get_obj()->get_max_sync_medium_scn(max_sync_medium_scn))) { - LOG_WARN("failed to get max sync medium snapshot", K(ret), K(ls_id), K(tablet_id)); - } else if (OB_FAIL(tablet_handle.get_obj()->read_medium_info_list(allocator, medium_list))) { - LOG_WARN("failed to read medium info list", K(ret), K(tablet_id)); + + // check medium list + if (OB_FAIL(tablet->read_medium_info_list(allocator, medium_list))) { + LOG_WARN("failed to read medium info list", K(ret), K(ls_id), K(tablet_id)); } else if (OB_FAIL(ObMediumCompactionScheduleFunc::get_max_sync_medium_scn( - *tablet_handle.get_obj(), *medium_list, max_sync_medium_scn))) { + *tablet, *medium_list, max_sync_medium_scn))) { LOG_WARN("failed to get max sync medium snapshot", K(ret), K(ls_id), K(tablet_id)); } else if ((nullptr != medium_list && medium_list->size() > 0) - || max_sync_medium_scn > last_major_snapshot_version) { - // do nothing - } else if (OB_FAIL(ObAdaptiveMergePolicy::get_adaptive_merge_reason(*tablet_handle.get_obj(), adaptive_merge_reason))) { - if (OB_HASH_NOT_EXIST != ret) { - LOG_WARN("failed to get meta merge priority", K(ret), K(tablet_id)); - } else { - ret = OB_SUCCESS; - } - } else if (ObAdaptiveMergePolicy::is_valid_merge_reason(adaptive_merge_reason)) { - LOG_INFO("start schedule meta merge", KPC(tablet_handle.get_obj())); // tmp log, remove later + || max_sync_medium_scn > last_major_snapshot_version) { + ret = OB_NO_NEED_MERGE; + LOG_WARN("tablet exists unfinished medium info, no need to do meta merge", K(ret), K(ls_id), K(tablet_id), + K(last_major_snapshot_version), K(max_sync_medium_scn), KPC(medium_list)); + } else { + LOG_INFO("start schedule meta merge", K(ls_id), K(tablet_id), KPC(tablet)); // tmp log, remove later + ObGetMergeTablesParam param; + ObGetMergeTablesResult result; param.merge_type_ = META_MAJOR_MERGE; if (OB_FAIL(ObAdaptiveMergePolicy::get_meta_merge_tables( param, *ls_handle.get_ls(), - *tablet_handle.get_obj(), + *tablet, result))) { - if (OB_NO_NEED_MERGE == ret) { - ret = OB_SUCCESS; - LOG_DEBUG("tablet no need meta merge", K(ret), K(param), K(tablet_id)); - } else { + if (OB_NO_NEED_MERGE != ret) { LOG_WARN("failed to get meta merge tables", K(ret), K(param), K(tablet_id)); } - } else if (OB_UNLIKELY(tablet_handle.get_obj()->get_multi_version_start() > result.merge_version_)) { + } else if (FALSE_IT(result.merge_version_ = result.version_range_.snapshot_version_)) { + } else if (OB_UNLIKELY(tablet->get_multi_version_start() > result.merge_version_)) { ret = OB_SNAPSHOT_DISCARDED; LOG_WARN("multi version data is discarded, should not compaction now", K(ret), K(ls_id), K(tablet_id), K(result.merge_version_)); + } else if (!tablet->is_row_store()) { + ObCOMergeDagParam dag_param; + dag_param.ls_id_ = ls_id; + dag_param.tablet_id_ = tablet->get_tablet_meta().tablet_id_; + dag_param.merge_type_ = META_MAJOR_MERGE; + dag_param.merge_version_ = result.merge_version_; + dag_param.is_tenant_major_merge_ = false; + dag_param.compat_mode_ = tablet->get_tablet_meta().compat_mode_; + if (OB_FAIL(compaction::ObScheduleDagFunc::schedule_tablet_co_merge_dag_net(dag_param))) { + if (OB_EAGAIN != ret && OB_SIZE_OVERFLOW != ret) { + LOG_WARN("failed to schedule tablet merge dag", K(ret)); + } + } + FLOG_INFO("chaser debug schedule co merge dag", K(ret), K(dag_param), K(tablet->is_row_store())); } else { ObTabletMergeDagParam dag_param(META_MAJOR_MERGE, ls_id, tablet_id); + dag_param.merge_version_ = result.merge_version_; ObTabletMergeExecuteDag *schedule_dag = nullptr; if (OB_FAIL(schedule_merge_execute_dag(dag_param, ls_handle, tablet_handle, result, schedule_dag))) { if (OB_SIZE_OVERFLOW != ret && OB_EAGAIN != ret) { - LOG_WARN("failed to schedule tablet meta merge dag", K(ret)); + LOG_WARN("failed to schedule tablet meta merge dag", K(ret), K(dag_param)); } } } + + if (OB_SUCC(ret)) { + has_created_dag = true; + } } } return ret; @@ -1328,17 +1341,6 @@ int ObTenantTabletScheduler::schedule_next_round_for_leader( LOG_WARN("failed to schedule_next_medium_for_leader", K(tmp_ret), K(ls_handle), K(tablet_handle), KPC(medium_list)); } ++idx; - } else { // unfinish tablet - if (ObTimeUtility::current_time_ns() < tablet_ls_infos.at(i).get_medium_scn() + WAIT_MEDIUM_CHECK_THRESHOLD) { - // need wait 10 mins before schedule meta major - } else if (OB_TMP_FAIL(get_ls_tablet_medium_list(ls_id, tablet_id, tmp_allocator, ls_handle, tablet_handle, medium_list, weak_read_ts))) { - LOG_WARN("failed to get_ls_tablet_medium_list", K(tmp_ret), K(ls_handle), K(tablet_handle), KPC(medium_list)); - } else if (enable_adaptive_compaction_ && OB_TMP_FAIL( - schedule_tablet_meta_major_merge(ls_handle, tablet_handle, *medium_list))) { - if (OB_SIZE_OVERFLOW != tmp_ret && OB_EAGAIN != tmp_ret) { - LOG_WARN("failed to schedule tablet merge", K(tmp_ret), K(ls_id), K(tablet_id)); - } - } } // clear flags set by ls_start_schedule_medium //#TODO @jingshui sort tablet_ls_info with ls id diff --git a/src/storage/compaction/ob_tenant_tablet_scheduler.h b/src/storage/compaction/ob_tenant_tablet_scheduler.h index d7e1bd9f2..94ce816e4 100644 --- a/src/storage/compaction/ob_tenant_tablet_scheduler.h +++ b/src/storage/compaction/ob_tenant_tablet_scheduler.h @@ -105,7 +105,7 @@ private: const int64_t memtable_create_timestamp, int64_t &adaptive_threshold); private: - static const int64_t FAST_FREEZE_INTERVAL_US = 300 * 1000 * 1000L; //300s + static const int64_t FAST_FREEZE_INTERVAL_US = 120 * 1000 * 1000L; //120s static const int64_t PRINT_LOG_INVERVAL = 2 * 60 * 1000 * 1000L; // 2m static const int64_t TOMBSTONE_DEFAULT_ROW_COUNT = 250000; static const int64_t TOMBSTONE_MAX_ROW_COUNT = 500000; @@ -245,10 +245,10 @@ public: static int schedule_tablet_minor_merge( ObLSHandle &ls_handle, ObTabletHandle &tablet_handle); - static int schedule_tablet_meta_major_merge( + static int schedule_tablet_meta_merge( ObLSHandle &ls_handle, ObTabletHandle &tablet_handle, - const compaction::ObMediumCompactionInfoList &medium_list); + bool &has_created_dag); template static int schedule_merge_execute_dag( const compaction::ObTabletMergeDagParam ¶m, diff --git a/src/storage/ob_i_table.cpp b/src/storage/ob_i_table.cpp index 7097c795e..81633e65a 100644 --- a/src/storage/ob_i_table.cpp +++ b/src/storage/ob_i_table.cpp @@ -69,7 +69,8 @@ const char* ObITable::table_type_name_[] = "DDL_MEM", "COL_ORIENTED", "NORMAL_COL_GROUP", - "ROWKEY_COL_GROUP" + "ROWKEY_COL_GROUP", + "COL_ORIENTED_META" }; uint64_t ObITable::TableKey::hash() const @@ -782,8 +783,7 @@ int ObTablesHandleArray::check_continues(const share::ObScnRange *scn_range) con if (OB_ISNULL(table = handles_array_.at(i).get_table())) { ret = OB_ERR_SYS; LOG_WARN("table is NULL", KPC(table)); - } else if (table->is_major_sstable() || table->is_meta_major_sstable()) { - base_end_scn = table->is_meta_major_sstable() ? table->get_end_scn() : SCN::min_scn(); + } else if (table->is_major_sstable()) { i++; } // 2:check minor sstable @@ -792,7 +792,7 @@ int ObTablesHandleArray::check_continues(const share::ObScnRange *scn_range) con if (OB_ISNULL(table)) { ret = OB_ERR_SYS; LOG_WARN("table is NULL", KPC(table)); - } else if (table->is_major_sstable() || table->is_meta_major_sstable()) { + } else if (table->is_major_sstable()) { ret = OB_ERR_SYS; LOG_WARN("major sstable or meta merge should be first", K(ret), K(i), K(table)); } else if (OB_ISNULL(last_table)) { // first table diff --git a/src/storage/ob_i_table.h b/src/storage/ob_i_table.h index 97e5bb418..ee37e6f6a 100644 --- a/src/storage/ob_i_table.h +++ b/src/storage/ob_i_table.h @@ -96,6 +96,7 @@ public: COLUMN_ORIENTED_SSTABLE = 17, NORMAL_COLUMN_GROUP_SSTABLE = 18, ROWKEY_COLUMN_GROUP_SSTABLE = 19, + COLUMN_ORIENTED_META_SSTABLE = 20, // < add new sstable before here, See is_sstable() MAX_TABLE_TYPE @@ -125,7 +126,7 @@ public: OB_INLINE bool is_lock_memtable() const { return ObITable::is_lock_memtable(table_type_); } OB_INLINE bool is_minor_sstable() const { return ObITable::is_minor_sstable(table_type_); } OB_INLINE bool is_mini_sstable() const { return ObITable::is_mini_sstable(table_type_); } - OB_INLINE bool is_major_sstable() const { return ObITable::is_major_sstable(table_type_); } + OB_INLINE bool is_major_sstable() const { return ObITable::is_major_sstable(table_type_) || ObITable::is_meta_major_sstable(table_type_); } OB_INLINE bool is_meta_major_sstable() const { return ObITable::is_meta_major_sstable(table_type_); } OB_INLINE bool is_multi_version_table() const { return ObITable::is_multi_version_table(table_type_); } OB_INLINE bool is_ddl_sstable() const { return ObITable::is_ddl_sstable(table_type_); } @@ -244,7 +245,8 @@ public: virtual bool is_normal_cg_sstable() const { return is_normal_cg_sstable(key_.table_type_); } virtual bool is_cg_sstable() const { return is_cg_sstable(key_.table_type_); } virtual bool is_column_store_sstable() const { return is_co_sstable() || is_cg_sstable(); } - virtual bool is_major_sstable() const { return is_major_sstable(key_.table_type_); } + virtual bool is_meta_major_sstable() const { return is_meta_major_sstable(key_.table_type_); } + virtual bool is_major_sstable() const { return is_major_sstable(key_.table_type_) || is_meta_major_sstable(key_.table_type_); } virtual bool is_minor_sstable() const { return is_minor_sstable(key_.table_type_); } virtual bool is_mini_sstable() const { return is_mini_sstable(key_.table_type_); } virtual bool is_multi_version_minor_sstable() const { return is_multi_version_minor_sstable(key_.table_type_); } @@ -257,7 +259,6 @@ public: virtual bool is_lock_memtable() const { return is_lock_memtable(key_.table_type_); } virtual bool is_frozen_memtable() const { return false; } virtual bool is_active_memtable() const { return false; } - virtual bool is_meta_major_sstable() const { return is_meta_major_sstable(key_.table_type_); } OB_INLINE bool is_table_with_scn_range() const { return is_table_with_scn_range(key_.table_type_); } virtual OB_INLINE int64_t get_timestamp() const { return 0; } virtual bool is_ddl_sstable() const { return is_ddl_sstable(key_.table_type_); } @@ -273,7 +274,9 @@ public: } static bool is_major_sstable(const TableType table_type) { - return ObITable::TableType::MAJOR_SSTABLE == table_type || is_co_sstable(table_type) || is_cg_sstable(table_type); + return ObITable::TableType::MAJOR_SSTABLE == table_type + || ObITable::TableType::COLUMN_ORIENTED_SSTABLE == table_type + || is_cg_sstable(table_type); } static bool is_minor_sstable(const TableType table_type) { @@ -308,7 +311,8 @@ public: */ static bool is_co_sstable(const TableType table_type) { - return ObITable::TableType::COLUMN_ORIENTED_SSTABLE == table_type; + return ObITable::TableType::COLUMN_ORIENTED_SSTABLE == table_type + || ObITable::TableType::COLUMN_ORIENTED_META_SSTABLE == table_type; } static bool is_normal_cg_sstable(const TableType table_type) { @@ -369,7 +373,8 @@ public: static bool is_meta_major_sstable(const TableType table_type) { - return ObITable::TableType::META_MAJOR_SSTABLE == table_type; + return ObITable::TableType::META_MAJOR_SSTABLE == table_type + || ObITable::TableType::COLUMN_ORIENTED_META_SSTABLE == table_type; } static bool is_ddl_sstable(const TableType table_type) { diff --git a/src/storage/ob_tenant_tablet_stat_mgr.cpp b/src/storage/ob_tenant_tablet_stat_mgr.cpp index 70d80a3ae..213b2fe4d 100644 --- a/src/storage/ob_tenant_tablet_stat_mgr.cpp +++ b/src/storage/ob_tenant_tablet_stat_mgr.cpp @@ -663,6 +663,32 @@ int ObTenantTabletStatMgr::get_latest_tablet_stat( return ret; } +int ObTenantTabletStatMgr::clear_tablet_stat( + const share::ObLSID &ls_id, + const common::ObTabletID &tablet_id) +{ + int ret = OB_SUCCESS; + const ObTabletStatKey key(ls_id, tablet_id); + + if (IS_NOT_INIT) { + ret = OB_NOT_INIT; + LOG_WARN("ObTenantTabletStatMgr not inited", K(ret)); + } else if (OB_UNLIKELY(!key.is_valid())) { + ret = OB_INVALID_ARGUMENT; + LOG_WARN("get invalid arguments", K(ret), K(ls_id), K(tablet_id)); + } else { + ObBucketHashWLockGuard lock_guard(bucket_lock_, key.hash()); + if (OB_FAIL(stream_map_.erase_refactored(key))) { + if (OB_HASH_NOT_EXIST == ret) { + ret = OB_SUCCESS; + } else { + LOG_WARN("failed to erase tablet stat", K(ret), K(key)); + } + } + } + return ret; +} + int ObTenantTabletStatMgr::get_all_tablet_stats( common::ObIArray &tablet_stats) { diff --git a/src/storage/ob_tenant_tablet_stat_mgr.h b/src/storage/ob_tenant_tablet_stat_mgr.h index 23a03d684..c2dae3bfb 100644 --- a/src/storage/ob_tenant_tablet_stat_mgr.h +++ b/src/storage/ob_tenant_tablet_stat_mgr.h @@ -344,6 +344,9 @@ public: const share::ObLSID &ls_id, const common::ObTabletID &tablet_id, ObTabletStatAnalyzer &analyzer); + int clear_tablet_stat( + const share::ObLSID &ls_id, + const common::ObTabletID &tablet_id); int get_sys_stat(ObTenantSysStat &sys_stat); void process_stats(); void refresh_all(const int64_t step); diff --git a/src/storage/tablet/ob_tablet_table_store.cpp b/src/storage/tablet/ob_tablet_table_store.cpp index 9e53c61c2..f3140e338 100644 --- a/src/storage/tablet/ob_tablet_table_store.cpp +++ b/src/storage/tablet/ob_tablet_table_store.cpp @@ -173,7 +173,7 @@ int ObTabletTableStore::init( // skip } else if (OB_FAIL(build_memtable_array(tablet))) { LOG_WARN("failed to build memtable array", K(ret)); - } else if (OB_UNLIKELY(!sstable->is_major_sstable())) { + } else if (OB_UNLIKELY(!sstable->is_major_sstable() || sstable->is_meta_major_sstable())) { ret = OB_ERR_UNEXPECTED; LOG_WARN("unexpected table", K(ret), KPC(sstable)); } else if (OB_FAIL(major_tables_.init(allocator, sstable))) { @@ -872,13 +872,13 @@ int ObTabletTableStore::get_table(const ObITable::TableKey &table_key, ObITable table = nullptr; const ObSSTableArray *sst_array = nullptr; if (table_key.is_major_sstable()) { - sst_array = &major_tables_; + sst_array = table_key.is_meta_major_sstable() + ? &meta_major_tables_ + : &major_tables_; } else if (table_key.is_minor_sstable()) { sst_array = &minor_tables_; } else if (table_key.is_ddl_sstable()) { sst_array = &ddl_sstables_; - } else if (table_key.is_meta_major_sstable()) { - sst_array = &meta_major_tables_; } if (table_key.is_memtable()) { @@ -1265,7 +1265,7 @@ int ObTabletTableStore::inner_build_major_tables_( for (int64_t i = 0; OB_SUCC(ret) && i < tables_array.count(); ++i) { ObITable *new_table = tables_array.at(i); need_add = true; - if (OB_NOT_NULL(new_table) && new_table->is_major_sstable()) { + if (OB_NOT_NULL(new_table) && (new_table->is_major_sstable() && !new_table->is_meta_major_sstable())) { for (int64_t j = 0; OB_SUCC(ret) && j < major_tables.count(); ++j) { ObITable *table = major_tables.at(j); if (OB_ISNULL(table) || OB_UNLIKELY(!table->is_major_sstable())) {