[Improvement](executor)Add spill property for workload group #32554
This commit is contained in:
@ -480,10 +480,12 @@ public class SchemaTable extends Table {
|
||||
.column("MAX_CONCURRENCY", ScalarType.createType(PrimitiveType.BIGINT))
|
||||
.column("MAX_QUEUE_SIZE", ScalarType.createType(PrimitiveType.BIGINT))
|
||||
.column("QUEUE_TIMEOUT", ScalarType.createType(PrimitiveType.BIGINT))
|
||||
.column("CPU_HARD_LIMIT", ScalarType.createStringType())
|
||||
.column("CPU_HARD_LIMIT", ScalarType.createVarchar(256))
|
||||
.column("SCAN_THREAD_NUM", ScalarType.createType(PrimitiveType.BIGINT))
|
||||
.column("MAX_REMOTE_SCAN_THREAD_NUM", ScalarType.createType(PrimitiveType.BIGINT))
|
||||
.column("MIN_REMOTE_SCAN_THREAD_NUM", ScalarType.createType(PrimitiveType.BIGINT))
|
||||
.column("SPILL_THRESHOLD_LOW_WATERMARK", ScalarType.createVarchar(256))
|
||||
.column("SPILL_THRESHOLD_HIGH_WATERMARK", ScalarType.createVarchar(256))
|
||||
.build()))
|
||||
.build();
|
||||
|
||||
|
||||
@ -67,13 +67,21 @@ public class WorkloadGroup implements Writable, GsonPostProcessable {
|
||||
|
||||
public static final String MIN_REMOTE_SCAN_THREAD_NUM = "min_remote_scan_thread_num";
|
||||
|
||||
public static final String SPILL_THRESHOLD_LOW_WATERMARK = "spill_threshold_low_watermark";
|
||||
|
||||
public static final String SPILL_THRESHOLD_HIGH_WATERMARK = "spill_threshold_high_watermark";
|
||||
|
||||
// NOTE(wb): all property is not required, some properties default value is set in be
|
||||
// default value is as followed
|
||||
// cpu_share=1024, memory_limit=0%(0 means not limit), enable_memory_overcommit=true
|
||||
private static final ImmutableSet<String> ALL_PROPERTIES_NAME = new ImmutableSet.Builder<String>()
|
||||
.add(CPU_SHARE).add(MEMORY_LIMIT).add(ENABLE_MEMORY_OVERCOMMIT).add(MAX_CONCURRENCY)
|
||||
.add(MAX_QUEUE_SIZE).add(QUEUE_TIMEOUT).add(CPU_HARD_LIMIT).add(SCAN_THREAD_NUM)
|
||||
.add(MAX_REMOTE_SCAN_THREAD_NUM).add(MIN_REMOTE_SCAN_THREAD_NUM).build();
|
||||
.add(MAX_REMOTE_SCAN_THREAD_NUM).add(MIN_REMOTE_SCAN_THREAD_NUM)
|
||||
.add(SPILL_THRESHOLD_LOW_WATERMARK).add(SPILL_THRESHOLD_HIGH_WATERMARK).build();
|
||||
|
||||
public static final int SPILL_LOW_WATERMARK_DEFAULT_VALUE = 50;
|
||||
public static final int SPILL_HIGH_WATERMARK_DEFAULT_VALUE = 80;
|
||||
|
||||
@SerializedName(value = "id")
|
||||
private long id;
|
||||
@ -120,6 +128,20 @@ public class WorkloadGroup implements Writable, GsonPostProcessable {
|
||||
this.cpuHardLimit = Integer.parseInt(cpuHardLimitStr);
|
||||
this.properties.put(CPU_HARD_LIMIT, cpuHardLimitStr);
|
||||
}
|
||||
if (properties.containsKey(SPILL_THRESHOLD_LOW_WATERMARK)) {
|
||||
String lowWatermarkStr = properties.get(SPILL_THRESHOLD_LOW_WATERMARK);
|
||||
if (lowWatermarkStr.endsWith("%")) {
|
||||
lowWatermarkStr = lowWatermarkStr.substring(0, lowWatermarkStr.length() - 1);
|
||||
}
|
||||
this.properties.put(SPILL_THRESHOLD_LOW_WATERMARK, lowWatermarkStr);
|
||||
}
|
||||
if (properties.containsKey(SPILL_THRESHOLD_HIGH_WATERMARK)) {
|
||||
String highWatermarkStr = properties.get(SPILL_THRESHOLD_HIGH_WATERMARK);
|
||||
if (highWatermarkStr.endsWith("%")) {
|
||||
highWatermarkStr = highWatermarkStr.substring(0, highWatermarkStr.length() - 1);
|
||||
}
|
||||
this.properties.put(SPILL_THRESHOLD_HIGH_WATERMARK, highWatermarkStr);
|
||||
}
|
||||
resetQueueProperty(properties);
|
||||
}
|
||||
|
||||
@ -252,7 +274,7 @@ public class WorkloadGroup implements Writable, GsonPostProcessable {
|
||||
}
|
||||
} catch (NumberFormatException e) {
|
||||
throw new DdlException(
|
||||
MAX_REMOTE_SCAN_THREAD_NUM + " must be a positive integer or -1. but input value is " + value);
|
||||
MIN_REMOTE_SCAN_THREAD_NUM + " must be a positive integer or -1. but input value is " + value);
|
||||
}
|
||||
}
|
||||
|
||||
@ -284,6 +306,51 @@ public class WorkloadGroup implements Writable, GsonPostProcessable {
|
||||
throw new DdlException(QUEUE_TIMEOUT + " requires a positive integer");
|
||||
}
|
||||
}
|
||||
|
||||
int lowWaterMark = SPILL_LOW_WATERMARK_DEFAULT_VALUE;
|
||||
if (properties.containsKey(SPILL_THRESHOLD_LOW_WATERMARK)) {
|
||||
String lowVal = properties.get(SPILL_THRESHOLD_LOW_WATERMARK);
|
||||
if (lowVal.endsWith("%")) {
|
||||
lowVal = lowVal.substring(0, lowVal.length() - 1);
|
||||
}
|
||||
try {
|
||||
int intValue = Integer.parseInt(lowVal);
|
||||
if ((intValue < 1 || intValue > 100) && intValue != -1) {
|
||||
throw new NumberFormatException();
|
||||
}
|
||||
lowWaterMark = intValue;
|
||||
} catch (NumberFormatException e) {
|
||||
throw new DdlException(
|
||||
SPILL_THRESHOLD_LOW_WATERMARK
|
||||
+ " must be a positive integer(1 ~ 100) or -1. but input value is "
|
||||
+ lowVal);
|
||||
}
|
||||
}
|
||||
|
||||
int highWaterMark = SPILL_HIGH_WATERMARK_DEFAULT_VALUE;
|
||||
if (properties.containsKey(SPILL_THRESHOLD_HIGH_WATERMARK)) {
|
||||
String highVal = properties.get(SPILL_THRESHOLD_HIGH_WATERMARK);
|
||||
if (highVal.endsWith("%")) {
|
||||
highVal = highVal.substring(0, highVal.length() - 1);
|
||||
}
|
||||
try {
|
||||
int intValue = Integer.parseInt(highVal);
|
||||
if ((intValue < 1 || intValue > 100)) {
|
||||
throw new NumberFormatException();
|
||||
}
|
||||
highWaterMark = intValue;
|
||||
} catch (NumberFormatException e) {
|
||||
throw new DdlException(
|
||||
SPILL_THRESHOLD_HIGH_WATERMARK + " must be a positive integer(1 ~ 100). but input value is "
|
||||
+ highVal);
|
||||
}
|
||||
}
|
||||
|
||||
if (lowWaterMark > highWaterMark) {
|
||||
throw new DdlException(SPILL_THRESHOLD_HIGH_WATERMARK + "(" + highWaterMark + ") should bigger than "
|
||||
+ SPILL_THRESHOLD_LOW_WATERMARK + "(" + lowWaterMark + ")");
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
public long getId() {
|
||||
@ -323,9 +390,9 @@ public class WorkloadGroup implements Writable, GsonPostProcessable {
|
||||
row.add(String.valueOf(id));
|
||||
row.add(name);
|
||||
// skip id,name,running query,waiting query
|
||||
for (int i = 2; i < WorkloadGroupMgr.WORKLOAD_GROUP_PROC_NODE_TITLE_NAMES.size() - 2; i++) {
|
||||
for (int i = 2; i < WorkloadGroupMgr.WORKLOAD_GROUP_PROC_NODE_TITLE_NAMES.size(); i++) {
|
||||
String key = WorkloadGroupMgr.WORKLOAD_GROUP_PROC_NODE_TITLE_NAMES.get(i);
|
||||
if (CPU_HARD_LIMIT.equalsIgnoreCase(key)) {
|
||||
if (CPU_HARD_LIMIT.equals(key)) {
|
||||
String val = properties.get(key);
|
||||
if (StringUtils.isEmpty(val)) { // cpu_hard_limit is not required
|
||||
row.add("0%");
|
||||
@ -342,14 +409,32 @@ public class WorkloadGroup implements Writable, GsonPostProcessable {
|
||||
row.add("-1");
|
||||
} else if (MAX_REMOTE_SCAN_THREAD_NUM.equals(key) && !properties.containsKey(key)) {
|
||||
row.add("-1");
|
||||
} else if (MIN_REMOTE_SCAN_THREAD_NUM.equals(key) && !properties.containsKey(key)) {
|
||||
} else if (MIN_REMOTE_SCAN_THREAD_NUM.equals(key) && !properties.containsKey(key)) {
|
||||
row.add("-1");
|
||||
} else if (SPILL_THRESHOLD_LOW_WATERMARK.equals(key)) {
|
||||
String val = properties.get(key);
|
||||
if (StringUtils.isEmpty(val)) {
|
||||
row.add(SPILL_LOW_WATERMARK_DEFAULT_VALUE + "%");
|
||||
} else if ("-1".equals(val)) {
|
||||
row.add("-1");
|
||||
} else {
|
||||
row.add(val + "%");
|
||||
}
|
||||
} else if (SPILL_THRESHOLD_HIGH_WATERMARK.equals(key)) {
|
||||
String val = properties.get(key);
|
||||
if (StringUtils.isEmpty(val)) {
|
||||
row.add(SPILL_HIGH_WATERMARK_DEFAULT_VALUE + "%");
|
||||
} else {
|
||||
row.add(val + "%");
|
||||
}
|
||||
} else if (QueryQueue.RUNNING_QUERY_NUM.equals(key)) {
|
||||
row.add(qq == null ? "0" : String.valueOf(qq.getCurrentRunningQueryNum()));
|
||||
} else if (QueryQueue.WAITING_QUERY_NUM.equals(key)) {
|
||||
row.add(qq == null ? "0" : String.valueOf(qq.getCurrentWaitingQueryNum()));
|
||||
} else {
|
||||
row.add(properties.get(key));
|
||||
}
|
||||
}
|
||||
row.add(qq == null ? "0" : String.valueOf(qq.getCurrentRunningQueryNum()));
|
||||
row.add(qq == null ? "0" : String.valueOf(qq.getCurrentWaitingQueryNum()));
|
||||
result.addRow(row);
|
||||
}
|
||||
|
||||
@ -414,6 +499,16 @@ public class WorkloadGroup implements Writable, GsonPostProcessable {
|
||||
tWorkloadGroupInfo.setMinRemoteScanThreadNum(Integer.parseInt(minRemoteScanThreadNumStr));
|
||||
}
|
||||
|
||||
String spillLowWatermarkStr = properties.get(SPILL_THRESHOLD_LOW_WATERMARK);
|
||||
if (spillLowWatermarkStr != null) {
|
||||
tWorkloadGroupInfo.setSpillThresholdLowWatermark(Integer.parseInt(spillLowWatermarkStr));
|
||||
}
|
||||
|
||||
String spillHighWatermarkStr = properties.get(SPILL_THRESHOLD_HIGH_WATERMARK);
|
||||
if (spillHighWatermarkStr != null) {
|
||||
tWorkloadGroupInfo.setSpillThresholdHighWatermark(Integer.parseInt(spillHighWatermarkStr));
|
||||
}
|
||||
|
||||
TopicInfo topicInfo = new TopicInfo();
|
||||
topicInfo.setWorkloadGroupInfo(tWorkloadGroupInfo);
|
||||
return topicInfo;
|
||||
|
||||
@ -72,6 +72,7 @@ public class WorkloadGroupMgr implements Writable, GsonPostProcessable {
|
||||
.add(WorkloadGroup.QUEUE_TIMEOUT).add(WorkloadGroup.CPU_HARD_LIMIT)
|
||||
.add(WorkloadGroup.SCAN_THREAD_NUM).add(WorkloadGroup.MAX_REMOTE_SCAN_THREAD_NUM)
|
||||
.add(WorkloadGroup.MIN_REMOTE_SCAN_THREAD_NUM)
|
||||
.add(WorkloadGroup.SPILL_THRESHOLD_LOW_WATERMARK).add(WorkloadGroup.SPILL_THRESHOLD_HIGH_WATERMARK)
|
||||
.add(QueryQueue.RUNNING_QUERY_NUM).add(QueryQueue.WAITING_QUERY_NUM)
|
||||
.build();
|
||||
|
||||
|
||||
@ -21,8 +21,7 @@ import org.apache.doris.analysis.UserIdentity;
|
||||
import org.apache.doris.catalog.Column;
|
||||
import org.apache.doris.catalog.Env;
|
||||
import org.apache.doris.catalog.MTMV;
|
||||
import org.apache.doris.catalog.PrimitiveType;
|
||||
import org.apache.doris.catalog.ScalarType;
|
||||
import org.apache.doris.catalog.SchemaTable;
|
||||
import org.apache.doris.catalog.Table;
|
||||
import org.apache.doris.common.AnalysisException;
|
||||
import org.apache.doris.common.ClientPool;
|
||||
@ -43,6 +42,7 @@ import org.apache.doris.qe.ConnectContext;
|
||||
import org.apache.doris.qe.QeProcessorImpl;
|
||||
import org.apache.doris.qe.QeProcessorImpl.QueryInfo;
|
||||
import org.apache.doris.resource.workloadgroup.QueueToken.TokenState;
|
||||
import org.apache.doris.resource.workloadgroup.WorkloadGroupMgr;
|
||||
import org.apache.doris.system.Backend;
|
||||
import org.apache.doris.system.SystemInfoService;
|
||||
import org.apache.doris.thrift.FrontendService;
|
||||
@ -67,7 +67,6 @@ import org.apache.doris.thrift.TUserIdentity;
|
||||
|
||||
import com.google.common.base.Stopwatch;
|
||||
import com.google.common.base.Strings;
|
||||
import com.google.common.collect.ImmutableList;
|
||||
import com.google.common.collect.ImmutableMap;
|
||||
import com.google.common.collect.Lists;
|
||||
import com.google.gson.Gson;
|
||||
@ -89,49 +88,23 @@ import java.util.concurrent.TimeUnit;
|
||||
public class MetadataGenerator {
|
||||
private static final Logger LOG = LogManager.getLogger(MetadataGenerator.class);
|
||||
|
||||
private static final ImmutableList<Column> ACTIVE_QUERIES_SCHEMA = ImmutableList.of(
|
||||
new Column("QUERY_ID", ScalarType.createStringType()),
|
||||
new Column("QUERY_START_TIME", ScalarType.createStringType()),
|
||||
new Column("QUERY_TIME_MS", PrimitiveType.BIGINT),
|
||||
new Column("WORKLOAD_GROUP_ID", PrimitiveType.BIGINT),
|
||||
new Column("DATABASE", ScalarType.createStringType()),
|
||||
new Column("FRONTEND_INSTANCE", ScalarType.createStringType()),
|
||||
new Column("QUEUE_START_TIME", ScalarType.createStringType()),
|
||||
new Column("QUEUE_END_TIME", ScalarType.createStringType()),
|
||||
new Column("QUERY_STATUS", ScalarType.createStringType()),
|
||||
new Column("SQL", ScalarType.createStringType()));
|
||||
|
||||
private static final ImmutableMap<String, Integer> ACTIVE_QUERIES_COLUMN_TO_INDEX;
|
||||
|
||||
|
||||
private static final ImmutableList<Column> WORKLOAD_GROUPS_SCHEMA = ImmutableList.of(
|
||||
new Column("ID", ScalarType.BIGINT),
|
||||
new Column("NAME", ScalarType.createStringType()),
|
||||
new Column("CPU_SHARE", PrimitiveType.BIGINT),
|
||||
new Column("MEMORY_LIMIT", ScalarType.createStringType()),
|
||||
new Column("ENABLE_MEMORY_OVERCOMMIT", ScalarType.createStringType()),
|
||||
new Column("MAX_CONCURRENCY", PrimitiveType.BIGINT),
|
||||
new Column("MAX_QUEUE_SIZE", PrimitiveType.BIGINT),
|
||||
new Column("QUEUE_TIMEOUT", PrimitiveType.BIGINT),
|
||||
new Column("CPU_HARD_LIMIT", PrimitiveType.BIGINT),
|
||||
new Column("SCAN_THREAD_NUM", PrimitiveType.BIGINT),
|
||||
new Column("MAX_REMOTE_SCAN_THREAD_NUM", PrimitiveType.BIGINT),
|
||||
new Column("MIN_REMOTE_SCAN_THREAD_NUM", PrimitiveType.BIGINT));
|
||||
|
||||
private static final ImmutableMap<String, Integer> WORKLOAD_GROUPS_COLUMN_TO_INDEX;
|
||||
|
||||
static {
|
||||
ImmutableMap.Builder<String, Integer> activeQueriesbuilder = new ImmutableMap.Builder();
|
||||
for (int i = 0; i < ACTIVE_QUERIES_SCHEMA.size(); i++) {
|
||||
activeQueriesbuilder.put(ACTIVE_QUERIES_SCHEMA.get(i).getName().toLowerCase(), i);
|
||||
List<Column> activeQueriesColList = SchemaTable.TABLE_MAP.get("active_queries").getFullSchema();
|
||||
for (int i = 0; i < activeQueriesColList.size(); i++) {
|
||||
activeQueriesbuilder.put(activeQueriesColList.get(i).getName().toLowerCase(), i);
|
||||
}
|
||||
ACTIVE_QUERIES_COLUMN_TO_INDEX = activeQueriesbuilder.build();
|
||||
|
||||
ImmutableMap.Builder<String, Integer> workloadGroupsBuilder = new ImmutableMap.Builder();
|
||||
for (int i = 0; i < WORKLOAD_GROUPS_SCHEMA.size(); i++) {
|
||||
workloadGroupsBuilder.put(WORKLOAD_GROUPS_SCHEMA.get(i).getName().toLowerCase(), i);
|
||||
ImmutableMap.Builder<String, Integer> workloadGroupBuilder = new ImmutableMap.Builder();
|
||||
for (int i = 0; i < WorkloadGroupMgr.WORKLOAD_GROUP_PROC_NODE_TITLE_NAMES.size(); i++) {
|
||||
workloadGroupBuilder.put(WorkloadGroupMgr.WORKLOAD_GROUP_PROC_NODE_TITLE_NAMES.get(i).toLowerCase(), i);
|
||||
}
|
||||
WORKLOAD_GROUPS_COLUMN_TO_INDEX = workloadGroupsBuilder.build();
|
||||
WORKLOAD_GROUPS_COLUMN_TO_INDEX = workloadGroupBuilder.build();
|
||||
}
|
||||
|
||||
public static TFetchSchemaTableDataResult getMetadataTable(TFetchSchemaTableDataRequest request) throws TException {
|
||||
@ -458,6 +431,10 @@ public class MetadataGenerator {
|
||||
trow.addToColumnValue(new TCell().setLongVal(Long.valueOf(rGroupsInfo.get(10))));
|
||||
// min remote scan thread num
|
||||
trow.addToColumnValue(new TCell().setLongVal(Long.valueOf(rGroupsInfo.get(11))));
|
||||
trow.addToColumnValue(new TCell().setStringVal(rGroupsInfo.get(12)));
|
||||
trow.addToColumnValue(new TCell().setStringVal(rGroupsInfo.get(13)));
|
||||
trow.addToColumnValue(new TCell().setStringVal(rGroupsInfo.get(14)));
|
||||
trow.addToColumnValue(new TCell().setStringVal(rGroupsInfo.get(15)));
|
||||
dataBatch.add(trow);
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user