From 7a0ee04debf872c1b1d7b36e4cd4116c8cdb50d2 Mon Sep 17 00:00:00 2001 From: wangbo Date: Fri, 3 Nov 2023 17:34:48 +0800 Subject: [PATCH] [feature](executor)refactor show workload group schema #26100 2 add query queue info --- .../resource/workloadgroup/QueryQueue.java | 15 ++++++++++-- .../resource/workloadgroup/WorkloadGroup.java | 24 +++++++++++++++---- .../workloadgroup/WorkloadGroupMgr.java | 6 ++++- .../tablefunction/MetadataGenerator.java | 16 +++++++++---- .../WorkloadGroupsTableValuedFunction.java | 13 ++++++++-- .../workloadgroup/WorkloadGroupTest.java | 2 +- 6 files changed, 60 insertions(+), 16 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadgroup/QueryQueue.java b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadgroup/QueryQueue.java index dc7e672973..fd9ce458e2 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadgroup/QueryQueue.java +++ b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadgroup/QueryQueue.java @@ -38,8 +38,19 @@ public class QueryQueue { private int maxQueueSize; private int queueTimeout; // ms // running property - private int currentRunningQueryNum; - private int currentWaitingQueryNum; + private volatile int currentRunningQueryNum; + private volatile int currentWaitingQueryNum; + + public static final String RUNNING_QUERY_NUM = "running_query_num"; + public static final String WAITING_QUERY_NUM = "waiting_query_num"; + + int getCurrentRunningQueryNum() { + return currentRunningQueryNum; + } + + int getCurrentWaitingQueryNum() { + return currentWaitingQueryNum; + } public QueryQueue(int maxConcurrency, int maxQueueSize, int queueTimeout) { this.maxConcurrency = maxConcurrency; diff --git a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadgroup/WorkloadGroup.java b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadgroup/WorkloadGroup.java index 80b0cab6b3..84a526aa7f 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadgroup/WorkloadGroup.java +++ b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadgroup/WorkloadGroup.java @@ -30,7 +30,6 @@ import org.apache.doris.thrift.TopicInfo; import com.google.common.base.Strings; import com.google.common.collect.ImmutableSet; -import com.google.common.collect.Lists; import com.google.gson.annotations.SerializedName; import org.apache.commons.lang3.StringUtils; import org.apache.logging.log4j.LogManager; @@ -39,7 +38,9 @@ import org.apache.logging.log4j.Logger; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; +import java.util.ArrayList; import java.util.HashMap; +import java.util.List; import java.util.Map; public class WorkloadGroup implements Writable, GsonPostProcessable { @@ -278,13 +279,26 @@ public class WorkloadGroup implements Writable, GsonPostProcessable { } public void getProcNodeData(BaseProcResult result) { - for (Map.Entry entry : properties.entrySet()) { - if (CPU_HARD_LIMIT.equals(entry.getKey())) { - result.addRow(Lists.newArrayList(String.valueOf(id), name, entry.getKey(), entry.getValue() + "%")); + List row = new ArrayList<>(); + row.add(String.valueOf(id)); + row.add(name); + // skip id,name,running query,waiting query + for (int i = 2; i < WorkloadGroupMgr.WORKLOAD_GROUP_PROC_NODE_TITLE_NAMES.size() - 2; i++) { + String key = WorkloadGroupMgr.WORKLOAD_GROUP_PROC_NODE_TITLE_NAMES.get(i); + if (CPU_HARD_LIMIT.equalsIgnoreCase(key)) { + String val = properties.get(key); + if (StringUtils.isEmpty(val)) { // cpu_hard_limit is not required + row.add("0%"); + } else { + row.add(val + "%"); + } } else { - result.addRow(Lists.newArrayList(String.valueOf(id), name, entry.getKey(), entry.getValue())); + row.add(properties.get(key)); } } + row.add(String.valueOf(queryQueue.getCurrentRunningQueryNum())); + row.add(String.valueOf(queryQueue.getCurrentWaitingQueryNum())); + result.addRow(row); } public int getCpuHardLimit() { diff --git a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadgroup/WorkloadGroupMgr.java b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadgroup/WorkloadGroupMgr.java index 5f0a52cab5..44c43c19fa 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadgroup/WorkloadGroupMgr.java +++ b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadgroup/WorkloadGroupMgr.java @@ -64,7 +64,11 @@ public class WorkloadGroupMgr implements Writable, GsonPostProcessable { public static final String DEFAULT_GROUP_NAME = "normal"; public static final ImmutableList WORKLOAD_GROUP_PROC_NODE_TITLE_NAMES = new ImmutableList.Builder() - .add("Id").add("Name").add("Item").add("Value") + .add("Id").add("Name").add(WorkloadGroup.CPU_SHARE).add(WorkloadGroup.MEMORY_LIMIT) + .add(WorkloadGroup.ENABLE_MEMORY_OVERCOMMIT) + .add(WorkloadGroup.MAX_CONCURRENCY).add(WorkloadGroup.MAX_QUEUE_SIZE) + .add(WorkloadGroup.QUEUE_TIMEOUT).add(WorkloadGroup.CPU_HARD_LIMIT) + .add(QueryQueue.RUNNING_QUERY_NUM).add(QueryQueue.WAITING_QUERY_NUM) .build(); private static final Logger LOG = LogManager.getLogger(WorkloadGroupMgr.class); diff --git a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/MetadataGenerator.java b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/MetadataGenerator.java index 2abb84cdfb..46443ca64a 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/MetadataGenerator.java +++ b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/MetadataGenerator.java @@ -352,11 +352,17 @@ public class MetadataGenerator { List dataBatch = Lists.newArrayList(); for (List rGroupsInfo : workloadGroupsInfo) { TRow trow = new TRow(); - Long id = Long.valueOf(rGroupsInfo.get(0)); - trow.addToColumnValue(new TCell().setLongVal(id)); - trow.addToColumnValue(new TCell().setStringVal(rGroupsInfo.get(1))); - trow.addToColumnValue(new TCell().setStringVal(rGroupsInfo.get(2))); - trow.addToColumnValue(new TCell().setStringVal(rGroupsInfo.get(3))); + trow.addToColumnValue(new TCell().setLongVal(Long.valueOf(rGroupsInfo.get(0)))); // id + trow.addToColumnValue(new TCell().setStringVal(rGroupsInfo.get(1))); // name + trow.addToColumnValue(new TCell().setLongVal(Long.valueOf(rGroupsInfo.get(2)))); // cpu_share + trow.addToColumnValue(new TCell().setStringVal(rGroupsInfo.get(3))); // mem_limit + trow.addToColumnValue(new TCell().setStringVal(rGroupsInfo.get(4))); //mem overcommit + trow.addToColumnValue(new TCell().setLongVal(Long.valueOf(rGroupsInfo.get(5)))); // max concurrent + trow.addToColumnValue(new TCell().setLongVal(Long.valueOf(rGroupsInfo.get(6)))); // max queue size + trow.addToColumnValue(new TCell().setLongVal(Long.valueOf(rGroupsInfo.get(7)))); // queue timeout + trow.addToColumnValue(new TCell().setStringVal(rGroupsInfo.get(8))); // cpu hard limit + trow.addToColumnValue(new TCell().setLongVal(Long.valueOf(rGroupsInfo.get(9)))); // running query num + trow.addToColumnValue(new TCell().setLongVal(Long.valueOf(rGroupsInfo.get(10)))); // waiting query num dataBatch.add(trow); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/WorkloadGroupsTableValuedFunction.java b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/WorkloadGroupsTableValuedFunction.java index 90f7ea1109..5b34db5a68 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/WorkloadGroupsTableValuedFunction.java +++ b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/WorkloadGroupsTableValuedFunction.java @@ -21,6 +21,8 @@ import org.apache.doris.catalog.Column; import org.apache.doris.catalog.PrimitiveType; import org.apache.doris.catalog.ScalarType; import org.apache.doris.nereids.exceptions.AnalysisException; +import org.apache.doris.resource.workloadgroup.QueryQueue; +import org.apache.doris.resource.workloadgroup.WorkloadGroup; import org.apache.doris.thrift.TMetaScanRange; import org.apache.doris.thrift.TMetadataType; @@ -40,8 +42,15 @@ public class WorkloadGroupsTableValuedFunction extends MetadataTableValuedFuncti private static final ImmutableList SCHEMA = ImmutableList.of( new Column("Id", ScalarType.createType(PrimitiveType.BIGINT)), new Column("Name", ScalarType.createStringType()), - new Column("Item", ScalarType.createStringType()), - new Column("Value", ScalarType.createStringType())); + new Column(WorkloadGroup.CPU_SHARE, ScalarType.createType(PrimitiveType.BIGINT)), + new Column(WorkloadGroup.MEMORY_LIMIT, ScalarType.createStringType()), + new Column(WorkloadGroup.ENABLE_MEMORY_OVERCOMMIT, ScalarType.createStringType()), + new Column(WorkloadGroup.MAX_CONCURRENCY, ScalarType.createType(PrimitiveType.BIGINT)), + new Column(WorkloadGroup.MAX_QUEUE_SIZE, ScalarType.createType(PrimitiveType.BIGINT)), + new Column(WorkloadGroup.QUEUE_TIMEOUT, ScalarType.createType(PrimitiveType.BIGINT)), + new Column(WorkloadGroup.CPU_HARD_LIMIT, ScalarType.createStringType()), + new Column(QueryQueue.RUNNING_QUERY_NUM, ScalarType.createType(PrimitiveType.BIGINT)), + new Column(QueryQueue.WAITING_QUERY_NUM, ScalarType.createType(PrimitiveType.BIGINT))); private static final ImmutableMap COLUMN_TO_INDEX; diff --git a/fe/fe-core/src/test/java/org/apache/doris/resource/workloadgroup/WorkloadGroupTest.java b/fe/fe-core/src/test/java/org/apache/doris/resource/workloadgroup/WorkloadGroupTest.java index 7009e55e38..8831380cf9 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/resource/workloadgroup/WorkloadGroupTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/resource/workloadgroup/WorkloadGroupTest.java @@ -92,6 +92,6 @@ public class WorkloadGroupTest { BaseProcResult result = new BaseProcResult(); group1.getProcNodeData(result); List> rows = result.getRows(); - Assert.assertEquals(5, rows.size()); + Assert.assertEquals(1, rows.size()); } }