[Improvement](executor)add remote scan thread pool (#31376)

* add remote scan thread pool

* +1
This commit is contained in:
wangbo
2024-02-26 19:01:46 +08:00
committed by yiguolei
parent 1127b0065a
commit c34639245e
19 changed files with 322 additions and 218 deletions

View File

@ -63,12 +63,17 @@ public class WorkloadGroup implements Writable, GsonPostProcessable {
public static final String SCAN_THREAD_NUM = "scan_thread_num";
public static final String MAX_REMOTE_SCAN_THREAD_NUM = "max_remote_scan_thread_num";
public static final String MIN_REMOTE_SCAN_THREAD_NUM = "min_remote_scan_thread_num";
// NOTE(wb): all property is not required, some properties default value is set in be
// default value is as followed
// cpu_share=1024, memory_limit=0%(0 means not limit), enable_memory_overcommit=true
private static final ImmutableSet<String> ALL_PROPERTIES_NAME = new ImmutableSet.Builder<String>()
.add(CPU_SHARE).add(MEMORY_LIMIT).add(ENABLE_MEMORY_OVERCOMMIT).add(MAX_CONCURRENCY)
.add(MAX_QUEUE_SIZE).add(QUEUE_TIMEOUT).add(CPU_HARD_LIMIT).add(SCAN_THREAD_NUM).build();
.add(MAX_QUEUE_SIZE).add(QUEUE_TIMEOUT).add(CPU_HARD_LIMIT).add(SCAN_THREAD_NUM)
.add(MAX_REMOTE_SCAN_THREAD_NUM).add(MIN_REMOTE_SCAN_THREAD_NUM).build();
@SerializedName(value = "id")
private long id;
@ -225,6 +230,32 @@ public class WorkloadGroup implements Writable, GsonPostProcessable {
}
}
if (properties.containsKey(MAX_REMOTE_SCAN_THREAD_NUM)) {
String value = properties.get(MAX_REMOTE_SCAN_THREAD_NUM);
try {
int intValue = Integer.parseInt(value);
if (intValue <= 0 && intValue != -1) {
throw new NumberFormatException();
}
} catch (NumberFormatException e) {
throw new DdlException(
MAX_REMOTE_SCAN_THREAD_NUM + " must be a positive integer or -1. but input value is " + value);
}
}
if (properties.containsKey(MIN_REMOTE_SCAN_THREAD_NUM)) {
String value = properties.get(MIN_REMOTE_SCAN_THREAD_NUM);
try {
int intValue = Integer.parseInt(value);
if (intValue <= 0 && intValue != -1) {
throw new NumberFormatException();
}
} catch (NumberFormatException e) {
throw new DdlException(
MAX_REMOTE_SCAN_THREAD_NUM + " must be a positive integer or -1. but input value is " + value);
}
}
// check queue property
if (properties.containsKey(MAX_CONCURRENCY)) {
try {
@ -309,6 +340,10 @@ public class WorkloadGroup implements Writable, GsonPostProcessable {
row.add("true");
} else if (SCAN_THREAD_NUM.equals(key) && !properties.containsKey(key)) {
row.add("-1");
} else if (MAX_REMOTE_SCAN_THREAD_NUM.equals(key) && !properties.containsKey(key)) {
row.add("-1");
} else if (MIN_REMOTE_SCAN_THREAD_NUM.equals(key) && !properties.containsKey(key)) {
row.add("-1");
} else {
row.add(properties.get(key));
}
@ -369,6 +404,16 @@ public class WorkloadGroup implements Writable, GsonPostProcessable {
tWorkloadGroupInfo.setScanThreadNum(Integer.parseInt(scanThreadNumStr));
}
String maxRemoteScanThreadNumStr = properties.get(MAX_REMOTE_SCAN_THREAD_NUM);
if (maxRemoteScanThreadNumStr != null) {
tWorkloadGroupInfo.setMaxRemoteScanThreadNum(Integer.parseInt(maxRemoteScanThreadNumStr));
}
String minRemoteScanThreadNumStr = properties.get(MIN_REMOTE_SCAN_THREAD_NUM);
if (minRemoteScanThreadNumStr != null) {
tWorkloadGroupInfo.setMinRemoteScanThreadNum(Integer.parseInt(minRemoteScanThreadNumStr));
}
TopicInfo topicInfo = new TopicInfo();
topicInfo.setWorkloadGroupInfo(tWorkloadGroupInfo);
return topicInfo;

View File

@ -70,7 +70,8 @@ public class WorkloadGroupMgr implements Writable, GsonPostProcessable {
.add(WorkloadGroup.ENABLE_MEMORY_OVERCOMMIT)
.add(WorkloadGroup.MAX_CONCURRENCY).add(WorkloadGroup.MAX_QUEUE_SIZE)
.add(WorkloadGroup.QUEUE_TIMEOUT).add(WorkloadGroup.CPU_HARD_LIMIT)
.add(WorkloadGroup.SCAN_THREAD_NUM)
.add(WorkloadGroup.SCAN_THREAD_NUM).add(WorkloadGroup.MAX_REMOTE_SCAN_THREAD_NUM)
.add(WorkloadGroup.MIN_REMOTE_SCAN_THREAD_NUM)
.add(QueryQueue.RUNNING_QUERY_NUM).add(QueryQueue.WAITING_QUERY_NUM)
.build();

View File

@ -378,14 +378,18 @@ public class MetadataGenerator {
trow.addToColumnValue(new TCell().setStringVal(rGroupsInfo.get(1))); // name
trow.addToColumnValue(new TCell().setLongVal(Long.valueOf(rGroupsInfo.get(2)))); // cpu_share
trow.addToColumnValue(new TCell().setStringVal(rGroupsInfo.get(3))); // mem_limit
trow.addToColumnValue(new TCell().setStringVal(rGroupsInfo.get(4))); //mem overcommit
trow.addToColumnValue(new TCell().setStringVal(rGroupsInfo.get(4))); // mem overcommit
trow.addToColumnValue(new TCell().setLongVal(Long.valueOf(rGroupsInfo.get(5)))); // max concurrent
trow.addToColumnValue(new TCell().setLongVal(Long.valueOf(rGroupsInfo.get(6)))); // max queue size
trow.addToColumnValue(new TCell().setLongVal(Long.valueOf(rGroupsInfo.get(7)))); // queue timeout
trow.addToColumnValue(new TCell().setStringVal(rGroupsInfo.get(8))); // cpu hard limit
trow.addToColumnValue(new TCell().setIntVal(Integer.parseInt(rGroupsInfo.get(9)))); // scan thread num
trow.addToColumnValue(new TCell().setLongVal(Long.valueOf(rGroupsInfo.get(10)))); // running query num
trow.addToColumnValue(new TCell().setLongVal(Long.valueOf(rGroupsInfo.get(11)))); // waiting query num
// max remote scan thread num
trow.addToColumnValue(new TCell().setIntVal(Integer.parseInt(rGroupsInfo.get(10))));
// min remote scan thread num
trow.addToColumnValue(new TCell().setIntVal(Integer.parseInt(rGroupsInfo.get(11))));
trow.addToColumnValue(new TCell().setLongVal(Long.valueOf(rGroupsInfo.get(12)))); // running query num
trow.addToColumnValue(new TCell().setLongVal(Long.valueOf(rGroupsInfo.get(13)))); // waiting query num
dataBatch.add(trow);
}

View File

@ -50,6 +50,8 @@ public class WorkloadGroupsTableValuedFunction extends MetadataTableValuedFuncti
new Column(WorkloadGroup.QUEUE_TIMEOUT, ScalarType.createType(PrimitiveType.BIGINT)),
new Column(WorkloadGroup.CPU_HARD_LIMIT, ScalarType.createStringType()),
new Column(WorkloadGroup.SCAN_THREAD_NUM, ScalarType.createType(PrimitiveType.INT)),
new Column(WorkloadGroup.MAX_REMOTE_SCAN_THREAD_NUM, ScalarType.createType(PrimitiveType.INT)),
new Column(WorkloadGroup.MIN_REMOTE_SCAN_THREAD_NUM, ScalarType.createType(PrimitiveType.INT)),
new Column(QueryQueue.RUNNING_QUERY_NUM, ScalarType.createType(PrimitiveType.BIGINT)),
new Column(QueryQueue.WAITING_QUERY_NUM, ScalarType.createType(PrimitiveType.BIGINT)));