From 27f65f44638752af34946089a4570cbe83ea19b7 Mon Sep 17 00:00:00 2001 From: wangbo Date: Sun, 4 Feb 2024 19:52:19 +0800 Subject: [PATCH] [Feature](executor)Stream load support workload group (#30763) * Stream load support workload group * skip mysql load --- .../doris/service/FrontendServiceImpl.java | 16 ++++++++++++++++ 1 file changed, 16 insertions(+) diff --git a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java index 9e0b9ed9fd..cc3e7da1df 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java +++ b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java @@ -184,6 +184,7 @@ import org.apache.doris.thrift.TNodeInfo; import org.apache.doris.thrift.TOlapTableIndexTablets; import org.apache.doris.thrift.TOlapTablePartition; import org.apache.doris.thrift.TPipelineFragmentParams; +import org.apache.doris.thrift.TPipelineWorkloadGroup; import org.apache.doris.thrift.TPrivilegeCtrl; import org.apache.doris.thrift.TPrivilegeHier; import org.apache.doris.thrift.TPrivilegeStatus; @@ -1800,14 +1801,29 @@ public class FrontendServiceImpl implements FrontendService.Iface { ctx.setThreadLocalInfo(); try { + List tWorkloadGroupList = null; + // mysql load request not carry user info, need fix it later. + boolean hasUserName = !StringUtils.isEmpty(ctx.getQualifiedUser()); + if (Config.enable_workload_group && hasUserName) { + tWorkloadGroupList = Env.getCurrentEnv().getWorkloadGroupMgr().getWorkloadGroup(ctx); + } if (!Strings.isNullOrEmpty(request.getLoadSql())) { httpStreamPutImpl(request, result, ctx); + if (tWorkloadGroupList != null && tWorkloadGroupList.size() > 0) { + result.params.setWorkloadGroups(tWorkloadGroupList); + } return result; } else { if (Config.enable_pipeline_load) { result.setPipelineParams(pipelineStreamLoadPutImpl(request)); + if (tWorkloadGroupList != null && tWorkloadGroupList.size() > 0) { + result.pipeline_params.setWorkloadGroups(tWorkloadGroupList); + } } else { result.setParams(streamLoadPutImpl(request, result)); + if (tWorkloadGroupList != null && tWorkloadGroupList.size() > 0) { + result.params.setWorkloadGroups(tWorkloadGroupList); + } } } } catch (UserException e) {