diff --git a/be/src/runtime/task_group/task_group.cpp b/be/src/runtime/task_group/task_group.cpp index 5878e38ed3..137f5ea234 100644 --- a/be/src/runtime/task_group/task_group.cpp +++ b/be/src/runtime/task_group/task_group.cpp @@ -39,10 +39,10 @@ namespace doris { namespace taskgroup { -const static std::string CPU_SHARE = "cpu_share"; -const static std::string MEMORY_LIMIT = "memory_limit"; -const static std::string ENABLE_MEMORY_OVERCOMMIT = "enable_memory_overcommit"; -const static std::string CPU_HARD_LIMIT = "cpu_hard_limit"; +const static uint64_t CPU_SHARE_DEFAULT_VALUE = 1024; +const static std::string MEMORY_LIMIT_DEFAULT_VALUE = "0%"; +const static bool ENABLE_MEMORY_OVERCOMMIT_DEFAULT_VALUE = true; +const static int CPU_HARD_LIMIT_DEFAULT_VALUE = -1; template TaskGroupEntity::TaskGroupEntity(taskgroup::TaskGroup* tg, std::string type) @@ -215,40 +215,31 @@ Status TaskGroupInfo::parse_topic_info(const TWorkloadGroupInfo& workload_group_ task_group_info->version = version; // 4 cpu_share - uint64_t cpu_share = 1024; + uint64_t cpu_share = CPU_SHARE_DEFAULT_VALUE; if (workload_group_info.__isset.cpu_share) { cpu_share = workload_group_info.cpu_share; } task_group_info->cpu_share = cpu_share; // 5 cpu hard limit - int cpu_hard_limit = -1; + int cpu_hard_limit = CPU_HARD_LIMIT_DEFAULT_VALUE; if (workload_group_info.__isset.cpu_hard_limit) { cpu_hard_limit = workload_group_info.cpu_hard_limit; } task_group_info->cpu_hard_limit = cpu_hard_limit; // 6 mem_limit - bool is_percent = true; - std::string mem_limit_str; + std::string mem_limit_str = MEMORY_LIMIT_DEFAULT_VALUE; if (workload_group_info.__isset.mem_limit) { mem_limit_str = workload_group_info.mem_limit; - } else { - return Status::InternalError("workload group mem_limit is required"); } + bool is_percent = true; int64_t mem_limit = ParseUtil::parse_mem_spec(mem_limit_str, -1, MemInfo::mem_limit(), &is_percent); - if (UNLIKELY(mem_limit <= 0)) { - std::stringstream ss; - ss << "parse memory limit error, " << MEMORY_LIMIT << ": " << mem_limit_str; - LOG(WARNING) << ss.str(); - return Status::InternalError("invalid value for {}, val={}", MEMORY_LIMIT, - mem_limit); - } task_group_info->memory_limit = mem_limit; // 7 mem overcommit - bool enable_memory_overcommit = true; + bool enable_memory_overcommit = ENABLE_MEMORY_OVERCOMMIT_DEFAULT_VALUE; if (workload_group_info.__isset.enable_memory_overcommit) { enable_memory_overcommit = workload_group_info.enable_memory_overcommit; } @@ -264,12 +255,5 @@ Status TaskGroupInfo::parse_topic_info(const TWorkloadGroupInfo& workload_group_ return Status::OK(); } -bool TaskGroupInfo::check_group_info(const TPipelineWorkloadGroup& resource_group) { - return resource_group.__isset.id && resource_group.__isset.version && - resource_group.__isset.name && resource_group.__isset.properties && - resource_group.properties.count(CPU_SHARE) > 0 && - resource_group.properties.count(MEMORY_LIMIT) > 0; -} - } // namespace taskgroup } // namespace doris diff --git a/be/src/runtime/task_group/task_group.h b/be/src/runtime/task_group/task_group.h index b3daf58257..41a5e9a162 100644 --- a/be/src/runtime/task_group/task_group.h +++ b/be/src/runtime/task_group/task_group.h @@ -139,6 +139,13 @@ public: return _mem_tracker_limiter_pool; } + // when mem_limit <=0 , it's an invalid value, then current group not participating in memory GC + // because mem_limit is not a required property + bool is_mem_limit_valid() { + std::shared_lock r_lock(_mutex); + return _memory_limit > 0; + } + private: mutable std::shared_mutex _mutex; // lock _name, _version, _cpu_share, _memory_limit const uint64_t _id; @@ -170,9 +177,6 @@ struct TaskGroupInfo { static Status parse_topic_info(const TWorkloadGroupInfo& topic_info, taskgroup::TaskGroupInfo* task_group_info); - -private: - static bool check_group_info(const TPipelineWorkloadGroup& resource_group); }; } // namespace taskgroup diff --git a/be/src/util/mem_info.cpp b/be/src/util/mem_info.cpp index 38f9039816..6c24caaf63 100644 --- a/be/src/util/mem_info.cpp +++ b/be/src/util/mem_info.cpp @@ -237,7 +237,7 @@ int64_t MemInfo::tg_not_enable_overcommit_group_gc() { ExecEnv::GetInstance()->task_group_manager()->get_resource_groups( [](const taskgroup::TaskGroupPtr& task_group) { - return !task_group->enable_memory_overcommit(); + return task_group->is_mem_limit_valid() && !task_group->enable_memory_overcommit(); }, &task_groups); if (task_groups.empty()) { @@ -279,7 +279,7 @@ int64_t MemInfo::tg_enable_overcommit_group_gc(int64_t request_free_memory, std::vector task_groups; ExecEnv::GetInstance()->task_group_manager()->get_resource_groups( [](const taskgroup::TaskGroupPtr& task_group) { - return task_group->enable_memory_overcommit(); + return task_group->is_mem_limit_valid() && task_group->enable_memory_overcommit(); }, &task_groups); if (task_groups.empty()) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadgroup/WorkloadGroup.java b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadgroup/WorkloadGroup.java index 53faa1a913..a2e5c6005c 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadgroup/WorkloadGroup.java +++ b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadgroup/WorkloadGroup.java @@ -61,9 +61,9 @@ public class WorkloadGroup implements Writable, GsonPostProcessable { public static final String QUEUE_TIMEOUT = "queue_timeout"; - private static final ImmutableSet REQUIRED_PROPERTIES_NAME = new ImmutableSet.Builder().add( - CPU_SHARE).add(MEMORY_LIMIT).build(); - + // NOTE(wb): all property is not required, some properties default value is set in be + // default value is as followed + // cpu_share=1024, memory_limit=0%(0 means not limit), enable_memory_overcommit=true private static final ImmutableSet ALL_PROPERTIES_NAME = new ImmutableSet.Builder() .add(CPU_SHARE).add(MEMORY_LIMIT).add(ENABLE_MEMORY_OVERCOMMIT).add(MAX_CONCURRENCY) .add(MAX_QUEUE_SIZE).add(QUEUE_TIMEOUT).add(CPU_HARD_LIMIT).build(); @@ -81,7 +81,7 @@ public class WorkloadGroup implements Writable, GsonPostProcessable { @SerializedName(value = "version") private long version; - private double memoryLimitPercent; + private double memoryLimitPercent = 0; private QueryQueue queryQueue; private int maxConcurrency = Integer.MAX_VALUE; @@ -99,8 +99,11 @@ public class WorkloadGroup implements Writable, GsonPostProcessable { this.name = name; this.properties = properties; this.version = version; - String memoryLimitString = properties.get(MEMORY_LIMIT); - this.memoryLimitPercent = Double.parseDouble(memoryLimitString.substring(0, memoryLimitString.length() - 1)); + if (properties.containsKey(MEMORY_LIMIT)) { + String memoryLimitString = properties.get(MEMORY_LIMIT); + this.memoryLimitPercent = Double.parseDouble( + memoryLimitString.substring(0, memoryLimitString.length() - 1)); + } if (properties.containsKey(ENABLE_MEMORY_OVERCOMMIT)) { properties.put(ENABLE_MEMORY_OVERCOMMIT, properties.get(ENABLE_MEMORY_OVERCOMMIT).toLowerCase()); } @@ -187,15 +190,12 @@ public class WorkloadGroup implements Writable, GsonPostProcessable { throw new DdlException("Property " + propertyName + " is not supported."); } } - for (String propertyName : REQUIRED_PROPERTIES_NAME) { - if (!properties.containsKey(propertyName)) { - throw new DdlException("Property " + propertyName + " is required."); - } - } - String cpuSchedulingWeight = properties.get(CPU_SHARE); - if (!StringUtils.isNumeric(cpuSchedulingWeight) || Long.parseLong(cpuSchedulingWeight) <= 0) { - throw new DdlException(CPU_SHARE + " " + cpuSchedulingWeight + " requires a positive integer."); + if (properties.containsKey(CPU_SHARE)) { + String cpuShare = properties.get(CPU_SHARE); + if (!StringUtils.isNumeric(cpuShare) || Long.parseLong(cpuShare) <= 0) { + throw new DdlException(CPU_SHARE + " " + cpuShare + " requires a positive integer."); + } } if (properties.containsKey(CPU_HARD_LIMIT)) { @@ -208,18 +208,20 @@ public class WorkloadGroup implements Writable, GsonPostProcessable { } } - String memoryLimit = properties.get(MEMORY_LIMIT); - if (!memoryLimit.endsWith("%")) { - throw new DdlException(MEMORY_LIMIT + " " + memoryLimit + " requires a percentage and ends with a '%'"); - } - String memLimitErr = MEMORY_LIMIT + " " + memoryLimit + " requires a positive floating point number."; - try { - if (Double.parseDouble(memoryLimit.substring(0, memoryLimit.length() - 1)) <= 0) { + if (properties.containsKey(MEMORY_LIMIT)) { + String memoryLimit = properties.get(MEMORY_LIMIT); + if (!memoryLimit.endsWith("%")) { + throw new DdlException(MEMORY_LIMIT + " " + memoryLimit + " requires a percentage and ends with a '%'"); + } + String memLimitErr = MEMORY_LIMIT + " " + memoryLimit + " requires a positive floating point number."; + try { + if (Double.parseDouble(memoryLimit.substring(0, memoryLimit.length() - 1)) <= 0) { + throw new DdlException(memLimitErr); + } + } catch (NumberFormatException e) { + LOG.debug(memLimitErr, e); throw new DdlException(memLimitErr); } - } catch (NumberFormatException e) { - LOG.debug(memLimitErr, e); - throw new DdlException(memLimitErr); } if (properties.containsKey(ENABLE_MEMORY_OVERCOMMIT)) { @@ -293,6 +295,12 @@ public class WorkloadGroup implements Writable, GsonPostProcessable { } else { row.add(val + "%"); } + } else if (CPU_SHARE.equals(key) && !properties.containsKey(key)) { + row.add("1024"); + } else if (MEMORY_LIMIT.equals(key) && !properties.containsKey(key)) { + row.add("0%"); + } else if (ENABLE_MEMORY_OVERCOMMIT.equals(key) && !properties.containsKey(key)) { + row.add("true"); } else { row.add(properties.get(key)); } @@ -370,10 +378,8 @@ public class WorkloadGroup implements Writable, GsonPostProcessable { String memoryLimitString = properties.get(MEMORY_LIMIT); this.memoryLimitPercent = Double.parseDouble(memoryLimitString.substring(0, memoryLimitString.length() - 1)); - } else { - this.memoryLimitPercent = 100; - this.properties.put(MEMORY_LIMIT, "100%"); } + if (properties.containsKey(CPU_HARD_LIMIT)) { this.cpuHardLimit = Integer.parseInt(properties.get(CPU_HARD_LIMIT)); } diff --git a/fe/fe-core/src/test/java/org/apache/doris/resource/workloadgroup/WorkloadGroupTest.java b/fe/fe-core/src/test/java/org/apache/doris/resource/workloadgroup/WorkloadGroupTest.java index 8831380cf9..1acb8d1afa 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/resource/workloadgroup/WorkloadGroupTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/resource/workloadgroup/WorkloadGroupTest.java @@ -52,13 +52,6 @@ public class WorkloadGroupTest { WorkloadGroup.create(name1, properties1); } - @Test(expected = DdlException.class) - public void testRequiredProperty() throws DdlException { - Map properties1 = Maps.newHashMap(); - String name1 = "g1"; - WorkloadGroup.create(name1, properties1); - } - @Test public void testCpuShareValue() { Map properties1 = Maps.newHashMap();