From dc949461b3413c66f6f052d8ceaf5cb3ddb49fcf Mon Sep 17 00:00:00 2001 From: HoniiTro19 Date: Wed, 5 Feb 2025 09:45:31 +0000 Subject: [PATCH] get medium info to schedule merge dag in ss and freezer --- .../compaction/ob_basic_tablet_merge_ctx.cpp | 30 +++------------- .../compaction/ob_tenant_tablet_scheduler.cpp | 35 +++++++++++++++++-- .../compaction/ob_tenant_tablet_scheduler.h | 4 +++ src/storage/ls/ob_freezer.cpp | 12 +++++-- .../tablet/ob_tablet_medium_info_reader.cpp | 35 +++++++++++++++++++ .../tablet/ob_tablet_medium_info_reader.h | 6 +++- 6 files changed, 92 insertions(+), 30 deletions(-) diff --git a/src/storage/compaction/ob_basic_tablet_merge_ctx.cpp b/src/storage/compaction/ob_basic_tablet_merge_ctx.cpp index bfdd30bbfa..c65eea5fa6 100644 --- a/src/storage/compaction/ob_basic_tablet_merge_ctx.cpp +++ b/src/storage/compaction/ob_basic_tablet_merge_ctx.cpp @@ -1175,38 +1175,18 @@ int ObBasicTabletMergeCtx::get_medium_compaction_info() { int ret = OB_SUCCESS; ObTablet *tablet = get_tablet(); - const share::ObLSID &ls_id = tablet->get_ls_id(); - const common::ObTabletID &tablet_id = tablet->get_tablet_id(); ObArenaAllocator temp_allocator("GetMediumInfo", OB_MALLOC_NORMAL_BLOCK_SIZE, MTL_ID()); // for load medium info - ObMediumCompactionInfoKey medium_info_key(get_merge_version()); ObMediumCompactionInfo *medium_info = nullptr; if (OB_UNLIKELY(tablet->get_multi_version_start() > get_merge_version())) { ret = OB_SNAPSHOT_DISCARDED; LOG_ERROR("multi version data is discarded, should not execute compaction now", K(ret), "param", get_dag_param(), KPC(this)); - } else if (OB_FAIL(ObTabletObjLoadHelper::alloc_and_new(temp_allocator, medium_info))) { - LOG_WARN("fail to alloc and new", K(ret)); - } else { - SMART_VARS_2((ObTableScanParam, scan_param), (storage::ObTabletMediumInfoReader, medium_info_reader)) { - if (OB_FAIL(ObMdsScanParamHelper::build_medium_info_scan_param( - temp_allocator, - ls_id, - tablet_id, - scan_param))) { - LOG_WARN("fail to build scan param", K(ret), K(ls_id), K(tablet_id)); - } else if (OB_FAIL(medium_info_reader.init(*tablet, scan_param))) { - LOG_WARN("failed to init medium info reader", K(ret), KPC(this)); - } else if (OB_FAIL(medium_info_reader.get_specified_medium_info(temp_allocator, medium_info_key, *medium_info))) { - LOG_WARN("failed to get specified scn info", K(ret), K(medium_info_key)); - } else if (OB_UNLIKELY(!medium_info->is_valid())) { - ret = OB_ERR_UNEXPECTED; - LOG_WARN("medium info is invalid", KR(ret), K(medium_info)); - } - } - } - - if (OB_FAIL(ret)) { + } else if (OB_FAIL(ObTabletMediumInfoReader::get_medium_info_with_merge_version(get_merge_version(), + *tablet, + temp_allocator, + medium_info))) { + LOG_WARN("fail to get medium info with merge version", K(ret), K(get_merge_version()), KPC(tablet)); } else if (medium_info->contain_parallel_range_ && !parallel_merge_ctx_.is_valid() && OB_FAIL(parallel_merge_ctx_.init(*medium_info))) { diff --git a/src/storage/compaction/ob_tenant_tablet_scheduler.cpp b/src/storage/compaction/ob_tenant_tablet_scheduler.cpp index 4359544c58..9d64c73ff0 100644 --- a/src/storage/compaction/ob_tenant_tablet_scheduler.cpp +++ b/src/storage/compaction/ob_tenant_tablet_scheduler.cpp @@ -23,6 +23,7 @@ #include "storage/ob_gc_upper_trans_helper.h" #include "share/schema/ob_tenant_schema_service.h" #include "storage/compaction/ob_schedule_tablet_func.h" +#include "storage/tablet/ob_tablet_medium_info_reader.h" namespace oceanbase { @@ -1068,6 +1069,29 @@ int ObTenantTabletScheduler::schedule_merge_dag( return ret; } +int ObTenantTabletScheduler::get_co_merge_type_for_compaction( + const int64_t merge_version, + const storage::ObTablet &tablet, + ObCOMajorMergePolicy::ObCOMajorMergeType &co_major_merge_type) +{ + int ret = OB_SUCCESS; + co_major_merge_type = ObCOMajorMergePolicy::INVALID_CO_MAJOR_MERGE_TYPE; + ObArenaAllocator temp_allocator("GetMediumInfo", OB_MALLOC_NORMAL_BLOCK_SIZE, MTL_ID()); // for load medium info + ObMediumCompactionInfo *medium_info = nullptr; + if (OB_UNLIKELY(tablet.get_multi_version_start() > merge_version)) { + ret = OB_SNAPSHOT_DISCARDED; + LOG_ERROR("multi version data is discarded, should not execute compaction now", K(ret), K(tablet), K(merge_version)); + } else if (OB_FAIL(ObTabletMediumInfoReader::get_medium_info_with_merge_version(merge_version, + tablet, + temp_allocator, + medium_info))) { + LOG_WARN("fail to get medium info with merge version", K(ret), K(merge_version), K(tablet)); + } else { + co_major_merge_type = static_cast(medium_info->co_major_merge_type_); + } + return ret; +} + int ObTenantTabletScheduler::schedule_convert_co_merge_dag_net( const ObLSID &ls_id, const ObTablet &tablet, @@ -1078,8 +1102,15 @@ int ObTenantTabletScheduler::schedule_convert_co_merge_dag_net( int ret = OB_SUCCESS; int tmp_ret = OB_SUCCESS; schedule_ret = OB_SUCCESS; - if (OB_TMP_FAIL(compaction::ObTenantTabletScheduler::schedule_merge_dag( - ls_id, tablet, compaction::ObMergeType::CONVERT_CO_MAJOR_MERGE, tablet.get_last_major_snapshot_version(), EXEC_MODE_LOCAL, &curr_dag_net_id))) { + // do not reply on co_major_merge_type in cs replica + ObCOMajorMergePolicy::ObCOMajorMergeType co_major_merge_type = ObCOMajorMergePolicy::INVALID_CO_MAJOR_MERGE_TYPE; + if (OB_TMP_FAIL(compaction::ObTenantTabletScheduler::schedule_merge_dag(ls_id, + tablet, + compaction::ObMergeType::CONVERT_CO_MAJOR_MERGE, + tablet.get_last_major_snapshot_version(), + EXEC_MODE_LOCAL, + &curr_dag_net_id, + co_major_merge_type))) { if (OB_SIZE_OVERFLOW != tmp_ret && OB_EAGAIN != tmp_ret) { ret = tmp_ret; LOG_WARN("failed to schedule co merge dag net for cs replica", K(ret), K(ls_id), "tablet_id", tablet.get_tablet_id()); diff --git a/src/storage/compaction/ob_tenant_tablet_scheduler.h b/src/storage/compaction/ob_tenant_tablet_scheduler.h index 1191d87df6..0715725665 100644 --- a/src/storage/compaction/ob_tenant_tablet_scheduler.h +++ b/src/storage/compaction/ob_tenant_tablet_scheduler.h @@ -223,6 +223,10 @@ public: const storage::ObTablet &tablet, const ObMergeType merge_type, ObCSReplicaTabletStatus &cs_replica_status); + static int get_co_merge_type_for_compaction( + const int64_t merge_version, + const storage::ObTablet &tablet, + ObCOMajorMergePolicy::ObCOMajorMergeType &co_major_merge_type); static int schedule_merge_dag( const share::ObLSID &ls_id, const storage::ObTablet &tablet, diff --git a/src/storage/ls/ob_freezer.cpp b/src/storage/ls/ob_freezer.cpp index c53f79c77e..fd7e89aa68 100644 --- a/src/storage/ls/ob_freezer.cpp +++ b/src/storage/ls/ob_freezer.cpp @@ -1167,8 +1167,16 @@ int ObFreezer::handle_no_active_memtable_(const ObTabletID &tablet_id, } else if (protected_handle->get_max_saved_version_from_medium_info_recorder() >= freeze_snapshot_version.get_val_for_tx() && !GCTX.is_shared_storage_mode()) { int tmp_ret = OB_SUCCESS; - if (OB_TMP_FAIL(compaction::ObTenantTabletScheduler::schedule_merge_dag( - ls_id, *tablet, MEDIUM_MERGE, freeze_snapshot_version.get_val_for_tx(), EXEC_MODE_LOCAL))) { + ObCOMajorMergePolicy::ObCOMajorMergeType co_major_merge_type; + if (OB_FAIL(ObTenantTabletScheduler::get_co_merge_type_for_compaction(freeze_snapshot_version.get_val_for_tx(), *tablet, co_major_merge_type))) { + LOG_WARN("fail to get co merge type from medium info", K(ret), K(freeze_snapshot_version), KPC(tablet)); + } else if (OB_TMP_FAIL(compaction::ObTenantTabletScheduler::schedule_merge_dag(ls_id, + *tablet, + MEDIUM_MERGE, + freeze_snapshot_version.get_val_for_tx(), + EXEC_MODE_LOCAL, + nullptr, /*dag_net_id*/ + co_major_merge_type))) { if (OB_SIZE_OVERFLOW != tmp_ret && OB_EAGAIN != tmp_ret) { ret = tmp_ret; LOG_WARN("failed to schedule medium merge dag", K(ret), K(ls_id), K(tablet_id)); diff --git a/src/storage/tablet/ob_tablet_medium_info_reader.cpp b/src/storage/tablet/ob_tablet_medium_info_reader.cpp index f65800b1c4..eedefc5633 100644 --- a/src/storage/tablet/ob_tablet_medium_info_reader.cpp +++ b/src/storage/tablet/ob_tablet_medium_info_reader.cpp @@ -12,6 +12,7 @@ #include "storage/tablet/ob_tablet_medium_info_reader.h" #include "storage/tablet/ob_tablet.h" +#include "storage/tablet/ob_mds_scan_param_helper.h" #define USING_LOG_PREFIX STORAGE @@ -142,6 +143,40 @@ int ObTabletMediumInfoReader::get_specified_medium_info( return ret; } +int ObTabletMediumInfoReader::get_medium_info_with_merge_version( + const int64_t merge_version, + const ObTablet &tablet, + common::ObIAllocator &allocator, + compaction::ObMediumCompactionInfo *&medium_info) +{ + int ret = OB_SUCCESS; + medium_info = nullptr; + const share::ObLSID &ls_id = tablet.get_ls_id(); + const common::ObTabletID &tablet_id = tablet.get_tablet_id(); + compaction::ObMediumCompactionInfoKey medium_info_key(merge_version); + if (OB_FAIL(ObTabletObjLoadHelper::alloc_and_new(allocator, medium_info))) { + LOG_WARN("fail to alloc and new", K(ret)); + } else { + SMART_VARS_2((ObTableScanParam, scan_param), (ObTabletMediumInfoReader, medium_info_reader)) { + if (OB_FAIL(ObMdsScanParamHelper::build_medium_info_scan_param( + allocator, + ls_id, + tablet_id, + scan_param))) { + LOG_WARN("fail to build scan param", K(ret), K(ls_id), K(tablet_id)); + } else if (OB_FAIL(medium_info_reader.init(tablet, scan_param))) { + LOG_WARN("fail to init medium info reader", K(ret)); + } else if (OB_FAIL(medium_info_reader.get_specified_medium_info(allocator, medium_info_key, *medium_info))) { + LOG_WARN("fail to get specified scn info", K(ret), K(medium_info_key)); + } else if (OB_ISNULL(medium_info) || OB_UNLIKELY(!medium_info->is_valid())) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("medium info is invalid", K(ret), K(medium_info)); + } + } + } + return ret; +} + int ObTabletMediumInfoReader::get_min_medium_snapshot( const int64_t last_major_snapshot_version, int64_t &min_medium_snapshot) diff --git a/src/storage/tablet/ob_tablet_medium_info_reader.h b/src/storage/tablet/ob_tablet_medium_info_reader.h index b361d77ed6..58cc6c94db 100644 --- a/src/storage/tablet/ob_tablet_medium_info_reader.h +++ b/src/storage/tablet/ob_tablet_medium_info_reader.h @@ -27,6 +27,11 @@ class ObTabletMediumInfoReader public: ObTabletMediumInfoReader(); ~ObTabletMediumInfoReader(); + static int get_medium_info_with_merge_version( + const int64_t merge_version, + const ObTablet &tablet, + common::ObIAllocator &allocator, + compaction::ObMediumCompactionInfo *&medium_info); public: int init( const ObTablet &tablet, @@ -39,7 +44,6 @@ public: common::ObIAllocator &allocator, const compaction::ObMediumCompactionInfoKey &key, compaction::ObMediumCompactionInfo &medium_info); - int get_min_medium_snapshot( const int64_t last_major_snapshot_version, int64_t &min_medium_snapshot);