[feature](executor)Make workload group property not required (#27229)

* Make workload group property not required

* remove useless  UT
This commit is contained in:
wangbo
2023-11-19 17:01:51 +08:00
committed by GitHub
parent ab16d0f473
commit 70e070182f
5 changed files with 51 additions and 64 deletions

View File

@ -39,10 +39,10 @@
namespace doris {
namespace taskgroup {
const static std::string CPU_SHARE = "cpu_share";
const static std::string MEMORY_LIMIT = "memory_limit";
const static std::string ENABLE_MEMORY_OVERCOMMIT = "enable_memory_overcommit";
const static std::string CPU_HARD_LIMIT = "cpu_hard_limit";
const static uint64_t CPU_SHARE_DEFAULT_VALUE = 1024;
const static std::string MEMORY_LIMIT_DEFAULT_VALUE = "0%";
const static bool ENABLE_MEMORY_OVERCOMMIT_DEFAULT_VALUE = true;
const static int CPU_HARD_LIMIT_DEFAULT_VALUE = -1;
template <typename QueueType>
TaskGroupEntity<QueueType>::TaskGroupEntity(taskgroup::TaskGroup* tg, std::string type)
@ -215,40 +215,31 @@ Status TaskGroupInfo::parse_topic_info(const TWorkloadGroupInfo& workload_group_
task_group_info->version = version;
// 4 cpu_share
uint64_t cpu_share = 1024;
uint64_t cpu_share = CPU_SHARE_DEFAULT_VALUE;
if (workload_group_info.__isset.cpu_share) {
cpu_share = workload_group_info.cpu_share;
}
task_group_info->cpu_share = cpu_share;
// 5 cpu hard limit
int cpu_hard_limit = -1;
int cpu_hard_limit = CPU_HARD_LIMIT_DEFAULT_VALUE;
if (workload_group_info.__isset.cpu_hard_limit) {
cpu_hard_limit = workload_group_info.cpu_hard_limit;
}
task_group_info->cpu_hard_limit = cpu_hard_limit;
// 6 mem_limit
bool is_percent = true;
std::string mem_limit_str;
std::string mem_limit_str = MEMORY_LIMIT_DEFAULT_VALUE;
if (workload_group_info.__isset.mem_limit) {
mem_limit_str = workload_group_info.mem_limit;
} else {
return Status::InternalError<false>("workload group mem_limit is required");
}
bool is_percent = true;
int64_t mem_limit =
ParseUtil::parse_mem_spec(mem_limit_str, -1, MemInfo::mem_limit(), &is_percent);
if (UNLIKELY(mem_limit <= 0)) {
std::stringstream ss;
ss << "parse memory limit error, " << MEMORY_LIMIT << ": " << mem_limit_str;
LOG(WARNING) << ss.str();
return Status::InternalError<false>("invalid value for {}, val={}", MEMORY_LIMIT,
mem_limit);
}
task_group_info->memory_limit = mem_limit;
// 7 mem overcommit
bool enable_memory_overcommit = true;
bool enable_memory_overcommit = ENABLE_MEMORY_OVERCOMMIT_DEFAULT_VALUE;
if (workload_group_info.__isset.enable_memory_overcommit) {
enable_memory_overcommit = workload_group_info.enable_memory_overcommit;
}
@ -264,12 +255,5 @@ Status TaskGroupInfo::parse_topic_info(const TWorkloadGroupInfo& workload_group_
return Status::OK();
}
bool TaskGroupInfo::check_group_info(const TPipelineWorkloadGroup& resource_group) {
return resource_group.__isset.id && resource_group.__isset.version &&
resource_group.__isset.name && resource_group.__isset.properties &&
resource_group.properties.count(CPU_SHARE) > 0 &&
resource_group.properties.count(MEMORY_LIMIT) > 0;
}
} // namespace taskgroup
} // namespace doris

View File

@ -139,6 +139,13 @@ public:
return _mem_tracker_limiter_pool;
}
// when mem_limit <=0 , it's an invalid value, then current group not participating in memory GC
// because mem_limit is not a required property
bool is_mem_limit_valid() {
std::shared_lock<std::shared_mutex> r_lock(_mutex);
return _memory_limit > 0;
}
private:
mutable std::shared_mutex _mutex; // lock _name, _version, _cpu_share, _memory_limit
const uint64_t _id;
@ -170,9 +177,6 @@ struct TaskGroupInfo {
static Status parse_topic_info(const TWorkloadGroupInfo& topic_info,
taskgroup::TaskGroupInfo* task_group_info);
private:
static bool check_group_info(const TPipelineWorkloadGroup& resource_group);
};
} // namespace taskgroup

