diff --git a/be/src/runtime/runtime_query_statistics_mgr.cpp b/be/src/runtime/runtime_query_statistics_mgr.cpp index 9764b0f050..955d1b9a7e 100644 --- a/be/src/runtime/runtime_query_statistics_mgr.cpp +++ b/be/src/runtime/runtime_query_statistics_mgr.cpp @@ -190,6 +190,8 @@ void RuntimeQueryStatiticsMgr::get_metric_map( metric_map.emplace(WorkloadMetricType::QUERY_TIME, std::to_string(query_time_ms)); metric_map.emplace(WorkloadMetricType::SCAN_ROWS, std::to_string(ret_qs.get_scan_rows())); metric_map.emplace(WorkloadMetricType::SCAN_BYTES, std::to_string(ret_qs.get_scan_bytes())); + metric_map.emplace(WorkloadMetricType::QUERY_MEMORY_BYTES, + std::to_string(ret_qs.get_current_used_memory_bytes())); } void RuntimeQueryStatiticsMgr::set_workload_group_id(std::string query_id, int64_t wg_id) { diff --git a/be/src/runtime/workload_management/workload_action.cpp b/be/src/runtime/workload_management/workload_action.cpp index 39916bc7cc..b36895594d 100644 --- a/be/src/runtime/workload_management/workload_action.cpp +++ b/be/src/runtime/workload_management/workload_action.cpp @@ -22,10 +22,14 @@ namespace doris { void WorkloadActionCancelQuery::exec(WorkloadQueryInfo* query_info) { - LOG(INFO) << "[workload_schedule]workload scheduler cancel query " << query_info->query_id; + std::stringstream msg; + msg << "query " << query_info->query_id + << " cancelled by workload policy: " << query_info->policy_name + << ", id:" << query_info->policy_id; + std::string msg_str = msg.str(); + LOG(INFO) << "[workload_schedule]" << msg_str; ExecEnv::GetInstance()->fragment_mgr()->cancel_query( - query_info->tquery_id, PPlanFragmentCancelReason::INTERNAL_ERROR, - std::string("query canceled by workload scheduler")); + query_info->tquery_id, PPlanFragmentCancelReason::INTERNAL_ERROR, msg_str); } void WorkloadActionMoveQuery::exec(WorkloadQueryInfo* query_info) { diff --git a/be/src/runtime/workload_management/workload_condition.cpp b/be/src/runtime/workload_management/workload_condition.cpp index dff6f2adc2..62c6072a60 100644 --- a/be/src/runtime/workload_management/workload_condition.cpp +++ b/be/src/runtime/workload_management/workload_condition.cpp @@ -56,4 +56,17 @@ bool WorkloadConditionScanBytes::eval(std::string str_val) { return WorkloadCompareUtils::compare_signed_integer(_op, scan_bytes_args, _scan_bytes); } +// query memory +WorkloadConditionQueryMemory::WorkloadConditionQueryMemory(WorkloadCompareOperator op, + std::string str_val) { + _op = op; + _query_memory_bytes = std::stol(str_val); +} + +bool WorkloadConditionQueryMemory::eval(std::string str_val) { + int64_t query_memory_bytes = std::stol(str_val); + return WorkloadCompareUtils::compare_signed_integer(_op, query_memory_bytes, + _query_memory_bytes); +} + } // namespace doris \ No newline at end of file diff --git a/be/src/runtime/workload_management/workload_condition.h b/be/src/runtime/workload_management/workload_condition.h index 96387a2af4..a85268a8dc 100644 --- a/be/src/runtime/workload_management/workload_condition.h +++ b/be/src/runtime/workload_management/workload_condition.h @@ -23,7 +23,7 @@ namespace doris { -enum WorkloadMetricType { QUERY_TIME, SCAN_ROWS, SCAN_BYTES }; +enum WorkloadMetricType { QUERY_TIME, SCAN_ROWS, SCAN_BYTES, QUERY_MEMORY_BYTES }; class WorkloadCondition { public: @@ -74,6 +74,19 @@ private: WorkloadCompareOperator _op; }; +class WorkloadConditionQueryMemory : public WorkloadCondition { +public: + WorkloadConditionQueryMemory(WorkloadCompareOperator op, std::string str_val); + bool eval(std::string str_val) override; + WorkloadMetricType get_workload_metric_type() override { + return WorkloadMetricType::QUERY_MEMORY_BYTES; + } + +private: + int64_t _query_memory_bytes; + WorkloadCompareOperator _op; +}; + class WorkloadConditionFactory { public: static std::unique_ptr create_workload_condition( @@ -88,6 +101,8 @@ public: return std::make_unique(op, str_val); } else if (TWorkloadMetricType::type::BE_SCAN_BYTES == metric_name) { return std::make_unique(op, str_val); + } else if (TWorkloadMetricType::type::QUERY_BE_MEMORY_BYTES == metric_name) { + return std::make_unique(op, str_val); } LOG(ERROR) << "not find a metric name " << metric_name; return nullptr; diff --git a/be/src/runtime/workload_management/workload_query_info.h b/be/src/runtime/workload_management/workload_query_info.h index f2da31b619..e544668e10 100644 --- a/be/src/runtime/workload_management/workload_query_info.h +++ b/be/src/runtime/workload_management/workload_query_info.h @@ -29,6 +29,8 @@ public: TUniqueId tquery_id; std::string query_id; int64_t wg_id; + int64_t policy_id; + std::string policy_name; }; } // namespace doris \ No newline at end of file diff --git a/be/src/runtime/workload_management/workload_sched_policy.cpp b/be/src/runtime/workload_management/workload_sched_policy.cpp index b97eb85c06..efa8965dd7 100644 --- a/be/src/runtime/workload_management/workload_sched_policy.cpp +++ b/be/src/runtime/workload_management/workload_sched_policy.cpp @@ -75,6 +75,8 @@ bool WorkloadSchedPolicy::is_match(WorkloadQueryInfo* query_info_ptr) { void WorkloadSchedPolicy::exec_action(WorkloadQueryInfo* query_info) { for (int i = 0; i < _action_list.size(); i++) { + query_info->policy_id = this->_id; + query_info->policy_name = this->_name; _action_list[i]->exec(query_info); } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadActionMeta.java b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadActionMeta.java index 57f6ba3799..2ce0541284 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadActionMeta.java +++ b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadActionMeta.java @@ -37,14 +37,12 @@ public class WorkloadActionMeta { } static WorkloadActionType getWorkloadActionType(String strType) throws UserException { - if (WorkloadActionType.CANCEL_QUERY.toString().equalsIgnoreCase(strType)) { - return WorkloadActionType.CANCEL_QUERY; - } else if (WorkloadActionType.MOVE_QUERY_TO_GROUP.toString().equalsIgnoreCase(strType)) { - return WorkloadActionType.MOVE_QUERY_TO_GROUP; - } else if (WorkloadActionType.SET_SESSION_VARIABLE.toString().equalsIgnoreCase(strType)) { - return WorkloadActionType.SET_SESSION_VARIABLE; + WorkloadActionType workloadActionType = WorkloadSchedPolicyMgr.STRING_ACTION_MAP.get(strType.toUpperCase()); + if (workloadActionType == null) { + throw new UserException("invalid action type " + strType); + } else { + return workloadActionType; } - throw new UserException("invalid action type " + strType); } public String toString() { diff --git a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadCondition.java b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadCondition.java index 5d89d2afae..c790a40130 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadCondition.java +++ b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadCondition.java @@ -37,6 +37,8 @@ public interface WorkloadCondition { return WorkloadConditionBeScanRows.createWorkloadCondition(cm.op, cm.value); } else if (WorkloadMetricType.BE_SCAN_BYTES.equals(cm.metricName)) { return WorkloadConditionBeScanBytes.createWorkloadCondition(cm.op, cm.value); + } else if (WorkloadMetricType.QUERY_BE_MEMORY_BYTES.equals(cm.metricName)) { + return WorkloadConditionQueryBeMemory.createWorkloadCondition(cm.op, cm.value); } throw new UserException("invalid metric name:" + cm.metricName); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadConditionBeScanBytes.java b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadConditionBeScanBytes.java index 7431f2e0c4..bd914baf54 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadConditionBeScanBytes.java +++ b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadConditionBeScanBytes.java @@ -38,9 +38,14 @@ public class WorkloadConditionBeScanBytes implements WorkloadCondition { public static WorkloadConditionBeScanBytes createWorkloadCondition(WorkloadConditionOperator op, String value) throws UserException { - long longValue = Long.parseLong(value); - if (longValue < 0) { - throw new UserException("invalid scan bytes value, " + longValue + ", it requires >= 0"); + long longValue = -1; + try { + longValue = Long.parseLong(value); + if (longValue < 0) { + throw new NumberFormatException(); + } + } catch (NumberFormatException e) { + throw new UserException("invalid scan bytes value: " + value + ", it requires >= 0"); } return new WorkloadConditionBeScanBytes(op, longValue); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadConditionBeScanRows.java b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadConditionBeScanRows.java index c2fb638e08..8b99e40d04 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadConditionBeScanRows.java +++ b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadConditionBeScanRows.java @@ -38,9 +38,14 @@ public class WorkloadConditionBeScanRows implements WorkloadCondition { public static WorkloadConditionBeScanRows createWorkloadCondition(WorkloadConditionOperator op, String value) throws UserException { - long longValue = Long.parseLong(value); - if (longValue < 0) { - throw new UserException("invalid scan rows value, " + longValue + ", it requires >= 0"); + long longValue = -1; + try { + longValue = Long.parseLong(value); + if (longValue < 0) { + throw new NumberFormatException(); + } + } catch (NumberFormatException e) { + throw new UserException("invalid scan rows value: " + value + ", it requires >= 0"); } return new WorkloadConditionBeScanRows(op, longValue); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadConditionMeta.java b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadConditionMeta.java index 52f50f924f..81e0f6c218 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadConditionMeta.java +++ b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadConditionMeta.java @@ -40,16 +40,12 @@ public class WorkloadConditionMeta { } private static WorkloadMetricType getMetricType(String metricStr) throws UserException { - if (WorkloadMetricType.USERNAME.toString().equalsIgnoreCase(metricStr)) { - return WorkloadMetricType.USERNAME; - } else if (WorkloadMetricType.QUERY_TIME.toString().equalsIgnoreCase(metricStr)) { - return WorkloadMetricType.QUERY_TIME; - } else if (WorkloadMetricType.BE_SCAN_ROWS.toString().equalsIgnoreCase(metricStr)) { - return WorkloadMetricType.BE_SCAN_ROWS; - } else if (WorkloadMetricType.BE_SCAN_BYTES.toString().equalsIgnoreCase(metricStr)) { - return WorkloadMetricType.BE_SCAN_BYTES; + WorkloadMetricType metricType = WorkloadSchedPolicyMgr.STRING_METRIC_MAP.get(metricStr.toUpperCase()); + if (metricType == null) { + throw new UserException("invalid metric name:" + metricStr); + } else { + return metricType; } - throw new UserException("invalid metric name:" + metricStr); } public String toString() { diff --git a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadConditionQueryBeMemory.java b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadConditionQueryBeMemory.java new file mode 100644 index 0000000000..2274b35ca5 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadConditionQueryBeMemory.java @@ -0,0 +1,56 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.resource.workloadschedpolicy; + +import org.apache.doris.common.UserException; + +public class WorkloadConditionQueryBeMemory implements WorkloadCondition { + + private long value; + + private WorkloadConditionOperator op; + + public WorkloadConditionQueryBeMemory(WorkloadConditionOperator op, long value) { + this.value = value; + this.op = op; + } + + @Override + public boolean eval(String strValue) { + return false; + } + + @Override + public WorkloadMetricType getMetricType() { + return WorkloadMetricType.QUERY_BE_MEMORY_BYTES; + } + + public static WorkloadConditionQueryBeMemory createWorkloadCondition(WorkloadConditionOperator op, + String value) throws UserException { + long longValue = -1; + try { + longValue = Long.parseLong(value); + if (longValue < 0) { + throw new NumberFormatException(); + } + } catch (NumberFormatException e) { + throw new UserException("invalid query be memory value: " + value + ", it requires >= 0"); + } + return new WorkloadConditionQueryBeMemory(op, longValue); + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadConditionQueryTime.java b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadConditionQueryTime.java index e61484508d..6c3a5c653a 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadConditionQueryTime.java +++ b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadConditionQueryTime.java @@ -37,9 +37,14 @@ public class WorkloadConditionQueryTime implements WorkloadCondition { public static WorkloadConditionQueryTime createWorkloadCondition(WorkloadConditionOperator op, String value) throws UserException { - long longValue = Long.parseLong(value); - if (longValue < 0) { - throw new UserException("invalid query time value, " + longValue + ", it requires >= 0"); + long longValue = -1; + try { + longValue = Long.parseLong(value); + if (longValue < 0) { + throw new NumberFormatException(); + } + } catch (NumberFormatException e) { + throw new UserException("invalid query time value: " + value + ", it requires >= 0"); } return new WorkloadConditionQueryTime(op, longValue); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadMetricType.java b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadMetricType.java index ed17414ec4..93e612a85c 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadMetricType.java +++ b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadMetricType.java @@ -18,5 +18,5 @@ package org.apache.doris.resource.workloadschedpolicy; public enum WorkloadMetricType { - USERNAME, QUERY_TIME, BE_SCAN_ROWS, BE_SCAN_BYTES + USERNAME, QUERY_TIME, BE_SCAN_ROWS, BE_SCAN_BYTES, QUERY_BE_MEMORY_BYTES } diff --git a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadSchedPolicy.java b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadSchedPolicy.java index 55759e9097..ff27a08706 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadSchedPolicy.java +++ b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadSchedPolicy.java @@ -22,7 +22,6 @@ import org.apache.doris.common.io.Text; import org.apache.doris.common.io.Writable; import org.apache.doris.persist.gson.GsonPostProcessable; import org.apache.doris.persist.gson.GsonUtils; -import org.apache.doris.thrift.TCompareOperator; import org.apache.doris.thrift.TWorkloadAction; import org.apache.doris.thrift.TWorkloadActionType; import org.apache.doris.thrift.TWorkloadCondition; @@ -31,7 +30,6 @@ import org.apache.doris.thrift.TWorkloadSchedPolicy; import org.apache.doris.thrift.TopicInfo; import com.esotericsoftware.minlog.Log; -import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; import com.google.gson.annotations.SerializedName; @@ -51,25 +49,6 @@ public class WorkloadSchedPolicy implements Writable, GsonPostProcessable { public static final ImmutableSet POLICY_PROPERTIES = new ImmutableSet.Builder() .add(ENABLED).add(PRIORITY).add(WORKLOAD_GROUP).build(); - // used for convert fe type to thrift type - private static ImmutableMap METRIC_MAP - = new ImmutableMap.Builder() - .put(WorkloadMetricType.QUERY_TIME, TWorkloadMetricType.QUERY_TIME) - .put(WorkloadMetricType.BE_SCAN_ROWS, TWorkloadMetricType.BE_SCAN_ROWS) - .put(WorkloadMetricType.BE_SCAN_BYTES, TWorkloadMetricType.BE_SCAN_BYTES).build(); - private static ImmutableMap ACTION_MAP - = new ImmutableMap.Builder() - .put(WorkloadActionType.MOVE_QUERY_TO_GROUP, TWorkloadActionType.MOVE_QUERY_TO_GROUP) - .put(WorkloadActionType.CANCEL_QUERY, TWorkloadActionType.CANCEL_QUERY).build(); - - private static ImmutableMap OP_MAP - = new ImmutableMap.Builder() - .put(WorkloadConditionOperator.EQUAL, TCompareOperator.EQUAL) - .put(WorkloadConditionOperator.GREATER, TCompareOperator.GREATER) - .put(WorkloadConditionOperator.GREATER_EQUAL, TCompareOperator.GREATER_EQUAL) - .put(WorkloadConditionOperator.LESS, TCompareOperator.LESS) - .put(WorkloadConditionOperator.LESS_EQUAl, TCompareOperator.LESS_EQUAL).build(); - @SerializedName(value = "id") long id; @SerializedName(value = "name") @@ -255,12 +234,12 @@ public class WorkloadSchedPolicy implements Writable, GsonPostProcessable { List condList = new ArrayList(); for (WorkloadConditionMeta cond : conditionMetaList) { TWorkloadCondition tCond = new TWorkloadCondition(); - TWorkloadMetricType metricType = METRIC_MAP.get(cond.metricName); + TWorkloadMetricType metricType = WorkloadSchedPolicyMgr.METRIC_MAP.get(cond.metricName); if (metricType == null) { return null; } tCond.setMetricName(metricType); - tCond.setOp(OP_MAP.get(cond.op)); + tCond.setOp(WorkloadSchedPolicyMgr.OP_MAP.get(cond.op)); tCond.setValue(cond.value); condList.add(tCond); } @@ -268,7 +247,7 @@ public class WorkloadSchedPolicy implements Writable, GsonPostProcessable { List actionList = new ArrayList(); for (WorkloadActionMeta action : actionMetaList) { TWorkloadAction tAction = new TWorkloadAction(); - TWorkloadActionType tActionType = ACTION_MAP.get(action.action); + TWorkloadActionType tActionType = WorkloadSchedPolicyMgr.ACTION_MAP.get(action.action); if (tActionType == null) { return null; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadSchedPolicyMgr.java b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadSchedPolicyMgr.java index 4aa7563f8d..3879dd83b9 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadSchedPolicyMgr.java +++ b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadSchedPolicyMgr.java @@ -35,11 +35,15 @@ import org.apache.doris.persist.gson.GsonPostProcessable; import org.apache.doris.persist.gson.GsonUtils; import org.apache.doris.qe.ConnectContext; import org.apache.doris.service.ExecuteEnv; +import org.apache.doris.thrift.TCompareOperator; import org.apache.doris.thrift.TUserIdentity; +import org.apache.doris.thrift.TWorkloadActionType; +import org.apache.doris.thrift.TWorkloadMetricType; import org.apache.doris.thrift.TopicInfo; import com.google.common.base.Preconditions; import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; import com.google.common.collect.Maps; import com.google.gson.annotations.SerializedName; @@ -80,6 +84,14 @@ public class WorkloadSchedPolicyMgr extends MasterDaemon implements Writable, Gs .add("WorkloadGroup") .build(); + public static final ImmutableMap OP_MAP + = new ImmutableMap.Builder() + .put(WorkloadConditionOperator.EQUAL, TCompareOperator.EQUAL) + .put(WorkloadConditionOperator.GREATER, TCompareOperator.GREATER) + .put(WorkloadConditionOperator.GREATER_EQUAL, TCompareOperator.GREATER_EQUAL) + .put(WorkloadConditionOperator.LESS, TCompareOperator.LESS) + .put(WorkloadConditionOperator.LESS_EQUAl, TCompareOperator.LESS_EQUAL).build(); + public static final ImmutableSet FE_ACTION_SET = new ImmutableSet.Builder().add(WorkloadActionType.SET_SESSION_VARIABLE).build(); @@ -93,7 +105,39 @@ public class WorkloadSchedPolicyMgr extends MasterDaemon implements Writable, Gs public static final ImmutableSet BE_METRIC_SET = new ImmutableSet.Builder().add(WorkloadMetricType.BE_SCAN_ROWS) - .add(WorkloadMetricType.BE_SCAN_BYTES).add(WorkloadMetricType.QUERY_TIME).build(); + .add(WorkloadMetricType.BE_SCAN_BYTES).add(WorkloadMetricType.QUERY_TIME) + .add(WorkloadMetricType.QUERY_BE_MEMORY_BYTES).build(); + + // used for convert fe type to thrift type + public static final ImmutableMap METRIC_MAP + = new ImmutableMap.Builder() + .put(WorkloadMetricType.QUERY_TIME, TWorkloadMetricType.QUERY_TIME) + .put(WorkloadMetricType.BE_SCAN_ROWS, TWorkloadMetricType.BE_SCAN_ROWS) + .put(WorkloadMetricType.BE_SCAN_BYTES, TWorkloadMetricType.BE_SCAN_BYTES) + .put(WorkloadMetricType.QUERY_BE_MEMORY_BYTES, TWorkloadMetricType.QUERY_BE_MEMORY_BYTES).build(); + public static final ImmutableMap ACTION_MAP + = new ImmutableMap.Builder() + .put(WorkloadActionType.MOVE_QUERY_TO_GROUP, TWorkloadActionType.MOVE_QUERY_TO_GROUP) + .put(WorkloadActionType.CANCEL_QUERY, TWorkloadActionType.CANCEL_QUERY).build(); + + public static final Map STRING_METRIC_MAP = new HashMap<>(); + public static final Map STRING_ACTION_MAP = new HashMap<>(); + + static { + for (WorkloadMetricType metricType : FE_METRIC_SET) { + STRING_METRIC_MAP.put(metricType.toString(), metricType); + } + for (WorkloadMetricType metricType : BE_METRIC_SET) { + STRING_METRIC_MAP.put(metricType.toString(), metricType); + } + + for (WorkloadActionType actionType : FE_ACTION_SET) { + STRING_ACTION_MAP.put(actionType.toString(), actionType); + } + for (WorkloadActionType actionType : BE_ACTION_SET) { + STRING_ACTION_MAP.put(actionType.toString(), actionType); + } + } private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); diff --git a/gensrc/thrift/BackendService.thrift b/gensrc/thrift/BackendService.thrift index d93520206c..0a2edb8ccb 100644 --- a/gensrc/thrift/BackendService.thrift +++ b/gensrc/thrift/BackendService.thrift @@ -187,6 +187,7 @@ enum TWorkloadMetricType { QUERY_TIME BE_SCAN_ROWS BE_SCAN_BYTES + QUERY_BE_MEMORY_BYTES } enum TCompareOperator { diff --git a/regression-test/suites/workload_manager_p0/test_workload_sched_policy.groovy b/regression-test/suites/workload_manager_p0/test_workload_sched_policy.groovy index d3f9b35426..2536b06ce7 100644 --- a/regression-test/suites/workload_manager_p0/test_workload_sched_policy.groovy +++ b/regression-test/suites/workload_manager_p0/test_workload_sched_policy.groovy @@ -23,6 +23,9 @@ suite("test_workload_sched_policy") { sql "drop workload policy if exists set_action_policy;" sql "drop workload policy if exists fe_policy;" sql "drop workload policy if exists be_policy;" + sql "drop workload policy if exists be_scan_row_policy;" + sql "drop workload policy if exists be_scan_bytes_policy;" + sql "drop workload policy if exists query_be_memory_used;" // 1 create cancel policy sql "create workload policy test_cancel_policy " + @@ -106,11 +109,38 @@ suite("test_workload_sched_policy") { exception "duplicate set_session_variable action args one policy" } + test { + sql "create workload policy invalid_metric_value_policy conditions(query_be_memory_bytes > '-1') actions(cancel_query);" + exception "invalid" + } + + test { + sql "create workload policy invalid_metric_value_policy conditions(query_time > '-1') actions(cancel_query);" + exception "invalid" + } + + test { + sql "create workload policy invalid_metric_value_policy conditions(be_scan_rows > '-1') actions(cancel_query);" + exception "invalid" + } + + test { + sql "create workload policy invalid_metric_value_policy conditions(be_scan_bytes > '-1') actions(cancel_query);" + exception "invalid" + } + + sql "create workload policy be_scan_row_policy conditions(be_scan_rows > 1) actions(cancel_query) properties('enabled'='false');" + sql "create workload policy be_scan_bytes_policy conditions(be_scan_bytes > 1) actions(cancel_query) properties('enabled'='false');" + sql "create workload policy query_be_memory_used conditions(query_be_memory_bytes > 1) actions(cancel_query) properties('enabled'='false');" + // drop sql "drop workload policy test_cancel_policy;" sql "drop workload policy set_action_policy;" sql "drop workload policy fe_policy;" sql "drop workload policy be_policy;" + sql "drop workload policy be_scan_row_policy;" + sql "drop workload policy be_scan_bytes_policy;" + sql "drop workload policy query_be_memory_used;" qt_select_policy_tvf_after_drop "select name,condition,action,priority,enabled,version from information_schema.workload_policy where name in('be_policy','fe_policy','set_action_policy','test_cancel_policy') order by name;"