[improvement](executor)Using cgroup to implement cpu hard limit (#25489)

* Using cgroup to implement cpu hard limit

* code style
This commit is contained in:
wangbo
2023-10-19 18:56:26 +08:00
committed by GitHub
parent 4d2e7d7c86
commit 54780c62e0
19 changed files with 297 additions and 131 deletions

View File

@ -72,11 +72,6 @@ public class WorkloadGroupMgr implements Writable, GsonPostProcessable {
private final ResourceProcNode procNode = new ResourceProcNode();
private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
public static final String QUERY_CPU_HARD_LIMIT = "query_cpu_hard_limit";
private int queryCPUHardLimit = 0;
// works when user not set cpu hard limit, we fill a default value
private int cpuHardLimitDefaultVal = 0;
public WorkloadGroupMgr() {
}
@ -124,19 +119,13 @@ public class WorkloadGroupMgr implements Writable, GsonPostProcessable {
throw new UserException("Workload group " + groupName + " does not exist");
}
workloadGroups.add(workloadGroup.toThrift());
// note(wb) -1 to tell be no need to update cgroup
int thriftVal = -1;
if (Config.enable_cpu_hard_limit) {
// reset cpu_share according to cpu hard limit
int cpuHardLimitShare = workloadGroup.getCpuHardLimit() == 0
? this.cpuHardLimitDefaultVal : workloadGroup.getCpuHardLimit();
workloadGroups.get(0).getProperties()
.put(WorkloadGroup.CPU_SHARE, String.valueOf(cpuHardLimitShare));
// reset sum of all groups cpu hard limit
thriftVal = this.queryCPUHardLimit;
// note(wb) -1 to tell be no need to not use cpu hard limit
int cpuHardLimitThriftVal = -1;
if (Config.enable_cpu_hard_limit && workloadGroup.getCpuHardLimit() > 0) {
cpuHardLimitThriftVal = workloadGroup.getCpuHardLimit();
}
workloadGroups.get(0).getProperties().put(QUERY_CPU_HARD_LIMIT, String.valueOf(thriftVal));
workloadGroups.get(0).getProperties().put(WorkloadGroup.CPU_HARD_LIMIT,
String.valueOf(cpuHardLimitThriftVal));
context.setWorkloadGroupName(groupName);
} finally {
readUnlock();
@ -213,7 +202,6 @@ public class WorkloadGroupMgr implements Writable, GsonPostProcessable {
checkGlobalUnlock(workloadGroup, null);
nameToWorkloadGroup.put(workloadGroupName, workloadGroup);
idToWorkloadGroup.put(workloadGroup.getId(), workloadGroup);
calQueryCPUHardLimit();
Env.getCurrentEnv().getEditLog().logCreateWorkloadGroup(workloadGroup);
} finally {
writeUnlock();
@ -240,44 +228,20 @@ public class WorkloadGroupMgr implements Writable, GsonPostProcessable {
+ " value can not be greater than 100% or less than or equal 0%");
}
// 2, calculate new query hard cpu limit
int tmpCpuHardLimit = 0;
int zeroCpuHardLimitCount = 0;
// 2, check sum of all cpu hard limit
int sumOfAllCpuHardLimit = 0;
for (Map.Entry<Long, WorkloadGroup> entry : idToWorkloadGroup.entrySet()) {
if (old != null && entry.getKey() == old.getId()) {
continue;
}
int cpuHardLimit = entry.getValue().getCpuHardLimit();
if (cpuHardLimit == 0) {
zeroCpuHardLimitCount++;
}
tmpCpuHardLimit += cpuHardLimit;
sumOfAllCpuHardLimit += entry.getValue().getCpuHardLimit();
}
if (newGroupCpuHardLimit == 0) {
zeroCpuHardLimitCount++;
}
tmpCpuHardLimit += newGroupCpuHardLimit;
sumOfAllCpuHardLimit += newGroupCpuHardLimit;
if (tmpCpuHardLimit > 100) {
if (sumOfAllCpuHardLimit > 100) {
throw new DdlException("sum of all workload group " + WorkloadGroup.CPU_HARD_LIMIT
+ " can not be greater than 100% ");
}
if (tmpCpuHardLimit == 100 && zeroCpuHardLimitCount > 0) {
throw new DdlException("some workload group may not be assigned "
+ "cpu hard limit but all query cpu hard limit exceeds 100%");
}
int leftCpuHardLimitVal = 100 - tmpCpuHardLimit;
if (zeroCpuHardLimitCount != 0) {
int tmpCpuHardLimitDefaultVal = leftCpuHardLimitVal / zeroCpuHardLimitCount;
if (tmpCpuHardLimitDefaultVal == 0) {
throw new DdlException("remaining cpu can not be assigned to the "
+ "workload group without cpu hard limit value; "
+ leftCpuHardLimitVal + "%," + newGroupCpuHardLimit
+ "%," + zeroCpuHardLimitCount);
}
}
}
public void alterWorkloadGroup(AlterWorkloadGroupStmt stmt) throws DdlException {
@ -296,7 +260,6 @@ public class WorkloadGroupMgr implements Writable, GsonPostProcessable {
checkGlobalUnlock(newWorkloadGroup, workloadGroup);
nameToWorkloadGroup.put(workloadGroupName, newWorkloadGroup);
idToWorkloadGroup.put(newWorkloadGroup.getId(), newWorkloadGroup);
calQueryCPUHardLimit();
Env.getCurrentEnv().getEditLog().logAlterWorkloadGroup(newWorkloadGroup);
} finally {
writeUnlock();
@ -331,7 +294,6 @@ public class WorkloadGroupMgr implements Writable, GsonPostProcessable {
long groupId = workloadGroup.getId();
idToWorkloadGroup.remove(groupId);
nameToWorkloadGroup.remove(workloadGroupName);
calQueryCPUHardLimit();
Env.getCurrentEnv().getEditLog().logDropWorkloadGroup(new DropWorkloadGroupOperationLog(groupId));
} finally {
writeUnlock();
@ -344,7 +306,6 @@ public class WorkloadGroupMgr implements Writable, GsonPostProcessable {
try {
nameToWorkloadGroup.put(workloadGroup.getName(), workloadGroup);
idToWorkloadGroup.put(workloadGroup.getId(), workloadGroup);
calQueryCPUHardLimit();
} finally {
writeUnlock();
}
@ -377,7 +338,6 @@ public class WorkloadGroupMgr implements Writable, GsonPostProcessable {
WorkloadGroup workloadGroup = idToWorkloadGroup.get(id);
nameToWorkloadGroup.remove(workloadGroup.getName());
idToWorkloadGroup.remove(id);
calQueryCPUHardLimit();
} finally {
writeUnlock();
}
@ -403,21 +363,6 @@ public class WorkloadGroupMgr implements Writable, GsonPostProcessable {
return idToWorkloadGroup;
}
private void calQueryCPUHardLimit() {
int zeroCpuHardLimitCount = 0;
int ret = 0;
for (Map.Entry<Long, WorkloadGroup> entry : idToWorkloadGroup.entrySet()) {
if (entry.getValue().getCpuHardLimit() == 0) {
zeroCpuHardLimitCount++;
}
ret += entry.getValue().getCpuHardLimit();
}
this.queryCPUHardLimit = ret;
if (zeroCpuHardLimitCount != 0) {
this.cpuHardLimitDefaultVal = (100 - this.queryCPUHardLimit) / zeroCpuHardLimitCount;
}
}
@Override
public void write(DataOutput out) throws IOException {
String json = GsonUtils.GSON.toJson(this);
@ -428,7 +373,6 @@ public class WorkloadGroupMgr implements Writable, GsonPostProcessable {
public void gsonPostProcess() throws IOException {
idToWorkloadGroup.forEach(
(id, workloadGroup) -> nameToWorkloadGroup.put(workloadGroup.getName(), workloadGroup));
calQueryCPUHardLimit();
}
public class ResourceProcNode {