diff --git a/src/share/scheduler/ob_tenant_dag_scheduler.h b/src/share/scheduler/ob_tenant_dag_scheduler.h index 7da08947b..e25fc3f94 100644 --- a/src/share/scheduler/ob_tenant_dag_scheduler.h +++ b/src/share/scheduler/ob_tenant_dag_scheduler.h @@ -1386,7 +1386,9 @@ inline bool is_compaction_dag(ObDagType::ObDagTypeEnum dag_type) ObDagType::DAG_TYPE_CO_MERGE_FINISH == dag_type || ObDagType::DAG_TYPE_MAJOR_MERGE == dag_type || ObDagType::DAG_TYPE_MINI_MERGE == dag_type || - ObDagType::DAG_TYPE_MERGE_EXECUTE == dag_type; + ObDagType::DAG_TYPE_MERGE_EXECUTE == dag_type || + ObDagType::DAG_TYPE_TX_TABLE_MERGE == dag_type || + ObDagType::DAG_TYPE_MDS_TABLE_MERGE == dag_type; } inline int dag_yield() diff --git a/src/storage/compaction/ob_medium_compaction_func.cpp b/src/storage/compaction/ob_medium_compaction_func.cpp index 711757c87..ff9463222 100644 --- a/src/storage/compaction/ob_medium_compaction_func.cpp +++ b/src/storage/compaction/ob_medium_compaction_func.cpp @@ -1220,40 +1220,33 @@ int ObMediumCompactionScheduleFunc::schedule_tablet_medium_merge( return ret; } #endif - const int64_t last_major_snapshot = tablet.get_last_major_snapshot_version(); + const int64_t last_major_snapshot = tablet.get_last_major_snapshot_version(); // current compaction scn if (MTL(ObTenantTabletScheduler *)->could_major_merge_start() && last_major_snapshot > 0) { - const ObTabletID &tablet_id = tablet.get_tablet_meta().tablet_id_; - const ObLSID &ls_id = ls.get_ls_id(); - ObArenaAllocator temp_allocator("GetMediumInfo", OB_MALLOC_NORMAL_BLOCK_SIZE, MTL_ID()); // for load medium info - const ObMediumCompactionInfoList *medium_list = nullptr; - bool schedule_flag = false; - const int64_t inner_table_merged_version = MTL(ObTenantTabletScheduler *)->get_inner_table_merged_scn(); - if (OB_FAIL(tablet.read_medium_info_list(temp_allocator, medium_list))) { - LOG_WARN("failed to load medium info list", K(ret), K(tablet)); - } else if (ObMediumCompactionInfo::MAJOR_COMPACTION == medium_list->get_last_compaction_type() - && inner_table_merged_version < medium_list->get_last_compaction_scn() - && !MTL_TENANT_ROLE_CACHE_IS_PRIMARY_OR_INVALID()) { // for STANDBY/RESTORE TENANT - ObTabletCompactionScnInfo ret_info; - // for standby/restore tenant, need select inner_table to check RS status before schedule new round - if (!scheduler_called) { // should not visit inner table, wait for scheduler loop - } else if (OB_FAIL(get_status_from_inner_table(ls_id, tablet_id, ret_info))) { - LOG_WARN("failed to get status from inner tablet", K(ret), K(ls_id), K(tablet_id)); - } else if (ret_info.could_schedule_next_round(medium_list->get_last_compaction_scn())) { - LOG_INFO("success to check RS major checksum validation finished", K(ret), K(ls_id), K(tablet_id)); + bool is_standy_tenant = !MTL_TENANT_ROLE_CACHE_IS_PRIMARY_OR_INVALID(); + if (is_standy_tenant && !scheduler_called) { + // standy tenant should not visit inner table if it is not scheduler_called, wait for scheduler loop + } else { + const ObTabletID &tablet_id = tablet.get_tablet_meta().tablet_id_; + const ObLSID &ls_id = ls.get_ls_id(); + ObArenaAllocator temp_allocator("GetMediumInfo", OB_MALLOC_NORMAL_BLOCK_SIZE, MTL_ID()); // for load medium info + const ObMediumCompactionInfoList *medium_list = nullptr; + bool schedule_flag = false; + const int64_t major_frozen_snapshot = 0 == input_major_snapshot ? MTL(ObTenantTabletScheduler *)->get_frozen_version() : input_major_snapshot; // broadcast scn + ObMediumCompactionInfo::ObCompactionType compaction_type = ObMediumCompactionInfo::COMPACTION_TYPE_MAX; + int64_t schedule_scn = 0; // medium_snapshot in medium info + if (OB_FAIL(tablet.read_medium_info_list(temp_allocator, medium_list))) { + LOG_WARN("failed to load medium info list", K(ret), K(tablet)); + } else if (OB_FAIL(read_medium_info_from_list(*medium_list, last_major_snapshot, major_frozen_snapshot, compaction_type, schedule_scn))) { + LOG_WARN("failed to read medium info from list", K(ret), K(ls_id), K(tablet_id), KPC(medium_list), K(last_major_snapshot)); + } else if (is_standy_tenant) { // for STANDBY/RESTORE TENANT + if (OB_FAIL(decide_standy_tenant_schedule(ls_id, tablet_id, compaction_type, schedule_scn, major_frozen_snapshot, *medium_list, schedule_flag))) { + LOG_WARN("failed to decide whehter to schedule standy schedule", K(ret), K(ls_id), K(tablet_id), K(compaction_type), K(schedule_scn), K(major_frozen_snapshot), K(medium_list)); + } + } else { schedule_flag = true; } - } else { - schedule_flag = true; - } - if (OB_FAIL(ret) || !schedule_flag) { - } else { - const int64_t major_frozen_snapshot = 0 == input_major_snapshot ? MTL(ObTenantTabletScheduler *)->get_frozen_version() : input_major_snapshot; - int64_t schedule_scn = 0; - if (OB_FAIL(read_medium_info_from_list(*medium_list, last_major_snapshot, - major_frozen_snapshot, schedule_scn))) { - LOG_WARN("failed to read medium info from list", K(ret), K(ls_id), K(tablet_id), KPC(medium_list), K(last_major_snapshot)); - } else if (schedule_scn > 0 - && OB_FAIL(check_need_merge_and_schedule(ls, tablet, schedule_scn, tablet_need_freeze_flag, create_dag_flag))) { + if (OB_FAIL(ret) || !schedule_flag) { + } else if (schedule_scn > 0 && OB_FAIL(check_need_merge_and_schedule(ls, tablet, schedule_scn, tablet_need_freeze_flag, create_dag_flag))) { LOG_WARN("failed to check medium merge", K(ret), K(ls_id), K(tablet_id), K(schedule_scn)); } } @@ -1262,10 +1255,47 @@ int ObMediumCompactionScheduleFunc::schedule_tablet_medium_merge( return ret; } +int ObMediumCompactionScheduleFunc::decide_standy_tenant_schedule( + const ObLSID &ls_id, + const ObTabletID &tablet_id, + const ObMediumCompactionInfo::ObCompactionType &compaction_type, + const int64_t schedule_scn, + const int64_t major_frozen_snapshot, + const ObMediumCompactionInfoList &medium_list, + bool &schedule_flag) +{ + int ret = OB_SUCCESS; + schedule_flag = false; + if (ObMediumCompactionInfo::MAJOR_COMPACTION == compaction_type) { + if (schedule_scn > major_frozen_snapshot) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("schedule_scn of current major is bigger than broadcast scn, wait for next round loop", K(ret), K(schedule_scn), K(major_frozen_snapshot)); + } else { + schedule_flag = true; + } + } else if (ObMediumCompactionInfo::MEDIUM_COMPACTION == compaction_type) { + if (schedule_scn > major_frozen_snapshot && ObMediumCompactionInfo::MAJOR_COMPACTION == medium_list.get_last_compaction_type()) { + ObTabletCompactionScnInfo ret_info; + if (OB_FAIL(get_status_from_inner_table(ls_id, tablet_id, ret_info))) { + LOG_WARN("failed to get status from inner tablet", K(ret), K(ls_id), K(tablet_id)); + } else if (ret_info.could_schedule_next_round(medium_list.get_last_compaction_scn())) { + LOG_INFO("success to check RS major checksum validation finished", K(ret), K(ls_id), K(tablet_id)); + schedule_flag = true; + } + } else { + schedule_flag = true; + } + } else { + // does not read valid medium info, wait for next round scheduler loop + } + return ret; +} + int ObMediumCompactionScheduleFunc::read_medium_info_from_list( const ObMediumCompactionInfoList &medium_list, const int64_t last_major_snapshot, const int64_t major_frozen_snapshot, + ObMediumCompactionInfo::ObCompactionType &compaction_type, int64_t &schedule_scn) { int ret = OB_SUCCESS; @@ -1276,6 +1306,7 @@ int ObMediumCompactionScheduleFunc::read_medium_info_from_list( if (info->is_medium_compaction() || info->medium_snapshot_ <= major_frozen_snapshot) { schedule_scn = info->medium_snapshot_; + compaction_type = (ObMediumCompactionInfo::ObCompactionType)info->compaction_type_; } break; // found one unfinish medium info, loop end } diff --git a/src/storage/compaction/ob_medium_compaction_func.h b/src/storage/compaction/ob_medium_compaction_func.h index 8cce5847e..483cd844c 100644 --- a/src/storage/compaction/ob_medium_compaction_func.h +++ b/src/storage/compaction/ob_medium_compaction_func.h @@ -51,10 +51,23 @@ public: bool &create_dag_flag, const int64_t major_frozen_scn = 0, const bool scheduler_called = false); + /* + * see + * standby tenant should catch up broadcast scn when freeze info is recycled + */ + static int decide_standy_tenant_schedule( + const ObLSID &ls_id, + const ObTabletID &tablet_id, + const ObMediumCompactionInfo::ObCompactionType &compaction_type, + const int64_t schedule_scn, + const int64_t major_frozen_snapshot, + const ObMediumCompactionInfoList &medium_list, + bool &schedule_flag); static int read_medium_info_from_list( const ObMediumCompactionInfoList &medium_list, const int64_t major_frozen_snapshot, const int64_t last_major_snapshot, + ObMediumCompactionInfo::ObCompactionType &compaction_type, int64_t &schedule_scn); static int is_election_leader(const share::ObLSID &ls_id, bool &ls_election_leader); static int get_max_sync_medium_scn( diff --git a/src/storage/compaction/ob_tablet_merge_task.cpp b/src/storage/compaction/ob_tablet_merge_task.cpp index 14f67d5a9..1bf6b4955 100644 --- a/src/storage/compaction/ob_tablet_merge_task.cpp +++ b/src/storage/compaction/ob_tablet_merge_task.cpp @@ -45,16 +45,6 @@ using namespace blocksstable; namespace compaction { -bool is_merge_dag(ObDagType::ObDagTypeEnum dag_type) -{ - return dag_type == ObDagType::DAG_TYPE_MAJOR_MERGE - || dag_type == ObDagType::DAG_TYPE_MERGE_EXECUTE - || dag_type == ObDagType::DAG_TYPE_MINI_MERGE - || dag_type == ObDagType::DAG_TYPE_TX_TABLE_MERGE - || dag_type == ObDagType::DAG_TYPE_MDS_TABLE_MERGE; -} - - /* * ----------------------------------------------ObCompactionTimeGuard-------------------------------------------------- */ @@ -809,7 +799,7 @@ int ObTabletMergePrepareTask::init() } else if (OB_ISNULL(dag_)) { ret = OB_ERR_SYS; LOG_WARN("dag must not null", K(ret)); - } else if (OB_UNLIKELY(!is_merge_dag(dag_->get_type()))) { + } else if (OB_UNLIKELY(!is_compaction_dag(dag_->get_type()))) { ret = OB_ERR_SYS; LOG_ERROR("dag type not match", K(ret), KPC(dag_)); } else { @@ -964,7 +954,7 @@ int ObTabletMergeFinishTask::init() } else if (OB_ISNULL(dag_)) { ret = OB_ERR_SYS; LOG_WARN("dag must not null", K(ret)); - } else if (!is_merge_dag(dag_->get_type())) { + } else if (!is_compaction_dag(dag_->get_type())) { ret = OB_ERR_SYS; LOG_ERROR("dag type not match", K(ret), KPC(dag_)); } else { @@ -1072,7 +1062,7 @@ int ObTabletMergeTask::generate_next_task(ObITask *&next_task) LOG_WARN("not init", K(ret)); } else if (idx_ + 1 == ctx_->get_concurrent_cnt()) { ret = OB_ITER_END; - } else if (!is_merge_dag(dag_->get_type())) { + } else if (!is_compaction_dag(dag_->get_type())) { ret = OB_ERR_SYS; LOG_ERROR("dag type not match", K(ret), KPC(dag_)); } else { diff --git a/src/storage/compaction/ob_tenant_tablet_scheduler.cpp b/src/storage/compaction/ob_tenant_tablet_scheduler.cpp index 7bc48509a..297f2de9b 100644 --- a/src/storage/compaction/ob_tenant_tablet_scheduler.cpp +++ b/src/storage/compaction/ob_tenant_tablet_scheduler.cpp @@ -596,7 +596,7 @@ int ObTenantTabletScheduler::schedule_merge(const int64_t broadcast_version) LOG_WARN("failed to add progress", K(tmp_ret), K(broadcast_version)); } { - obsys::ObRLockGuard frozen_version_guard(frozen_version_lock_); + obsys::ObWLockGuard frozen_version_guard(frozen_version_lock_); frozen_version_ = broadcast_version; }