View File

@ -237,7 +237,7 @@ int64_t MemInfo::tg_not_enable_overcommit_group_gc() {
ExecEnv::GetInstance()->task_group_manager()->get_resource_groups(
[](const taskgroup::TaskGroupPtr& task_group) {
return !task_group->enable_memory_overcommit();
return task_group->is_mem_limit_valid() && !task_group->enable_memory_overcommit();
},
&task_groups);
if (task_groups.empty()) {
@ -279,7 +279,7 @@ int64_t MemInfo::tg_enable_overcommit_group_gc(int64_t request_free_memory,
std::vector<taskgroup::TaskGroupPtr> task_groups;
ExecEnv::GetInstance()->task_group_manager()->get_resource_groups(
[](const taskgroup::TaskGroupPtr& task_group) {
return task_group->enable_memory_overcommit();
return task_group->is_mem_limit_valid() && task_group->enable_memory_overcommit();
},
&task_groups);
if (task_groups.empty()) {

View File

@ -61,9 +61,9 @@ public class WorkloadGroup implements Writable, GsonPostProcessable {
public static final String QUEUE_TIMEOUT = "queue_timeout";
private static final ImmutableSet<String> REQUIRED_PROPERTIES_NAME = new ImmutableSet.Builder<String>().add(
CPU_SHARE).add(MEMORY_LIMIT).build();
// NOTE(wb): all property is not required, some properties default value is set in be
// default value is as followed
// cpu_share=1024, memory_limit=0%(0 means not limit), enable_memory_overcommit=true
private static final ImmutableSet<String> ALL_PROPERTIES_NAME = new ImmutableSet.Builder<String>()
.add(CPU_SHARE).add(MEMORY_LIMIT).add(ENABLE_MEMORY_OVERCOMMIT).add(MAX_CONCURRENCY)
.add(MAX_QUEUE_SIZE).add(QUEUE_TIMEOUT).add(CPU_HARD_LIMIT).build();
@ -81,7 +81,7 @@ public class WorkloadGroup implements Writable, GsonPostProcessable {
@SerializedName(value = "version")
private long version;
private double memoryLimitPercent;
private double memoryLimitPercent = 0;
private QueryQueue queryQueue;
private int maxConcurrency = Integer.MAX_VALUE;
@ -99,8 +99,11 @@ public class WorkloadGroup implements Writable, GsonPostProcessable {
this.name = name;
this.properties = properties;
this.version = version;
String memoryLimitString = properties.get(MEMORY_LIMIT);
this.memoryLimitPercent = Double.parseDouble(memoryLimitString.substring(0, memoryLimitString.length() - 1));
if (properties.containsKey(MEMORY_LIMIT)) {
String memoryLimitString = properties.get(MEMORY_LIMIT);
this.memoryLimitPercent = Double.parseDouble(
memoryLimitString.substring(0, memoryLimitString.length() - 1));
}
if (properties.containsKey(ENABLE_MEMORY_OVERCOMMIT)) {
properties.put(ENABLE_MEMORY_OVERCOMMIT, properties.get(ENABLE_MEMORY_OVERCOMMIT).toLowerCase());
}
@ -187,15 +190,12 @@ public class WorkloadGroup implements Writable, GsonPostProcessable {
throw new DdlException("Property " + propertyName + " is not supported.");
}
}
for (String propertyName : REQUIRED_PROPERTIES_NAME) {
if (!properties.containsKey(propertyName)) {
throw new DdlException("Property " + propertyName + " is required.");
}
}
String cpuSchedulingWeight = properties.get(CPU_SHARE);
if (!StringUtils.isNumeric(cpuSchedulingWeight) || Long.parseLong(cpuSchedulingWeight) <= 0) {
throw new DdlException(CPU_SHARE + " " + cpuSchedulingWeight + " requires a positive integer.");
if (properties.containsKey(CPU_SHARE)) {
String cpuShare = properties.get(CPU_SHARE);
if (!StringUtils.isNumeric(cpuShare) || Long.parseLong(cpuShare) <= 0) {
throw new DdlException(CPU_SHARE + " " + cpuShare + " requires a positive integer.");
}
}
if (properties.containsKey(CPU_HARD_LIMIT)) {
@ -208,18 +208,20 @@ public class WorkloadGroup implements Writable, GsonPostProcessable {
}
}
String memoryLimit = properties.get(MEMORY_LIMIT);
if (!memoryLimit.endsWith("%")) {
throw new DdlException(MEMORY_LIMIT + " " + memoryLimit + " requires a percentage and ends with a '%'");
}
String memLimitErr = MEMORY_LIMIT + " " + memoryLimit + " requires a positive floating point number.";
try {
if (Double.parseDouble(memoryLimit.substring(0, memoryLimit.length() - 1)) <= 0) {
if (properties.containsKey(MEMORY_LIMIT)) {
String memoryLimit = properties.get(MEMORY_LIMIT);
if (!memoryLimit.endsWith("%")) {
throw new DdlException(MEMORY_LIMIT + " " + memoryLimit + " requires a percentage and ends with a '%'");
}
String memLimitErr = MEMORY_LIMIT + " " + memoryLimit + " requires a positive floating point number.";
try {
if (Double.parseDouble(memoryLimit.substring(0, memoryLimit.length() - 1)) <= 0) {
throw new DdlException(memLimitErr);
}
} catch (NumberFormatException e) {
LOG.debug(memLimitErr, e);
throw new DdlException(memLimitErr);
}
} catch (NumberFormatException e) {
LOG.debug(memLimitErr, e);
throw new DdlException(memLimitErr);
}
if (properties.containsKey(ENABLE_MEMORY_OVERCOMMIT)) {
@ -293,6 +295,12 @@ public class WorkloadGroup implements Writable, GsonPostProcessable {
} else {
row.add(val + "%");
}
} else if (CPU_SHARE.equals(key) && !properties.containsKey(key)) {
row.add("1024");
} else if (MEMORY_LIMIT.equals(key) && !properties.containsKey(key)) {
row.add("0%");
} else if (ENABLE_MEMORY_OVERCOMMIT.equals(key) && !properties.containsKey(key)) {
row.add("true");
} else {
row.add(properties.get(key));
}
@ -370,10 +378,8 @@ public class WorkloadGroup implements Writable, GsonPostProcessable {
String memoryLimitString = properties.get(MEMORY_LIMIT);
this.memoryLimitPercent = Double.parseDouble(memoryLimitString.substring(0,
memoryLimitString.length() - 1));
} else {
this.memoryLimitPercent = 100;
this.properties.put(MEMORY_LIMIT, "100%");
}
if (properties.containsKey(CPU_HARD_LIMIT)) {
this.cpuHardLimit = Integer.parseInt(properties.get(CPU_HARD_LIMIT));
}

View File

@ -52,13 +52,6 @@ public class WorkloadGroupTest {
WorkloadGroup.create(name1, properties1);
}
@Test(expected = DdlException.class)
public void testRequiredProperty() throws DdlException {
Map<String, String> properties1 = Maps.newHashMap();
String name1 = "g1";
WorkloadGroup.create(name1, properties1);
}
@Test
public void testCpuShareValue() {
Map<String, String> properties1 = Maps.newHashMap();