[Improvement](executor)Routine load support workload group #31671

This commit is contained in:
wangbo
2024-03-11 22:51:15 +08:00
committed by yiguolei
parent b41b17ad0a
commit 194f3432ab
8 changed files with 114 additions and 9 deletions

View File

@ -29,6 +29,7 @@ import org.apache.doris.common.util.Util;
import org.apache.doris.load.routineload.AbstractDataSourceProperties;
import org.apache.doris.load.routineload.RoutineLoadDataSourcePropertyFactory;
import org.apache.doris.load.routineload.RoutineLoadJob;
import org.apache.doris.qe.ConnectContext;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Maps;
@ -66,6 +67,7 @@ public class AlterRoutineLoadStmt extends DdlStmt {
.add(CreateRoutineLoadStmt.PARTIAL_COLUMNS)
.add(LoadStmt.STRICT_MODE)
.add(LoadStmt.TIMEZONE)
.add(CreateRoutineLoadStmt.WORKLOAD_GROUP)
.build();
private final LabelName labelName;
@ -242,6 +244,12 @@ public class AlterRoutineLoadStmt extends DdlStmt {
analyzedJobProperties.put(CreateRoutineLoadStmt.PARTIAL_COLUMNS,
String.valueOf(isPartialUpdate));
}
if (jobProperties.containsKey(CreateRoutineLoadStmt.WORKLOAD_GROUP)) {
String workloadGroup = jobProperties.get(CreateRoutineLoadStmt.WORKLOAD_GROUP);
long wgId = Env.getCurrentEnv().getWorkloadGroupMgr()
.getWorkloadGroup(ConnectContext.get().getCurrentUserIdentity(), workloadGroup);
analyzedJobProperties.put(CreateRoutineLoadStmt.WORKLOAD_GROUP, String.valueOf(wgId));
}
}
private void checkDataSourceProperties() throws UserException {

View File

@ -111,6 +111,8 @@ public class CreateRoutineLoadStmt extends DdlStmt {
public static final String PARTIAL_COLUMNS = "partial_columns";
public static final String WORKLOAD_GROUP = "workload_group";
private static final String NAME_TYPE = "ROUTINE LOAD NAME";
public static final String ENDPOINT_REGEX = "[-A-Za-z0-9+&@#/%?=~_|!:,.;]+[-A-Za-z0-9+&@#/%=~_|]";
public static final String SEND_BATCH_PARALLELISM = "send_batch_parallelism";
@ -138,6 +140,7 @@ public class CreateRoutineLoadStmt extends DdlStmt {
.add(SEND_BATCH_PARALLELISM)
.add(LOAD_TO_SINGLE_TABLET)
.add(PARTIAL_COLUMNS)
.add(WORKLOAD_GROUP)
.build();
private final LabelName labelName;
@ -179,6 +182,8 @@ public class CreateRoutineLoadStmt extends DdlStmt {
private String escape;
private long workloadGroupId = -1;
/**
* support partial columns load(Only Unique Key Columns)
*/
@ -330,6 +335,10 @@ public class CreateRoutineLoadStmt extends DdlStmt {
return comment;
}
public long getWorkloadGroupId() {
return workloadGroupId;
}
@Override
public void analyze(Analyzer analyzer) throws UserException {
super.analyze(analyzer);
@ -506,6 +515,11 @@ public class CreateRoutineLoadStmt extends DdlStmt {
if (escape != null && escape.length() != 1) {
throw new AnalysisException("escape must be single-char");
}
String inputWorkloadGroupStr = jobProperties.get(WORKLOAD_GROUP);
if (!StringUtils.isEmpty(inputWorkloadGroupStr)) {
this.workloadGroupId = Env.getCurrentEnv().getWorkloadGroupMgr()
.getWorkloadGroup(ConnectContext.get().getCurrentUserIdentity(), inputWorkloadGroupStr);
}
if (ConnectContext.get() != null) {
timezone = ConnectContext.get().getSessionVariable().getTimeZone();

View File

@ -28,6 +28,7 @@ import org.apache.doris.thrift.TFileFormatType;
import org.apache.doris.thrift.TKafkaLoadInfo;
import org.apache.doris.thrift.TLoadSourceType;
import org.apache.doris.thrift.TPipelineFragmentParams;
import org.apache.doris.thrift.TPipelineWorkloadGroup;
import org.apache.doris.thrift.TPlanFragment;
import org.apache.doris.thrift.TRoutineLoadTask;
import org.apache.doris.thrift.TUniqueId;
@ -130,6 +131,19 @@ public class KafkaTaskInfo extends RoutineLoadTaskInfo {
TExecPlanFragmentParams tExecPlanFragmentParams = routineLoadJob.plan(loadId, txnId);
TPlanFragment tPlanFragment = tExecPlanFragmentParams.getFragment();
tPlanFragment.getOutputSink().getOlapTableSink().setTxnId(txnId);
long wgId = routineLoadJob.getWorkloadId();
List<TPipelineWorkloadGroup> tWgList = new ArrayList<>();
if (wgId > 0) {
tWgList = Env.getCurrentEnv().getWorkloadGroupMgr()
.getTWorkloadGroupById(wgId);
}
if (tWgList.size() == 0) {
tWgList = Env.getCurrentEnv().getWorkloadGroupMgr()
.getTWorkloadGroupByUserIdentity(routineLoadJob.getUserIdentity());
}
tExecPlanFragmentParams.setWorkloadGroups(tWgList);
return tExecPlanFragmentParams;
}
@ -139,6 +153,19 @@ public class KafkaTaskInfo extends RoutineLoadTaskInfo {
TPipelineFragmentParams tExecPlanFragmentParams = routineLoadJob.planForPipeline(loadId, txnId);
TPlanFragment tPlanFragment = tExecPlanFragmentParams.getFragment();
tPlanFragment.getOutputSink().getOlapTableSink().setTxnId(txnId);
long wgId = routineLoadJob.getWorkloadId();
List<TPipelineWorkloadGroup> tWgList = new ArrayList<>();
if (wgId > 0) {
tWgList = Env.getCurrentEnv().getWorkloadGroupMgr()
.getTWorkloadGroupById(wgId);
}
if (tWgList.size() == 0) {
tWgList = Env.getCurrentEnv().getWorkloadGroupMgr()
.getTWorkloadGroupByUserIdentity(routineLoadJob.getUserIdentity());
}
tExecPlanFragmentParams.setWorkloadGroups(tWgList);
return tExecPlanFragmentParams;
}

View File

@ -69,6 +69,7 @@ import org.apache.doris.transaction.TransactionException;
import org.apache.doris.transaction.TransactionState;
import org.apache.doris.transaction.TransactionStatus;
import com.aliyuncs.utils.StringUtils;
import com.google.common.base.Joiner;
import com.google.common.base.Preconditions;
import com.google.common.base.Strings;
@ -117,6 +118,8 @@ public abstract class RoutineLoadJob extends AbstractTxnStateChangeCallback impl
protected static final String STAR_STRING = "*";
public static final String WORKLOAD_GROUP = "workload_group";
@Getter
@Setter
private boolean isMultiTable = false;
@ -394,6 +397,9 @@ public abstract class RoutineLoadJob extends AbstractTxnStateChangeCallback impl
if (stmt.getEscape() != null) {
jobProperties.put(LoadStmt.KEY_ESCAPE, stmt.getEscape());
}
if (stmt.getWorkloadGroupId() > 0) {
jobProperties.put(WORKLOAD_GROUP, String.valueOf(stmt.getWorkloadGroupId()));
}
}
private void setRoutineLoadDesc(RoutineLoadDesc routineLoadDesc) {
@ -479,6 +485,14 @@ public abstract class RoutineLoadJob extends AbstractTxnStateChangeCallback impl
return database.getTableOrMetaException(tableId).getName();
}
public long getWorkloadId() {
String workloadIdStr = jobProperties.get(WORKLOAD_GROUP);
if (!StringUtils.isEmpty(workloadIdStr)) {
return Long.parseLong(workloadIdStr);
}
return -1;
}
public JobState getState() {
return state;
}

View File

@ -183,13 +183,48 @@ public class WorkloadGroupMgr implements Writable, GsonPostProcessable {
return workloadGroups;
}
public WorkloadGroup getWorkloadGroupById(long wgId) {
public long getWorkloadGroup(UserIdentity currentUser, String groupName) throws UserException {
Long workloadId = getWorkloadGroupIdByName(groupName);
if (workloadId == null) {
throw new UserException("Workload group " + groupName + " does not exist");
}
if (!Env.getCurrentEnv().getAccessManager()
.checkWorkloadGroupPriv(currentUser, groupName, PrivPredicate.USAGE)) {
ErrorReport.reportAnalysisException(
"Access denied; you need (at least one of) the %s privilege(s) to use workload group '%s'.",
ErrorCode.ERR_SPECIFIC_ACCESS_DENIED_ERROR, "USAGE/ADMIN", groupName);
}
return workloadId.longValue();
}
public List<TPipelineWorkloadGroup> getTWorkloadGroupById(long wgId) {
List<TPipelineWorkloadGroup> tWorkloadGroups = Lists.newArrayList();
readLock();
try {
return idToWorkloadGroup.get(wgId);
WorkloadGroup wg = idToWorkloadGroup.get(wgId);
if (wg != null) {
tWorkloadGroups.add(wg.toThrift());
}
} finally {
readUnlock();
}
return tWorkloadGroups;
}
public List<TPipelineWorkloadGroup> getTWorkloadGroupByUserIdentity(UserIdentity user) throws UserException {
String groupName = Env.getCurrentEnv().getAuth().getWorkloadGroup(user.getQualifiedUser());
List<TPipelineWorkloadGroup> ret = new ArrayList<>();
readLock();
try {
WorkloadGroup wg = nameToWorkloadGroup.get(groupName);
if (wg == null) {
throw new UserException("can not find workload group " + groupName);
}
ret.add(wg.toThrift());
} finally {
readUnlock();
}
return ret;
}
public List<TopicInfo> getPublishTopicInfo() {