diff --git a/deps/oblib/src/lib/utility/ob_tracepoint_def.h b/deps/oblib/src/lib/utility/ob_tracepoint_def.h index 6dd6c2767..63e094b6a 100644 --- a/deps/oblib/src/lib/utility/ob_tracepoint_def.h +++ b/deps/oblib/src/lib/utility/ob_tracepoint_def.h @@ -439,6 +439,7 @@ GLOBAL_ERRSIM_POINT_DEF(748, EN_COMPACTION_ITER_SET_BATCH_CNT, ""); // please add new trace point after 750 GLOBAL_ERRSIM_POINT_DEF(751, EN_SESSION_LEAK_COUNT_THRESHOLD, "used to control the threshold of report session leak ERROR"); GLOBAL_ERRSIM_POINT_DEF(760, EN_DISABLE_TABLET_MINOR_MERGE, "used to stop scheduling minor merge"); +GLOBAL_ERRSIM_POINT_DEF(761, EN_FAST_RECLAIM_THREAD, "used to speed up reclaiming thread"); GLOBAL_ERRSIM_POINT_DEF(800, EN_END_PARTICIPANT, ""); // compaction 801 - 899 diff --git a/src/share/scheduler/ob_tenant_dag_scheduler.cpp b/src/share/scheduler/ob_tenant_dag_scheduler.cpp index 120b53d9a..2b5f1eaec 100644 --- a/src/share/scheduler/ob_tenant_dag_scheduler.cpp +++ b/src/share/scheduler/ob_tenant_dag_scheduler.cpp @@ -3900,7 +3900,6 @@ ObTenantDagScheduler::ObTenantDagScheduler() loop_waiting_dag_list_period_(0), total_worker_cnt_(0), work_thread_num_(0), - default_work_thread_num_(0), total_running_task_cnt_(0), scheduled_task_cnt_(0), scheduler_sync_(), @@ -3989,7 +3988,7 @@ int ObTenantDagScheduler::init( loop_waiting_dag_list_period_ = loop_waiting_list_period; dag_limit_ = dag_limit; compaction_dag_limit_ = dag_limit; - work_thread_num_ = default_work_thread_num_ = 0; + work_thread_num_ = 0; MEMSET(dag_cnts_, 0, sizeof(dag_cnts_)); MEMSET(running_dag_cnts_, 0, sizeof(running_dag_cnts_)); MEMSET(added_dag_cnts_, 0, sizeof(added_dag_cnts_)); @@ -4005,14 +4004,6 @@ int ObTenantDagScheduler::init( COMMON_LOG(WARN, "failed to init prio_sche_", K(ret), K(dag_limit)); } else { work_thread_num_ += prio_sche_[i].get_limit(); - default_work_thread_num_ += prio_sche_[i].get_limit(); - } - } - - // create dag workers - for (int64_t i = 0; OB_SUCC(ret) && i < work_thread_num_; ++i) { - if (OB_FAIL(create_worker())) { - COMMON_LOG(WARN, "failed to create worker", K(ret)); } } @@ -4021,7 +4012,7 @@ int ObTenantDagScheduler::init( } else { is_inited_ = true; dump_dag_status(); - COMMON_LOG(INFO, "ObTenantDagScheduler is inited", K(ret), K_(work_thread_num), K_(default_work_thread_num)); + COMMON_LOG(INFO, "ObTenantDagScheduler is inited", K(ret), K(work_thread_num_)); } if (!is_inited_) { @@ -4077,6 +4068,7 @@ void ObTenantDagScheduler::reset() compaction_dag_limit_ = 0; total_worker_cnt_ = 0; work_thread_num_ = 0; + reclaim_util_.reset(); total_running_task_cnt_ = 0; scheduled_task_cnt_ = 0; MEMSET(dag_cnts_, 0, sizeof(dag_cnts_)); @@ -4574,6 +4566,54 @@ void ObTenantDagScheduler::run1() } } +void ObReclaimUtil::reset() +{ + total_periodic_running_worker_cnt_ = 0; + check_worker_loop_times_ = 0; +} + +int64_t ObReclaimUtil::compute_expected_reclaim_worker_cnt( + const int64_t total_running_task_cnt, + const int64_t free_worker_cnt, + const int64_t total_worker_cnt) +{ + int64_t expected_reclaim_worker_cnt = 0; + if (free_worker_cnt > 0) { + total_periodic_running_worker_cnt_ = total_periodic_running_worker_cnt_ + total_running_task_cnt; + ++check_worker_loop_times_; + bool is_always_triggered = false; +#ifdef ERRSIM + #define ADAPTIVE_RECLAIM_ERRSIM(tracepoint) \ + int ret = OB_SUCCESS; \ + do { \ + if (OB_SUCC(ret)) { \ + ret = OB_E((EventTable::tracepoint)) OB_SUCCESS; \ + if (OB_FAIL(ret)) { \ + ret = OB_SUCCESS; \ + is_always_triggered = true; \ + } \ + } \ + } while(0); + ADAPTIVE_RECLAIM_ERRSIM(EN_FAST_RECLAIM_THREAD); + #undef ADAPTIVE_RECLAIM_ERRSIM +#endif + if (REACH_TENANT_TIME_INTERVAL(CHECK_USING_WOKRER_INTERVAL) || is_always_triggered) { + const int64_t avg_periodic_running_worker_cnt = total_periodic_running_worker_cnt_ / check_worker_loop_times_; + if (total_running_task_cnt < avg_periodic_running_worker_cnt) { + // reclaim one fifth of the workers each time + expected_reclaim_worker_cnt = MAX(free_worker_cnt * 0.2 , 1); + } else if (total_running_task_cnt == avg_periodic_running_worker_cnt) { + if (0 == total_running_task_cnt) { + expected_reclaim_worker_cnt = MAX(free_worker_cnt * 0.2 , 1);; + } + } + reset(); + } + } + return expected_reclaim_worker_cnt; +} + + void ObTenantDagScheduler::notify() { ObThreadCondGuard cond_guard(scheduler_sync_); @@ -4797,14 +4837,15 @@ int ObTenantDagScheduler::try_reclaim_threads() int ret = OB_SUCCESS; ObTenantDagWorker *worker2delete = NULL; int32_t free_cnt = 0; - while (total_worker_cnt_ > work_thread_num_ && !free_workers_.is_empty()) { + int64_t expected_reclaim_worker_cnt = reclaim_util_.compute_expected_reclaim_worker_cnt(total_running_task_cnt_, free_workers_.get_size(), total_worker_cnt_); + while (free_cnt < expected_reclaim_worker_cnt) { worker2delete = free_workers_.remove_first(); ob_delete(worker2delete); --total_worker_cnt_; ++free_cnt; } if (free_cnt > 0) { - COMMON_LOG(INFO, "reclaim threads", K(free_cnt), K_(total_worker_cnt), K_(work_thread_num)); + COMMON_LOG(INFO, "reclaim threads", K(free_cnt), K_(total_worker_cnt), K(work_thread_num_)); } return ret; } @@ -4880,8 +4921,7 @@ int ObTenantDagScheduler::set_thread_score(const int64_t priority, const int64_t } else { COMMON_LOG(INFO, "set thread score successfully", K(score), "prio", OB_DAG_PRIOS[priority].dag_prio_str_, - "limits_", new_val, K_(work_thread_num), - K_(default_work_thread_num)); + "limits_", new_val, K_(work_thread_num)); } } } diff --git a/src/share/scheduler/ob_tenant_dag_scheduler.h b/src/share/scheduler/ob_tenant_dag_scheduler.h index 1343f8c67..3917eb66d 100644 --- a/src/share/scheduler/ob_tenant_dag_scheduler.h +++ b/src/share/scheduler/ob_tenant_dag_scheduler.h @@ -852,6 +852,27 @@ private: int64_t dag_net_cnts_[ObDagNetType::DAG_NET_TYPE_MAX]; // lock by dag_net_map_lock_ }; +class ObReclaimUtil +{ + +public: + ObReclaimUtil() + : total_periodic_running_worker_cnt_(0), + check_worker_loop_times_(0) + {} + ~ObReclaimUtil(){} + int64_t compute_expected_reclaim_worker_cnt( + const int64_t total_running_task_cnt, + const int64_t free_worker_cnt, + const int64_t total_worker_cnt); + void reset(); + +public: + int64_t total_periodic_running_worker_cnt_; + int64_t check_worker_loop_times_; + static const int64_t CHECK_USING_WOKRER_INTERVAL = 60 * 1000L* 1000L; // 1min +}; + class ObDagPrioScheduler { public: @@ -1203,7 +1224,6 @@ private: int64_t loop_waiting_dag_list_period_; // only set in init/destroy int64_t total_worker_cnt_; // lock by scheduler_sync_ int64_t work_thread_num_; // lock by scheduler_sync_ - int64_t default_work_thread_num_; // only set in init/destroy int64_t total_running_task_cnt_; // atomic value int64_t scheduled_task_cnt_; // atomic value // interval scheduled task count int64_t dag_cnts_[ObDagType::DAG_TYPE_MAX]; // just for showing // atomic value @@ -1212,6 +1232,7 @@ private: int64_t scheduled_dag_cnts_[ObDagType::DAG_TYPE_MAX]; // atomic value // interval scheduled dag count int64_t scheduled_task_cnts_[ObDagType::DAG_TYPE_MAX]; // atomic value // interval scheduled task count int64_t scheduled_data_size_[ObDagType::DAG_TYPE_MAX]; // atomic value // interval scheduled data size + ObReclaimUtil reclaim_util_; // util to help adaptively reclaim worker common::ObThreadCond scheduler_sync_; // Make sure the lock is inside if there are nested locks lib::MemoryContext mem_context_; lib::MemoryContext ha_mem_context_;