[Feature](executor)Add scan_thread_num property for workload group (#31106)
This commit is contained in:
@ -61,12 +61,14 @@ public class WorkloadGroup implements Writable, GsonPostProcessable {
|
||||
|
||||
public static final String QUEUE_TIMEOUT = "queue_timeout";
|
||||
|
||||
public static final String SCAN_THREAD_NUM = "scan_thread_num";
|
||||
|
||||
// NOTE(wb): all property is not required, some properties default value is set in be
|
||||
// default value is as followed
|
||||
// cpu_share=1024, memory_limit=0%(0 means not limit), enable_memory_overcommit=true
|
||||
private static final ImmutableSet<String> ALL_PROPERTIES_NAME = new ImmutableSet.Builder<String>()
|
||||
.add(CPU_SHARE).add(MEMORY_LIMIT).add(ENABLE_MEMORY_OVERCOMMIT).add(MAX_CONCURRENCY)
|
||||
.add(MAX_QUEUE_SIZE).add(QUEUE_TIMEOUT).add(CPU_HARD_LIMIT).build();
|
||||
.add(MAX_QUEUE_SIZE).add(QUEUE_TIMEOUT).add(CPU_HARD_LIMIT).add(SCAN_THREAD_NUM).build();
|
||||
|
||||
@SerializedName(value = "id")
|
||||
private long id;
|
||||
@ -210,6 +212,19 @@ public class WorkloadGroup implements Writable, GsonPostProcessable {
|
||||
}
|
||||
}
|
||||
|
||||
if (properties.containsKey(SCAN_THREAD_NUM)) {
|
||||
String value = properties.get(SCAN_THREAD_NUM);
|
||||
try {
|
||||
int intValue = Integer.parseInt(value);
|
||||
if (intValue <= 0 && intValue != -1) {
|
||||
throw new NumberFormatException();
|
||||
}
|
||||
} catch (NumberFormatException e) {
|
||||
throw new DdlException(
|
||||
SCAN_THREAD_NUM + " must be a positive integer or -1. but input value is " + value);
|
||||
}
|
||||
}
|
||||
|
||||
// check queue property
|
||||
if (properties.containsKey(MAX_CONCURRENCY)) {
|
||||
try {
|
||||
@ -292,6 +307,8 @@ public class WorkloadGroup implements Writable, GsonPostProcessable {
|
||||
row.add("0%");
|
||||
} else if (ENABLE_MEMORY_OVERCOMMIT.equals(key) && !properties.containsKey(key)) {
|
||||
row.add("true");
|
||||
} else if (SCAN_THREAD_NUM.equals(key) && !properties.containsKey(key)) {
|
||||
row.add("-1");
|
||||
} else {
|
||||
row.add(properties.get(key));
|
||||
}
|
||||
@ -347,6 +364,11 @@ public class WorkloadGroup implements Writable, GsonPostProcessable {
|
||||
+ "id=" + id + ",name=" + name);
|
||||
}
|
||||
|
||||
String scanThreadNumStr = properties.get(SCAN_THREAD_NUM);
|
||||
if (scanThreadNumStr != null) {
|
||||
tWorkloadGroupInfo.setScanThreadNum(Integer.parseInt(scanThreadNumStr));
|
||||
}
|
||||
|
||||
TopicInfo topicInfo = new TopicInfo();
|
||||
topicInfo.setWorkloadGroupInfo(tWorkloadGroupInfo);
|
||||
return topicInfo;
|
||||
|
||||
@ -70,6 +70,7 @@ public class WorkloadGroupMgr implements Writable, GsonPostProcessable {
|
||||
.add(WorkloadGroup.ENABLE_MEMORY_OVERCOMMIT)
|
||||
.add(WorkloadGroup.MAX_CONCURRENCY).add(WorkloadGroup.MAX_QUEUE_SIZE)
|
||||
.add(WorkloadGroup.QUEUE_TIMEOUT).add(WorkloadGroup.CPU_HARD_LIMIT)
|
||||
.add(WorkloadGroup.SCAN_THREAD_NUM)
|
||||
.add(QueryQueue.RUNNING_QUERY_NUM).add(QueryQueue.WAITING_QUERY_NUM)
|
||||
.build();
|
||||
|
||||
|
||||
@ -383,8 +383,9 @@ public class MetadataGenerator {
|
||||
trow.addToColumnValue(new TCell().setLongVal(Long.valueOf(rGroupsInfo.get(6)))); // max queue size
|
||||
trow.addToColumnValue(new TCell().setLongVal(Long.valueOf(rGroupsInfo.get(7)))); // queue timeout
|
||||
trow.addToColumnValue(new TCell().setStringVal(rGroupsInfo.get(8))); // cpu hard limit
|
||||
trow.addToColumnValue(new TCell().setLongVal(Long.valueOf(rGroupsInfo.get(9)))); // running query num
|
||||
trow.addToColumnValue(new TCell().setLongVal(Long.valueOf(rGroupsInfo.get(10)))); // waiting query num
|
||||
trow.addToColumnValue(new TCell().setIntVal(Integer.parseInt(rGroupsInfo.get(9)))); // scan thread num
|
||||
trow.addToColumnValue(new TCell().setLongVal(Long.valueOf(rGroupsInfo.get(10)))); // running query num
|
||||
trow.addToColumnValue(new TCell().setLongVal(Long.valueOf(rGroupsInfo.get(11)))); // waiting query num
|
||||
dataBatch.add(trow);
|
||||
}
|
||||
|
||||
|
||||
@ -49,6 +49,7 @@ public class WorkloadGroupsTableValuedFunction extends MetadataTableValuedFuncti
|
||||
new Column(WorkloadGroup.MAX_QUEUE_SIZE, ScalarType.createType(PrimitiveType.BIGINT)),
|
||||
new Column(WorkloadGroup.QUEUE_TIMEOUT, ScalarType.createType(PrimitiveType.BIGINT)),
|
||||
new Column(WorkloadGroup.CPU_HARD_LIMIT, ScalarType.createStringType()),
|
||||
new Column(WorkloadGroup.SCAN_THREAD_NUM, ScalarType.createType(PrimitiveType.INT)),
|
||||
new Column(QueryQueue.RUNNING_QUERY_NUM, ScalarType.createType(PrimitiveType.BIGINT)),
|
||||
new Column(QueryQueue.WAITING_QUERY_NUM, ScalarType.createType(PrimitiveType.BIGINT)));
|
||||
|
||||
|
||||
Reference in New Issue
Block a user