fix dag not schedule problem
This commit is contained in:
@ -3315,13 +3315,14 @@ bool ObDagPrioScheduler::try_switch(ObTenantDagWorker &worker)
|
||||
ObMutexGuard guard(prio_lock_);
|
||||
if (running_task_cnts_ > adaptive_task_limit_) {
|
||||
need_pause = true;
|
||||
pause_worker_(worker);
|
||||
}
|
||||
if (is_rank_dag_prio()) {
|
||||
need_pause = check_need_load_shedding_(false /*for_schedule*/);
|
||||
} else if (is_rank_dag_prio() && check_need_load_shedding_(false /*for_schedule*/)) {
|
||||
need_pause = true;
|
||||
FLOG_INFO("[ADAPTIVE_SCHED]tenant cpu is at high level, pause current compaction task", K(priority_));
|
||||
}
|
||||
|
||||
if (!need_pause && !waiting_workers_.is_empty()) {
|
||||
if (need_pause) {
|
||||
pause_worker_(worker);
|
||||
} else if (!waiting_workers_.is_empty()) {
|
||||
if (waiting_workers_.get_first()->need_wake_up()) {
|
||||
// schedule_one will schedule the first worker on the waiting list first
|
||||
if (OB_TMP_FAIL(schedule_one_())) {
|
||||
|
@ -557,8 +557,6 @@ int ObTenantSysLoadShedder::refresh_cpu_utility()
|
||||
inc_cpu_time = curr_cpu_time - last_cpu_time_;
|
||||
physical_cpu_utility = inc_cpu_time * 100 / (curr_sample_time - last_sample_time_);
|
||||
}
|
||||
last_sample_time_ = curr_sample_time;
|
||||
last_cpu_time_ = curr_cpu_time;
|
||||
|
||||
if (physical_cpu_utility > max_cpu_cnt_ * 100) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
@ -567,25 +565,12 @@ int ObTenantSysLoadShedder::refresh_cpu_utility()
|
||||
} else if (physical_cpu_utility >= max_cpu_cnt_ * 100 * CPU_TIME_THRESHOLD) {
|
||||
ATOMIC_STORE(&load_shedding_factor_, DEFAULT_LOAD_SHEDDING_FACTOR);
|
||||
effect_time_ = ObTimeUtility::fast_current_time();
|
||||
FLOG_INFO("[ADAPTIVE_SCHED] refresh cpu utility", K(ret), K(load_shedding_factor_), K(min_cpu_cnt_),
|
||||
K(physical_cpu_utility), K(inc_cpu_time), K(curr_sample_time), K(last_sample_time_));
|
||||
FLOG_INFO("[ADAPTIVE_SCHED] refresh cpu utility", K(ret), K(load_shedding_factor_), K(max_cpu_cnt_),
|
||||
K(physical_cpu_utility), K(inc_cpu_time), K(curr_sample_time), K(last_sample_time_), K(curr_cpu_time), K(last_cpu_time_));
|
||||
}
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObTenantSysLoadShedder::refresh_cpu_usage()
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
|
||||
// tenant_cpu_usage is a relatively large value, it includes the wait_time on lock, RPC, IO and so on.
|
||||
if (OB_FAIL(GCTX.omt_->get_tenant_cpu_usage(MTL_ID(), cpu_usage_))) {
|
||||
LOG_WARN("failed to get tenant cpu usage", K(ret));
|
||||
} else if (cpu_usage_ * 100 >= max_cpu_cnt_ * CPU_USAGE_THRESHOLD) {
|
||||
effect_time_ = ObTimeUtility::fast_current_time();
|
||||
ATOMIC_STORE(&load_shedding_factor_, DEFAULT_LOAD_SHEDDING_FACTOR);
|
||||
|
||||
FLOG_INFO("[ADAPTIVE_SCHED] refresh cpu usage", K(ret), K(load_shedding_factor_), "cpu_usage_percent", cpu_usage_ * 100 * 100);
|
||||
last_sample_time_ = curr_sample_time;
|
||||
last_cpu_time_ = curr_cpu_time;
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
@ -326,13 +326,11 @@ public:
|
||||
TO_STRING_KV(K_(load_shedding_factor), K_(last_cpu_time), K_(cpu_usage), K_(min_cpu_cnt), K_(max_cpu_cnt), K_(effect_time));
|
||||
private:
|
||||
int refresh_cpu_utility();
|
||||
int refresh_cpu_usage();
|
||||
|
||||
public:
|
||||
static const int64_t DEFAULT_LOAD_SHEDDING_FACTOR = 2;
|
||||
static const int64_t CPU_TIME_SAMPLING_INTERVAL = 20_s; //20 * 1000 * 1000 us
|
||||
static constexpr double CPU_TIME_THRESHOLD = 0.6; // 60%
|
||||
static constexpr double CPU_USAGE_THRESHOLD = 0.8; // 80%
|
||||
static constexpr double CPU_TIME_THRESHOLD = 0.8; // 80%
|
||||
static const int64_t SHEDDER_EXPIRE_TIME = 2_min;
|
||||
private:
|
||||
int64_t effect_time_;
|
||||
|
Reference in New Issue
Block a user