From 326a264fcd41c0db7bf0ccfbd9be6d172c469448 Mon Sep 17 00:00:00 2001 From: wangbo Date: Fri, 22 Mar 2024 15:20:58 +0800 Subject: [PATCH] [Improvement](executor)Add spill property for workload group #32554 --- .../schema_workload_groups_scanner.cpp | 7 +- .../runtime/workload_group/workload_group.cpp | 28 ++++- .../runtime/workload_group/workload_group.h | 4 + .../org/apache/doris/catalog/SchemaTable.java | 4 +- .../resource/workloadgroup/WorkloadGroup.java | 109 ++++++++++++++++-- .../workloadgroup/WorkloadGroupMgr.java | 1 + .../tablefunction/MetadataGenerator.java | 49 +++----- gensrc/thrift/BackendService.thrift | 2 + .../workload_manager_p0/test_curd_wlg.out | 12 ++ .../workload_manager_p0/test_curd_wlg.groovy | 76 ++++++++++++ 10 files changed, 243 insertions(+), 49 deletions(-) diff --git a/be/src/exec/schema_scanner/schema_workload_groups_scanner.cpp b/be/src/exec/schema_scanner/schema_workload_groups_scanner.cpp index b3fb9adcbe..24e23a3e33 100644 --- a/be/src/exec/schema_scanner/schema_workload_groups_scanner.cpp +++ b/be/src/exec/schema_scanner/schema_workload_groups_scanner.cpp @@ -35,10 +35,13 @@ std::vector SchemaWorkloadGroupsScanner::_s_tbls_colu {"MAX_CONCURRENCY", TYPE_BIGINT, sizeof(int64_t), true}, {"MAX_QUEUE_SIZE", TYPE_BIGINT, sizeof(int64_t), true}, {"QUEUE_TIMEOUT", TYPE_BIGINT, sizeof(int64_t), true}, - {"CPU_HARD_LIMIT", TYPE_STRING, sizeof(StringRef), true}, + {"CPU_HARD_LIMIT", TYPE_VARCHAR, sizeof(StringRef), true}, {"SCAN_THREAD_NUM", TYPE_BIGINT, sizeof(int64_t), true}, {"MAX_REMOTE_SCAN_THREAD_NUM", TYPE_BIGINT, sizeof(int64_t), true}, - {"MIN_REMOTE_SCAN_THREAD_NUM", TYPE_BIGINT, sizeof(int64_t), true}}; + {"MIN_REMOTE_SCAN_THREAD_NUM", TYPE_BIGINT, sizeof(int64_t), true}, + {"SPILL_THRESHOLD_LOW_WATERMARK", TYPE_VARCHAR, sizeof(StringRef), true}, + {"SPILL_THRESHOLD_HIGH_WATERMARK", TYPE_VARCHAR, sizeof(StringRef), true}, +}; SchemaWorkloadGroupsScanner::SchemaWorkloadGroupsScanner() : SchemaScanner(_s_tbls_columns, TSchemaTableType::SCH_WORKLOAD_GROUPS) {} diff --git a/be/src/runtime/workload_group/workload_group.cpp b/be/src/runtime/workload_group/workload_group.cpp index 88e5a1221e..44fe422e0c 100644 --- a/be/src/runtime/workload_group/workload_group.cpp +++ b/be/src/runtime/workload_group/workload_group.cpp @@ -44,6 +44,8 @@ const static std::string MEMORY_LIMIT_DEFAULT_VALUE = "0%"; const static bool ENABLE_MEMORY_OVERCOMMIT_DEFAULT_VALUE = true; const static int CPU_HARD_LIMIT_DEFAULT_VALUE = -1; const static uint64_t CPU_SOFT_LIMIT_DEFAULT_VALUE = 1024; +const static int SPILL_LOW_WATERMARK_DEFAULT_VALUE = 50; +const static int SPILL_HIGH_WATERMARK_DEFAULT_VALUE = 80; WorkloadGroup::WorkloadGroup(const WorkloadGroupInfo& tg_info) : _id(tg_info.id), @@ -56,17 +58,21 @@ WorkloadGroup::WorkloadGroup(const WorkloadGroupInfo& tg_info) _cpu_hard_limit(tg_info.cpu_hard_limit), _scan_thread_num(tg_info.scan_thread_num), _max_remote_scan_thread_num(tg_info.max_remote_scan_thread_num), - _min_remote_scan_thread_num(tg_info.min_remote_scan_thread_num) {} + _min_remote_scan_thread_num(tg_info.min_remote_scan_thread_num), + _spill_low_watermark(tg_info.spill_low_watermark), + _spill_high_watermark(tg_info.spill_high_watermark) {} std::string WorkloadGroup::debug_string() const { std::shared_lock rl {_mutex}; return fmt::format( "TG[id = {}, name = {}, cpu_share = {}, memory_limit = {}, enable_memory_overcommit = " "{}, version = {}, cpu_hard_limit = {}, scan_thread_num = " - "{}, max_remote_scan_thread_num = {}, min_remote_scan_thread_num = {}]", + "{}, max_remote_scan_thread_num = {}, min_remote_scan_thread_num = {}, " + "spill_low_watermark={}, spill_high_watermark={}]", _id, _name, cpu_share(), PrettyPrinter::print(_memory_limit, TUnit::BYTES), _enable_memory_overcommit ? "true" : "false", _version, cpu_hard_limit(), - _scan_thread_num, _max_remote_scan_thread_num, _min_remote_scan_thread_num); + _scan_thread_num, _max_remote_scan_thread_num, _min_remote_scan_thread_num, + _spill_low_watermark, _spill_high_watermark); } void WorkloadGroup::check_and_update(const WorkloadGroupInfo& tg_info) { @@ -91,6 +97,8 @@ void WorkloadGroup::check_and_update(const WorkloadGroupInfo& tg_info) { _scan_thread_num = tg_info.scan_thread_num; _max_remote_scan_thread_num = tg_info.max_remote_scan_thread_num; _min_remote_scan_thread_num = tg_info.min_remote_scan_thread_num; + _spill_low_watermark = tg_info.spill_low_watermark; + _spill_high_watermark = tg_info.spill_high_watermark; } else { return; } @@ -288,6 +296,20 @@ Status WorkloadGroupInfo::parse_topic_info(const TWorkloadGroupInfo& tworkload_g tworkload_group_info.min_remote_scan_thread_num; } + // 12 spill low watermark + int spill_low_watermark = SPILL_LOW_WATERMARK_DEFAULT_VALUE; + if (tworkload_group_info.__isset.spill_threshold_low_watermark) { + spill_low_watermark = tworkload_group_info.spill_threshold_low_watermark; + } + workload_group_info->spill_low_watermark = spill_low_watermark; + + // 13 spil high watermark + int spill_high_watermark = SPILL_HIGH_WATERMARK_DEFAULT_VALUE; + if (tworkload_group_info.__isset.spill_threshold_high_watermark) { + spill_high_watermark = tworkload_group_info.spill_threshold_high_watermark; + } + workload_group_info->spill_high_watermark = spill_high_watermark; + return Status::OK(); } diff --git a/be/src/runtime/workload_group/workload_group.h b/be/src/runtime/workload_group/workload_group.h index af77be493b..f70ee4c558 100644 --- a/be/src/runtime/workload_group/workload_group.h +++ b/be/src/runtime/workload_group/workload_group.h @@ -149,6 +149,8 @@ private: std::atomic _scan_thread_num; std::atomic _max_remote_scan_thread_num; std::atomic _min_remote_scan_thread_num; + std::atomic _spill_low_watermark; + std::atomic _spill_high_watermark; // means workload group is mark dropped // new query can not submit @@ -178,6 +180,8 @@ struct WorkloadGroupInfo { int scan_thread_num; int max_remote_scan_thread_num; int min_remote_scan_thread_num; + int spill_low_watermark; + int spill_high_watermark; // log cgroup cpu info uint64_t cgroup_cpu_shares = 0; int cgroup_cpu_hard_limit = 0; diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/SchemaTable.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/SchemaTable.java index 5654805619..610a84b474 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/SchemaTable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/SchemaTable.java @@ -480,10 +480,12 @@ public class SchemaTable extends Table { .column("MAX_CONCURRENCY", ScalarType.createType(PrimitiveType.BIGINT)) .column("MAX_QUEUE_SIZE", ScalarType.createType(PrimitiveType.BIGINT)) .column("QUEUE_TIMEOUT", ScalarType.createType(PrimitiveType.BIGINT)) - .column("CPU_HARD_LIMIT", ScalarType.createStringType()) + .column("CPU_HARD_LIMIT", ScalarType.createVarchar(256)) .column("SCAN_THREAD_NUM", ScalarType.createType(PrimitiveType.BIGINT)) .column("MAX_REMOTE_SCAN_THREAD_NUM", ScalarType.createType(PrimitiveType.BIGINT)) .column("MIN_REMOTE_SCAN_THREAD_NUM", ScalarType.createType(PrimitiveType.BIGINT)) + .column("SPILL_THRESHOLD_LOW_WATERMARK", ScalarType.createVarchar(256)) + .column("SPILL_THRESHOLD_HIGH_WATERMARK", ScalarType.createVarchar(256)) .build())) .build(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadgroup/WorkloadGroup.java b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadgroup/WorkloadGroup.java index b14b3afec0..4a220252af 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadgroup/WorkloadGroup.java +++ b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadgroup/WorkloadGroup.java @@ -67,13 +67,21 @@ public class WorkloadGroup implements Writable, GsonPostProcessable { public static final String MIN_REMOTE_SCAN_THREAD_NUM = "min_remote_scan_thread_num"; + public static final String SPILL_THRESHOLD_LOW_WATERMARK = "spill_threshold_low_watermark"; + + public static final String SPILL_THRESHOLD_HIGH_WATERMARK = "spill_threshold_high_watermark"; + // NOTE(wb): all property is not required, some properties default value is set in be // default value is as followed // cpu_share=1024, memory_limit=0%(0 means not limit), enable_memory_overcommit=true private static final ImmutableSet ALL_PROPERTIES_NAME = new ImmutableSet.Builder() .add(CPU_SHARE).add(MEMORY_LIMIT).add(ENABLE_MEMORY_OVERCOMMIT).add(MAX_CONCURRENCY) .add(MAX_QUEUE_SIZE).add(QUEUE_TIMEOUT).add(CPU_HARD_LIMIT).add(SCAN_THREAD_NUM) - .add(MAX_REMOTE_SCAN_THREAD_NUM).add(MIN_REMOTE_SCAN_THREAD_NUM).build(); + .add(MAX_REMOTE_SCAN_THREAD_NUM).add(MIN_REMOTE_SCAN_THREAD_NUM) + .add(SPILL_THRESHOLD_LOW_WATERMARK).add(SPILL_THRESHOLD_HIGH_WATERMARK).build(); + + public static final int SPILL_LOW_WATERMARK_DEFAULT_VALUE = 50; + public static final int SPILL_HIGH_WATERMARK_DEFAULT_VALUE = 80; @SerializedName(value = "id") private long id; @@ -120,6 +128,20 @@ public class WorkloadGroup implements Writable, GsonPostProcessable { this.cpuHardLimit = Integer.parseInt(cpuHardLimitStr); this.properties.put(CPU_HARD_LIMIT, cpuHardLimitStr); } + if (properties.containsKey(SPILL_THRESHOLD_LOW_WATERMARK)) { + String lowWatermarkStr = properties.get(SPILL_THRESHOLD_LOW_WATERMARK); + if (lowWatermarkStr.endsWith("%")) { + lowWatermarkStr = lowWatermarkStr.substring(0, lowWatermarkStr.length() - 1); + } + this.properties.put(SPILL_THRESHOLD_LOW_WATERMARK, lowWatermarkStr); + } + if (properties.containsKey(SPILL_THRESHOLD_HIGH_WATERMARK)) { + String highWatermarkStr = properties.get(SPILL_THRESHOLD_HIGH_WATERMARK); + if (highWatermarkStr.endsWith("%")) { + highWatermarkStr = highWatermarkStr.substring(0, highWatermarkStr.length() - 1); + } + this.properties.put(SPILL_THRESHOLD_HIGH_WATERMARK, highWatermarkStr); + } resetQueueProperty(properties); } @@ -252,7 +274,7 @@ public class WorkloadGroup implements Writable, GsonPostProcessable { } } catch (NumberFormatException e) { throw new DdlException( - MAX_REMOTE_SCAN_THREAD_NUM + " must be a positive integer or -1. but input value is " + value); + MIN_REMOTE_SCAN_THREAD_NUM + " must be a positive integer or -1. but input value is " + value); } } @@ -284,6 +306,51 @@ public class WorkloadGroup implements Writable, GsonPostProcessable { throw new DdlException(QUEUE_TIMEOUT + " requires a positive integer"); } } + + int lowWaterMark = SPILL_LOW_WATERMARK_DEFAULT_VALUE; + if (properties.containsKey(SPILL_THRESHOLD_LOW_WATERMARK)) { + String lowVal = properties.get(SPILL_THRESHOLD_LOW_WATERMARK); + if (lowVal.endsWith("%")) { + lowVal = lowVal.substring(0, lowVal.length() - 1); + } + try { + int intValue = Integer.parseInt(lowVal); + if ((intValue < 1 || intValue > 100) && intValue != -1) { + throw new NumberFormatException(); + } + lowWaterMark = intValue; + } catch (NumberFormatException e) { + throw new DdlException( + SPILL_THRESHOLD_LOW_WATERMARK + + " must be a positive integer(1 ~ 100) or -1. but input value is " + + lowVal); + } + } + + int highWaterMark = SPILL_HIGH_WATERMARK_DEFAULT_VALUE; + if (properties.containsKey(SPILL_THRESHOLD_HIGH_WATERMARK)) { + String highVal = properties.get(SPILL_THRESHOLD_HIGH_WATERMARK); + if (highVal.endsWith("%")) { + highVal = highVal.substring(0, highVal.length() - 1); + } + try { + int intValue = Integer.parseInt(highVal); + if ((intValue < 1 || intValue > 100)) { + throw new NumberFormatException(); + } + highWaterMark = intValue; + } catch (NumberFormatException e) { + throw new DdlException( + SPILL_THRESHOLD_HIGH_WATERMARK + " must be a positive integer(1 ~ 100). but input value is " + + highVal); + } + } + + if (lowWaterMark > highWaterMark) { + throw new DdlException(SPILL_THRESHOLD_HIGH_WATERMARK + "(" + highWaterMark + ") should bigger than " + + SPILL_THRESHOLD_LOW_WATERMARK + "(" + lowWaterMark + ")"); + } + } public long getId() { @@ -323,9 +390,9 @@ public class WorkloadGroup implements Writable, GsonPostProcessable { row.add(String.valueOf(id)); row.add(name); // skip id,name,running query,waiting query - for (int i = 2; i < WorkloadGroupMgr.WORKLOAD_GROUP_PROC_NODE_TITLE_NAMES.size() - 2; i++) { + for (int i = 2; i < WorkloadGroupMgr.WORKLOAD_GROUP_PROC_NODE_TITLE_NAMES.size(); i++) { String key = WorkloadGroupMgr.WORKLOAD_GROUP_PROC_NODE_TITLE_NAMES.get(i); - if (CPU_HARD_LIMIT.equalsIgnoreCase(key)) { + if (CPU_HARD_LIMIT.equals(key)) { String val = properties.get(key); if (StringUtils.isEmpty(val)) { // cpu_hard_limit is not required row.add("0%"); @@ -342,14 +409,32 @@ public class WorkloadGroup implements Writable, GsonPostProcessable { row.add("-1"); } else if (MAX_REMOTE_SCAN_THREAD_NUM.equals(key) && !properties.containsKey(key)) { row.add("-1"); - } else if (MIN_REMOTE_SCAN_THREAD_NUM.equals(key) && !properties.containsKey(key)) { + } else if (MIN_REMOTE_SCAN_THREAD_NUM.equals(key) && !properties.containsKey(key)) { row.add("-1"); + } else if (SPILL_THRESHOLD_LOW_WATERMARK.equals(key)) { + String val = properties.get(key); + if (StringUtils.isEmpty(val)) { + row.add(SPILL_LOW_WATERMARK_DEFAULT_VALUE + "%"); + } else if ("-1".equals(val)) { + row.add("-1"); + } else { + row.add(val + "%"); + } + } else if (SPILL_THRESHOLD_HIGH_WATERMARK.equals(key)) { + String val = properties.get(key); + if (StringUtils.isEmpty(val)) { + row.add(SPILL_HIGH_WATERMARK_DEFAULT_VALUE + "%"); + } else { + row.add(val + "%"); + } + } else if (QueryQueue.RUNNING_QUERY_NUM.equals(key)) { + row.add(qq == null ? "0" : String.valueOf(qq.getCurrentRunningQueryNum())); + } else if (QueryQueue.WAITING_QUERY_NUM.equals(key)) { + row.add(qq == null ? "0" : String.valueOf(qq.getCurrentWaitingQueryNum())); } else { row.add(properties.get(key)); } } - row.add(qq == null ? "0" : String.valueOf(qq.getCurrentRunningQueryNum())); - row.add(qq == null ? "0" : String.valueOf(qq.getCurrentWaitingQueryNum())); result.addRow(row); } @@ -414,6 +499,16 @@ public class WorkloadGroup implements Writable, GsonPostProcessable { tWorkloadGroupInfo.setMinRemoteScanThreadNum(Integer.parseInt(minRemoteScanThreadNumStr)); } + String spillLowWatermarkStr = properties.get(SPILL_THRESHOLD_LOW_WATERMARK); + if (spillLowWatermarkStr != null) { + tWorkloadGroupInfo.setSpillThresholdLowWatermark(Integer.parseInt(spillLowWatermarkStr)); + } + + String spillHighWatermarkStr = properties.get(SPILL_THRESHOLD_HIGH_WATERMARK); + if (spillHighWatermarkStr != null) { + tWorkloadGroupInfo.setSpillThresholdHighWatermark(Integer.parseInt(spillHighWatermarkStr)); + } + TopicInfo topicInfo = new TopicInfo(); topicInfo.setWorkloadGroupInfo(tWorkloadGroupInfo); return topicInfo; diff --git a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadgroup/WorkloadGroupMgr.java b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadgroup/WorkloadGroupMgr.java index 1bd1a35712..967efd26e6 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadgroup/WorkloadGroupMgr.java +++ b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadgroup/WorkloadGroupMgr.java @@ -72,6 +72,7 @@ public class WorkloadGroupMgr implements Writable, GsonPostProcessable { .add(WorkloadGroup.QUEUE_TIMEOUT).add(WorkloadGroup.CPU_HARD_LIMIT) .add(WorkloadGroup.SCAN_THREAD_NUM).add(WorkloadGroup.MAX_REMOTE_SCAN_THREAD_NUM) .add(WorkloadGroup.MIN_REMOTE_SCAN_THREAD_NUM) + .add(WorkloadGroup.SPILL_THRESHOLD_LOW_WATERMARK).add(WorkloadGroup.SPILL_THRESHOLD_HIGH_WATERMARK) .add(QueryQueue.RUNNING_QUERY_NUM).add(QueryQueue.WAITING_QUERY_NUM) .build(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/MetadataGenerator.java b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/MetadataGenerator.java index adf3a9b3ed..c84939ffbe 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/MetadataGenerator.java +++ b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/MetadataGenerator.java @@ -21,8 +21,7 @@ import org.apache.doris.analysis.UserIdentity; import org.apache.doris.catalog.Column; import org.apache.doris.catalog.Env; import org.apache.doris.catalog.MTMV; -import org.apache.doris.catalog.PrimitiveType; -import org.apache.doris.catalog.ScalarType; +import org.apache.doris.catalog.SchemaTable; import org.apache.doris.catalog.Table; import org.apache.doris.common.AnalysisException; import org.apache.doris.common.ClientPool; @@ -43,6 +42,7 @@ import org.apache.doris.qe.ConnectContext; import org.apache.doris.qe.QeProcessorImpl; import org.apache.doris.qe.QeProcessorImpl.QueryInfo; import org.apache.doris.resource.workloadgroup.QueueToken.TokenState; +import org.apache.doris.resource.workloadgroup.WorkloadGroupMgr; import org.apache.doris.system.Backend; import org.apache.doris.system.SystemInfoService; import org.apache.doris.thrift.FrontendService; @@ -67,7 +67,6 @@ import org.apache.doris.thrift.TUserIdentity; import com.google.common.base.Stopwatch; import com.google.common.base.Strings; -import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import com.google.common.collect.Lists; import com.google.gson.Gson; @@ -89,49 +88,23 @@ import java.util.concurrent.TimeUnit; public class MetadataGenerator { private static final Logger LOG = LogManager.getLogger(MetadataGenerator.class); - private static final ImmutableList ACTIVE_QUERIES_SCHEMA = ImmutableList.of( - new Column("QUERY_ID", ScalarType.createStringType()), - new Column("QUERY_START_TIME", ScalarType.createStringType()), - new Column("QUERY_TIME_MS", PrimitiveType.BIGINT), - new Column("WORKLOAD_GROUP_ID", PrimitiveType.BIGINT), - new Column("DATABASE", ScalarType.createStringType()), - new Column("FRONTEND_INSTANCE", ScalarType.createStringType()), - new Column("QUEUE_START_TIME", ScalarType.createStringType()), - new Column("QUEUE_END_TIME", ScalarType.createStringType()), - new Column("QUERY_STATUS", ScalarType.createStringType()), - new Column("SQL", ScalarType.createStringType())); - private static final ImmutableMap ACTIVE_QUERIES_COLUMN_TO_INDEX; - - private static final ImmutableList WORKLOAD_GROUPS_SCHEMA = ImmutableList.of( - new Column("ID", ScalarType.BIGINT), - new Column("NAME", ScalarType.createStringType()), - new Column("CPU_SHARE", PrimitiveType.BIGINT), - new Column("MEMORY_LIMIT", ScalarType.createStringType()), - new Column("ENABLE_MEMORY_OVERCOMMIT", ScalarType.createStringType()), - new Column("MAX_CONCURRENCY", PrimitiveType.BIGINT), - new Column("MAX_QUEUE_SIZE", PrimitiveType.BIGINT), - new Column("QUEUE_TIMEOUT", PrimitiveType.BIGINT), - new Column("CPU_HARD_LIMIT", PrimitiveType.BIGINT), - new Column("SCAN_THREAD_NUM", PrimitiveType.BIGINT), - new Column("MAX_REMOTE_SCAN_THREAD_NUM", PrimitiveType.BIGINT), - new Column("MIN_REMOTE_SCAN_THREAD_NUM", PrimitiveType.BIGINT)); - private static final ImmutableMap WORKLOAD_GROUPS_COLUMN_TO_INDEX; static { ImmutableMap.Builder activeQueriesbuilder = new ImmutableMap.Builder(); - for (int i = 0; i < ACTIVE_QUERIES_SCHEMA.size(); i++) { - activeQueriesbuilder.put(ACTIVE_QUERIES_SCHEMA.get(i).getName().toLowerCase(), i); + List activeQueriesColList = SchemaTable.TABLE_MAP.get("active_queries").getFullSchema(); + for (int i = 0; i < activeQueriesColList.size(); i++) { + activeQueriesbuilder.put(activeQueriesColList.get(i).getName().toLowerCase(), i); } ACTIVE_QUERIES_COLUMN_TO_INDEX = activeQueriesbuilder.build(); - ImmutableMap.Builder workloadGroupsBuilder = new ImmutableMap.Builder(); - for (int i = 0; i < WORKLOAD_GROUPS_SCHEMA.size(); i++) { - workloadGroupsBuilder.put(WORKLOAD_GROUPS_SCHEMA.get(i).getName().toLowerCase(), i); + ImmutableMap.Builder workloadGroupBuilder = new ImmutableMap.Builder(); + for (int i = 0; i < WorkloadGroupMgr.WORKLOAD_GROUP_PROC_NODE_TITLE_NAMES.size(); i++) { + workloadGroupBuilder.put(WorkloadGroupMgr.WORKLOAD_GROUP_PROC_NODE_TITLE_NAMES.get(i).toLowerCase(), i); } - WORKLOAD_GROUPS_COLUMN_TO_INDEX = workloadGroupsBuilder.build(); + WORKLOAD_GROUPS_COLUMN_TO_INDEX = workloadGroupBuilder.build(); } public static TFetchSchemaTableDataResult getMetadataTable(TFetchSchemaTableDataRequest request) throws TException { @@ -458,6 +431,10 @@ public class MetadataGenerator { trow.addToColumnValue(new TCell().setLongVal(Long.valueOf(rGroupsInfo.get(10)))); // min remote scan thread num trow.addToColumnValue(new TCell().setLongVal(Long.valueOf(rGroupsInfo.get(11)))); + trow.addToColumnValue(new TCell().setStringVal(rGroupsInfo.get(12))); + trow.addToColumnValue(new TCell().setStringVal(rGroupsInfo.get(13))); + trow.addToColumnValue(new TCell().setStringVal(rGroupsInfo.get(14))); + trow.addToColumnValue(new TCell().setStringVal(rGroupsInfo.get(15))); dataBatch.add(trow); } diff --git a/gensrc/thrift/BackendService.thrift b/gensrc/thrift/BackendService.thrift index b803618af4..f42aa41ab7 100644 --- a/gensrc/thrift/BackendService.thrift +++ b/gensrc/thrift/BackendService.thrift @@ -179,6 +179,8 @@ struct TWorkloadGroupInfo { 9: optional i32 scan_thread_num 10: optional i32 max_remote_scan_thread_num 11: optional i32 min_remote_scan_thread_num + 12: optional i32 spill_threshold_low_watermark + 13: optional i32 spill_threshold_high_watermark } enum TWorkloadMetricType { diff --git a/regression-test/data/workload_manager_p0/test_curd_wlg.out b/regression-test/data/workload_manager_p0/test_curd_wlg.out index 77b9fa75be..13b33fd9b8 100644 --- a/regression-test/data/workload_manager_p0/test_curd_wlg.out +++ b/regression-test/data/workload_manager_p0/test_curd_wlg.out @@ -44,3 +44,15 @@ test_group 10 11% false 100 0 0 20% -1 normal 20 50% true 2147483647 0 0 1% 16 test_group 10 11% false 100 0 0 20% -1 +-- !show_spill_1 -- +spill_group_test 1024 0% true 2147483647 0 0 0% -1 10% 10% + +-- !show_spill_1 -- +spill_group_test 1024 0% true 2147483647 0 0 0% -1 -1 10% + +-- !show_spill_2 -- +spill_group_test 1024 0% true 2147483647 0 0 0% -1 5% 10% + +-- !show_spill_3 -- +spill_group_test 1024 0% true 2147483647 0 0 0% -1 5% 40% + diff --git a/regression-test/suites/workload_manager_p0/test_curd_wlg.groovy b/regression-test/suites/workload_manager_p0/test_curd_wlg.groovy index 92b4836ea7..a73801f874 100644 --- a/regression-test/suites/workload_manager_p0/test_curd_wlg.groovy +++ b/regression-test/suites/workload_manager_p0/test_curd_wlg.groovy @@ -325,5 +325,81 @@ suite("test_crud_wlg") { Thread.sleep(10000) sql "select /*+SET_VAR(parallel_fragment_exec_instance_num=1)*/ * from ${table_name};" sql "set workload_group=normal;" + + // test workload spill property + // 1 create group + test { + sql "create workload group if not exists spill_group_test_failed properties ( 'spill_threshold_low_watermark'='90%');" + exception "should bigger than spill_threshold_low_watermark" + } + sql "create workload group if not exists spill_group_test properties ( 'spill_threshold_low_watermark'='10%','spill_threshold_high_watermark'='10%');" + qt_show_spill_1 "select name,cpu_share,memory_limit,enable_memory_overcommit,max_concurrency,max_queue_size,queue_timeout,cpu_hard_limit,scan_thread_num,spill_threshold_low_watermark,spill_threshold_high_watermark from information_schema.workload_groups where name in ('spill_group_test');" + + test { + sql "create workload group if not exists spill_group_test properties ( 'spill_threshold_low_watermark'='20%','spill_threshold_high_watermark'='10%');" + exception "should bigger than spill_threshold_low_watermark" + } + + // 2 alter low + sql "alter workload group spill_group_test properties ( 'spill_threshold_low_watermark'='-1' );" + qt_show_spill_1 "select name,cpu_share,memory_limit,enable_memory_overcommit,max_concurrency,max_queue_size,queue_timeout,cpu_hard_limit,scan_thread_num,spill_threshold_low_watermark,spill_threshold_high_watermark from information_schema.workload_groups where name in ('spill_group_test');" + + sql "alter workload group spill_group_test properties ( 'spill_threshold_low_watermark'='5%' );" + qt_show_spill_2 "select name,cpu_share,memory_limit,enable_memory_overcommit,max_concurrency,max_queue_size,queue_timeout,cpu_hard_limit,scan_thread_num,spill_threshold_low_watermark,spill_threshold_high_watermark from information_schema.workload_groups where name in ('spill_group_test');" + + test { + sql "alter workload group spill_group_test properties ( 'spill_threshold_low_watermark'='20%' );" + exception "should bigger than spill_threshold_low_watermark" + } + + test { + sql "alter workload group spill_group_test properties ( 'spill_threshold_low_watermark'='0%' );" + exception "must be a positive integer" + } + + test { + sql "alter workload group spill_group_test properties ( 'spill_threshold_low_watermark'='101%' );" + exception "must be a positive integer" + } + + test { + sql "create workload group if not exists spill_group_test2 properties ( 'spill_threshold_low_watermark'='0%')" + exception "must be a positive integer" + } + + test { + sql "create workload group if not exists spill_group_test2 properties ( 'spill_threshold_low_watermark'='101%')" + exception "must be a positive integer" + } + + // 3 alter high + sql "alter workload group spill_group_test properties ( 'spill_threshold_high_watermark'='40%' );" + qt_show_spill_3 "select name,cpu_share,memory_limit,enable_memory_overcommit,max_concurrency,max_queue_size,queue_timeout,cpu_hard_limit,scan_thread_num,spill_threshold_low_watermark,spill_threshold_high_watermark from information_schema.workload_groups where name in ('spill_group_test');" + test { + sql "alter workload group spill_group_test properties ( 'spill_threshold_high_watermark'='1%' );" + exception "should bigger than spill_threshold_low_watermark" + } + + test { + sql "alter workload group spill_group_test properties ( 'spill_threshold_high_watermark'='0%' );" + exception "must be a positive integer" + } + + test { + sql "alter workload group spill_group_test properties ( 'spill_threshold_high_watermark'='101%' );" + exception "must be a positive integer" + } + + test { + sql "create workload group if not exists spill_group_test2 properties ( 'spill_threshold_high_watermark'='0%')" + exception "must be a positive integer" + } + + test { + sql "create workload group if not exists spill_group_test2 properties ( 'spill_threshold_high_watermark'='101%')" + exception "must be a positive integer" + } + sql "drop workload group test_group;" + sql "drop workload group spill_group_test;" }