optimize dag scheduler thread allocation

This commit is contained in:
obdev 2024-08-27 15:23:32 +00:00 committed by ob-robot
parent 3303c68606
commit f7e9334490
3 changed files with 78 additions and 16 deletions

View File

@ -439,6 +439,7 @@ GLOBAL_ERRSIM_POINT_DEF(748, EN_COMPACTION_ITER_SET_BATCH_CNT, "");
// please add new trace point after 750
GLOBAL_ERRSIM_POINT_DEF(751, EN_SESSION_LEAK_COUNT_THRESHOLD, "used to control the threshold of report session leak ERROR");
GLOBAL_ERRSIM_POINT_DEF(760, EN_DISABLE_TABLET_MINOR_MERGE, "used to stop scheduling minor merge");
GLOBAL_ERRSIM_POINT_DEF(761, EN_FAST_RECLAIM_THREAD, "used to speed up reclaiming thread");
GLOBAL_ERRSIM_POINT_DEF(800, EN_END_PARTICIPANT, "");
// compaction 801 - 899

View File

@ -3900,7 +3900,6 @@ ObTenantDagScheduler::ObTenantDagScheduler()
loop_waiting_dag_list_period_(0),
total_worker_cnt_(0),
work_thread_num_(0),
default_work_thread_num_(0),
total_running_task_cnt_(0),
scheduled_task_cnt_(0),
scheduler_sync_(),
@ -3989,7 +3988,7 @@ int ObTenantDagScheduler::init(
loop_waiting_dag_list_period_ = loop_waiting_list_period;
dag_limit_ = dag_limit;
compaction_dag_limit_ = dag_limit;
work_thread_num_ = default_work_thread_num_ = 0;
work_thread_num_ = 0;
MEMSET(dag_cnts_, 0, sizeof(dag_cnts_));
MEMSET(running_dag_cnts_, 0, sizeof(running_dag_cnts_));
MEMSET(added_dag_cnts_, 0, sizeof(added_dag_cnts_));
@ -4005,14 +4004,6 @@ int ObTenantDagScheduler::init(
COMMON_LOG(WARN, "failed to init prio_sche_", K(ret), K(dag_limit));
} else {
work_thread_num_ += prio_sche_[i].get_limit();
default_work_thread_num_ += prio_sche_[i].get_limit();
}
}
// create dag workers
for (int64_t i = 0; OB_SUCC(ret) && i < work_thread_num_; ++i) {
if (OB_FAIL(create_worker())) {
COMMON_LOG(WARN, "failed to create worker", K(ret));
}
}
@ -4021,7 +4012,7 @@ int ObTenantDagScheduler::init(
} else {
is_inited_ = true;
dump_dag_status();
COMMON_LOG(INFO, "ObTenantDagScheduler is inited", K(ret), K_(work_thread_num), K_(default_work_thread_num));
COMMON_LOG(INFO, "ObTenantDagScheduler is inited", K(ret), K(work_thread_num_));
}
if (!is_inited_) {
@ -4077,6 +4068,7 @@ void ObTenantDagScheduler::reset()
compaction_dag_limit_ = 0;
total_worker_cnt_ = 0;
work_thread_num_ = 0;
reclaim_util_.reset();
total_running_task_cnt_ = 0;
scheduled_task_cnt_ = 0;
MEMSET(dag_cnts_, 0, sizeof(dag_cnts_));
@ -4574,6 +4566,54 @@ void ObTenantDagScheduler::run1()
}
}
void ObReclaimUtil::reset()
{
total_periodic_running_worker_cnt_ = 0;
check_worker_loop_times_ = 0;
}
int64_t ObReclaimUtil::compute_expected_reclaim_worker_cnt(
const int64_t total_running_task_cnt,
const int64_t free_worker_cnt,
const int64_t total_worker_cnt)
{
int64_t expected_reclaim_worker_cnt = 0;
if (free_worker_cnt > 0) {
total_periodic_running_worker_cnt_ = total_periodic_running_worker_cnt_ + total_running_task_cnt;
++check_worker_loop_times_;
bool is_always_triggered = false;
#ifdef ERRSIM
#define ADAPTIVE_RECLAIM_ERRSIM(tracepoint) \
int ret = OB_SUCCESS; \
do { \
if (OB_SUCC(ret)) { \
ret = OB_E((EventTable::tracepoint)) OB_SUCCESS; \
if (OB_FAIL(ret)) { \
ret = OB_SUCCESS; \
is_always_triggered = true; \
} \
} \
} while(0);
ADAPTIVE_RECLAIM_ERRSIM(EN_FAST_RECLAIM_THREAD);
#undef ADAPTIVE_RECLAIM_ERRSIM
#endif
if (REACH_TENANT_TIME_INTERVAL(CHECK_USING_WOKRER_INTERVAL) || is_always_triggered) {
const int64_t avg_periodic_running_worker_cnt = total_periodic_running_worker_cnt_ / check_worker_loop_times_;
if (total_running_task_cnt < avg_periodic_running_worker_cnt) {
// reclaim one fifth of the workers each time
expected_reclaim_worker_cnt = MAX(free_worker_cnt * 0.2 , 1);
} else if (total_running_task_cnt == avg_periodic_running_worker_cnt) {
if (0 == total_running_task_cnt) {
expected_reclaim_worker_cnt = MAX(free_worker_cnt * 0.2 , 1);;
}
}
reset();
}
}
return expected_reclaim_worker_cnt;
}
void ObTenantDagScheduler::notify()
{
ObThreadCondGuard cond_guard(scheduler_sync_);
@ -4797,14 +4837,15 @@ int ObTenantDagScheduler::try_reclaim_threads()
int ret = OB_SUCCESS;
ObTenantDagWorker *worker2delete = NULL;
int32_t free_cnt = 0;
while (total_worker_cnt_ > work_thread_num_ && !free_workers_.is_empty()) {
int64_t expected_reclaim_worker_cnt = reclaim_util_.compute_expected_reclaim_worker_cnt(total_running_task_cnt_, free_workers_.get_size(), total_worker_cnt_);
while (free_cnt < expected_reclaim_worker_cnt) {
worker2delete = free_workers_.remove_first();
ob_delete(worker2delete);
--total_worker_cnt_;
++free_cnt;
}
if (free_cnt > 0) {
COMMON_LOG(INFO, "reclaim threads", K(free_cnt), K_(total_worker_cnt), K_(work_thread_num));
COMMON_LOG(INFO, "reclaim threads", K(free_cnt), K_(total_worker_cnt), K(work_thread_num_));
}
return ret;
}
@ -4880,8 +4921,7 @@ int ObTenantDagScheduler::set_thread_score(const int64_t priority, const int64_t
} else {
COMMON_LOG(INFO, "set thread score successfully", K(score),
"prio", OB_DAG_PRIOS[priority].dag_prio_str_,
"limits_", new_val, K_(work_thread_num),
K_(default_work_thread_num));
"limits_", new_val, K_(work_thread_num));
}
}
}

