[feature](executor) Add some check when create workload group/workload schedule policy (#29236)

This commit is contained in:
wangbo
2023-12-29 15:41:16 +08:00
committed by GitHub
parent d6dcf962a9
commit c3c34e10bb
8 changed files with 147 additions and 43 deletions

View File

@ -78,6 +78,9 @@ void WorkloadSchedPolicyMgr::_schedule_workload() {
std::vector<WorkloadQueryInfo> list;
_exec_env->fragment_mgr()->get_runtime_query_info(&list);
// todo: add timer
if (list.size() == 0) {
continue;
}
LOG(INFO) << "[workload_schedule] get query list size=" << list.size();
for (int i = 0; i < list.size(); i++) {

View File

@ -2318,6 +2318,18 @@ public class Config extends ConfigBase {
@ConfField(mutable = true)
public static int workload_action_interval_ms = 10000; // 10s
@ConfField(mutable = true, masterOnly = true)
public static int workload_max_policy_num = 25;
@ConfField(mutable = true, masterOnly = true)
public static int workload_max_condition_num_in_policy = 5;
@ConfField(mutable = true, masterOnly = true)
public static int workload_max_action_num_in_policy = 5; // mainly used to limit set session var action
@ConfField(mutable = true, masterOnly = true)
public static int workload_group_max_num = 15;
@ConfField(description = {"查询be wal_queue 的超时阈值(ms)",
"the timeout threshold of checking wal_queue on be(ms)"})
public static int check_wal_queue_timeout_threshold = 180000; // 3 min

View File

@ -153,13 +153,6 @@ public class WorkloadGroupMgr implements Writable, GsonPostProcessable {
lock.writeLock().unlock();
}
private void checkWorkloadGroupEnabled() throws DdlException {
if (!Config.enable_workload_group) {
throw new DdlException(
"WorkloadGroup is disabled, you can set config enable_workload_group = true to enable it");
}
}
public void init() {
if (Config.enable_workload_group || Config.use_fuzzy_session_variable /* for github workflow */) {
checkAndCreateDefaultGroup();
@ -265,8 +258,6 @@ public class WorkloadGroupMgr implements Writable, GsonPostProcessable {
}
public void createWorkloadGroup(CreateWorkloadGroupStmt stmt) throws DdlException {
checkWorkloadGroupEnabled();
WorkloadGroup workloadGroup = WorkloadGroup.create(stmt.getWorkloadGroupName(), stmt.getProperties());
String workloadGroupName = workloadGroup.getName();
writeLock();
@ -277,6 +268,10 @@ public class WorkloadGroupMgr implements Writable, GsonPostProcessable {
}
throw new DdlException("workload group " + workloadGroupName + " already exist");
}
if (idToWorkloadGroup.size() >= Config.workload_group_max_num) {
throw new DdlException(
"workload group number can not be exceed " + Config.workload_group_max_num);
}
checkGlobalUnlock(workloadGroup, null);
nameToWorkloadGroup.put(workloadGroupName, workloadGroup);
idToWorkloadGroup.put(workloadGroup.getId(), workloadGroup);
@ -323,8 +318,6 @@ public class WorkloadGroupMgr implements Writable, GsonPostProcessable {
}
public void alterWorkloadGroup(AlterWorkloadGroupStmt stmt) throws DdlException {
checkWorkloadGroupEnabled();
String workloadGroupName = stmt.getWorkloadGroupName();
Map<String, String> properties = stmt.getProperties();
WorkloadGroup newWorkloadGroup;
@ -346,8 +339,6 @@ public class WorkloadGroupMgr implements Writable, GsonPostProcessable {
}
public void dropWorkloadGroup(DropWorkloadGroupStmt stmt) throws DdlException {
checkWorkloadGroupEnabled();
String workloadGroupName = stmt.getWorkloadGroupName();
if (DEFAULT_GROUP_NAME.equals(workloadGroupName)) {
throw new DdlException("Dropping default workload group " + workloadGroupName + " is not allowed");

View File

@ -18,5 +18,5 @@
package org.apache.doris.resource.workloadschedpolicy;
public enum WorkloadMetricType {
USERNAME, QUERY_TIME
USERNAME, QUERY_TIME, SCAN_ROWS, SCAN_BYTES
}

View File

@ -87,6 +87,8 @@ public class WorkloadSchedPolicy implements Writable, GsonPostProcessable {
private List<WorkloadCondition> workloadConditionList;
private List<WorkloadAction> workloadActionList;
private Boolean isFePolicy = null;
// for ut
public WorkloadSchedPolicy() {
}
@ -198,6 +200,21 @@ public class WorkloadSchedPolicy implements Writable, GsonPostProcessable {
return actionMetaList;
}
// true, current policy can only run in FE;
// false, current policy can only run in BE
public boolean isFePolicy() {
if (isFePolicy == null) {
isFePolicy = false;
for (WorkloadAction action : workloadActionList) {
if (WorkloadSchedPolicyMgr.FE_ACTION_SET.contains(action.getWorkloadActionType())) {
isFePolicy = true;
break;
}
}
}
return isFePolicy;
}
public TopicInfo toTopicInfo() {
TWorkloadSchedPolicy tPolicy = new TWorkloadSchedPolicy();
tPolicy.setId(id);

View File

@ -39,6 +39,7 @@ import org.apache.doris.thrift.TopicInfo;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Maps;
import com.google.gson.annotations.SerializedName;
import org.apache.logging.log4j.LogManager;
@ -73,6 +74,22 @@ public class WorkloadSchedPolicyMgr implements Writable, GsonPostProcessable {
.add("Id").add("Name").add("Condition").add("Action").add("Priority").add("Enabled").add("Version")
.build();
public static final ImmutableSet<WorkloadActionType> FE_ACTION_SET
= new ImmutableSet.Builder<WorkloadActionType>().add(WorkloadActionType.SET_SESSION_VARIABLE).build();
public static final ImmutableSet<WorkloadMetricType> FE_METRIC_SET
= new ImmutableSet.Builder<WorkloadMetricType>().add(WorkloadMetricType.USERNAME)
.build();
public static final ImmutableSet<WorkloadActionType> BE_ACTION_SET
= new ImmutableSet.Builder<WorkloadActionType>().add(WorkloadActionType.MOVE_QUERY_TO_GROUP)
.add(WorkloadActionType.CANCEL_QUERY).build();
public static final ImmutableSet<WorkloadMetricType> BE_METRIC_SET
= new ImmutableSet.Builder<WorkloadMetricType>().add(WorkloadMetricType.SCAN_ROWS)
.add(WorkloadMetricType.SCAN_BYTES).add(WorkloadMetricType.QUERY_TIME)
.build();
private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
public static Comparator<WorkloadSchedPolicy> policyComparator = new Comparator<WorkloadSchedPolicy>() {
@ -151,6 +168,7 @@ public class WorkloadSchedPolicyMgr implements Writable, GsonPostProcessable {
WorkloadCondition cond = WorkloadCondition.createWorkloadCondition(cm);
policyConditionList.add(cond);
}
boolean feCondition = checkPolicyCondition(policyConditionList);
// 2 create action
List<WorkloadActionMeta> originActions = createStmt.getActions();
@ -172,7 +190,11 @@ public class WorkloadSchedPolicyMgr implements Writable, GsonPostProcessable {
WorkloadAction ret = WorkloadAction.createWorkloadAction(workloadActionMeta);
policyActionList.add(ret);
}
checkPolicyActionConflicts(policyActionList);
boolean feAction = checkPolicyAction(policyActionList);
if (feAction != feCondition) {
throw new UserException("action and metric must run in FE together or run in BE together");
}
// 3 create policy
Map<String, String> propMap = createStmt.getProperties();
@ -191,6 +213,10 @@ public class WorkloadSchedPolicyMgr implements Writable, GsonPostProcessable {
throw new UserException("workload schedule policy " + policyName + " already exists ");
}
}
if (idToPolicy.size() >= Config.workload_max_policy_num) {
throw new UserException(
"workload scheduler policy num can not exceed " + Config.workload_max_policy_num);
}
long id = Env.getCurrentEnv().getNextId();
WorkloadSchedPolicy policy = new WorkloadSchedPolicy(id, policyName,
policyConditionList, policyActionList, propMap);
@ -204,10 +230,51 @@ public class WorkloadSchedPolicyMgr implements Writable, GsonPostProcessable {
}
}
private void checkPolicyActionConflicts(List<WorkloadAction> actionList) throws UserException {
private boolean checkPolicyCondition(List<WorkloadCondition> conditionList) throws UserException {
if (conditionList.size() > Config.workload_max_condition_num_in_policy) {
throw new UserException(
"condition num in a policy can not exceed " + Config.workload_max_condition_num_in_policy);
}
boolean containsFeMetric = false;
boolean containsBeMetric = false;
for (WorkloadCondition cond : conditionList) {
if (FE_METRIC_SET.contains(cond.getMetricType())) {
containsFeMetric = true;
}
if (BE_METRIC_SET.contains(cond.getMetricType())) {
containsBeMetric = true;
}
if (containsFeMetric && containsBeMetric) {
throw new UserException(
"one policy can not contains fe and be metric, FE metric list is " + FE_METRIC_SET
+ ", BE metric list is " + BE_METRIC_SET);
}
}
return containsFeMetric;
}
private boolean checkPolicyAction(List<WorkloadAction> actionList) throws UserException {
if (actionList.size() > Config.workload_max_action_num_in_policy) {
throw new UserException(
"action num in one policy can not exceed " + Config.workload_max_action_num_in_policy);
}
Set<WorkloadActionType> actionTypeSet = new HashSet<>();
Set<String> setSessionVarSet = new HashSet<>();
boolean containsFeAction = false;
boolean containsBeAction = false;
for (WorkloadAction action : actionList) {
if (FE_ACTION_SET.contains(action.getWorkloadActionType())) {
containsFeAction = true;
}
if (BE_ACTION_SET.contains(action.getWorkloadActionType())) {
containsBeAction = true;
}
if (containsFeAction && containsBeAction) {
throw new UserException(
"one policy can not contains fe and be action, FE action list is " + FE_ACTION_SET
+ ", BE action list is " + BE_ACTION_SET);
}
// set session var cmd can be duplicate, but args can not be duplicate
if (action.getWorkloadActionType().equals(WorkloadActionType.SET_SESSION_VARIABLE)) {
WorkloadActionSetSessionVar setAction = (WorkloadActionSetSessionVar) action;
@ -225,6 +292,7 @@ public class WorkloadSchedPolicyMgr implements Writable, GsonPostProcessable {
throw new UserException(String.format("%s and %s can not exist in one policy at same time",
WorkloadActionType.CANCEL_QUERY, WorkloadActionType.MOVE_QUERY_TO_GROUP));
}
return containsFeAction;
}
public void execPolicy(List<WorkloadQueryInfo> queryInfoList) {
@ -232,7 +300,11 @@ public class WorkloadSchedPolicyMgr implements Writable, GsonPostProcessable {
Set<Long> policyIdSet = new HashSet<>();
readLock();
try {
policyIdSet.addAll(idToPolicy.keySet());
for (Map.Entry<Long, WorkloadSchedPolicy> entry : idToPolicy.entrySet()) {
if (entry.getValue().isFePolicy()) {
policyIdSet.add(entry.getKey());
}
}
} finally {
readUnlock();
}
@ -418,6 +490,9 @@ public class WorkloadSchedPolicyMgr implements Writable, GsonPostProcessable {
readLock();
try {
for (Map.Entry<Long, WorkloadSchedPolicy> entry : idToPolicy.entrySet()) {
if (entry.getValue().isFePolicy()) {
continue;
}
TopicInfo tInfo = entry.getValue().toTopicInfo();
if (tInfo != null) {
topicInfoList.add(tInfo);

View File

@ -1,8 +1,9 @@
-- This file is automatically generated. You should know what you did if you want to edit this
-- !select_policy_tvf --
full_policy_policy query_time > 10;username = root set_session_variable "workload_group=normal" 10 false 0
move_action_policy username = root move_query_to_group "normal" 0 true 0
set_action_policy query_time > 10;username = root set_session_variable "workload_group=normal" 0 true 0
be_policy query_time > 10 cancel_query 10 true 0
fe_policy username = root set_session_variable "workload_group=normal" 10 false 0
move_action_policy query_time > 10 move_query_to_group "normal" 0 true 0
set_action_policy username = root set_session_variable "workload_group=normal" 0 true 0
test_cancel_policy query_time > 10 cancel_query 0 false 0
-- !select_policy_tvf_after_drop --

View File

@ -19,12 +19,11 @@ suite("test_workload_sched_policy") {
sql "set experimental_enable_nereids_planner = false;"
sql "drop workload schedule policy if exists full_policy_policy;"
sql "drop workload schedule policy if exists set_action_policy;"
sql "drop workload schedule policy if exists move_action_policy;"
sql "drop workload schedule policy if exists test_cancel_policy;"
sql "drop workload schedule policy if exists test_set_var_policy;"
sql "drop workload schedule policy if exists test_set_var_policy2;"
sql "drop workload schedule policy if exists move_action_policy;"
sql "drop workload schedule policy if exists set_action_policy;"
sql "drop workload schedule policy if exists fe_policy;"
sql "drop workload schedule policy if exists be_policy;"
// 1 create cancel policy
sql "create workload schedule policy test_cancel_policy " +
@ -33,27 +32,36 @@ suite("test_workload_sched_policy") {
// 2 create cancel policy
sql "create workload schedule policy move_action_policy " +
"conditions(username='root') " +
"conditions(query_time > 10) " +
"actions(move_query_to_group 'normal');"
// 3 create set policy
sql "create workload schedule policy set_action_policy " +
"conditions(query_time > 10, username='root') " +
"conditions(username='root') " +
"actions(set_session_variable 'workload_group=normal');"
// 4 create policy with property
sql "create workload schedule policy full_policy_policy " +
"conditions(query_time > 10, username='root') " +
// 4 create policy run in fe
sql "create workload schedule policy fe_policy " +
"conditions(username='root') " +
"actions(set_session_variable 'workload_group=normal') " +
"properties( " +
"'enabled' = 'false', " +
"'priority'='10' " +
");"
// 5 create policy run in be
sql "create workload schedule policy be_policy " +
"conditions(query_time > 10) " +
"actions(cancel_query) " +
"properties( " +
"'enabled' = 'true', " +
"'priority'='10' " +
");"
qt_select_policy_tvf "select name,condition,action,priority,enabled,version from workload_schedule_policy() order by name;"
// test_alter
sql "alter workload schedule policy full_policy_policy properties('priority'='2', 'enabled'='false');"
sql "alter workload schedule policy fe_policy properties('priority'='2', 'enabled'='false');"
// create failed check
test {
@ -71,26 +79,26 @@ suite("test_workload_sched_policy") {
}
test {
sql "alter workload schedule policy full_policy_policy properties('priority'='abc');"
sql "alter workload schedule policy fe_policy properties('priority'='abc');"
exception "invalid priority property value"
}
test {
sql "alter workload schedule policy full_policy_policy properties('enabled'='abc');"
sql "alter workload schedule policy fe_policy properties('enabled'='abc');"
exception "invalid enabled property value"
}
test {
sql "alter workload schedule policy full_policy_policy properties('priority'='10000');"
sql "alter workload schedule policy fe_policy properties('priority'='10000');"
exception "priority can only between"
}
test {
sql "create workload schedule policy conflict_policy " +
"conditions (username = 'root')" +
"conditions (query_time > 0)" +
"actions(cancel_query, move_query_to_group 'normal');"
exception "can not exist in one policy at same time"
@ -98,7 +106,7 @@ suite("test_workload_sched_policy") {
test {
sql "create workload schedule policy conflict_policy " +
"conditions (username = 'root') " +
"conditions (query_time > 0) " +
"actions(cancel_query, cancel_query);"
exception "duplicate action in one policy"
@ -113,10 +121,11 @@ suite("test_workload_sched_policy") {
}
// drop
sql "drop workload schedule policy full_policy_policy;"
sql "drop workload schedule policy set_action_policy;"
sql "drop workload schedule policy move_action_policy;"
sql "drop workload schedule policy test_cancel_policy;"
sql "drop workload schedule policy move_action_policy;"
sql "drop workload schedule policy set_action_policy;"
sql "drop workload schedule policy fe_policy;"
sql "drop workload schedule policy be_policy;"
qt_select_policy_tvf_after_drop "select name,condition,action,priority,enabled,version from workload_schedule_policy() order by name;"
@ -158,10 +167,6 @@ suite("test_workload_sched_policy") {
sql "ADMIN SET FRONTEND CONFIG ('workload_sched_policy_interval_ms' = '10000');"
sql "drop workload schedule policy if exists full_policy_policy;"
sql "drop workload schedule policy if exists set_action_policy;"
sql "drop workload schedule policy if exists move_action_policy;"
sql "drop workload schedule policy if exists test_cancel_policy;"
sql "drop workload schedule policy if exists test_set_var_policy;"
sql "drop workload schedule policy if exists test_set_var_policy2;"