[Fix](executor)Fix routine load failed when can not find group (#33596)

This commit is contained in:
wangbo
2024-04-17 09:48:37 +08:00
committed by yiguolei
parent b44fed8dc2
commit ff8cb3cc43
2 changed files with 45 additions and 24 deletions

View File

@ -139,17 +139,23 @@ public class KafkaTaskInfo extends RoutineLoadTaskInfo {
tExecPlanFragmentParams.getQueryOptions().setQueryTimeout((int) timeoutS);
tExecPlanFragmentParams.getQueryOptions().setExecutionTimeout((int) timeoutS);
long wgId = routineLoadJob.getWorkloadId();
List<TPipelineWorkloadGroup> tWgList = new ArrayList<>();
if (wgId > 0) {
tWgList = Env.getCurrentEnv().getWorkloadGroupMgr()
.getTWorkloadGroupById(wgId);
if (Config.enable_workload_group) {
long wgId = routineLoadJob.getWorkloadId();
List<TPipelineWorkloadGroup> tWgList = new ArrayList<>();
if (wgId > 0) {
tWgList = Env.getCurrentEnv().getWorkloadGroupMgr()
.getTWorkloadGroupById(wgId);
if (tWgList.size() == 0) {
throw new UserException("can not find workload group, id=" + wgId);
}
} else {
tWgList = Env.getCurrentEnv().getWorkloadGroupMgr()
.getWorkloadGroupByUser(routineLoadJob.getUserIdentity());
}
if (tWgList.size() != 0) {
tExecPlanFragmentParams.setWorkloadGroups(tWgList);
}
}
if (tWgList.size() == 0) {
tWgList = Env.getCurrentEnv().getWorkloadGroupMgr()
.getTWorkloadGroupByUserIdentity(routineLoadJob.getUserIdentity());
}
tExecPlanFragmentParams.setWorkloadGroups(tWgList);
return tExecPlanFragmentParams;
}
@ -166,17 +172,23 @@ public class KafkaTaskInfo extends RoutineLoadTaskInfo {
tExecPlanFragmentParams.getQueryOptions().setQueryTimeout((int) timeoutS);
tExecPlanFragmentParams.getQueryOptions().setExecutionTimeout((int) timeoutS);
long wgId = routineLoadJob.getWorkloadId();
List<TPipelineWorkloadGroup> tWgList = new ArrayList<>();
if (wgId > 0) {
tWgList = Env.getCurrentEnv().getWorkloadGroupMgr()
.getTWorkloadGroupById(wgId);
if (Config.enable_workload_group) {
long wgId = routineLoadJob.getWorkloadId();
List<TPipelineWorkloadGroup> tWgList = new ArrayList<>();
if (wgId > 0) {
tWgList = Env.getCurrentEnv().getWorkloadGroupMgr()
.getTWorkloadGroupById(wgId);
if (tWgList.size() == 0) {
throw new UserException("can not find workload group, id=" + wgId);
}
} else {
tWgList = Env.getCurrentEnv().getWorkloadGroupMgr()
.getWorkloadGroupByUser(routineLoadJob.getUserIdentity());
}
if (tWgList.size() != 0) {
tExecPlanFragmentParams.setWorkloadGroups(tWgList);
}
}
if (tWgList.size() == 0) {
tWgList = Env.getCurrentEnv().getWorkloadGroupMgr()
.getTWorkloadGroupByUserIdentity(routineLoadJob.getUserIdentity());
}
tExecPlanFragmentParams.setWorkloadGroups(tWgList);
return tExecPlanFragmentParams;
}

View File

@ -223,14 +223,23 @@ public class WorkloadGroupMgr implements Writable, GsonPostProcessable {
return tWorkloadGroups;
}
public List<TPipelineWorkloadGroup> getTWorkloadGroupByUserIdentity(UserIdentity user) throws UserException {
public List<TPipelineWorkloadGroup> getWorkloadGroupByUser(UserIdentity user) throws UserException {
String groupName = Env.getCurrentEnv().getAuth().getWorkloadGroup(user.getQualifiedUser());
List<TPipelineWorkloadGroup> ret = new ArrayList<>();
WorkloadGroup wg = null;
readLock();
try {
WorkloadGroup wg = nameToWorkloadGroup.get(groupName);
if (wg == null) {
throw new UserException("can not find workload group " + groupName);
if (groupName == null || groupName.isEmpty()) {
wg = nameToWorkloadGroup.get(DEFAULT_GROUP_NAME);
if (wg == null) {
throw new RuntimeException("can not find normal workload group for routineload");
}
} else {
wg = nameToWorkloadGroup.get(groupName);
if (wg == null) {
throw new UserException(
"can not find workload group " + groupName + " for user " + user.getQualifiedUser());
}
}
ret.add(wg.toThrift());
} finally {