optimize tenant sys stat for adaptive merge schedule
This commit is contained in:
@ -2679,9 +2679,14 @@ int ObDagPrioScheduler::loop_ready_dag_list(bool &is_found)
|
||||
ObMutexGuard guard(prio_lock_);
|
||||
if (running_task_cnts_ < adaptive_task_limit_) {
|
||||
// if extra_erase_dag_net not null, the is_found must be false.
|
||||
if (!check_need_load_shedding_(true/*for_schedule*/)) {
|
||||
is_found = (OB_SUCCESS == schedule_one_(delayed_erase_dag_nets, extra_erase_dag_net));
|
||||
}
|
||||
|
||||
while (running_task_cnts_ < adaptive_task_limit_ && is_found) {
|
||||
if (OB_SUCCESS != schedule_one_(delayed_erase_dag_nets, extra_erase_dag_net)) {
|
||||
if (check_need_load_shedding_(true/*for_schedule*/)) {
|
||||
break;
|
||||
} else if (OB_SUCCESS != schedule_one_(delayed_erase_dag_nets, extra_erase_dag_net)) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
@ -3307,6 +3312,10 @@ bool ObDagPrioScheduler::try_switch(ObTenantDagWorker &worker)
|
||||
need_pause = true;
|
||||
pause_worker_(worker);
|
||||
}
|
||||
if (is_rank_dag_prio()) {
|
||||
need_pause = check_need_load_shedding_(false /*for_schedule*/);
|
||||
}
|
||||
|
||||
if (!need_pause && !waiting_workers_.is_empty()) {
|
||||
if (waiting_workers_.get_first()->need_wake_up()) {
|
||||
// schedule_one will schedule the first worker on the waiting list first
|
||||
@ -3328,6 +3337,37 @@ bool ObDagPrioScheduler::try_switch(ObTenantDagWorker &worker)
|
||||
return need_pause;
|
||||
}
|
||||
|
||||
// under prio lock
|
||||
bool ObDagPrioScheduler::check_need_load_shedding_(const bool for_schedule)
|
||||
{
|
||||
bool need_shedding = false;
|
||||
compaction::ObTenantTabletScheduler *tablet_scheduler = nullptr;
|
||||
|
||||
if (OB_ISNULL(tablet_scheduler = MTL(compaction::ObTenantTabletScheduler *))) {
|
||||
// may be during the start phase
|
||||
} else if (tablet_scheduler->enable_adaptive_merge_schedule()) {
|
||||
ObTenantTabletStatMgr *stat_mgr = MTL(ObTenantTabletStatMgr *);
|
||||
int64_t load_shedding_factor = 1;
|
||||
const int64_t extra_limit = for_schedule ? 0 : 1;
|
||||
|
||||
if (OB_ISNULL(stat_mgr)) {
|
||||
} else if (FALSE_IT(load_shedding_factor = MAX(1, stat_mgr->get_load_shedding_factor()))) {
|
||||
} else if (load_shedding_factor <= 1 || !is_rank_dag_prio()) {
|
||||
// no need to load shedding
|
||||
} else {
|
||||
const int64_t load_shedding_limit = MAX(2, limits_ / load_shedding_factor);
|
||||
if (running_task_cnts_ > load_shedding_limit + extra_limit) {
|
||||
need_shedding = true;
|
||||
if (REACH_TENANT_TIME_INTERVAL(30_s)) {
|
||||
FLOG_INFO("DagScheduler needs to load shedding", K(load_shedding_factor), K(extra_limit), K_(limits));
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
return need_shedding;
|
||||
}
|
||||
|
||||
|
||||
/***************************************ObDagNetScheduler impl********************************************/
|
||||
void ObDagNetScheduler::destroy()
|
||||
{
|
||||
|
||||
@ -953,6 +953,7 @@ private:
|
||||
int deal_with_fail_dag_(ObIDag &dag, bool &retry_flag);
|
||||
int finish_task_in_dag_(ObITask &task, ObIDag &dag, ObIDagNet *&erase_dag_net);
|
||||
void pause_worker_(ObTenantDagWorker &worker);
|
||||
bool check_need_load_shedding_(const bool for_schedule);
|
||||
|
||||
public:
|
||||
static const int32_t MAX_SHOW_DAG_CNT = 100;
|
||||
|
||||
@ -18,6 +18,7 @@
|
||||
#include "storage/ob_tenant_tablet_stat_mgr.h"
|
||||
#include "observer/ob_server_struct.h"
|
||||
#include "observer/ob_server.h"
|
||||
#include <sys/sysinfo.h>
|
||||
|
||||
using namespace oceanbase;
|
||||
using namespace oceanbase::common;
|
||||
@ -280,9 +281,9 @@ bool ObTenantSysStat::is_full_cpu_usage() const
|
||||
{
|
||||
bool bret = false;
|
||||
if (is_small_tenant()) {
|
||||
bret = max_cpu_cnt_ * 60 <= cpu_usage_percentage_;
|
||||
bret = 75 <= cpu_usage_percentage_;
|
||||
} else {
|
||||
bret = max_cpu_cnt_ * 70 <= cpu_usage_percentage_;
|
||||
bret = 85 <= cpu_usage_percentage_;
|
||||
}
|
||||
return bret;
|
||||
}
|
||||
@ -502,6 +503,82 @@ void ObTabletStreamPool::free(ObTabletStreamNode *node)
|
||||
}
|
||||
|
||||
|
||||
/************************************* ObTenantSysLoadShedder *************************************/
|
||||
ObTenantSysLoadShedder::ObTenantSysLoadShedder()
|
||||
{
|
||||
reset();
|
||||
}
|
||||
|
||||
void ObTenantSysLoadShedder::reset()
|
||||
{
|
||||
MEMSET(this, 0, sizeof(ObTenantSysLoadShedder));
|
||||
load_shedding_factor_ = 1;
|
||||
}
|
||||
|
||||
void ObTenantSysLoadShedder::refresh_sys_load()
|
||||
{
|
||||
if (load_shedding_factor_ > 1 &&
|
||||
ObTimeUtility::fast_current_time() < effect_time_ + SHEDDER_EXPIRE_TIME) {
|
||||
// do nothing
|
||||
} else if (REACH_TENANT_TIME_INTERVAL(CPU_TIME_SAMPLING_INTERVAL)) {
|
||||
load_shedding_factor_ = 1;
|
||||
(void) refresh_cpu_utility();
|
||||
|
||||
if (1 >= load_shedding_factor_) {
|
||||
(void) refresh_cpu_usage();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
int ObTenantSysLoadShedder::refresh_cpu_utility()
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
int64_t curr_cpu_time = last_cpu_time_;
|
||||
int64_t inc_cpu_time = 0;
|
||||
int64_t physical_cpu_utility = 0;
|
||||
double max_cpu_cnt = 0; // placeholder
|
||||
|
||||
if (OB_FAIL(GCTX.omt_->get_tenant_cpu(MTL_ID(), min_cpu_cnt_, max_cpu_cnt))) {
|
||||
LOG_WARN("failed to get tennant cpu cnt", K(ret));
|
||||
} else if (OB_FAIL(GCTX.omt_->get_tenant_cpu_time(MTL_ID(), curr_cpu_time))) {
|
||||
LOG_WARN("failed to get tennant cpu cnt", K(ret));
|
||||
} else {
|
||||
const int64_t curr_sample_time = ObTimeUtility::fast_current_time();
|
||||
if (0 == last_sample_time_) {
|
||||
// first time sample, no need to calculate cpu utility
|
||||
} else {
|
||||
inc_cpu_time = curr_cpu_time - last_cpu_time_;
|
||||
physical_cpu_utility = inc_cpu_time * 100 / (curr_sample_time - last_sample_time_);
|
||||
}
|
||||
last_sample_time_ = curr_sample_time;
|
||||
|
||||
if (physical_cpu_utility >= min_cpu_cnt_ * CPU_UTIL_THRESHOLD) {
|
||||
ATOMIC_STORE(&load_shedding_factor_, DEFAULT_LOAD_SHEDDING_FACTOR);
|
||||
effect_time_ = ObTimeUtility::fast_current_time();
|
||||
}
|
||||
}
|
||||
|
||||
// debug log, remove later
|
||||
FLOG_INFO("BatMan refresh cpu utility", K(ret), K(load_shedding_factor_), K(min_cpu_cnt_), K(inc_cpu_time), K(physical_cpu_utility));
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObTenantSysLoadShedder::refresh_cpu_usage()
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
|
||||
if (OB_FAIL(GCTX.omt_->get_tenant_cpu_usage(MTL_ID(), cpu_usage_))) {
|
||||
LOG_WARN("failed to get tenant cpu usage", K(ret));
|
||||
} else if (cpu_usage_ * 100 >= CPU_UTIL_THRESHOLD) {
|
||||
effect_time_ = ObTimeUtility::fast_current_time();
|
||||
ATOMIC_STORE(&load_shedding_factor_, DEFAULT_LOAD_SHEDDING_FACTOR);
|
||||
}
|
||||
|
||||
// debug log, remove later
|
||||
FLOG_INFO("BatMan refresh cpu usage", K(ret), K(load_shedding_factor_), "cpu_usage_percent", cpu_usage_ * 100 * 100);
|
||||
return ret;
|
||||
}
|
||||
|
||||
/************************************* ObTenantTabletStatMgr *************************************/
|
||||
ObTenantTabletStatMgr::ObTenantTabletStatMgr()
|
||||
: report_stat_task_(*this),
|
||||
@ -509,6 +586,7 @@ ObTenantTabletStatMgr::ObTenantTabletStatMgr()
|
||||
stream_map_(),
|
||||
bucket_lock_(),
|
||||
report_queue_(),
|
||||
load_shedder_(),
|
||||
report_cursor_(0),
|
||||
pending_cursor_(0),
|
||||
report_tg_id_(0),
|
||||
@ -545,6 +623,7 @@ int ObTenantTabletStatMgr::init(const int64_t tenant_id)
|
||||
} else if (OB_FAIL(TG_SCHEDULE(report_tg_id_, report_stat_task_, TABLET_STAT_PROCESS_INTERVAL, repeat))) {
|
||||
LOG_WARN("failed to schedule tablet stat update task", K(ret));
|
||||
} else {
|
||||
load_shedder_.refresh_sys_load();
|
||||
is_inited_ = true;
|
||||
}
|
||||
if (!is_inited_) {
|
||||
@ -596,6 +675,7 @@ void ObTenantTabletStatMgr::reset()
|
||||
is_inited_ = false;
|
||||
}
|
||||
bucket_lock_.destroy();
|
||||
load_shedder_.reset();
|
||||
FLOG_INFO("ObTenantTabletStatMgr destroyed!");
|
||||
}
|
||||
|
||||
@ -771,7 +851,7 @@ int ObTenantTabletStatMgr::get_sys_stat(ObTenantSysStat &sys_stat)
|
||||
} else {
|
||||
sys_stat.memory_hold_ = lib::get_tenant_memory_hold(MTL_ID());
|
||||
sys_stat.memory_limit_ = lib::get_tenant_memory_limit(MTL_ID());
|
||||
sys_stat.cpu_usage_percentage_ *= 100;
|
||||
sys_stat.cpu_usage_percentage_ *= 100 * 100;
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
@ -886,6 +966,7 @@ void ObTenantTabletStatMgr::refresh_all(const int64_t step)
|
||||
void ObTenantTabletStatMgr::TabletStatUpdater::runTimerTask()
|
||||
{
|
||||
mgr_.process_stats();
|
||||
mgr_.refresh_sys_load();
|
||||
|
||||
int64_t interval_step = 0;
|
||||
if (CHECK_SCHEDULE_TIME_INTERVAL(CHECK_INTERVAL, interval_step)) {
|
||||
|
||||
@ -23,6 +23,7 @@
|
||||
#include "lib/lock/ob_tc_rwlock.h"
|
||||
#include "lib/queue/ob_fixed_queue.h"
|
||||
#include "lib/list/ob_dlist.h"
|
||||
#include "lib/literals/ob_literals.h"
|
||||
|
||||
namespace oceanbase
|
||||
{
|
||||
@ -313,6 +314,35 @@ private:
|
||||
};
|
||||
|
||||
|
||||
class ObTenantSysLoadShedder
|
||||
{
|
||||
public:
|
||||
ObTenantSysLoadShedder();
|
||||
~ObTenantSysLoadShedder() = default;
|
||||
void reset();
|
||||
void refresh_sys_load();
|
||||
int64_t get_load_shedding_factor() const { return ATOMIC_LOAD(&load_shedding_factor_); }
|
||||
|
||||
TO_STRING_KV(K_(load_shedding_factor), K_(last_cpu_time), K_(cpu_usage), K_(min_cpu_cnt), K_(effect_time));
|
||||
private:
|
||||
int refresh_cpu_utility();
|
||||
int refresh_cpu_usage();
|
||||
|
||||
public:
|
||||
static const int64_t DEFAULT_LOAD_SHEDDING_FACTOR = 2;
|
||||
static const int64_t CPU_TIME_SAMPLING_INTERVAL = 20_s; //20 * 1000 * 1000 us
|
||||
static constexpr double CPU_UTIL_THRESHOLD = 0.6; // 60%
|
||||
static const int64_t SHEDDER_EXPIRE_TIME = 10_min;
|
||||
private:
|
||||
int64_t effect_time_;
|
||||
int64_t last_sample_time_;
|
||||
int64_t load_shedding_factor_;
|
||||
int64_t last_cpu_time_;
|
||||
double cpu_usage_;
|
||||
double min_cpu_cnt_;
|
||||
};
|
||||
|
||||
|
||||
class ObTenantTabletStatMgr
|
||||
{
|
||||
public:
|
||||
@ -350,6 +380,8 @@ public:
|
||||
int get_sys_stat(ObTenantSysStat &sys_stat);
|
||||
void process_stats();
|
||||
void refresh_all(const int64_t step);
|
||||
int64_t get_load_shedding_factor() const { return load_shedder_.get_load_shedding_factor(); }
|
||||
void refresh_sys_load() { load_shedder_.refresh_sys_load(); }
|
||||
private:
|
||||
class TabletStatUpdater : public common::ObTimerTask
|
||||
{
|
||||
@ -388,6 +420,7 @@ private:
|
||||
TabletStreamMap stream_map_;
|
||||
common::ObBucketLock bucket_lock_;
|
||||
ObTabletStat report_queue_[DEFAULT_MAX_PENDING_CNT]; // 12 * 8 * 40000 bytes
|
||||
ObTenantSysLoadShedder load_shedder_;
|
||||
uint64_t report_cursor_;
|
||||
uint64_t pending_cursor_;
|
||||
int report_tg_id_;
|
||||
|
||||
Reference in New Issue
Block a user