Enhanced resource isolation capability, previous IOPS group control for front-end and back-end tasks
Co-authored-by: Charles0429 <xiezhenjiang@gmail.com> Co-authored-by: raywill <hustos@gmail.com>
This commit is contained in:
@ -20,6 +20,7 @@
|
||||
#include "share/scheduler/ob_dag_scheduler.h"
|
||||
#include "share/scheduler/ob_sys_task_stat.h"
|
||||
#include "share/rc/ob_context.h"
|
||||
#include "share/resource_manager/ob_resource_manager.h"
|
||||
#include "observer/omt/ob_tenant.h"
|
||||
#include "observer/omt/ob_tenant_config_mgr.h"
|
||||
#include "lib/stat/ob_diagnose_info.h"
|
||||
@ -1288,6 +1289,8 @@ ObTenantDagWorker::ObTenantDagWorker()
|
||||
status_(DWS_FREE),
|
||||
check_period_(0),
|
||||
last_check_time_(0),
|
||||
function_type_(0),
|
||||
group_id_(-1),
|
||||
tg_id_(-1),
|
||||
is_inited_(false)
|
||||
{
|
||||
@ -1347,6 +1350,8 @@ void ObTenantDagWorker::destroy()
|
||||
status_ = DWS_FREE;
|
||||
check_period_ = 0;
|
||||
last_check_time_ = 0;
|
||||
function_type_ = 0;
|
||||
group_id_ = -1;
|
||||
self_ = NULL;
|
||||
is_inited_ = false;
|
||||
TG_DESTROY(tg_id_);
|
||||
@ -1365,6 +1370,25 @@ void ObTenantDagWorker::resume()
|
||||
notify(DWS_RUNNABLE);
|
||||
}
|
||||
|
||||
int ObTenantDagWorker::set_dag_resource()
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
uint64_t group_id = 0;
|
||||
if (nullptr == GCTX.cgroup_ctrl_ || OB_UNLIKELY(!GCTX.cgroup_ctrl_->is_valid())) {
|
||||
//invalid cgroup, cannot bind thread and control resource
|
||||
} else if (OB_FAIL(G_RES_MGR.get_mapping_rule_mgr().get_group_id_by_function_type(MTL_ID(), function_type_, group_id))) {
|
||||
LOG_WARN("fail to get group id by function", K(ret), K(MTL_ID()), K(function_type_), K(group_id));
|
||||
} else if (group_id == group_id_) {
|
||||
// group not change, do nothing
|
||||
} else if (OB_FAIL(GCTX.cgroup_ctrl_->add_thread_to_group(static_cast<pid_t>(GETTID()), MTL_ID(), group_id))) {
|
||||
LOG_WARN("bind back thread to group failed", K(ret), K(GETTID()), K(MTL_ID()), K(group_id));
|
||||
} else {
|
||||
ATOMIC_SET(&group_id_, group_id);
|
||||
THIS_WORKER.set_group_id(group_id);
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
bool ObTenantDagWorker::need_wake_up() const
|
||||
{
|
||||
return (ObTimeUtility::fast_current_time() - last_check_time_) > check_period_ * 10;
|
||||
@ -1393,7 +1417,9 @@ void ObTenantDagWorker::run1()
|
||||
COMMON_LOG(WARN, "invalid compat mode", K(ret), K(*dag));
|
||||
} else {
|
||||
THIS_WORKER.set_compatibility_mode(compat_mode);
|
||||
if (OB_FAIL(task_->do_work())) {
|
||||
if (OB_FAIL(set_dag_resource())) {
|
||||
LOG_WARN("isolate dag CPU and IOPS failed", K(ret));
|
||||
} else if (OB_FAIL(task_->do_work())) {
|
||||
if (!dag->ignore_warning()) {
|
||||
COMMON_LOG(WARN, "failed to do work", K(ret), K(*task_), K(compat_mode));
|
||||
}
|
||||
@ -2846,7 +2872,7 @@ int ObTenantDagScheduler::schedule_one(const int64_t priority)
|
||||
&& OB_FAIL(task->generate_next_task())) {
|
||||
task->get_dag()->reset_task_running_status(*task, ObITask::TASK_STATUS_FAILED);
|
||||
COMMON_LOG(WARN, "failed to generate_next_task", K(ret));
|
||||
} else if (OB_FAIL(dispatch_task(*task, worker))) {
|
||||
} else if (OB_FAIL(dispatch_task(*task, worker, priority))) {
|
||||
task->get_dag()->reset_task_running_status(*task, ObITask::TASK_STATUS_WAITING);
|
||||
COMMON_LOG(WARN, "failed to dispatch task", K(ret));
|
||||
} else {
|
||||
@ -2859,7 +2885,7 @@ int ObTenantDagScheduler::schedule_one(const int64_t priority)
|
||||
running_workers_.add_last(worker, priority);
|
||||
if (task != NULL) {
|
||||
COMMON_LOG(INFO, "schedule one task", KPC(task), "priority", OB_DAG_PRIOS[priority].dag_prio_str_,
|
||||
K_(total_running_task_cnt), K(running_task_cnts_[priority]),
|
||||
"group id", worker->get_group_id(), K_(total_running_task_cnt), K(running_task_cnts_[priority]),
|
||||
K(low_limits_[priority]), K(up_limits_[priority]), KP(task->get_dag()->get_dag_net()));
|
||||
}
|
||||
worker->resume();
|
||||
@ -2933,7 +2959,7 @@ int ObTenantDagScheduler::loop_ready_dag_lists()
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObTenantDagScheduler::dispatch_task(ObITask &task, ObTenantDagWorker *&ret_worker)
|
||||
int ObTenantDagScheduler::dispatch_task(ObITask &task, ObTenantDagWorker *&ret_worker, const int64_t priority)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
ret_worker = NULL;
|
||||
@ -2945,6 +2971,7 @@ int ObTenantDagScheduler::dispatch_task(ObITask &task, ObTenantDagWorker *&ret_w
|
||||
if (OB_SUCC(ret)) {
|
||||
ret_worker = free_workers_.remove_first();
|
||||
ret_worker->set_task(&task);
|
||||
ret_worker->set_function_type(priority);
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user