pick #51278
This commit is contained in:
@ -52,23 +52,20 @@ void WorkloadGroupListener::handle_topic_info(const std::vector<TopicInfo>& topi
|
||||
auto wg =
|
||||
_exec_env->workload_group_mgr()->get_or_create_workload_group(workload_group_info);
|
||||
|
||||
// 3 set cpu soft hard limit switch
|
||||
_exec_env->workload_group_mgr()->_enable_cpu_hard_limit.store(
|
||||
workload_group_info.enable_cpu_hard_limit);
|
||||
|
||||
// 4 create and update task scheduler
|
||||
// 3 create and update task scheduler
|
||||
wg->upsert_task_scheduler(&workload_group_info, _exec_env);
|
||||
|
||||
// 5 upsert io throttle
|
||||
// 4 upsert io throttle
|
||||
wg->upsert_scan_io_throttle(&workload_group_info);
|
||||
|
||||
LOG(INFO) << "[topic_publish_wg]update workload group finish, wg info="
|
||||
<< wg->debug_string() << ", enable_cpu_hard_limit="
|
||||
<< (_exec_env->workload_group_mgr()->enable_cpu_hard_limit() ? "true" : "false")
|
||||
<< ", cgroup cpu_shares=" << workload_group_info.cgroup_cpu_shares
|
||||
<< ", cgroup cpu_hard_limit=" << workload_group_info.cgroup_cpu_hard_limit
|
||||
<< ", cgroup home path=" << config::doris_cgroup_cpu_path
|
||||
<< ", list size=" << list_size << ", thread info=" << wg->thread_debug_info();
|
||||
LOG_EVERY_T(INFO, 180) << "[topic_publish_wg]update workload group finish, wg info="
|
||||
<< wg->debug_string()
|
||||
<< ", cgroup cpu_shares=" << workload_group_info.cgroup_cpu_shares
|
||||
<< ", cgroup cpu_hard_limit="
|
||||
<< workload_group_info.cgroup_cpu_hard_limit
|
||||
<< ", cgroup home path=" << config::doris_cgroup_cpu_path
|
||||
<< ", list size=" << list_size
|
||||
<< ", thread info=" << wg->thread_debug_info();
|
||||
}
|
||||
|
||||
// NOTE(wb) when is_set_workload_group_info=false, it means FE send a empty workload group list
|
||||
|
||||
@ -354,13 +354,6 @@ Status WorkloadGroupInfo::parse_topic_info(const TWorkloadGroupInfo& tworkload_g
|
||||
}
|
||||
workload_group_info->enable_memory_overcommit = enable_memory_overcommit;
|
||||
|
||||
// 8 cpu soft limit or hard limit
|
||||
bool enable_cpu_hard_limit = false;
|
||||
if (tworkload_group_info.__isset.enable_cpu_hard_limit) {
|
||||
enable_cpu_hard_limit = tworkload_group_info.enable_cpu_hard_limit;
|
||||
}
|
||||
workload_group_info->enable_cpu_hard_limit = enable_cpu_hard_limit;
|
||||
|
||||
// 9 scan thread num
|
||||
workload_group_info->scan_thread_num = config::doris_scanner_thread_pool_thread_num;
|
||||
if (tworkload_group_info.__isset.scan_thread_num && tworkload_group_info.scan_thread_num > 0) {
|
||||
@ -537,28 +530,13 @@ void WorkloadGroup::upsert_thread_pool_no_lock(WorkloadGroupInfo* wg_info,
|
||||
}
|
||||
|
||||
void WorkloadGroup::upsert_cgroup_cpu_ctl_no_lock(WorkloadGroupInfo* wg_info) {
|
||||
uint64_t wg_id = wg_info->id;
|
||||
int cpu_hard_limit = wg_info->cpu_hard_limit;
|
||||
uint64_t cpu_shares = wg_info->cpu_share;
|
||||
bool enable_cpu_hard_limit = wg_info->enable_cpu_hard_limit;
|
||||
create_cgroup_cpu_ctl_no_lock();
|
||||
|
||||
if (_cgroup_cpu_ctl) {
|
||||
if (enable_cpu_hard_limit) {
|
||||
if (cpu_hard_limit > 0) {
|
||||
_cgroup_cpu_ctl->update_cpu_hard_limit(cpu_hard_limit);
|
||||
_cgroup_cpu_ctl->update_cpu_soft_limit(
|
||||
CgroupCpuCtl::cpu_soft_limit_default_value());
|
||||
} else {
|
||||
LOG(INFO) << "[upsert wg thread pool] enable cpu hard limit but value is "
|
||||
"illegal: "
|
||||
<< cpu_hard_limit << ", gid=" << wg_id;
|
||||
}
|
||||
} else {
|
||||
_cgroup_cpu_ctl->update_cpu_soft_limit(cpu_shares);
|
||||
_cgroup_cpu_ctl->update_cpu_hard_limit(
|
||||
CPU_HARD_LIMIT_DEFAULT_VALUE); // disable cpu hard limit
|
||||
}
|
||||
_cgroup_cpu_ctl->update_cpu_hard_limit(cpu_hard_limit);
|
||||
_cgroup_cpu_ctl->update_cpu_soft_limit(cpu_shares);
|
||||
_cgroup_cpu_ctl->get_cgroup_cpu_info(&(wg_info->cgroup_cpu_shares),
|
||||
&(wg_info->cgroup_cpu_hard_limit));
|
||||
}
|
||||
|
||||
@ -289,7 +289,6 @@ struct WorkloadGroupInfo {
|
||||
bool enable_memory_overcommit;
|
||||
int64_t version;
|
||||
int cpu_hard_limit;
|
||||
bool enable_cpu_hard_limit;
|
||||
int scan_thread_num;
|
||||
int max_remote_scan_thread_num;
|
||||
int min_remote_scan_thread_num;
|
||||
|
||||
@ -131,9 +131,11 @@ void WorkloadGroupMgr::delete_workload_group_by_ids(std::set<uint64_t> used_wg_i
|
||||
}
|
||||
}
|
||||
int64_t time_cost_ms = MonotonicMillis() - begin_time;
|
||||
LOG(INFO) << "[topic_publish_wg]finish clear unused workload group, time cost: " << time_cost_ms
|
||||
<< " ms, deleted group size:" << deleted_task_groups.size()
|
||||
<< ", before wg size=" << old_wg_size << ", after wg size=" << new_wg_size;
|
||||
if (deleted_task_groups.size() > 0) {
|
||||
LOG(INFO) << "[topic_publish_wg]finish clear unused workload group, time cost: "
|
||||
<< time_cost_ms << " ms, deleted group size:" << deleted_task_groups.size()
|
||||
<< ", before wg size=" << old_wg_size << ", after wg size=" << new_wg_size;
|
||||
}
|
||||
}
|
||||
|
||||
struct WorkloadGroupMemInfo {
|
||||
|
||||
@ -52,12 +52,6 @@ public:
|
||||
|
||||
void stop();
|
||||
|
||||
std::atomic<bool> _enable_cpu_hard_limit = false;
|
||||
|
||||
bool enable_cpu_soft_limit() { return !_enable_cpu_hard_limit.load(); }
|
||||
|
||||
bool enable_cpu_hard_limit() { return _enable_cpu_hard_limit.load(); }
|
||||
|
||||
void refresh_wg_weighted_memory_limit();
|
||||
|
||||
void get_wg_resource_usage(vectorized::Block* block);
|
||||
|
||||
@ -19,7 +19,6 @@ package org.apache.doris.resource.workloadgroup;
|
||||
|
||||
import org.apache.doris.catalog.Env;
|
||||
import org.apache.doris.common.AnalysisException;
|
||||
import org.apache.doris.common.Config;
|
||||
import org.apache.doris.common.DdlException;
|
||||
import org.apache.doris.common.FeNameFormat;
|
||||
import org.apache.doris.common.Pair;
|
||||
@ -535,10 +534,6 @@ public class WorkloadGroup implements Writable, GsonPostProcessable {
|
||||
result.addRow(row);
|
||||
}
|
||||
|
||||
public int getCpuHardLimitWhenCalSum() {
|
||||
return cpuHardLimit == -1 ? 0 : cpuHardLimit;
|
||||
}
|
||||
|
||||
public double getMemoryLimitPercentWhenCalSum() {
|
||||
return memoryLimitPercent == -1 ? 0 : memoryLimitPercent;
|
||||
}
|
||||
@ -595,14 +590,6 @@ public class WorkloadGroup implements Writable, GsonPostProcessable {
|
||||
if (memOvercommitStr != null) {
|
||||
tWorkloadGroupInfo.setEnableMemoryOvercommit(Boolean.valueOf(memOvercommitStr));
|
||||
}
|
||||
// enable_cpu_hard_limit = true, using cpu hard limit
|
||||
// enable_cpu_hard_limit = false, using cpu soft limit
|
||||
tWorkloadGroupInfo.setEnableCpuHardLimit(Config.enable_cpu_hard_limit);
|
||||
|
||||
if (Config.enable_cpu_hard_limit && cpuHardLimit <= 0) {
|
||||
LOG.warn("enable_cpu_hard_limit=true but cpuHardLimit value not illegal,"
|
||||
+ "id=" + id + ",name=" + name);
|
||||
}
|
||||
|
||||
String scanThreadNumStr = properties.get(SCAN_THREAD_NUM);
|
||||
if (scanThreadNumStr != null) {
|
||||
|
||||
@ -382,7 +382,6 @@ public class WorkloadGroupMgr extends MasterDaemon implements Writable, GsonPost
|
||||
|
||||
for (String newWgOneTag : newWgTagSet) {
|
||||
double sumOfAllMemLimit = 0;
|
||||
int sumOfAllCpuHardLimit = 0;
|
||||
|
||||
// 1 get sum value of all wg which has same tag without current wg
|
||||
for (Map.Entry<Long, WorkloadGroup> entry : idToWorkloadGroup.entrySet()) {
|
||||
@ -401,9 +400,6 @@ public class WorkloadGroupMgr extends MasterDaemon implements Writable, GsonPost
|
||||
continue;
|
||||
}
|
||||
|
||||
if (wg.getCpuHardLimitWhenCalSum() > 0) {
|
||||
sumOfAllCpuHardLimit += wg.getCpuHardLimitWhenCalSum();
|
||||
}
|
||||
if (wg.getMemoryLimitPercentWhenCalSum() > 0) {
|
||||
sumOfAllMemLimit += wg.getMemoryLimitPercentWhenCalSum();
|
||||
}
|
||||
@ -411,7 +407,6 @@ public class WorkloadGroupMgr extends MasterDaemon implements Writable, GsonPost
|
||||
|
||||
// 2 sum current wg value
|
||||
sumOfAllMemLimit += newWg.getMemoryLimitPercentWhenCalSum();
|
||||
sumOfAllCpuHardLimit += newWg.getCpuHardLimitWhenCalSum();
|
||||
|
||||
// 3 check total sum
|
||||
if (sumOfAllMemLimit > 100.0 + 1e-6) {
|
||||
@ -420,13 +415,6 @@ public class WorkloadGroupMgr extends MasterDaemon implements Writable, GsonPost
|
||||
newWgTag.isPresent() ? newWgTag.get() : "")
|
||||
+ " cannot be greater than 100.0%. current sum val:" + sumOfAllMemLimit);
|
||||
}
|
||||
|
||||
if (sumOfAllCpuHardLimit > 100) {
|
||||
throw new DdlException(
|
||||
"sum of all workload group " + WorkloadGroup.CPU_HARD_LIMIT + " within tag " + (
|
||||
newWgTag.isPresent()
|
||||
? newWgTag.get() : "") + " can not be greater than 100% ");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@ -163,12 +163,6 @@ suite("test_crud_wlg") {
|
||||
|
||||
sql "alter workload group test_group properties ( 'cpu_hard_limit'='99%' );"
|
||||
|
||||
test {
|
||||
sql "alter workload group normal properties ( 'cpu_hard_limit'='2%' );"
|
||||
|
||||
exception "can not be greater than 100%"
|
||||
}
|
||||
|
||||
sql "alter workload group test_group properties ( 'cpu_hard_limit'='20%' );"
|
||||
qt_cpu_hard_limit_1 """ select count(1) from ${table_name} """
|
||||
qt_cpu_hard_limit_2 "select name,cpu_share,memory_limit,enable_memory_overcommit,max_concurrency,max_queue_size,queue_timeout,cpu_hard_limit,scan_thread_num from information_schema.workload_groups where name in ('normal','test_group') order by name;"
|
||||
@ -268,31 +262,6 @@ suite("test_crud_wlg") {
|
||||
exception "must be true or false"
|
||||
}
|
||||
|
||||
// failed for cpu_hard_limit
|
||||
test {
|
||||
sql "create workload group if not exists test_group2 " +
|
||||
"properties ( " +
|
||||
" 'cpu_share'='10', " +
|
||||
" 'memory_limit'='3%', " +
|
||||
" 'enable_memory_overcommit'='true', " +
|
||||
" 'cpu_hard_limit'='120%' " +
|
||||
");"
|
||||
|
||||
exception "a positive integer between 1 and 100"
|
||||
}
|
||||
|
||||
test {
|
||||
sql "create workload group if not exists test_group2 " +
|
||||
"properties ( " +
|
||||
" 'cpu_share'='10', " +
|
||||
" 'memory_limit'='3%', " +
|
||||
" 'enable_memory_overcommit'='true', " +
|
||||
" 'cpu_hard_limit'='99%' " +
|
||||
");"
|
||||
|
||||
exception "can not be greater than 100%"
|
||||
}
|
||||
|
||||
// test show workload groups
|
||||
qt_select_tvf_1 "select name,cpu_share,memory_limit,enable_memory_overcommit,max_concurrency,max_queue_size,queue_timeout,cpu_hard_limit,scan_thread_num from information_schema.workload_groups where name in ('normal','test_group') order by name;"
|
||||
|
||||
|
||||
Reference in New Issue
Block a user