[Feature](executor)Insert select limited by WorkloadGroup #30610

This commit is contained in:
wangbo
2024-01-31 16:29:28 +08:00
committed by yiguolei
parent 65305f66c0
commit c28ced1ebb
9 changed files with 71 additions and 31 deletions

View File

@ -599,18 +599,25 @@ public class Coordinator implements CoordInterface {
@Override
public void exec() throws Exception {
// LoadTask does not have context, not controlled by queue now
if (Config.enable_workload_group && Config.enable_query_queue && context != null) {
queryQueue = context.getEnv().getWorkloadGroupMgr().getWorkloadGroupQueryQueue(context);
if (queryQueue == null) {
// This logic is actually useless, because when could not find query queue, it will
// throw exception during workload group manager.
throw new UserException("could not find query queue");
}
queueToken = queryQueue.getToken();
if (!queueToken.waitSignal(this.queryOptions.getExecutionTimeout() * 1000)) {
LOG.error("query (id=" + DebugUtil.printId(queryId) + ") " + queueToken.getOfferResultDetail());
queryQueue.returnToken(queueToken);
throw new UserException(queueToken.getOfferResultDetail());
if (context != null) {
if (Config.enable_workload_group) {
this.setTWorkloadGroups(context.getEnv().getWorkloadGroupMgr().getWorkloadGroup(context));
if (Config.enable_query_queue) {
queryQueue = context.getEnv().getWorkloadGroupMgr().getWorkloadGroupQueryQueue(context);
if (queryQueue == null) {
// This logic is actually useless, because when could not find query queue, it will
// throw exception during workload group manager.
throw new UserException("could not find query queue");
}
queueToken = queryQueue.getToken();
if (!queueToken.waitSignal(this.queryOptions.getExecutionTimeout() * 1000)) {
LOG.error("query (id=" + DebugUtil.printId(queryId) + ") " + queueToken.getOfferResultDetail());
queryQueue.returnToken(queueToken);
throw new UserException(queueToken.getOfferResultDetail());
}
}
} else {
context.setWorkloadGroupName("");
}
}
execInternal();

View File

@ -1525,11 +1525,6 @@ public class StmtExecutor {
coordBase = new PointQueryExec(planner, analyzer);
} else {
coord = new Coordinator(context, analyzer, planner, context.getStatsErrorEstimator());
if (Config.enable_workload_group) {
coord.setTWorkloadGroups(context.getEnv().getWorkloadGroupMgr().getWorkloadGroup(context));
} else {
context.setWorkloadGroupName("");
}
QeProcessorImpl.INSTANCE.registerQuery(context.queryId(),
new QeProcessorImpl.QueryInfo(context, originStmt.originStmt, coord));
profile.setExecutionProfile(coord.getExecutionProfile());