From 0a9f32a5d18277a1dcfac0b4146e2d19ddca9877 Mon Sep 17 00:00:00 2001 From: obdev Date: Sat, 28 Jan 2023 18:00:20 +0800 Subject: [PATCH] use medium in memtable to schedule --- src/storage/compaction/ob_compaction_util.cpp | 7 +- .../compaction/ob_medium_compaction_func.cpp | 178 +++++++++++------- .../compaction/ob_medium_compaction_func.h | 16 +- .../compaction/ob_medium_compaction_info.h | 1 + .../compaction/ob_medium_compaction_mgr.cpp | 36 ++-- .../compaction/ob_medium_compaction_mgr.h | 13 +- .../compaction/ob_tablet_merge_ctx.cpp | 125 +++++++----- src/storage/compaction/ob_tablet_merge_ctx.h | 6 +- .../compaction/ob_tablet_merge_task.cpp | 9 +- .../compaction/ob_tenant_tablet_scheduler.cpp | 11 +- src/storage/ob_storage_struct.cpp | 12 +- src/storage/ob_storage_struct.h | 6 +- src/storage/tablet/ob_tablet.cpp | 2 +- 13 files changed, 247 insertions(+), 175 deletions(-) diff --git a/src/storage/compaction/ob_compaction_util.cpp b/src/storage/compaction/ob_compaction_util.cpp index d60056cd44..5069ad8dce 100644 --- a/src/storage/compaction/ob_compaction_util.cpp +++ b/src/storage/compaction/ob_compaction_util.cpp @@ -21,14 +21,15 @@ const char * ObMergeTypeStr[] = { "MAJOR_MERGE", "MEDIUM_MERGE", "DDL_KV_MERGE", - "BACKFILL_TX_MERGE" + "BACKFILL_TX_MERGE", + "EMPTY_MERGE_TYPE" }; const char *merge_type_to_str(const ObMergeType &merge_type) { - STATIC_ASSERT(static_cast(MERGE_TYPE_MAX) == ARRAYSIZEOF(ObMergeTypeStr), "merge type str len is mismatch"); + STATIC_ASSERT(static_cast(MERGE_TYPE_MAX + 1) == ARRAYSIZEOF(ObMergeTypeStr), "merge type str len is mismatch"); const char *str = ""; - if (merge_type >= MERGE_TYPE_MAX || merge_type <= INVALID_MERGE_TYPE) { + if (merge_type > MERGE_TYPE_MAX || merge_type <= INVALID_MERGE_TYPE) { str = "invalid_merge_type"; } else { str = ObMergeTypeStr[merge_type]; diff --git a/src/storage/compaction/ob_medium_compaction_func.cpp b/src/storage/compaction/ob_medium_compaction_func.cpp index 987af8bb67..91c4379532 100644 --- a/src/storage/compaction/ob_medium_compaction_func.cpp +++ b/src/storage/compaction/ob_medium_compaction_func.cpp @@ -262,7 +262,7 @@ int ObMediumCompactionScheduleFunc::decide_medium_snapshot( if (medium_info.medium_snapshot_ == tablet_.get_snapshot_version() // no uncommitted sstable && weak_read_ts.get_val_for_tx() <= max_reserved_snapshot && weak_read_ts.get_val_for_tx() + DEFAULT_SCHEDULE_MEDIUM_INTERVAL < ObTimeUtility::current_time_ns()) { - medium_info.medium_snapshot_ = weak_read_ts.get_val_for_tx(); + medium_info.medium_snapshot_ = MAX(max_reserved_snapshot, weak_read_ts.get_val_for_tx()); LOG_INFO("use weak_read_ts to schedule medium", K(ret), KPC(this), K(medium_info), K(max_reserved_snapshot), K(weak_read_ts)); } else { @@ -562,29 +562,14 @@ int ObMediumCompactionScheduleFunc::get_table_schema_to_merge( LOG_WARN("table is deleted", K(ret), K(table_id)); } } - if (OB_SUCC(ret)) { - int64_t max_schema_version = 0; - if (OB_FAIL(tablet_.get_max_sync_storage_schema_version(max_schema_version))) { - LOG_WARN("failed to get max sync storage schema version", K(ret), KPC(this)); - } else if (max_schema_version < table_schema->get_schema_version()) { - // need sync schema clog - if (OB_FAIL(tablet_.try_update_storage_schema( - table_id, - table_schema->get_schema_version(), - allocator_, - DEFAULT_SYNC_SCHEMA_CLOG_TIMEOUT))) { - LOG_WARN("failed to sync schema clog", K(ret), KPC(this), KPC(table_schema)); - } - } - if (FAILEDx(medium_info.storage_schema_.init( - allocator_, - *table_schema, - tablet_.get_tablet_meta().compat_mode_))) { - LOG_WARN("failed to init storage schema", K(ret), K(schema_version)); - } else { - FLOG_INFO("get schema to merge", K(table_id), K(schema_version), K(save_schema_version), - K(*reinterpret_cast(table_schema))); - } + if (FAILEDx(medium_info.storage_schema_.init( + allocator_, + *table_schema, + tablet_.get_tablet_meta().compat_mode_))) { + LOG_WARN("failed to init storage schema", K(ret), K(schema_version)); + } else { + FLOG_INFO("get schema to merge", K(table_id), K(schema_version), K(save_schema_version), + K(*reinterpret_cast(table_schema))); } return ret; } @@ -749,7 +734,8 @@ int ObMediumCompactionScheduleFunc::check_medium_finish() int ObMediumCompactionScheduleFunc::schedule_tablet_medium_merge( ObLS &ls, ObTablet &tablet, - const int64_t input_major_snapshot) + const int64_t input_major_snapshot, + const bool schedule_with_memtable) { int ret = OB_SUCCESS; #ifdef ERRSIM @@ -763,12 +749,19 @@ int ObMediumCompactionScheduleFunc::schedule_tablet_medium_merge( const ObTabletID &tablet_id = tablet.get_tablet_meta().tablet_id_; const ObLSID &ls_id = ls.get_ls_id(); int64_t major_frozen_snapshot = 0 == input_major_snapshot ? MTL(ObTenantTabletScheduler *)->get_frozen_version() : input_major_snapshot; - - const int64_t schedule_scn = tablet.get_medium_compaction_info_list().get_schedule_scn(major_frozen_snapshot); + ObMediumCompactionInfo::ObCompactionType compaction_type = ObMediumCompactionInfo::COMPACTION_TYPE_MAX; + int64_t schedule_scn = 0; bool need_merge = false; - LOG_DEBUG("schedule_tablet_medium_merge", K(schedule_scn), K(ls_id), K(tablet_id)); - if (schedule_scn > 0) { - if (OB_FAIL(check_need_merge_and_schedule(ls, tablet, schedule_scn, need_merge))) { + + (void)tablet.get_medium_compaction_info_list().get_schedule_scn(major_frozen_snapshot, schedule_scn, compaction_type); + + LOG_DEBUG("schedule_tablet_medium_merge", K(schedule_scn), K(major_frozen_snapshot), K(schedule_with_memtable), K(ls_id), K(tablet_id)); + if (0 == schedule_scn + && schedule_with_memtable + && OB_FAIL(get_schedule_medium_from_memtable(tablet, major_frozen_snapshot, schedule_scn, compaction_type))) { + LOG_WARN("failed to get schedule medium scn from memtables", K(ret)); + } else if (schedule_scn > 0) { + if (OB_FAIL(check_need_merge_and_schedule(ls, tablet, schedule_scn, compaction_type))) { LOG_WARN("failed to check medium merge", K(ret), K(ls_id), K(tablet_id), K(schedule_scn)); } } @@ -792,39 +785,45 @@ int ObMediumCompactionScheduleFunc::get_palf_role(const ObLSID &ls_id, ObRole &r return ret; } -int ObMediumCompactionScheduleFunc::freeze_memtable_to_get_medium_info() +int ObMediumCompactionScheduleFunc::get_schedule_medium_from_memtable( + ObTablet &tablet, + const int64_t major_frozen_snapshot, + int64_t &schedule_medium_scn, + ObMediumCompactionInfo::ObCompactionType &compaction_type) { int ret = OB_SUCCESS; - ObSEArray memtables; - if (OB_FAIL(tablet_.get_table_store().get_memtables(memtables, true/*need_active*/))) { - LOG_WARN("failed to get memtables", K(ret), KPC(this)); - } else if (memtables.empty()) { - // do nothing - } else { - memtable::ObMemtable *memtable = nullptr; - bool receive_medium_info = false; - bool has_medium_info = false; - for (int i = 0; OB_SUCC(ret) && i < memtables.count(); ++i) { - if (OB_ISNULL(memtable = static_cast(memtables.at(i)))) { - ret = OB_ERR_UNEXPECTED; - LOG_WARN("memtable is null", K(ret), K(i), KPC(memtables.at(i)), K(memtable)); - } else if (memtable->has_multi_source_data_unit(memtable::MultiSourceDataUnitType::MEDIUM_COMPACTION_INFO)) { - has_medium_info = true; - if (memtable->is_active_memtable()) { - receive_medium_info = true; - break; + schedule_medium_scn = 0; + compaction_type = ObMediumCompactionInfo::COMPACTION_TYPE_MAX; + + ObITable *last_major = tablet.get_table_store().get_major_sstables().get_boundary_table(true/*last*/); + if (OB_NOT_NULL(last_major)) { + ObArenaAllocator tmp_allocator; + ObMediumCompactionInfoList tmp_medium_list; + const int64_t last_major_snapshot = last_major->get_snapshot_version(); + ObSEArray memtables; + if (OB_FAIL(tablet.get_table_store().get_memtables(memtables, true/*need_active*/))) { + LOG_WARN("failed to get memtables", K(ret), "tablet_id", tablet.get_tablet_meta().tablet_id_); + } else if (memtables.empty()) { + // do nothing + } else if (OB_FAIL(get_medium_info_list_from_memtable(tmp_allocator, memtables, tmp_medium_list))) { + LOG_WARN("failed to get medium info list from memtable", K(ret)); + } else if (!tmp_medium_list.is_empty()) { + const ObMediumCompactionInfo *info_in_list = nullptr; + DLIST_FOREACH_X(info, tmp_medium_list.get_list(), OB_SUCC(ret)) { + if (OB_UNLIKELY(memtable::MultiSourceDataUnitType::MEDIUM_COMPACTION_INFO != info->type())) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("return info is invalid", K(ret), KPC(info)); + } else if (FALSE_IT(info_in_list = static_cast(info))) { + } else if (info_in_list->medium_snapshot_ <= last_major_snapshot) { + // finished, this medium info could recycle + } else { + if (info_in_list->is_medium_compaction() || info_in_list->medium_snapshot_ <= major_frozen_snapshot) { + schedule_medium_scn = info_in_list->medium_snapshot_; + compaction_type = (ObMediumCompactionInfo::ObCompactionType)info_in_list->compaction_type_; + } + break; // found one unfinish medium info, loop end } } - } // end of for - if (OB_FAIL(ret)) { - } else if (receive_medium_info) { - if (OB_FAIL(ls_.tablet_freeze(tablet_.get_tablet_meta().tablet_id_, true/*is_sync*/))) { - if (OB_TABLE_NOT_EXIST != ret) { - LOG_WARN("failed to freeze tablet", K(ret), KPC(this)); - } - } - } else if (has_medium_info) { - LOG_INFO("received medium info, the memtable is frozen, no need to freeze tablet again", K(ret), K(tablet_)); } } return ret; @@ -834,17 +833,20 @@ int ObMediumCompactionScheduleFunc::check_need_merge_and_schedule( ObLS &ls, ObTablet &tablet, const int64_t schedule_scn, - bool &need_merge) + const ObMediumCompactionInfo::ObCompactionType compaction_type) { int ret = OB_SUCCESS; int tmp_ret = OB_SUCCESS; - need_merge = false; + bool need_merge = false; bool can_merge = false; bool need_force_freeze = false; const ObLSID &ls_id = ls.get_ls_id(); const ObTabletID &tablet_id = tablet.get_tablet_meta().tablet_id_; - if (OB_FAIL(ObPartitionMergePolicy::check_need_medium_merge( + if (OB_UNLIKELY(0 == schedule_scn || !ObMediumCompactionInfo::is_valid_compaction_type(compaction_type))) { + ret = OB_INVALID_ARGUMENT; + LOG_WARN("invalid argument", K(ret), K(schedule_scn), K(compaction_type)); + } else if (OB_FAIL(ObPartitionMergePolicy::check_need_medium_merge( ls, tablet, schedule_scn, @@ -854,14 +856,12 @@ int ObMediumCompactionScheduleFunc::check_need_merge_and_schedule( LOG_WARN("failed to check medium merge", K(ret), K(ls_id), K(tablet_id)); } else if (need_merge && can_merge) { const ObMediumCompactionInfo *medium_info = nullptr; - if (OB_FAIL(tablet.get_medium_compaction_info_list().get_specified_scn_info(schedule_scn, medium_info))) { - LOG_WARN("failed to get specified scn info", K(ret), K(schedule_scn)); - } else if (OB_TMP_FAIL(ObTenantTabletScheduler::schedule_merge_dag( + if (OB_TMP_FAIL(ObTenantTabletScheduler::schedule_merge_dag( ls.get_ls_id(), - tablet.get_tablet_meta().tablet_id_, + tablet_id, MEDIUM_MERGE, schedule_scn, - medium_info->is_major_compaction()))) { + ObMediumCompactionInfo::is_major_compaction(compaction_type)))) { if (OB_SIZE_OVERFLOW != tmp_ret && OB_EAGAIN != tmp_ret) { ret = tmp_ret; LOG_WARN("failed to schedule medium merge dag", K(ret), K(ls_id), K(tablet_id)); @@ -870,8 +870,8 @@ int ObMediumCompactionScheduleFunc::check_need_merge_and_schedule( LOG_DEBUG("success to schedule medium merge dag", K(ret), K(schedule_scn)); } } else if (need_force_freeze) { - if (OB_FAIL(MTL(ObTenantFreezer *)->tablet_freeze(tablet_id, true/*force_freeze*/))) { - LOG_WARN("failed to force freeze tablet", K(ret), K(ls_id), K(tablet_id)); + if (OB_TMP_FAIL(MTL(ObTenantFreezer *)->tablet_freeze(tablet_id, true/*force_freeze*/, true/*is_sync*/))) { + LOG_WARN("failed to force freeze tablet", K(tmp_ret), K(ls_id), K(tablet_id)); } } return ret; @@ -910,5 +910,45 @@ int ObMediumCompactionScheduleFunc::get_latest_storage_schema_from_memtable( return ret; } +// make sure only data tablet could schedule this func +int ObMediumCompactionScheduleFunc::get_medium_info_list_from_memtable( + ObIAllocator &allocator, + const ObIArray &memtables, + ObMediumCompactionInfoList &medium_list) +{ + int ret = OB_SUCCESS; + if (OB_FAIL(medium_list.init(allocator))) { + LOG_WARN("failed to init merge list", K(ret)); + } + ObITable *table = nullptr; + memtable::ObMemtable *memtable = nullptr; + compaction::ObMediumCompactionInfo useless_medium_info; + memtable::ObMultiSourceData::ObIMultiSourceDataUnitList dst_list; + for (int64_t i = 0; OB_SUCC(ret) && i < memtables.count(); ++i) { + dst_list.reset(); + if (OB_ISNULL(table = memtables.at(i))) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("table in tables_handle is invalid", K(ret), KPC(table)); + } else if (OB_ISNULL(memtable = dynamic_cast(table))) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("table pointer does not point to a ObMemtable object", KPC(table)); + } else if (OB_FAIL(memtable->get_multi_source_data_unit_list(&useless_medium_info, dst_list, &allocator))) { + LOG_WARN("failed to get medium info from memtable", K(ret), KPC(table)); + } else { + ObMediumCompactionInfo *info_in_list = nullptr; + DLIST_FOREACH_X(info, dst_list, OB_SUCC(ret)) { + if (OB_UNLIKELY(memtable::MultiSourceDataUnitType::MEDIUM_COMPACTION_INFO != info->type())) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("return info is invalid", K(ret), KPC(info)); + } else if (FALSE_IT(info_in_list = static_cast(info))) { + } else if (OB_FAIL(medium_list.add_medium_compaction_info(*info_in_list))) { + LOG_WARN("failed to add medium compaction info", K(ret), KPC(info_in_list)); + } + } + } + } // end of for + return ret; +} + } //namespace compaction } // namespace oceanbase diff --git a/src/storage/compaction/ob_medium_compaction_func.h b/src/storage/compaction/ob_medium_compaction_func.h index 239760e3d0..697142e02c 100644 --- a/src/storage/compaction/ob_medium_compaction_func.h +++ b/src/storage/compaction/ob_medium_compaction_func.h @@ -35,11 +35,16 @@ public: static int schedule_tablet_medium_merge( ObLS &ls, ObTablet &tablet, - const int64_t major_frozen_scn = 0); + const int64_t major_frozen_scn = 0, + const bool schedule_with_memtable = false); static int get_latest_storage_schema_from_memtable( ObIAllocator &allocator, const ObIArray &memtables, ObStorageSchema &storage_schema); + static int get_medium_info_list_from_memtable( + ObIAllocator &allocator, + const ObIArray &memtables, + ObMediumCompactionInfoList &medium_list); static int get_palf_role(const share::ObLSID &ls_id, ObRole &role); int schedule_next_medium_for_leader(const int64_t major_snapshot); @@ -50,8 +55,6 @@ public: int check_medium_finish(); - int freeze_memtable_to_get_medium_info(); - TO_STRING_KV("ls_id", ls_.get_ls_id(), "tablet_id", tablet_.get_tablet_meta().tablet_id_); protected: int get_status_from_inner_table(share::ObTabletCompactionScnInfo &ret_info); @@ -98,12 +101,17 @@ protected: ObLS &ls, ObTablet &tablet, const int64_t schedule_scn, - bool &need_merge); + const ObMediumCompactionInfo::ObCompactionType compaction_type); int schedule_next_medium_primary_cluster(const int64_t major_snapshot); int get_table_schema_to_merge(const int64_t schema_version, ObMediumCompactionInfo &medium_info); int get_max_reserved_snapshot(int64_t &max_reserved_snapshot); + static int get_schedule_medium_from_memtable( + ObTablet &tablet, + const int64_t major_frozen_snapshot, + int64_t &schedule_medium_scn, + ObMediumCompactionInfo::ObCompactionType &compaction_type); static int get_table_id( ObMultiVersionSchemaService &schema_service, const ObTabletID &tablet_id, diff --git a/src/storage/compaction/ob_medium_compaction_info.h b/src/storage/compaction/ob_medium_compaction_info.h index 9a016c2d01..ffecffd761 100644 --- a/src/storage/compaction/ob_medium_compaction_info.h +++ b/src/storage/compaction/ob_medium_compaction_info.h @@ -93,6 +93,7 @@ public: int gene_parallel_info( ObIAllocator &allocator, common::ObArrayArray ¶l_range); + static inline bool is_valid_compaction_type(const ObCompactionType type) { return MEDIUM_COMPACTION <= type && type < COMPACTION_TYPE_MAX; } static inline bool is_medium_compaction(const ObCompactionType type) { return MEDIUM_COMPACTION == type; } static inline bool is_major_compaction(const ObCompactionType type) { return MAJOR_COMPACTION == type; } inline bool is_major_compaction() const { return is_major_compaction((ObCompactionType)compaction_type_); } diff --git a/src/storage/compaction/ob_medium_compaction_mgr.cpp b/src/storage/compaction/ob_medium_compaction_mgr.cpp index b119bd3d52..4d3c98babe 100644 --- a/src/storage/compaction/ob_medium_compaction_mgr.cpp +++ b/src/storage/compaction/ob_medium_compaction_mgr.cpp @@ -363,29 +363,30 @@ int ObMediumCompactionInfoList::init(common::ObIAllocator &allocator, const ObMediumCompactionInfoList *old_list, const ObMediumCompactionInfoList *dump_list, const int64_t finish_medium_scn/*= 0*/, - const bool update_in_major_type_merge/*= false*/) + const ObMergeType merge_type/*= MERGE_TYPE_MAX*/) { int ret = OB_SUCCESS; if (IS_INIT) { ret = OB_INIT_TWICE; LOG_WARN("init twice", K(ret)); } else if (FALSE_IT(allocator_ = &allocator)) { - } else if (nullptr != old_list && OB_FAIL(append_list_with_deep_copy(finish_medium_scn, update_in_major_type_merge, *old_list))) { + } else if (nullptr != old_list && OB_FAIL(append_list_with_deep_copy(finish_medium_scn, *old_list))) { LOG_WARN("failed to deep copy list", K(ret), K(old_list)); - } else if (nullptr != dump_list && OB_FAIL(append_list_with_deep_copy(finish_medium_scn, update_in_major_type_merge, *dump_list))) { + } else if (nullptr != dump_list && OB_FAIL(append_list_with_deep_copy(finish_medium_scn, *dump_list))) { LOG_WARN("failed to deep copy list", K(ret), K(dump_list)); - } else { - // if update_in_major_type_merge = true, will update wait_check_medium_scn in delete_medium_compaction_info - if (!update_in_major_type_merge && nullptr != old_list) { - last_compaction_type_ = old_list->last_compaction_type_; - wait_check_medium_scn_ = old_list->get_wait_check_medium_scn(); - } + } else if (is_major_merge_type(merge_type)) { // update list after major_type_merge + last_compaction_type_ = is_major_merge(merge_type) ? ObMediumCompactionInfo::MAJOR_COMPACTION : ObMediumCompactionInfo::MEDIUM_COMPACTION; + wait_check_medium_scn_ = finish_medium_scn; + } else if (OB_NOT_NULL(old_list)) { // update list with old_list + last_compaction_type_ = old_list->last_compaction_type_; + wait_check_medium_scn_ = old_list->get_wait_check_medium_scn(); } if (OB_SUCC(ret)) { compat_ = MEDIUM_LIST_VERSION; is_inited_ = true; if (medium_info_list_.get_size() > 0 || wait_check_medium_scn_ > 0) { - LOG_INFO("success to init list", K(ret), KPC(this), KPC(old_list)); + LOG_INFO("success to init list", K(ret), KPC(this), KPC(old_list), K(finish_medium_scn), + "merge_type", merge_type_to_str(merge_type)); } } else if (OB_UNLIKELY(!is_inited_)) { reset(); @@ -406,7 +407,7 @@ int ObMediumCompactionInfoList::init_after_check_finish( ret = OB_INVALID_ARGUMENT; LOG_WARN("invalid argument", K(ret), K(old_list)); } else if (FALSE_IT(allocator_ = &allocator)) { - } else if (OB_FAIL(append_list_with_deep_copy(wait_check_medium_scn_, false, old_list))) { + } else if (OB_FAIL(append_list_with_deep_copy(wait_check_medium_scn_, old_list))) { LOG_WARN("failed to deep copy list", K(ret), K(wait_check_medium_scn_)); } else { last_compaction_type_ = old_list.last_compaction_type_; @@ -421,7 +422,6 @@ int ObMediumCompactionInfoList::init_after_check_finish( return ret; } - void ObMediumCompactionInfoList::reset_list() { DLIST_REMOVE_ALL_NORET(info, medium_info_list_) { @@ -487,18 +487,22 @@ int ObMediumCompactionInfoList::get_specified_scn_info( return ret; } -int64_t ObMediumCompactionInfoList::get_schedule_scn(const int64_t major_compaction_scn) const +void ObMediumCompactionInfoList::get_schedule_scn( + const int64_t major_compaction_scn, + int64_t &schedule_scn, + ObMediumCompactionInfo::ObCompactionType &compaction_type) const { - int64_t ret_scn = 0; + schedule_scn = 0; + compaction_type = ObMediumCompactionInfo::COMPACTION_TYPE_MAX; if (size() > 0) { const ObMediumCompactionInfo *first_medium_info = get_first_medium_info(); if (first_medium_info->is_medium_compaction() || (first_medium_info->is_major_compaction() && major_compaction_scn >= first_medium_info->medium_snapshot_)) { // for standby cluster, receive several medium info, only schedule what scheduler have received - ret_scn = first_medium_info->medium_snapshot_; + schedule_scn = first_medium_info->medium_snapshot_; + compaction_type = (ObMediumCompactionInfo::ObCompactionType)first_medium_info->compaction_type_; } } - return ret_scn; } int ObMediumCompactionInfoList::inner_deep_copy_node( diff --git a/src/storage/compaction/ob_medium_compaction_mgr.h b/src/storage/compaction/ob_medium_compaction_mgr.h index 483c6d262e..f49b35093c 100644 --- a/src/storage/compaction/ob_medium_compaction_mgr.h +++ b/src/storage/compaction/ob_medium_compaction_mgr.h @@ -95,7 +95,7 @@ public: const ObMediumCompactionInfoList *old_list, const ObMediumCompactionInfoList *dump_list = nullptr, const int64_t finish_medium_scn = 0, - const bool update_in_major_type_merge = false); + const ObMergeType merge_type = MERGE_TYPE_MAX); int init_after_check_finish( ObIAllocator &allocator, @@ -124,7 +124,10 @@ public: { return (ObMediumCompactionInfo::ObCompactionType)last_compaction_type_; } - int64_t get_schedule_scn(const int64_t major_compaction_scn) const; + void get_schedule_scn( + const int64_t major_compaction_scn, + int64_t &schedule_scn, + ObMediumCompactionInfo::ObCompactionType &compaction_type) const; int get_specified_scn_info( const int64_t snapshot, @@ -165,17 +168,11 @@ private: } OB_INLINE int append_list_with_deep_copy( const int64_t finish_scn, - const bool update_in_major_type_merge, const ObMediumCompactionInfoList &input_list) { int ret = OB_SUCCESS; DLIST_FOREACH_X(input_info, input_list.medium_info_list_, OB_SUCC(ret)) { const ObMediumCompactionInfo *medium_info = static_cast(input_info); - if (update_in_major_type_merge - && medium_info->medium_snapshot_ == finish_scn) { - last_compaction_type_ = medium_info->compaction_type_; - wait_check_medium_scn_ = finish_scn; - } if (medium_info->medium_snapshot_ > finish_scn) { ret = inner_deep_copy_node(*medium_info); } diff --git a/src/storage/compaction/ob_tablet_merge_ctx.cpp b/src/storage/compaction/ob_tablet_merge_ctx.cpp index d792b3700c..e1f1d7eacc 100644 --- a/src/storage/compaction/ob_tablet_merge_ctx.cpp +++ b/src/storage/compaction/ob_tablet_merge_ctx.cpp @@ -646,7 +646,6 @@ int ObTabletMergeCtx::get_merge_range(int64_t parallel_idx, ObDatumRange &merge_ int ObTabletMergeCtx::inner_init_for_medium() { int ret = OB_SUCCESS; - const ObMediumCompactionInfo *medium_info = nullptr; ObGetMergeTablesParam get_merge_table_param; ObGetMergeTablesResult get_merge_table_result; get_merge_table_param.merge_type_ = param_.merge_type_; @@ -667,11 +666,8 @@ int ObTabletMergeCtx::inner_init_for_medium() ret = OB_EAGAIN; LOG_INFO("tx table is not ready. waiting for max_decided_log_ts ...", KR(ret), "merge_scn", get_merge_table_result.scn_range_.end_scn_); - } else if (OB_FAIL(init_get_medium_compaction_info(param_.merge_version_, medium_info))) { // have checked medium info inside + } else if (OB_FAIL(init_get_medium_compaction_info(param_.merge_version_, get_merge_table_result))) { // have checked medium info inside LOG_WARN("failed to get medium compaction info", K(ret), KPC(this)); - } else if (FALSE_IT(get_merge_table_result.schema_version_ = medium_info->storage_schema_.schema_version_)) { - } else if (FALSE_IT(data_version_ = medium_info->data_version_)) { - } else if (FALSE_IT(is_tenant_major_merge_ = medium_info->is_major_compaction())) { } else if (OB_FAIL(get_basic_info_from_result(get_merge_table_result))) { LOG_WARN("failed to set basic info to ctx", K(ret), K(get_merge_table_result), KPC(this)); } else if (OB_FAIL(cal_major_merge_param(get_merge_table_result))) { @@ -682,23 +678,27 @@ int ObTabletMergeCtx::inner_init_for_medium() int ObTabletMergeCtx::init_get_medium_compaction_info( const int64_t medium_snapshot, - const ObMediumCompactionInfo *&medium_info) + ObGetMergeTablesResult &get_merge_table_result) { int ret = OB_SUCCESS; - medium_info = nullptr; - const ObMediumCompactionInfoList &medium_list = tablet_handle_.get_obj()->get_medium_compaction_info_list(); - + ObTablet *tablet = tablet_handle_.get_obj(); + const ObMediumCompactionInfoList &medium_list = tablet->get_medium_compaction_info_list(); + const bool medium_in_storage = medium_snapshot <= tablet->get_tablet_meta().max_serialized_medium_scn_; + ObMediumCompactionInfo medium_info; + const ObMediumCompactionInfo *medium_info_ptr = &medium_info; if (OB_UNLIKELY(!medium_list.is_valid())) { ret = OB_ERR_UNEXPECTED; LOG_ERROR("medium compaction mgr is invalid", K(ret), KPC(this), K(medium_list)); - } else if (OB_FAIL(medium_list.get_specified_scn_info(medium_snapshot, medium_info))) { + } else if (medium_in_storage && OB_FAIL(medium_list.get_specified_scn_info(medium_snapshot, medium_info_ptr))) { LOG_WARN("failed to get medium info from mgr", K(ret), K(medium_snapshot), K(medium_list)); - } else if (OB_UNLIKELY(nullptr == medium_info || !medium_info->is_valid())) { + } else if (!medium_in_storage && OB_FAIL(get_specified_medium_compaction_info_from_memtable(allocator_, medium_snapshot, medium_info))) { + LOG_WARN("failed to get medium info from memtable", K(ret), K(medium_snapshot)); + } else if (OB_UNLIKELY(nullptr == medium_info_ptr || !medium_info_ptr->is_valid())) { ret = OB_ERR_UNEXPECTED; - LOG_ERROR("medium compaction info is invalid", K(ret), KPC(this), K(medium_list), KPC(medium_info)); - } else if (medium_info->contain_parallel_range_ - && OB_FAIL(parallel_merge_ctx_.init(*medium_info))) { - LOG_WARN("failed to init parallel merge ctx", K(ret), KPC(medium_info)); + LOG_ERROR("medium compaction info is invalid", K(ret), KPC(this), K(medium_list), KPC(medium_info_ptr)); + } else if (medium_info_ptr->contain_parallel_range_ + && OB_FAIL(parallel_merge_ctx_.init(*medium_info_ptr))) { + LOG_WARN("failed to init parallel merge ctx", K(ret), KPC(medium_info_ptr)); } else { void *buf = nullptr; if (OB_ISNULL(buf = allocator_.alloc(sizeof(ObStorageSchema)))) { @@ -709,8 +709,8 @@ int ObTabletMergeCtx::init_get_medium_compaction_info( storage_schema = new(buf) ObStorageSchema(); schema_ctx_.storage_schema_ = storage_schema; schema_ctx_.allocated_storage_schema_ = true; - if (OB_FAIL(storage_schema->init(allocator_, medium_info->storage_schema_))) { - LOG_WARN("failed to init storage schema from current medium info", K(ret), KPC(medium_info)); + if (OB_FAIL(storage_schema->init(allocator_, medium_info_ptr->storage_schema_))) { + LOG_WARN("failed to init storage schema from current medium info", K(ret), KPC(medium_info_ptr)); } } @@ -720,7 +720,11 @@ int ObTabletMergeCtx::init_get_medium_compaction_info( LOG_ERROR("multi version data is discarded, should not compaction now", K(ret), K(param_), K(medium_snapshot)); } } - + if (OB_SUCC(ret)) { + get_merge_table_result.schema_version_ = medium_info_ptr->storage_schema_.schema_version_; + data_version_ = medium_info_ptr->data_version_; + is_tenant_major_merge_ = medium_info_ptr->is_major_compaction(); + } return ret; } @@ -1007,39 +1011,13 @@ int ObTabletMergeCtx::get_medium_compaction_info_to_store() { int ret = OB_SUCCESS; if (is_mini_merge(param_.merge_type_)) { - if (OB_FAIL(merge_list_.init(allocator_))) { - LOG_WARN("failed to init merge list", K(ret)); - } - ObITable *table = nullptr; - memtable::ObMemtable * memtable = nullptr; - compaction::ObMediumCompactionInfo medium_info; - memtable::ObMultiSourceData::ObIMultiSourceDataUnitList dst_list; - for (int i = 0; OB_SUCC(ret) && i < tables_handle_.get_count(); ++i) { - dst_list.reset(); - if (OB_UNLIKELY(nullptr == (table = tables_handle_.get_table(i)) || !table->is_frozen_memtable())) { - ret = OB_ERR_UNEXPECTED; - LOG_WARN("table in tables_handle is invalid", K(ret), KPC(table)); - } else if (OB_ISNULL(memtable = dynamic_cast(table))) { - ret = OB_ERR_UNEXPECTED; - LOG_WARN("table pointer does not point to a ObMemtable object", KPC(table)); - } else if (OB_FAIL(memtable->get_multi_source_data_unit_list(&medium_info, dst_list, &allocator_))) { - LOG_WARN("failed to get medium info from memtable", K(ret), KPC(table)); - } else if (dst_list.is_empty()) { - // do nothing - } else { - ObMediumCompactionInfo *input_info = nullptr; - DLIST_FOREACH_X(info, dst_list, OB_SUCC(ret)) { - if (OB_UNLIKELY(memtable::MultiSourceDataUnitType::MEDIUM_COMPACTION_INFO != info->type())) { - ret = OB_ERR_UNEXPECTED; - LOG_WARN("return info is invalid", K(ret), KPC(info)); - } else if (FALSE_IT(input_info = static_cast(info))) { - } else if (OB_FAIL(merge_list_.add_medium_compaction_info(*input_info))) { - LOG_WARN("failed to add medium compaction info", K(ret), KPC(input_info)); - } - } - } - } // end of for - if (OB_SUCC(ret)) { + ObSEArray memtables; + if (OB_FAIL(tables_handle_.get_tables(memtables))) { + LOG_WARN("failed to get tables", K(ret), K(memtables)); + } else if (OB_FAIL(ObMediumCompactionScheduleFunc::get_medium_info_list_from_memtable( + allocator_, memtables, merge_list_))) { + LOG_WARN("failed to get medium info list from memtable", K(ret)); + } else if (merge_list_.size() > 0) { LOG_INFO("success get medium info list", "ls_id", param_.ls_id_, "tablet_id", param_.tablet_id_, K(merge_list_)); } @@ -1047,6 +1025,51 @@ int ObTabletMergeCtx::get_medium_compaction_info_to_store() return ret; } +int ObTabletMergeCtx::get_specified_medium_compaction_info_from_memtable( + ObIAllocator &allocator, + const int64_t medium_snapshot, + ObMediumCompactionInfo &medium_info) +{ + int ret = OB_SUCCESS; + ObArenaAllocator tmp_allocator; + ObSEArray memtables; + ObMediumCompactionInfoList tmp_medium_list; + if (OB_FAIL(tablet_handle_.get_obj()->get_table_store().get_memtables(memtables, true/*need_active*/))) { + LOG_WARN("failed to get memtables", K(ret), K(param_)); + } else if (memtables.empty()) { + ret = OB_ENTRY_NOT_EXIST; + LOG_WARN("no memtable", K(ret), K(memtables), K(param_)); + } else if (OB_FAIL(ObMediumCompactionScheduleFunc::get_medium_info_list_from_memtable( + tmp_allocator, memtables, tmp_medium_list))) { + LOG_WARN("failed to get medium info list from memtable", K(ret)); + } else if (tmp_medium_list.is_empty()) { + ret = OB_ENTRY_NOT_EXIST; + LOG_WARN("have memtables, but medium list is empty", K(ret), K(memtables), K(tmp_medium_list)); + } else { + const ObMediumCompactionInfo *info_in_list = nullptr; + DLIST_FOREACH_X(info, tmp_medium_list.get_list(), OB_SUCC(ret)) { + if (OB_UNLIKELY(memtable::MultiSourceDataUnitType::MEDIUM_COMPACTION_INFO != info->type())) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("return info is invalid", K(ret), KPC(info)); + } else if (FALSE_IT(info_in_list = static_cast(info))) { + } else if (medium_snapshot < info_in_list->medium_snapshot_) { + ret = OB_ENTRY_NOT_EXIST; + LOG_WARN("not found specified medium info in medium list", K(ret), K(param_), K(memtables), K(tmp_medium_list)); + } else if (medium_snapshot == info_in_list->medium_snapshot_) { + if (OB_FAIL(medium_info.init(allocator, *info_in_list))) { + LOG_WARN("failed to init medium info", K(ret)); + } + break; + } + } + if (OB_SUCC(ret) && medium_info.is_valid()) { + LOG_INFO("success get medium info", "ls_id", param_.ls_id_, + "tablet_id", param_.tablet_id_, K(medium_info)); + } + } + return ret; +} + int ObTabletMergeCtx::get_storage_schema_to_merge( const ObTablesHandleArray &merge_tables_handle, const bool get_schema_on_memtable) diff --git a/src/storage/compaction/ob_tablet_merge_ctx.h b/src/storage/compaction/ob_tablet_merge_ctx.h index c633c91b8a..f9fa191eea 100644 --- a/src/storage/compaction/ob_tablet_merge_ctx.h +++ b/src/storage/compaction/ob_tablet_merge_ctx.h @@ -167,7 +167,11 @@ struct ObTabletMergeCtx int inner_init_for_mini(bool &skip_rest_operation); int inner_init_for_medium(); - int init_get_medium_compaction_info(const int64_t medium_snapshot, const ObMediumCompactionInfo *&medium_info); + int init_get_medium_compaction_info(const int64_t medium_snapshot, ObGetMergeTablesResult &result); + int get_specified_medium_compaction_info_from_memtable( + ObIAllocator &allocator, + const int64_t medium_snapshot, + ObMediumCompactionInfo &info); int get_schema_and_gene_from_result(const ObGetMergeTablesResult &get_merge_table_result); int get_storage_schema_and_gene_from_result(const ObGetMergeTablesResult &get_merge_table_result); int get_storage_schema_to_merge(const ObTablesHandleArray &merge_tables_handle, const bool get_schema_on_memtable = true); diff --git a/src/storage/compaction/ob_tablet_merge_task.cpp b/src/storage/compaction/ob_tablet_merge_task.cpp index 1765cf2a70..5db390a5af 100644 --- a/src/storage/compaction/ob_tablet_merge_task.cpp +++ b/src/storage/compaction/ob_tablet_merge_task.cpp @@ -1098,18 +1098,12 @@ int ObTabletMergeFinishTask::get_merged_sstable(ObTabletMergeCtx &ctx, ObSSTable int ObTabletMergeFinishTask::add_sstable_for_merge(ObTabletMergeCtx &ctx) { int ret = OB_SUCCESS; - const ObStorageSchema *update_storage_schema = ctx.schema_ctx_.storage_schema_; ObTablet *old_tablet = ctx.tablet_handle_.get_obj(); const ObMergeType merge_type = ctx.param_.merge_type_; if (OB_UNLIKELY(!ctx.is_valid())) { ret = OB_ERR_UNEXPECTED; LOG_WARN("unexpected error of merge ctx", K(ctx)); - } else if (is_major_merge_type(merge_type) - && update_storage_schema->schema_version_ > old_tablet->get_storage_schema().schema_version_) { - ret = OB_ERR_UNEXPECTED; - LOG_WARN("schema in major can't have larger schema version than tablet", K(ret), - KPC(update_storage_schema), K(old_tablet->get_storage_schema())); } else if (is_mini_merge(merge_type) && !ctx.param_.tablet_id_.is_special_merge_tablet()) { // if only one medium compaction info need store, just use ObUpdateTableStoreParam // OR need to read from inner table to decide what need to keep after release memtable @@ -1130,7 +1124,8 @@ int ObTabletMergeFinishTask::add_sstable_for_merge(ObTabletMergeCtx &ctx) clog_checkpoint_scn, is_minor_merge(ctx.param_.merge_type_)/*need_check_sstable*/, false/*allow_duplicate_sstable*/, - &ctx.merge_list_); + &ctx.merge_list_, + ctx.param_.get_merge_type()); ObTablet *old_tablet = ctx.tablet_handle_.get_obj(); ObTabletHandle new_tablet_handle; if (ctx.param_.tablet_id_.is_special_merge_tablet()) { diff --git a/src/storage/compaction/ob_tenant_tablet_scheduler.cpp b/src/storage/compaction/ob_tenant_tablet_scheduler.cpp index c655229977..1db71d8164 100755 --- a/src/storage/compaction/ob_tenant_tablet_scheduler.cpp +++ b/src/storage/compaction/ob_tenant_tablet_scheduler.cpp @@ -982,19 +982,12 @@ int ObTenantTabletScheduler::schedule_ls_medium_merge( if (could_major_merge && OB_TMP_FAIL(ObMediumCompactionScheduleFunc::schedule_tablet_medium_merge( ls, *tablet, - major_frozen_scn))) { + major_frozen_scn, + true/*schedule_with_memtable*/))) { if (OB_EAGAIN != ret) { LOG_WARN("failed to schedule medium", K(tmp_ret), K(ls_id), K(tablet_id)); } } - - // get info from memtable to check have received new medium info - if (OB_TMP_FAIL(func.freeze_memtable_to_get_medium_info())) { - if (OB_TABLE_NOT_EXIST != tmp_ret) { - LOG_WARN("failed to freeze memtable", K(tmp_ret), K(ls_id), K(tablet_id)); - } - } - ls_merge_finish &= tablet_merge_finish; } } // end of while diff --git a/src/storage/ob_storage_struct.cpp b/src/storage/ob_storage_struct.cpp index add3ab8620..d2616e8b89 100644 --- a/src/storage/ob_storage_struct.cpp +++ b/src/storage/ob_storage_struct.cpp @@ -233,7 +233,8 @@ ObUpdateTableStoreParam::ObUpdateTableStoreParam( tx_data_(), binding_info_(), auto_inc_seq_(), - medium_info_list_(nullptr) + medium_info_list_(nullptr), + merge_type_(MERGE_TYPE_MAX) { clog_checkpoint_scn_.set_min(); } @@ -248,7 +249,8 @@ ObUpdateTableStoreParam::ObUpdateTableStoreParam( const SCN clog_checkpoint_scn, const bool need_check_sstable, const bool allow_duplicate_sstable, - const compaction::ObMediumCompactionInfoList *medium_info_list) + const compaction::ObMediumCompactionInfoList *medium_info_list, + const ObMergeType merge_type) : table_handle_(table_handle), snapshot_version_(snapshot_version), clog_checkpoint_scn_(), @@ -268,7 +270,8 @@ ObUpdateTableStoreParam::ObUpdateTableStoreParam( tx_data_(), binding_info_(), auto_inc_seq_(), - medium_info_list_(medium_info_list) + medium_info_list_(medium_info_list), + merge_type_(merge_type) { clog_checkpoint_scn_ = clog_checkpoint_scn; } @@ -301,7 +304,8 @@ ObUpdateTableStoreParam::ObUpdateTableStoreParam( tx_data_(), binding_info_(), auto_inc_seq_(), - medium_info_list_(nullptr) + medium_info_list_(nullptr), + merge_type_(MERGE_TYPE_MAX) { clog_checkpoint_scn_.set_min(); } diff --git a/src/storage/ob_storage_struct.h b/src/storage/ob_storage_struct.h index 3653f5d7f0..b5666510e4 100644 --- a/src/storage/ob_storage_struct.h +++ b/src/storage/ob_storage_struct.h @@ -310,7 +310,8 @@ struct ObUpdateTableStoreParam const share::SCN clog_checkpoint_scn = share::SCN::min_scn(), const bool need_check_sstable = false, const bool allow_duplicate_sstable = false, - const compaction::ObMediumCompactionInfoList *medium_info_list = nullptr); + const compaction::ObMediumCompactionInfoList *medium_info_list = nullptr, + const ObMergeType merge_type = MERGE_TYPE_MAX); ObUpdateTableStoreParam( // for ddl merge task only const ObTableHandleV2 &table_handle, @@ -327,7 +328,7 @@ struct ObUpdateTableStoreParam K_(keep_old_ddl_sstable), K_(need_report), KPC_(storage_schema), K_(rebuild_seq), K_(update_with_major_flag), K_(need_check_sstable), K_(ddl_checkpoint_scn), K_(ddl_start_scn), K_(ddl_snapshot_version), K_(ddl_execution_id), K_(ddl_cluster_version), K_(allow_duplicate_sstable), K_(tx_data), K_(binding_info), K_(auto_inc_seq), - KPC_(medium_info_list)); + KPC_(medium_info_list), "merge_type", merge_type_to_str(merge_type_)); ObTableHandleV2 table_handle_; int64_t snapshot_version_; @@ -352,6 +353,7 @@ struct ObUpdateTableStoreParam share::ObTabletAutoincSeq auto_inc_seq_; const compaction::ObMediumCompactionInfoList *medium_info_list_; + ObMergeType merge_type_; // set merge_type only when update tablet in compaction }; struct ObBatchUpdateTableStoreParam final diff --git a/src/storage/tablet/ob_tablet.cpp b/src/storage/tablet/ob_tablet.cpp index f8c8d8cb73..a7f8b0ec1a 100644 --- a/src/storage/tablet/ob_tablet.cpp +++ b/src/storage/tablet/ob_tablet.cpp @@ -243,7 +243,7 @@ int ObTablet::init( param.medium_info_list_, // delete all medium before latest finish major snapshot nullptr != last_major ? last_major->get_snapshot_version() : 0, - update_in_major_type_merge))) { + param.merge_type_))) { LOG_WARN("failed to init medium info list", K(ret)); } else if (OB_FAIL(build_read_info(*allocator_))) { LOG_WARN("failed to build read info", K(ret));