[feature](executor)refactor show workload group schema #26100

2 add query queue info
This commit is contained in:
wangbo
2023-11-03 17:34:48 +08:00
committed by GitHub
parent f16256362c
commit 7a0ee04deb
6 changed files with 60 additions and 16 deletions

View File

@ -38,8 +38,19 @@ public class QueryQueue {
private int maxQueueSize;
private int queueTimeout; // ms
// running property
private int currentRunningQueryNum;
private int currentWaitingQueryNum;
private volatile int currentRunningQueryNum;
private volatile int currentWaitingQueryNum;
public static final String RUNNING_QUERY_NUM = "running_query_num";
public static final String WAITING_QUERY_NUM = "waiting_query_num";
int getCurrentRunningQueryNum() {
return currentRunningQueryNum;
}
int getCurrentWaitingQueryNum() {
return currentWaitingQueryNum;
}
public QueryQueue(int maxConcurrency, int maxQueueSize, int queueTimeout) {
this.maxConcurrency = maxConcurrency;

View File

@ -30,7 +30,6 @@ import org.apache.doris.thrift.TopicInfo;
import com.google.common.base.Strings;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Lists;
import com.google.gson.annotations.SerializedName;
import org.apache.commons.lang3.StringUtils;
import org.apache.logging.log4j.LogManager;
@ -39,7 +38,9 @@ import org.apache.logging.log4j.Logger;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
public class WorkloadGroup implements Writable, GsonPostProcessable {
@ -278,13 +279,26 @@ public class WorkloadGroup implements Writable, GsonPostProcessable {
}
public void getProcNodeData(BaseProcResult result) {
for (Map.Entry<String, String> entry : properties.entrySet()) {
if (CPU_HARD_LIMIT.equals(entry.getKey())) {
result.addRow(Lists.newArrayList(String.valueOf(id), name, entry.getKey(), entry.getValue() + "%"));
List<String> row = new ArrayList<>();
row.add(String.valueOf(id));
row.add(name);
// skip id,name,running query,waiting query
for (int i = 2; i < WorkloadGroupMgr.WORKLOAD_GROUP_PROC_NODE_TITLE_NAMES.size() - 2; i++) {
String key = WorkloadGroupMgr.WORKLOAD_GROUP_PROC_NODE_TITLE_NAMES.get(i);
if (CPU_HARD_LIMIT.equalsIgnoreCase(key)) {
String val = properties.get(key);
if (StringUtils.isEmpty(val)) { // cpu_hard_limit is not required
row.add("0%");
} else {
row.add(val + "%");
}
} else {
result.addRow(Lists.newArrayList(String.valueOf(id), name, entry.getKey(), entry.getValue()));
row.add(properties.get(key));
}
}
row.add(String.valueOf(queryQueue.getCurrentRunningQueryNum()));
row.add(String.valueOf(queryQueue.getCurrentWaitingQueryNum()));
result.addRow(row);
}
public int getCpuHardLimit() {

View File

@ -64,7 +64,11 @@ public class WorkloadGroupMgr implements Writable, GsonPostProcessable {
public static final String DEFAULT_GROUP_NAME = "normal";
public static final ImmutableList<String> WORKLOAD_GROUP_PROC_NODE_TITLE_NAMES = new ImmutableList.Builder<String>()
.add("Id").add("Name").add("Item").add("Value")
.add("Id").add("Name").add(WorkloadGroup.CPU_SHARE).add(WorkloadGroup.MEMORY_LIMIT)
.add(WorkloadGroup.ENABLE_MEMORY_OVERCOMMIT)
.add(WorkloadGroup.MAX_CONCURRENCY).add(WorkloadGroup.MAX_QUEUE_SIZE)
.add(WorkloadGroup.QUEUE_TIMEOUT).add(WorkloadGroup.CPU_HARD_LIMIT)
.add(QueryQueue.RUNNING_QUERY_NUM).add(QueryQueue.WAITING_QUERY_NUM)
.build();
private static final Logger LOG = LogManager.getLogger(WorkloadGroupMgr.class);

View File

@ -352,11 +352,17 @@ public class MetadataGenerator {
List<TRow> dataBatch = Lists.newArrayList();
for (List<String> rGroupsInfo : workloadGroupsInfo) {
TRow trow = new TRow();
Long id = Long.valueOf(rGroupsInfo.get(0));
trow.addToColumnValue(new TCell().setLongVal(id));
trow.addToColumnValue(new TCell().setStringVal(rGroupsInfo.get(1)));
trow.addToColumnValue(new TCell().setStringVal(rGroupsInfo.get(2)));
trow.addToColumnValue(new TCell().setStringVal(rGroupsInfo.get(3)));
trow.addToColumnValue(new TCell().setLongVal(Long.valueOf(rGroupsInfo.get(0)))); // id
trow.addToColumnValue(new TCell().setStringVal(rGroupsInfo.get(1))); // name
trow.addToColumnValue(new TCell().setLongVal(Long.valueOf(rGroupsInfo.get(2)))); // cpu_share
trow.addToColumnValue(new TCell().setStringVal(rGroupsInfo.get(3))); // mem_limit
trow.addToColumnValue(new TCell().setStringVal(rGroupsInfo.get(4))); //mem overcommit
trow.addToColumnValue(new TCell().setLongVal(Long.valueOf(rGroupsInfo.get(5)))); // max concurrent
trow.addToColumnValue(new TCell().setLongVal(Long.valueOf(rGroupsInfo.get(6)))); // max queue size
trow.addToColumnValue(new TCell().setLongVal(Long.valueOf(rGroupsInfo.get(7)))); // queue timeout
trow.addToColumnValue(new TCell().setStringVal(rGroupsInfo.get(8))); // cpu hard limit
trow.addToColumnValue(new TCell().setLongVal(Long.valueOf(rGroupsInfo.get(9)))); // running query num
trow.addToColumnValue(new TCell().setLongVal(Long.valueOf(rGroupsInfo.get(10)))); // waiting query num
dataBatch.add(trow);
}

View File

@ -21,6 +21,8 @@ import org.apache.doris.catalog.Column;
import org.apache.doris.catalog.PrimitiveType;
import org.apache.doris.catalog.ScalarType;
import org.apache.doris.nereids.exceptions.AnalysisException;
import org.apache.doris.resource.workloadgroup.QueryQueue;
import org.apache.doris.resource.workloadgroup.WorkloadGroup;
import org.apache.doris.thrift.TMetaScanRange;
import org.apache.doris.thrift.TMetadataType;
@ -40,8 +42,15 @@ public class WorkloadGroupsTableValuedFunction extends MetadataTableValuedFuncti
private static final ImmutableList<Column> SCHEMA = ImmutableList.of(
new Column("Id", ScalarType.createType(PrimitiveType.BIGINT)),
new Column("Name", ScalarType.createStringType()),
new Column("Item", ScalarType.createStringType()),
new Column("Value", ScalarType.createStringType()));
new Column(WorkloadGroup.CPU_SHARE, ScalarType.createType(PrimitiveType.BIGINT)),
new Column(WorkloadGroup.MEMORY_LIMIT, ScalarType.createStringType()),
new Column(WorkloadGroup.ENABLE_MEMORY_OVERCOMMIT, ScalarType.createStringType()),
new Column(WorkloadGroup.MAX_CONCURRENCY, ScalarType.createType(PrimitiveType.BIGINT)),
new Column(WorkloadGroup.MAX_QUEUE_SIZE, ScalarType.createType(PrimitiveType.BIGINT)),
new Column(WorkloadGroup.QUEUE_TIMEOUT, ScalarType.createType(PrimitiveType.BIGINT)),
new Column(WorkloadGroup.CPU_HARD_LIMIT, ScalarType.createStringType()),
new Column(QueryQueue.RUNNING_QUERY_NUM, ScalarType.createType(PrimitiveType.BIGINT)),
new Column(QueryQueue.WAITING_QUERY_NUM, ScalarType.createType(PrimitiveType.BIGINT)));
private static final ImmutableMap<String, Integer> COLUMN_TO_INDEX;