[feature](executor) Automatically set the instance_num using the info from be. (#19345)
1. fixed some error regressions (results error with big nstance_num due to incorrect order by). 2. if set parallel_fragment_exec_instance_num to 0, the concurrency in the Pipeline execution engine will automatically be set to half of the number of CPU cores. 3. add limit to parallel_fragment_exec_instance_num that it cannot be set to more than fe.conf::max_instance_num(Default: 128) ``` mysql [(none)]>set parallel_fragment_exec_instance_num = 514; ERROR 1231 (42000): errCode = 2, detailMessage = Variable 'parallel_fragment_exec_instance_num' can't be set to the value of '514(Should not be set to more than 128)' ```
This commit is contained in:
@ -19,6 +19,7 @@ package org.apache.doris.analysis;
|
||||
|
||||
import org.apache.doris.catalog.Env;
|
||||
import org.apache.doris.common.AnalysisException;
|
||||
import org.apache.doris.common.Config;
|
||||
import org.apache.doris.common.ErrorCode;
|
||||
import org.apache.doris.common.ErrorReport;
|
||||
import org.apache.doris.common.UserException;
|
||||
@ -155,7 +156,14 @@ public class SetVar {
|
||||
this.value = new StringLiteral(TimeUtils.checkTimeZoneValidAndStandardize(getValue().getStringValue()));
|
||||
this.result = (LiteralExpr) this.value;
|
||||
}
|
||||
|
||||
if (getVariable().equalsIgnoreCase(SessionVariable.PARALLEL_FRAGMENT_EXEC_INSTANCE_NUM)) {
|
||||
int instanceNum = Integer.parseInt(getValue().getStringValue());
|
||||
if (instanceNum > Config.max_instance_num) {
|
||||
ErrorReport.reportAnalysisException(ErrorCode.ERR_WRONG_VALUE_FOR_VAR,
|
||||
SessionVariable.PARALLEL_FRAGMENT_EXEC_INSTANCE_NUM,
|
||||
instanceNum + "(Should not be set to more than " + Config.max_instance_num + ")");
|
||||
}
|
||||
}
|
||||
if (getVariable().equalsIgnoreCase(SessionVariable.EXEC_MEM_LIMIT)) {
|
||||
this.value = new StringLiteral(Long.toString(ParseUtil.analyzeDataVolumn(getValue().getStringValue())));
|
||||
this.result = (LiteralExpr) this.value;
|
||||
|
||||
@ -194,6 +194,11 @@ public class ReportHandler extends Daemon {
|
||||
|
||||
LOG.info("receive report from be {}. type: {}, current queue size: {}",
|
||||
backend.getId(), reportType, reportQueue.size());
|
||||
if (reportType == ReportType.DISK) {
|
||||
Backend.BeInfoCollector beinfoCollector = Backend.getBeInfoCollector();
|
||||
int numCores = request.getNumCores();
|
||||
beinfoCollector.addBeInfo(beId, numCores);
|
||||
}
|
||||
return result;
|
||||
}
|
||||
|
||||
|
||||
@ -28,6 +28,7 @@ import org.apache.doris.common.util.TimeUtils;
|
||||
import org.apache.doris.nereids.metrics.Event;
|
||||
import org.apache.doris.nereids.metrics.EventSwitchParser;
|
||||
import org.apache.doris.qe.VariableMgr.VarAttr;
|
||||
import org.apache.doris.system.Backend;
|
||||
import org.apache.doris.thrift.TQueryOptions;
|
||||
import org.apache.doris.thrift.TResourceLimit;
|
||||
import org.apache.doris.thrift.TRuntimeFilterType;
|
||||
@ -1224,7 +1225,12 @@ public class SessionVariable implements Serializable, Writable {
|
||||
}
|
||||
|
||||
public int getParallelExecInstanceNum() {
|
||||
return parallelExecInstanceNum;
|
||||
if (enablePipelineEngine && parallelExecInstanceNum == 0) {
|
||||
Backend.BeInfoCollector beinfoCollector = Backend.getBeInfoCollector();
|
||||
return beinfoCollector.getParallelExecInstanceNum();
|
||||
} else {
|
||||
return parallelExecInstanceNum;
|
||||
}
|
||||
}
|
||||
|
||||
public int getExchangeInstanceParallel() {
|
||||
@ -1768,7 +1774,7 @@ public class SessionVariable implements Serializable, Writable {
|
||||
tResult.setCodegenLevel(codegenLevel);
|
||||
tResult.setBeExecVersion(Config.be_exec_version);
|
||||
tResult.setEnablePipelineEngine(enablePipelineEngine);
|
||||
tResult.setParallelInstance(parallelExecInstanceNum);
|
||||
tResult.setParallelInstance(getParallelExecInstanceNum());
|
||||
tResult.setReturnObjectDataAsBinary(returnObjectDataAsBinary);
|
||||
tResult.setTrimTailingSpacesForExternalTableQuery(trimTailingSpacesForExternalTableQuery);
|
||||
tResult.setEnableShareHashTableForBroadcastJoin(enableShareHashTableForBroadcastJoin);
|
||||
|
||||
@ -289,7 +289,7 @@ public class StmtExecutor {
|
||||
builder.instancesNumPerBe(
|
||||
beToInstancesNum.entrySet().stream().map(entry -> entry.getKey() + ":" + entry.getValue())
|
||||
.collect(Collectors.joining(",")));
|
||||
builder.parallelFragmentExecInstance(String.valueOf(context.sessionVariable.parallelExecInstanceNum));
|
||||
builder.parallelFragmentExecInstance(String.valueOf(context.sessionVariable.getParallelExecInstanceNum()));
|
||||
builder.traceId(context.getSessionVariable().getTraceId());
|
||||
return builder.build();
|
||||
}
|
||||
@ -2206,4 +2206,3 @@ public class StmtExecutor {
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
|
||||
@ -49,6 +49,7 @@ import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Objects;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
|
||||
/**
|
||||
@ -513,7 +514,6 @@ public class Backend implements Writable {
|
||||
long dataUsedCapacityB = tDisk.getDataUsedCapacity();
|
||||
long diskAvailableCapacityB = tDisk.getDiskAvailableCapacity();
|
||||
boolean isUsed = tDisk.isUsed();
|
||||
|
||||
DiskInfo diskInfo = disks.get(rootPath);
|
||||
if (diskInfo == null) {
|
||||
diskInfo = new DiskInfo(rootPath);
|
||||
@ -799,4 +799,64 @@ public class Backend implements Writable {
|
||||
public String getTagMapString() {
|
||||
return "{" + new PrintableMap<>(tagMap, ":", true, false).toString() + "}";
|
||||
}
|
||||
|
||||
public static BeInfoCollector getBeInfoCollector() {
|
||||
return BeInfoCollector.get();
|
||||
}
|
||||
|
||||
public static class BeInfoCollector {
|
||||
private int numCores = 1;
|
||||
private static volatile BeInfoCollector instance = null;
|
||||
private static final Map<Long, BeInfoCollector> Info = new ConcurrentHashMap<>();
|
||||
|
||||
private BeInfoCollector(int numCores) {
|
||||
this.numCores = numCores;
|
||||
}
|
||||
|
||||
public static BeInfoCollector get() {
|
||||
if (instance == null) {
|
||||
synchronized (BeInfoCollector.class) {
|
||||
if (instance == null) {
|
||||
instance = new BeInfoCollector(Integer.MAX_VALUE);
|
||||
}
|
||||
}
|
||||
}
|
||||
return instance;
|
||||
}
|
||||
|
||||
public int getNumCores() {
|
||||
return numCores;
|
||||
}
|
||||
|
||||
public void clear() {
|
||||
Info.clear();
|
||||
}
|
||||
|
||||
public void addBeInfo(long beId, int numCores) {
|
||||
Info.put(beId, new BeInfoCollector(numCores));
|
||||
}
|
||||
|
||||
public void dropBeInfo(long beId) {
|
||||
Info.remove(beId);
|
||||
}
|
||||
|
||||
public int getMinNumCores() {
|
||||
int minNumCores = Integer.MAX_VALUE;
|
||||
for (BeInfoCollector beinfo : Info.values()) {
|
||||
minNumCores = Math.min(minNumCores, beinfo.getNumCores());
|
||||
}
|
||||
return Math.max(1, minNumCores);
|
||||
}
|
||||
|
||||
public int getParallelExecInstanceNum() {
|
||||
if (getMinNumCores() == Integer.MAX_VALUE) {
|
||||
return 1;
|
||||
}
|
||||
return (getMinNumCores() + 1) / 2;
|
||||
}
|
||||
|
||||
public BeInfoCollector getBeInfoCollectorById(long beId) {
|
||||
return Info.get(beId);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@ -270,6 +270,9 @@ public class SystemInfoService {
|
||||
}
|
||||
|
||||
dropBackend(backend.getIp(), backend.getHostName(), backend.getHeartbeatPort());
|
||||
// update BeInfoCollector
|
||||
Backend.BeInfoCollector beinfoCollector = Backend.getBeInfoCollector();
|
||||
beinfoCollector.dropBeInfo(backendId);
|
||||
}
|
||||
|
||||
// final entry of dropping backend
|
||||
@ -1231,6 +1234,10 @@ public class SystemInfoService {
|
||||
copiedReportVersions.remove(backend.getId());
|
||||
ImmutableMap<Long, AtomicLong> newIdToReportVersion = ImmutableMap.copyOf(copiedReportVersions);
|
||||
idToReportVersionRef = newIdToReportVersion;
|
||||
|
||||
// update BeInfoCollector
|
||||
Backend.BeInfoCollector beinfoCollector = Backend.getBeInfoCollector();
|
||||
beinfoCollector.dropBeInfo(backend.getId());
|
||||
}
|
||||
|
||||
public void updateBackendState(Backend be) {
|
||||
|
||||
Reference in New Issue
Block a user