View File

@ -852,6 +852,27 @@ private:
int64_t dag_net_cnts_[ObDagNetType::DAG_NET_TYPE_MAX]; // lock by dag_net_map_lock_
};
class ObReclaimUtil
{
public:
ObReclaimUtil()
: total_periodic_running_worker_cnt_(0),
check_worker_loop_times_(0)
{}
~ObReclaimUtil(){}
int64_t compute_expected_reclaim_worker_cnt(
const int64_t total_running_task_cnt,
const int64_t free_worker_cnt,
const int64_t total_worker_cnt);
void reset();
public:
int64_t total_periodic_running_worker_cnt_;
int64_t check_worker_loop_times_;
static const int64_t CHECK_USING_WOKRER_INTERVAL = 60 * 1000L* 1000L; // 1min
};
class ObDagPrioScheduler
{
public:
@ -1203,7 +1224,6 @@ private:
int64_t loop_waiting_dag_list_period_; // only set in init/destroy
int64_t total_worker_cnt_; // lock by scheduler_sync_
int64_t work_thread_num_; // lock by scheduler_sync_
int64_t default_work_thread_num_; // only set in init/destroy
int64_t total_running_task_cnt_; // atomic value
int64_t scheduled_task_cnt_; // atomic value // interval scheduled task count
int64_t dag_cnts_[ObDagType::DAG_TYPE_MAX]; // just for showing // atomic value
@ -1212,6 +1232,7 @@ private:
int64_t scheduled_dag_cnts_[ObDagType::DAG_TYPE_MAX]; // atomic value // interval scheduled dag count
int64_t scheduled_task_cnts_[ObDagType::DAG_TYPE_MAX]; // atomic value // interval scheduled task count
int64_t scheduled_data_size_[ObDagType::DAG_TYPE_MAX]; // atomic value // interval scheduled data size
ObReclaimUtil reclaim_util_; // util to help adaptively reclaim worker
common::ObThreadCond scheduler_sync_; // Make sure the lock is inside if there are nested locks
lib::MemoryContext mem_context_;
lib::MemoryContext ha_mem_context_;