diff --git a/be/src/runtime/workload_management/workload_sched_policy_mgr.cpp b/be/src/runtime/workload_management/workload_sched_policy_mgr.cpp index 8a30b5395e..2398bff465 100644 --- a/be/src/runtime/workload_management/workload_sched_policy_mgr.cpp +++ b/be/src/runtime/workload_management/workload_sched_policy_mgr.cpp @@ -78,6 +78,9 @@ void WorkloadSchedPolicyMgr::_schedule_workload() { std::vector list; _exec_env->fragment_mgr()->get_runtime_query_info(&list); // todo: add timer + if (list.size() == 0) { + continue; + } LOG(INFO) << "[workload_schedule] get query list size=" << list.size(); for (int i = 0; i < list.size(); i++) { diff --git a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java index 0f4d019347..4e05352455 100644 --- a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java +++ b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java @@ -2318,6 +2318,18 @@ public class Config extends ConfigBase { @ConfField(mutable = true) public static int workload_action_interval_ms = 10000; // 10s + @ConfField(mutable = true, masterOnly = true) + public static int workload_max_policy_num = 25; + + @ConfField(mutable = true, masterOnly = true) + public static int workload_max_condition_num_in_policy = 5; + + @ConfField(mutable = true, masterOnly = true) + public static int workload_max_action_num_in_policy = 5; // mainly used to limit set session var action + + @ConfField(mutable = true, masterOnly = true) + public static int workload_group_max_num = 15; + @ConfField(description = {"查询be wal_queue 的超时阈值(ms)", "the timeout threshold of checking wal_queue on be(ms)"}) public static int check_wal_queue_timeout_threshold = 180000; // 3 min diff --git a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadgroup/WorkloadGroupMgr.java b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadgroup/WorkloadGroupMgr.java index c8b49a5eba..85a87f610e 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadgroup/WorkloadGroupMgr.java +++ b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadgroup/WorkloadGroupMgr.java @@ -153,13 +153,6 @@ public class WorkloadGroupMgr implements Writable, GsonPostProcessable { lock.writeLock().unlock(); } - private void checkWorkloadGroupEnabled() throws DdlException { - if (!Config.enable_workload_group) { - throw new DdlException( - "WorkloadGroup is disabled, you can set config enable_workload_group = true to enable it"); - } - } - public void init() { if (Config.enable_workload_group || Config.use_fuzzy_session_variable /* for github workflow */) { checkAndCreateDefaultGroup(); @@ -265,8 +258,6 @@ public class WorkloadGroupMgr implements Writable, GsonPostProcessable { } public void createWorkloadGroup(CreateWorkloadGroupStmt stmt) throws DdlException { - checkWorkloadGroupEnabled(); - WorkloadGroup workloadGroup = WorkloadGroup.create(stmt.getWorkloadGroupName(), stmt.getProperties()); String workloadGroupName = workloadGroup.getName(); writeLock(); @@ -277,6 +268,10 @@ public class WorkloadGroupMgr implements Writable, GsonPostProcessable { } throw new DdlException("workload group " + workloadGroupName + " already exist"); } + if (idToWorkloadGroup.size() >= Config.workload_group_max_num) { + throw new DdlException( + "workload group number can not be exceed " + Config.workload_group_max_num); + } checkGlobalUnlock(workloadGroup, null); nameToWorkloadGroup.put(workloadGroupName, workloadGroup); idToWorkloadGroup.put(workloadGroup.getId(), workloadGroup); @@ -323,8 +318,6 @@ public class WorkloadGroupMgr implements Writable, GsonPostProcessable { } public void alterWorkloadGroup(AlterWorkloadGroupStmt stmt) throws DdlException { - checkWorkloadGroupEnabled(); - String workloadGroupName = stmt.getWorkloadGroupName(); Map properties = stmt.getProperties(); WorkloadGroup newWorkloadGroup; @@ -346,8 +339,6 @@ public class WorkloadGroupMgr implements Writable, GsonPostProcessable { } public void dropWorkloadGroup(DropWorkloadGroupStmt stmt) throws DdlException { - checkWorkloadGroupEnabled(); - String workloadGroupName = stmt.getWorkloadGroupName(); if (DEFAULT_GROUP_NAME.equals(workloadGroupName)) { throw new DdlException("Dropping default workload group " + workloadGroupName + " is not allowed"); diff --git a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadMetricType.java b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadMetricType.java index 89dfde9eba..f81d75c675 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadMetricType.java +++ b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadMetricType.java @@ -18,5 +18,5 @@ package org.apache.doris.resource.workloadschedpolicy; public enum WorkloadMetricType { - USERNAME, QUERY_TIME + USERNAME, QUERY_TIME, SCAN_ROWS, SCAN_BYTES } diff --git a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadSchedPolicy.java b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadSchedPolicy.java index d514ea62d2..13637b5b8b 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadSchedPolicy.java +++ b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadSchedPolicy.java @@ -87,6 +87,8 @@ public class WorkloadSchedPolicy implements Writable, GsonPostProcessable { private List workloadConditionList; private List workloadActionList; + private Boolean isFePolicy = null; + // for ut public WorkloadSchedPolicy() { } @@ -198,6 +200,21 @@ public class WorkloadSchedPolicy implements Writable, GsonPostProcessable { return actionMetaList; } + // true, current policy can only run in FE; + // false, current policy can only run in BE + public boolean isFePolicy() { + if (isFePolicy == null) { + isFePolicy = false; + for (WorkloadAction action : workloadActionList) { + if (WorkloadSchedPolicyMgr.FE_ACTION_SET.contains(action.getWorkloadActionType())) { + isFePolicy = true; + break; + } + } + } + return isFePolicy; + } + public TopicInfo toTopicInfo() { TWorkloadSchedPolicy tPolicy = new TWorkloadSchedPolicy(); tPolicy.setId(id); diff --git a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadSchedPolicyMgr.java b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadSchedPolicyMgr.java index 45ba3a35de..0074020735 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadSchedPolicyMgr.java +++ b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadSchedPolicyMgr.java @@ -39,6 +39,7 @@ import org.apache.doris.thrift.TopicInfo; import com.google.common.base.Preconditions; import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableSet; import com.google.common.collect.Maps; import com.google.gson.annotations.SerializedName; import org.apache.logging.log4j.LogManager; @@ -73,6 +74,22 @@ public class WorkloadSchedPolicyMgr implements Writable, GsonPostProcessable { .add("Id").add("Name").add("Condition").add("Action").add("Priority").add("Enabled").add("Version") .build(); + public static final ImmutableSet FE_ACTION_SET + = new ImmutableSet.Builder().add(WorkloadActionType.SET_SESSION_VARIABLE).build(); + + public static final ImmutableSet FE_METRIC_SET + = new ImmutableSet.Builder().add(WorkloadMetricType.USERNAME) + .build(); + + public static final ImmutableSet BE_ACTION_SET + = new ImmutableSet.Builder().add(WorkloadActionType.MOVE_QUERY_TO_GROUP) + .add(WorkloadActionType.CANCEL_QUERY).build(); + + public static final ImmutableSet BE_METRIC_SET + = new ImmutableSet.Builder().add(WorkloadMetricType.SCAN_ROWS) + .add(WorkloadMetricType.SCAN_BYTES).add(WorkloadMetricType.QUERY_TIME) + .build(); + private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); public static Comparator policyComparator = new Comparator() { @@ -151,6 +168,7 @@ public class WorkloadSchedPolicyMgr implements Writable, GsonPostProcessable { WorkloadCondition cond = WorkloadCondition.createWorkloadCondition(cm); policyConditionList.add(cond); } + boolean feCondition = checkPolicyCondition(policyConditionList); // 2 create action List originActions = createStmt.getActions(); @@ -172,7 +190,11 @@ public class WorkloadSchedPolicyMgr implements Writable, GsonPostProcessable { WorkloadAction ret = WorkloadAction.createWorkloadAction(workloadActionMeta); policyActionList.add(ret); } - checkPolicyActionConflicts(policyActionList); + + boolean feAction = checkPolicyAction(policyActionList); + if (feAction != feCondition) { + throw new UserException("action and metric must run in FE together or run in BE together"); + } // 3 create policy Map propMap = createStmt.getProperties(); @@ -191,6 +213,10 @@ public class WorkloadSchedPolicyMgr implements Writable, GsonPostProcessable { throw new UserException("workload schedule policy " + policyName + " already exists "); } } + if (idToPolicy.size() >= Config.workload_max_policy_num) { + throw new UserException( + "workload scheduler policy num can not exceed " + Config.workload_max_policy_num); + } long id = Env.getCurrentEnv().getNextId(); WorkloadSchedPolicy policy = new WorkloadSchedPolicy(id, policyName, policyConditionList, policyActionList, propMap); @@ -204,10 +230,51 @@ public class WorkloadSchedPolicyMgr implements Writable, GsonPostProcessable { } } - private void checkPolicyActionConflicts(List actionList) throws UserException { + private boolean checkPolicyCondition(List conditionList) throws UserException { + if (conditionList.size() > Config.workload_max_condition_num_in_policy) { + throw new UserException( + "condition num in a policy can not exceed " + Config.workload_max_condition_num_in_policy); + } + boolean containsFeMetric = false; + boolean containsBeMetric = false; + for (WorkloadCondition cond : conditionList) { + if (FE_METRIC_SET.contains(cond.getMetricType())) { + containsFeMetric = true; + } + if (BE_METRIC_SET.contains(cond.getMetricType())) { + containsBeMetric = true; + } + if (containsFeMetric && containsBeMetric) { + throw new UserException( + "one policy can not contains fe and be metric, FE metric list is " + FE_METRIC_SET + + ", BE metric list is " + BE_METRIC_SET); + } + } + return containsFeMetric; + } + + private boolean checkPolicyAction(List actionList) throws UserException { + if (actionList.size() > Config.workload_max_action_num_in_policy) { + throw new UserException( + "action num in one policy can not exceed " + Config.workload_max_action_num_in_policy); + } + Set actionTypeSet = new HashSet<>(); Set setSessionVarSet = new HashSet<>(); + boolean containsFeAction = false; + boolean containsBeAction = false; for (WorkloadAction action : actionList) { + if (FE_ACTION_SET.contains(action.getWorkloadActionType())) { + containsFeAction = true; + } + if (BE_ACTION_SET.contains(action.getWorkloadActionType())) { + containsBeAction = true; + } + if (containsFeAction && containsBeAction) { + throw new UserException( + "one policy can not contains fe and be action, FE action list is " + FE_ACTION_SET + + ", BE action list is " + BE_ACTION_SET); + } // set session var cmd can be duplicate, but args can not be duplicate if (action.getWorkloadActionType().equals(WorkloadActionType.SET_SESSION_VARIABLE)) { WorkloadActionSetSessionVar setAction = (WorkloadActionSetSessionVar) action; @@ -225,6 +292,7 @@ public class WorkloadSchedPolicyMgr implements Writable, GsonPostProcessable { throw new UserException(String.format("%s and %s can not exist in one policy at same time", WorkloadActionType.CANCEL_QUERY, WorkloadActionType.MOVE_QUERY_TO_GROUP)); } + return containsFeAction; } public void execPolicy(List queryInfoList) { @@ -232,7 +300,11 @@ public class WorkloadSchedPolicyMgr implements Writable, GsonPostProcessable { Set policyIdSet = new HashSet<>(); readLock(); try { - policyIdSet.addAll(idToPolicy.keySet()); + for (Map.Entry entry : idToPolicy.entrySet()) { + if (entry.getValue().isFePolicy()) { + policyIdSet.add(entry.getKey()); + } + } } finally { readUnlock(); } @@ -418,6 +490,9 @@ public class WorkloadSchedPolicyMgr implements Writable, GsonPostProcessable { readLock(); try { for (Map.Entry entry : idToPolicy.entrySet()) { + if (entry.getValue().isFePolicy()) { + continue; + } TopicInfo tInfo = entry.getValue().toTopicInfo(); if (tInfo != null) { topicInfoList.add(tInfo); diff --git a/regression-test/data/workload_manager_p0/test_workload_sched_policy.out b/regression-test/data/workload_manager_p0/test_workload_sched_policy.out index 4e8482384c..c5028f8363 100644 --- a/regression-test/data/workload_manager_p0/test_workload_sched_policy.out +++ b/regression-test/data/workload_manager_p0/test_workload_sched_policy.out @@ -1,8 +1,9 @@ -- This file is automatically generated. You should know what you did if you want to edit this -- !select_policy_tvf -- -full_policy_policy query_time > 10;username = root set_session_variable "workload_group=normal" 10 false 0 -move_action_policy username = root move_query_to_group "normal" 0 true 0 -set_action_policy query_time > 10;username = root set_session_variable "workload_group=normal" 0 true 0 +be_policy query_time > 10 cancel_query 10 true 0 +fe_policy username = root set_session_variable "workload_group=normal" 10 false 0 +move_action_policy query_time > 10 move_query_to_group "normal" 0 true 0 +set_action_policy username = root set_session_variable "workload_group=normal" 0 true 0 test_cancel_policy query_time > 10 cancel_query 0 false 0 -- !select_policy_tvf_after_drop -- diff --git a/regression-test/suites/workload_manager_p0/test_workload_sched_policy.groovy b/regression-test/suites/workload_manager_p0/test_workload_sched_policy.groovy index 2c51c79ae0..d59bb76dd5 100644 --- a/regression-test/suites/workload_manager_p0/test_workload_sched_policy.groovy +++ b/regression-test/suites/workload_manager_p0/test_workload_sched_policy.groovy @@ -19,12 +19,11 @@ suite("test_workload_sched_policy") { sql "set experimental_enable_nereids_planner = false;" - sql "drop workload schedule policy if exists full_policy_policy;" - sql "drop workload schedule policy if exists set_action_policy;" - sql "drop workload schedule policy if exists move_action_policy;" sql "drop workload schedule policy if exists test_cancel_policy;" - sql "drop workload schedule policy if exists test_set_var_policy;" - sql "drop workload schedule policy if exists test_set_var_policy2;" + sql "drop workload schedule policy if exists move_action_policy;" + sql "drop workload schedule policy if exists set_action_policy;" + sql "drop workload schedule policy if exists fe_policy;" + sql "drop workload schedule policy if exists be_policy;" // 1 create cancel policy sql "create workload schedule policy test_cancel_policy " + @@ -33,27 +32,36 @@ suite("test_workload_sched_policy") { // 2 create cancel policy sql "create workload schedule policy move_action_policy " + - "conditions(username='root') " + + "conditions(query_time > 10) " + "actions(move_query_to_group 'normal');" // 3 create set policy sql "create workload schedule policy set_action_policy " + - "conditions(query_time > 10, username='root') " + + "conditions(username='root') " + "actions(set_session_variable 'workload_group=normal');" - // 4 create policy with property - sql "create workload schedule policy full_policy_policy " + - "conditions(query_time > 10, username='root') " + + // 4 create policy run in fe + sql "create workload schedule policy fe_policy " + + "conditions(username='root') " + "actions(set_session_variable 'workload_group=normal') " + "properties( " + "'enabled' = 'false', " + "'priority'='10' " + ");" + // 5 create policy run in be + sql "create workload schedule policy be_policy " + + "conditions(query_time > 10) " + + "actions(cancel_query) " + + "properties( " + + "'enabled' = 'true', " + + "'priority'='10' " + + ");" + qt_select_policy_tvf "select name,condition,action,priority,enabled,version from workload_schedule_policy() order by name;" // test_alter - sql "alter workload schedule policy full_policy_policy properties('priority'='2', 'enabled'='false');" + sql "alter workload schedule policy fe_policy properties('priority'='2', 'enabled'='false');" // create failed check test { @@ -71,26 +79,26 @@ suite("test_workload_sched_policy") { } test { - sql "alter workload schedule policy full_policy_policy properties('priority'='abc');" + sql "alter workload schedule policy fe_policy properties('priority'='abc');" exception "invalid priority property value" } test { - sql "alter workload schedule policy full_policy_policy properties('enabled'='abc');" + sql "alter workload schedule policy fe_policy properties('enabled'='abc');" exception "invalid enabled property value" } test { - sql "alter workload schedule policy full_policy_policy properties('priority'='10000');" + sql "alter workload schedule policy fe_policy properties('priority'='10000');" exception "priority can only between" } test { sql "create workload schedule policy conflict_policy " + - "conditions (username = 'root')" + + "conditions (query_time > 0)" + "actions(cancel_query, move_query_to_group 'normal');" exception "can not exist in one policy at same time" @@ -98,7 +106,7 @@ suite("test_workload_sched_policy") { test { sql "create workload schedule policy conflict_policy " + - "conditions (username = 'root') " + + "conditions (query_time > 0) " + "actions(cancel_query, cancel_query);" exception "duplicate action in one policy" @@ -113,10 +121,11 @@ suite("test_workload_sched_policy") { } // drop - sql "drop workload schedule policy full_policy_policy;" - sql "drop workload schedule policy set_action_policy;" - sql "drop workload schedule policy move_action_policy;" sql "drop workload schedule policy test_cancel_policy;" + sql "drop workload schedule policy move_action_policy;" + sql "drop workload schedule policy set_action_policy;" + sql "drop workload schedule policy fe_policy;" + sql "drop workload schedule policy be_policy;" qt_select_policy_tvf_after_drop "select name,condition,action,priority,enabled,version from workload_schedule_policy() order by name;" @@ -158,10 +167,6 @@ suite("test_workload_sched_policy") { sql "ADMIN SET FRONTEND CONFIG ('workload_sched_policy_interval_ms' = '10000');" - sql "drop workload schedule policy if exists full_policy_policy;" - sql "drop workload schedule policy if exists set_action_policy;" - sql "drop workload schedule policy if exists move_action_policy;" - sql "drop workload schedule policy if exists test_cancel_policy;" sql "drop workload schedule policy if exists test_set_var_policy;" sql "drop workload schedule policy if exists test_set_var_policy2;"