diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaTaskInfo.java b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaTaskInfo.java index 86a084764e..deb6749e67 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaTaskInfo.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaTaskInfo.java @@ -139,17 +139,23 @@ public class KafkaTaskInfo extends RoutineLoadTaskInfo { tExecPlanFragmentParams.getQueryOptions().setQueryTimeout((int) timeoutS); tExecPlanFragmentParams.getQueryOptions().setExecutionTimeout((int) timeoutS); - long wgId = routineLoadJob.getWorkloadId(); - List tWgList = new ArrayList<>(); - if (wgId > 0) { - tWgList = Env.getCurrentEnv().getWorkloadGroupMgr() - .getTWorkloadGroupById(wgId); + if (Config.enable_workload_group) { + long wgId = routineLoadJob.getWorkloadId(); + List tWgList = new ArrayList<>(); + if (wgId > 0) { + tWgList = Env.getCurrentEnv().getWorkloadGroupMgr() + .getTWorkloadGroupById(wgId); + if (tWgList.size() == 0) { + throw new UserException("can not find workload group, id=" + wgId); + } + } else { + tWgList = Env.getCurrentEnv().getWorkloadGroupMgr() + .getWorkloadGroupByUser(routineLoadJob.getUserIdentity()); + } + if (tWgList.size() != 0) { + tExecPlanFragmentParams.setWorkloadGroups(tWgList); + } } - if (tWgList.size() == 0) { - tWgList = Env.getCurrentEnv().getWorkloadGroupMgr() - .getTWorkloadGroupByUserIdentity(routineLoadJob.getUserIdentity()); - } - tExecPlanFragmentParams.setWorkloadGroups(tWgList); return tExecPlanFragmentParams; } @@ -166,17 +172,23 @@ public class KafkaTaskInfo extends RoutineLoadTaskInfo { tExecPlanFragmentParams.getQueryOptions().setQueryTimeout((int) timeoutS); tExecPlanFragmentParams.getQueryOptions().setExecutionTimeout((int) timeoutS); - long wgId = routineLoadJob.getWorkloadId(); - List tWgList = new ArrayList<>(); - if (wgId > 0) { - tWgList = Env.getCurrentEnv().getWorkloadGroupMgr() - .getTWorkloadGroupById(wgId); + if (Config.enable_workload_group) { + long wgId = routineLoadJob.getWorkloadId(); + List tWgList = new ArrayList<>(); + if (wgId > 0) { + tWgList = Env.getCurrentEnv().getWorkloadGroupMgr() + .getTWorkloadGroupById(wgId); + if (tWgList.size() == 0) { + throw new UserException("can not find workload group, id=" + wgId); + } + } else { + tWgList = Env.getCurrentEnv().getWorkloadGroupMgr() + .getWorkloadGroupByUser(routineLoadJob.getUserIdentity()); + } + if (tWgList.size() != 0) { + tExecPlanFragmentParams.setWorkloadGroups(tWgList); + } } - if (tWgList.size() == 0) { - tWgList = Env.getCurrentEnv().getWorkloadGroupMgr() - .getTWorkloadGroupByUserIdentity(routineLoadJob.getUserIdentity()); - } - tExecPlanFragmentParams.setWorkloadGroups(tWgList); return tExecPlanFragmentParams; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadgroup/WorkloadGroupMgr.java b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadgroup/WorkloadGroupMgr.java index 28648ef25e..7796a385ee 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadgroup/WorkloadGroupMgr.java +++ b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadgroup/WorkloadGroupMgr.java @@ -223,14 +223,23 @@ public class WorkloadGroupMgr implements Writable, GsonPostProcessable { return tWorkloadGroups; } - public List getTWorkloadGroupByUserIdentity(UserIdentity user) throws UserException { + public List getWorkloadGroupByUser(UserIdentity user) throws UserException { String groupName = Env.getCurrentEnv().getAuth().getWorkloadGroup(user.getQualifiedUser()); List ret = new ArrayList<>(); + WorkloadGroup wg = null; readLock(); try { - WorkloadGroup wg = nameToWorkloadGroup.get(groupName); - if (wg == null) { - throw new UserException("can not find workload group " + groupName); + if (groupName == null || groupName.isEmpty()) { + wg = nameToWorkloadGroup.get(DEFAULT_GROUP_NAME); + if (wg == null) { + throw new RuntimeException("can not find normal workload group for routineload"); + } + } else { + wg = nameToWorkloadGroup.get(groupName); + if (wg == null) { + throw new UserException( + "can not find workload group " + groupName + " for user " + user.getQualifiedUser()); + } } ret.add(wg.toThrift()); } finally {