diff --git a/src/share/scheduler/ob_dag_scheduler.cpp b/src/share/scheduler/ob_dag_scheduler.cpp index 3c311e3262..8f91fba680 100755 --- a/src/share/scheduler/ob_dag_scheduler.cpp +++ b/src/share/scheduler/ob_dag_scheduler.cpp @@ -1531,6 +1531,7 @@ ObTenantDagScheduler::ObTenantDagScheduler() work_thread_num_(0), default_work_thread_num_(0), total_running_task_cnt_(0), + scheduled_task_cnt_(0), tg_id_(-1) { } @@ -1607,6 +1608,7 @@ int ObTenantDagScheduler::init( check_period_ = check_period; loop_waiting_dag_list_period_ = loop_waiting_list_period; MEMSET(dag_cnts_, 0, sizeof(dag_cnts_)); + MEMSET(scheduled_task_cnts_, 0, sizeof(scheduled_task_cnts_)); MEMSET(dag_net_cnts_, 0, sizeof(dag_net_cnts_)); MEMSET(running_task_cnts_, 0, sizeof(running_task_cnts_)); @@ -1705,8 +1707,10 @@ void ObTenantDagScheduler::destroy() total_worker_cnt_ = 0; work_thread_num_ = 0; total_running_task_cnt_ = 0; + scheduled_task_cnt_ = 0; MEMSET(running_task_cnts_, 0, sizeof(running_task_cnts_)); MEMSET(dag_cnts_, 0, sizeof(dag_cnts_)); + MEMSET(scheduled_task_cnts_, 0, sizeof(scheduled_task_cnts_)); MEMSET(dag_net_cnts_, 0, sizeof(dag_net_cnts_)); waiting_workers_.reset(); running_workers_.reset(); @@ -1896,15 +1900,19 @@ int ObTenantDagScheduler::add_dag( void ObTenantDagScheduler::dump_dag_status() { if (REACH_TENANT_TIME_INTERVAL(DUMP_DAG_STATUS_INTERVAL)) { + int64_t scheduled_task_cnt = 0; int64_t running_task[ObDagPrio::DAG_PRIO_MAX]; int64_t low_limits[ObDagPrio::DAG_PRIO_MAX]; int64_t up_limits[ObDagPrio::DAG_PRIO_MAX]; int64_t dag_count[ObDagType::DAG_TYPE_MAX]; + int64_t scheduled_task_count[ObDagType::DAG_TYPE_MAX]; int64_t dag_net_count[ObDagNetType::DAG_NET_TYPE_MAX]; int64_t ready_dag_count[ObDagPrio::DAG_PRIO_MAX]; int64_t waiting_dag_count[ObDagPrio::DAG_PRIO_MAX]; { ObThreadCondGuard guard(scheduler_sync_); + scheduled_task_cnt = scheduled_task_cnt_; + scheduled_task_cnt_ = 0; for (int64_t i = 0; i < ObDagPrio::DAG_PRIO_MAX; ++i) { running_task[i] = running_task_cnts_[i]; low_limits[i] = low_limits_[i]; @@ -1914,7 +1922,9 @@ void ObTenantDagScheduler::dump_dag_status() } for (int64_t i = 0; i < ObDagType::DAG_TYPE_MAX; ++i) { dag_count[i] = dag_cnts_[i]; + scheduled_task_count[i] = scheduled_task_cnts_[i]; } + MEMSET(scheduled_task_cnts_, 0, sizeof(scheduled_task_cnts_)); COMMON_LOG(INFO, "dump_dag_status", K_(dag_cnt), "map_size", dag_map_.size()); } { @@ -1938,13 +1948,14 @@ void ObTenantDagScheduler::dump_dag_status() } for (int64_t i = 0; i < ObDagType::DAG_TYPE_MAX; ++i) { COMMON_LOG(INFO, "dump_dag_status", "type", OB_DAG_TYPES[i], "dag_count", dag_count[i]); + COMMON_LOG(INFO, "dump_dag_status", "type", OB_DAG_TYPES[i], "scheduled_task_count", scheduled_task_count[i]); } for (int64_t i = 0; i < ObDagNetType::DAG_NET_TYPE_MAX; ++i) { COMMON_LOG(INFO, "dump_dag_status[DAG_NET]", "type", OB_DAG_NET_TYPES[i].dag_net_type_str_, - "dag_count", dag_net_count[i]); + "dag_net_count", dag_net_count[i]); } - COMMON_LOG(INFO, "dump_dag_status", K_(total_worker_cnt), K_(total_running_task_cnt), K_(work_thread_num)); + COMMON_LOG(INFO, "dump_dag_status", K_(total_worker_cnt), K_(total_running_task_cnt), K_(work_thread_num), K(scheduled_task_cnt)); } } @@ -2977,6 +2988,8 @@ int ObTenantDagScheduler::schedule_one(const int64_t priority) if (OB_SUCC(ret) && NULL != worker) { ++running_task_cnts_[priority]; ++total_running_task_cnt_; + ++scheduled_task_cnt_; + ++scheduled_task_cnts_[worker->get_task()->get_dag()->get_type()]; running_workers_.add_last(worker, priority); if (task != NULL) { COMMON_LOG(INFO, "schedule one task", KP(task), "priority", OB_DAG_PRIOS[priority].dag_prio_str_, diff --git a/src/share/scheduler/ob_dag_scheduler.h b/src/share/scheduler/ob_dag_scheduler.h index 15de4c44b3..74428153a3 100644 --- a/src/share/scheduler/ob_dag_scheduler.h +++ b/src/share/scheduler/ob_dag_scheduler.h @@ -945,9 +945,11 @@ private: int64_t work_thread_num_; int64_t default_work_thread_num_; int64_t total_running_task_cnt_; + int64_t scheduled_task_cnt_; // interval scheduled task count int64_t running_task_cnts_[ObDagPrio::DAG_PRIO_MAX]; int64_t low_limits_[ObDagPrio::DAG_PRIO_MAX]; // wait to delete int64_t up_limits_[ObDagPrio::DAG_PRIO_MAX]; // wait to delete + int64_t scheduled_task_cnts_[ObDagType::DAG_TYPE_MAX]; // interval scheduled dag count int64_t dag_cnts_[ObDagType::DAG_TYPE_MAX]; int64_t dag_net_cnts_[ObDagNetType::DAG_NET_TYPE_MAX]; common::ObFIFOAllocator allocator_; diff --git a/src/storage/compaction/ob_compaction_diagnose.cpp b/src/storage/compaction/ob_compaction_diagnose.cpp index 78aa8a3ffd..b55e27b471 100755 --- a/src/storage/compaction/ob_compaction_diagnose.cpp +++ b/src/storage/compaction/ob_compaction_diagnose.cpp @@ -1122,7 +1122,6 @@ int ObCompactionDiagnoseMgr::diagnose_tablet_major_and_medium( int ret = OB_SUCCESS; int tmp_ret = OB_SUCCESS; tablet_major_finish = false; - const storage::ObMergeType merge_type = MEDIUM_MERGE; const ObTabletID &tablet_id = tablet.get_tablet_meta().tablet_id_; ObITable *last_major_sstable = nullptr; int64_t max_sync_medium_scn = 0; @@ -1141,34 +1140,36 @@ int ObCompactionDiagnoseMgr::diagnose_tablet_major_and_medium( } else { // diagnose medium LOG_TRACE("diagnose tablet medium merge", K(max_sync_medium_scn)); - if (max_sync_medium_scn > last_major_sstable->get_snapshot_version()) { - if (tablet.get_snapshot_version() < max_sync_medium_scn) { // wait mini compaction or tablet freeze - if (ObTimeUtility::current_time_ns() > max_sync_medium_scn + WAIT_MEDIUM_SCHEDULE_INTERVAL) { - if (DIAGNOSE_TABELT_MAX_COUNT > medium_not_schedule_count_ && can_add_diagnose_info()) { - SET_DIAGNOSE_INFO( - info_array_[idx_++], - merge_type, - MTL_ID(), - ls_id, - tablet_id, - gen_diagnose_status(max_sync_medium_scn), - ObTimeUtility::fast_current_time(), - "medium wait for freeze, interval", static_cast(WAIT_MEDIUM_SCHEDULE_INTERVAL), - "max_receive_medium_scn", max_sync_medium_scn, - "tablet_snapshot", tablet.get_snapshot_version()); + if (!diagnose_major_flag || (diagnose_major_flag && max_sync_medium_scn < compaction_scn)) { + if (max_sync_medium_scn > last_major_sstable->get_snapshot_version()) { + if (tablet.get_snapshot_version() < max_sync_medium_scn) { // wait mini compaction or tablet freeze + if (ObTimeUtility::current_time_ns() > max_sync_medium_scn + WAIT_MEDIUM_SCHEDULE_INTERVAL) { + if (DIAGNOSE_TABELT_MAX_COUNT > medium_not_schedule_count_ && can_add_diagnose_info()) { + SET_DIAGNOSE_INFO( + info_array_[idx_++], + MEDIUM_MERGE, + MTL_ID(), + ls_id, + tablet_id, + gen_diagnose_status(max_sync_medium_scn), + ObTimeUtility::fast_current_time(), + "medium wait for freeze, interval", static_cast(WAIT_MEDIUM_SCHEDULE_INTERVAL / NS_TIME), + "max_receive_medium_scn", max_sync_medium_scn, + "tablet_snapshot", tablet.get_snapshot_version()); + } + ++medium_not_schedule_count_; + } + } else { + // last medium not finish or schedule + ObTabletMajorMergeDag dag; + if (OB_TMP_FAIL(diagnose_tablet_merge( + dag, + MEDIUM_MERGE, + ls_id, + tablet_id, + max_sync_medium_scn))) { + LOG_WARN("diagnose failed", K(tmp_ret), K(ls_id), K(tablet_id), KPC(last_major_sstable)); } - ++medium_not_schedule_count_; - } - } else if (max_sync_medium_scn != compaction_scn) { - // last medium not finish or schedule - ObTabletMajorMergeDag dag; - if (OB_TMP_FAIL(diagnose_tablet_merge( - dag, - merge_type, - ls_id, - tablet_id, - max_sync_medium_scn))) { - LOG_WARN("diagnose failed", K(tmp_ret), K(ls_id), K(tablet_id), KPC(last_major_sstable)); } } } @@ -1185,7 +1186,7 @@ int ObCompactionDiagnoseMgr::diagnose_tablet_major_and_medium( if (max_sync_medium_scn < compaction_scn && max_sync_medium_scn == last_major_sstable->get_snapshot_version()) { // last compaction finish - if (OB_TMP_FAIL(get_suspect_info_and_print(merge_type, ls_id, tablet_id))) { + if (OB_TMP_FAIL(get_suspect_info_and_print(MEDIUM_MERGE, ls_id, tablet_id))) { if (OB_HASH_NOT_EXIST != tmp_ret) { LOG_WARN("failed get major merge suspect info", K(ret), K(ls_id)); } @@ -1202,7 +1203,7 @@ int ObCompactionDiagnoseMgr::diagnose_tablet_major_and_medium( tablet_id, gen_diagnose_status(compaction_scn), ObTimeUtility::fast_current_time(), - "major not schedule for long time, interval", static_cast(WAIT_MEDIUM_SCHEDULE_INTERVAL * 2), + "major not schedule for long time, interval", static_cast(WAIT_MEDIUM_SCHEDULE_INTERVAL * 2 / NS_TIME), "max_receive_medium_snapshot", max_sync_medium_scn, "compaction_scn", compaction_scn))) { LOG_WARN("failed to add diagnose info", K(ret), K(ls_id), K(tablet_id)); @@ -1215,13 +1216,13 @@ int ObCompactionDiagnoseMgr::diagnose_tablet_major_and_medium( if (DIAGNOSE_TABELT_MAX_COUNT > major_not_schedule_count_ && can_add_diagnose_info()) { SET_DIAGNOSE_INFO( info_array_[idx_++], - merge_type, + MAJOR_MERGE, MTL_ID(), ls_id, tablet_id, gen_diagnose_status(compaction_scn), ObTimeUtility::fast_current_time(), - "major wait for freeze, interval", static_cast(WAIT_MEDIUM_SCHEDULE_INTERVAL), + "major wait for freeze, interval", static_cast(WAIT_MEDIUM_SCHEDULE_INTERVAL / NS_TIME), "compaction_scn", compaction_scn, "tablet_snapshot", tablet.get_snapshot_version()); } @@ -1231,7 +1232,7 @@ int ObCompactionDiagnoseMgr::diagnose_tablet_major_and_medium( ObTabletMajorMergeDag dag; if (OB_TMP_FAIL(diagnose_tablet_merge( dag, - merge_type, + MEDIUM_MERGE, ls_id, tablet.get_tablet_meta().tablet_id_, compaction_scn))) { diff --git a/src/storage/compaction/ob_compaction_diagnose.h b/src/storage/compaction/ob_compaction_diagnose.h index b335379a4b..58f438db52 100755 --- a/src/storage/compaction/ob_compaction_diagnose.h +++ b/src/storage/compaction/ob_compaction_diagnose.h @@ -486,8 +486,9 @@ private: public: typedef common::hash::ObHashMap LSStatusMap; private: - static const int64_t WAIT_MEDIUM_SCHEDULE_INTERVAL = 1000L * 1000L * 1000L * 60L * 5; // 5 min // ns - static const int64_t TOLERATE_MEDIUM_SCHEDULE_INTERVAL = 1000L * 1000L * 1000L * 60L * 60L * 5; // 5 hour + static const int64_t NS_TIME = 1000L * 1000L * 1000L; + static const int64_t WAIT_MEDIUM_SCHEDULE_INTERVAL = NS_TIME * 60L * 5; // 5 min // ns + static const int64_t TOLERATE_MEDIUM_SCHEDULE_INTERVAL = NS_TIME * 60L * 60L * 36; // 36 hour static const int64_t MAX_LS_TABLET_CNT = 10 * 10000; // TODO(@jingshui): tmp solution static const int64_t DIAGNOSE_TABELT_MAX_COUNT = 10; // same type diagnose info max count bool is_inited_;