[Feature](executor)broker load support workload group (#30866) (#31580)

This commit is contained in:
wangbo
2024-02-29 15:09:10 +08:00
committed by GitHub
parent 4636b6195b
commit 95b1f76664
6 changed files with 31 additions and 3 deletions

View File

@ -223,6 +223,7 @@ public class BrokerLoadJob extends BulkLoadJob {
TUniqueId loadId = new TUniqueId(uuid.getMostSignificantBits(), uuid.getLeastSignificantBits());
task.init(loadId, attachment.getFileStatusByTable(aggKey),
attachment.getFileNumByTable(aggKey), getUserInfo());
task.settWorkloadGroups(tWorkloadGroups);
idToTasks.put(task.getSignature(), task);
// idToTasks contains previous LoadPendingTasks, so idToTasks is just used to save all tasks.
// use newLoadingTasks to save new created loading tasks and submit them later.

View File

@ -52,6 +52,7 @@ import org.apache.doris.qe.ConnectContext;
import org.apache.doris.qe.Coordinator;
import org.apache.doris.qe.QeProcessorImpl;
import org.apache.doris.thrift.TEtlState;
import org.apache.doris.thrift.TPipelineWorkloadGroup;
import org.apache.doris.thrift.TUniqueId;
import org.apache.doris.transaction.AbstractTxnStateChangeCallback;
import org.apache.doris.transaction.BeginTransactionException;
@ -134,7 +135,7 @@ public abstract class LoadJob extends AbstractTxnStateChangeCallback implements
protected String comment = "";
protected List<TPipelineWorkloadGroup> tWorkloadGroups = null;
public LoadJob(EtlJobType jobType) {
this.jobType = jobType;
@ -1166,4 +1167,8 @@ public abstract class LoadJob extends AbstractTxnStateChangeCallback implements
public LoadStatistic getLoadStatistic() {
return loadStatistic;
}
public void settWorkloadGroups(List<TPipelineWorkloadGroup> tWorkloadGroups) {
this.tWorkloadGroups = tWorkloadGroups;
}
}

View File

@ -34,6 +34,7 @@ import org.apache.doris.load.FailMsg;
import org.apache.doris.qe.Coordinator;
import org.apache.doris.qe.QeProcessorImpl;
import org.apache.doris.thrift.TBrokerFileStatus;
import org.apache.doris.thrift.TPipelineWorkloadGroup;
import org.apache.doris.thrift.TQueryType;
import org.apache.doris.thrift.TUniqueId;
import org.apache.doris.transaction.ErrorTabletInfo;
@ -79,6 +80,8 @@ public class LoadLoadingTask extends LoadTask {
private Profile jobProfile;
private long beginTime;
private List<TPipelineWorkloadGroup> tWorkloadGroups = null;
public LoadLoadingTask(Database db, OlapTable table,
BrokerDesc brokerDesc, List<BrokerFileGroup> fileGroups,
long jobDeadlineMs, long execMemLimit, boolean strictMode, boolean isPartialUpdate,
@ -164,6 +167,10 @@ public class LoadLoadingTask extends LoadTask {
int timeoutS = Math.max((int) (leftTimeMs / 1000), 1);
curCoordinator.setTimeout(timeoutS);
if (tWorkloadGroups != null) {
curCoordinator.setTWorkloadGroups(tWorkloadGroups);
}
try {
QeProcessorImpl.INSTANCE.registerQuery(loadId, curCoordinator);
actualExecute(curCoordinator, timeoutS);
@ -221,4 +228,9 @@ public class LoadLoadingTask extends LoadTask {
this.loadId = new TUniqueId(uuid.getMostSignificantBits(), uuid.getLeastSignificantBits());
planner.updateLoadId(this.loadId);
}
void settWorkloadGroups(List<TPipelineWorkloadGroup> tWorkloadGroups) {
this.tWorkloadGroups = tWorkloadGroups;
}
}

View File

@ -115,7 +115,7 @@ public class LoadManager implements Writable {
/**
* This method will be invoked by the broker load(v2) now.
*/
public long createLoadJobFromStmt(LoadStmt stmt) throws DdlException {
public long createLoadJobFromStmt(LoadStmt stmt) throws DdlException, UserException {
Database database = checkDb(stmt.getLabel().getDbName());
long dbId = database.getId();
LoadJob loadJob;
@ -144,6 +144,12 @@ public class LoadManager implements Writable {
} finally {
writeUnlock();
}
if (Config.enable_workload_group) {
loadJob.settWorkloadGroups(
Env.getCurrentEnv().getWorkloadGroupMgr().getWorkloadGroup(ConnectContext.get()));
}
Env.getCurrentEnv().getEditLog().logCreateLoadJob(loadJob);
// The job must be submitted after edit log.

View File

@ -140,7 +140,7 @@ public class MultiLoadMgr {
// 'db' and 'label' form a multiLabel used to
// user can pass commitLabel which use this string commit to jobmgr
public void commit(String fullDbName, String label) throws DdlException {
public void commit(String fullDbName, String label) throws DdlException, UserException {
LabelName multiLabel = new LabelName(fullDbName, label);
List<Long> jobIds = Lists.newArrayList();
lock.writeLock().lock();