[refactor](planner) refactor automatically set instance_num (#21640)

refactor automatically set instance_num
This commit is contained in:
Mryange
2023-07-08 21:59:17 +08:00
committed by GitHub
parent aad8043d44
commit f8a2c66174
6 changed files with 92 additions and 83 deletions

View File

@ -181,7 +181,8 @@ public class ReportHandler extends Daemon {
}
ReportTask reportTask = new ReportTask(beId, tasks, disks, tablets, reportVersion,
request.getStoragePolicy(), request.getResource());
request.getStoragePolicy(), request.getResource(), request.getNumCores(),
request.getPipelineExecutorSize());
try {
putToQueue(reportTask);
} catch (Exception e) {
@ -192,14 +193,8 @@ public class ReportHandler extends Daemon {
tStatus.setErrorMsgs(errorMsgs);
return result;
}
LOG.info("receive report from be {}. type: {}, current queue size: {}",
backend.getId(), reportType, reportQueue.size());
if (reportType == ReportType.DISK) {
Backend.BeInfoCollector beinfoCollector = Backend.getBeInfoCollector();
int numCores = request.getNumCores();
beinfoCollector.addBeInfo(beId, numCores);
}
return result;
}
@ -236,11 +231,14 @@ public class ReportHandler extends Daemon {
private List<TStoragePolicy> storagePolicies;
private List<TStorageResource> storageResources;
private int cpuCores;
private int pipelineExecutorSize;
public ReportTask(long beId, Map<TTaskType, Set<Long>> tasks,
Map<String, TDisk> disks,
Map<Long, TTablet> tablets, long reportVersion,
List<TStoragePolicy> storagePolicies, List<TStorageResource> storageResources) {
Map<String, TDisk> disks,
Map<Long, TTablet> tablets, long reportVersion,
List<TStoragePolicy> storagePolicies, List<TStorageResource> storageResources, int cpuCores,
int pipelineExecutorSize) {
this.beId = beId;
this.tasks = tasks;
this.disks = disks;
@ -248,6 +246,8 @@ public class ReportHandler extends Daemon {
this.reportVersion = reportVersion;
this.storagePolicies = storagePolicies;
this.storageResources = storageResources;
this.cpuCores = cpuCores;
this.pipelineExecutorSize = pipelineExecutorSize;
}
@Override
@ -257,6 +257,7 @@ public class ReportHandler extends Daemon {
}
if (disks != null) {
ReportHandler.diskReport(beId, disks);
ReportHandler.cpuReport(beId, cpuCores, pipelineExecutorSize);
}
if (Config.enable_storage_policy && storagePolicies != null && storageResources != null) {
storagePolicyReport(beId, storagePolicies, storageResources);
@ -557,12 +558,30 @@ public class ReportHandler extends Daemon {
LOG.warn("backend doesn't exist. id: " + backendId);
return;
}
backend.updateDisks(backendDisks);
LOG.info("finished to handle disk report from backend {}, cost: {} ms",
backendId, (System.currentTimeMillis() - start));
}
private static void cpuReport(long backendId, int cpuCores, int pipelineExecutorSize) {
LOG.info("begin to handle cpu report from backend {}", backendId);
long start = System.currentTimeMillis();
Backend backend = Env.getCurrentSystemInfo().getBackend(backendId);
if (backend == null) {
LOG.warn("backend doesn't exist. id: " + backendId);
return;
}
if (backend.updateCpuInfo(cpuCores, pipelineExecutorSize)) {
// cpu info is changed
LOG.info("new cpu info. backendId: {}, cpucores: {}, pipelineExecutorSize: {}", backendId, cpuCores,
pipelineExecutorSize);
// log change
Env.getCurrentEnv().getEditLog().logBackendStateChange(backend);
}
LOG.info("finished to handle cpu report from backend {}, cost: {} ms",
backendId, (System.currentTimeMillis() - start));
}
private static void sync(Map<Long, TTablet> backendTablets, ListMultimap<Long, Long> tabletSyncMap,
long backendId, long backendReportVersion) {
TabletInvertedIndex invertedIndex = Env.getCurrentInvertedIndex();

View File

@ -19,6 +19,7 @@ package org.apache.doris.qe;
import org.apache.doris.analysis.SetVar;
import org.apache.doris.analysis.StringLiteral;
import org.apache.doris.catalog.Env;
import org.apache.doris.common.Config;
import org.apache.doris.common.DdlException;
import org.apache.doris.common.ExperimentalUtil.ExperimentalType;
@ -28,7 +29,6 @@ import org.apache.doris.common.util.TimeUtils;
import org.apache.doris.nereids.metrics.Event;
import org.apache.doris.nereids.metrics.EventSwitchParser;
import org.apache.doris.qe.VariableMgr.VarAttr;
import org.apache.doris.system.Backend;
import org.apache.doris.thrift.TQueryOptions;
import org.apache.doris.thrift.TResourceLimit;
import org.apache.doris.thrift.TRuntimeFilterType;
@ -1433,8 +1433,8 @@ public class SessionVariable implements Serializable, Writable {
public int getParallelExecInstanceNum() {
if (enablePipelineEngine && parallelPipelineTaskNum == 0) {
Backend.BeInfoCollector beinfoCollector = Backend.getBeInfoCollector();
return beinfoCollector.getParallelExecInstanceNum();
int size = Env.getCurrentSystemInfo().getMinPipelineExecutorSize();
return (size + 1) / 2;
} else if (enablePipelineEngine) {
return parallelPipelineTaskNum;
} else {

View File

@ -48,7 +48,6 @@ import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
/**
@ -119,6 +118,14 @@ public class Backend implements Writable {
@SerializedName("tagMap")
private Map<String, String> tagMap = Maps.newHashMap();
// cpu cores
@SerializedName("cpuCores")
private int cpuCores = 1;
// from config::pipeline_executor_size , default equal cpuCores
@SerializedName("pipelineExecutorSize")
private int pipelineExecutorSize = 1;
// Counter of heartbeat failure.
// Once a heartbeat failed, increase this counter by one.
// And if it reaches Config.max_backend_heartbeat_failure_tolerance_count, this backend
@ -278,6 +285,14 @@ public class Backend implements Writable {
this.brpcPort = brpcPort;
}
public void setCpuCores(int cpuCores) {
this.cpuCores = cpuCores;
}
public void setPipelineExecutorSize(int pipelineExecutorSize) {
this.pipelineExecutorSize = pipelineExecutorSize;
}
public long getLastUpdateMs() {
return this.lastUpdateMs;
}
@ -294,6 +309,14 @@ public class Backend implements Writable {
this.lastStartTime = currentTime;
}
public int getCputCores() {
return cpuCores;
}
public int getPipelineExecutorSize() {
return pipelineExecutorSize;
}
public long getLastMissingHeartbeatTime() {
return lastMissingHeartbeatTime;
}
@ -519,6 +542,20 @@ public class Backend implements Writable {
}
}
public boolean updateCpuInfo(int cpuCores, int pipelineExecutorSize) {
boolean isChanged = false;
if (this.cpuCores != cpuCores) {
this.cpuCores = cpuCores;
isChanged = true;
}
if (this.pipelineExecutorSize != pipelineExecutorSize) {
this.pipelineExecutorSize = pipelineExecutorSize;
isChanged = true;
}
return isChanged;
}
/**
* In old version, there is only one tag for a Backend, and it is a "location" type tag.
* But in new version, a Backend can have multi tag, so we need to put locationTag to
@ -729,63 +766,4 @@ public class Backend implements Writable {
return "{" + new PrintableMap<>(tagMap, ":", true, false).toString() + "}";
}
public static BeInfoCollector getBeInfoCollector() {
return BeInfoCollector.get();
}
public static class BeInfoCollector {
private int numCores = 1;
private static volatile BeInfoCollector instance = null;
private static final Map<Long, BeInfoCollector> Info = new ConcurrentHashMap<>();
private BeInfoCollector(int numCores) {
this.numCores = numCores;
}
public static BeInfoCollector get() {
if (instance == null) {
synchronized (BeInfoCollector.class) {
if (instance == null) {
instance = new BeInfoCollector(Integer.MAX_VALUE);
}
}
}
return instance;
}
public int getNumCores() {
return numCores;
}
public void clear() {
Info.clear();
}
public void addBeInfo(long beId, int numCores) {
Info.put(beId, new BeInfoCollector(numCores));
}
public void dropBeInfo(long beId) {
Info.remove(beId);
}
public int getMinNumCores() {
int minNumCores = Integer.MAX_VALUE;
for (BeInfoCollector beinfo : Info.values()) {
minNumCores = Math.min(minNumCores, beinfo.getNumCores());
}
return Math.max(1, minNumCores);
}
public int getParallelExecInstanceNum() {
if (getMinNumCores() == Integer.MAX_VALUE) {
return 1;
}
return (getMinNumCores() + 1) / 2;
}
public BeInfoCollector getBeInfoCollectorById(long beId) {
return Info.get(beId);
}
}
}

View File

@ -247,9 +247,6 @@ public class SystemInfoService {
throw new DdlException("Backend[" + backendId + "] does not exist");
}
dropBackend(backend.getHost(), backend.getHeartbeatPort());
// update BeInfoCollector
Backend.BeInfoCollector beinfoCollector = Backend.getBeInfoCollector();
beinfoCollector.dropBeInfo(backendId);
}
// final entry of dropping backend
@ -770,9 +767,6 @@ public class SystemInfoService {
ImmutableMap<Long, AtomicLong> newIdToReportVersion = ImmutableMap.copyOf(copiedReportVersions);
idToReportVersionRef = newIdToReportVersion;
// update BeInfoCollector
Backend.BeInfoCollector beinfoCollector = Backend.getBeInfoCollector();
beinfoCollector.dropBeInfo(backend.getId());
}
public void updateBackendState(Backend be) {
@ -791,6 +785,8 @@ public class SystemInfoService {
memoryBe.setLastUpdateMs(be.getLastUpdateMs());
memoryBe.setLastStartTime(be.getLastStartTime());
memoryBe.setDisks(be.getDisks());
memoryBe.setCpuCores(be.getCputCores());
memoryBe.setPipelineExecutorSize(be.getPipelineExecutorSize());
}
}
@ -963,4 +959,18 @@ public class SystemInfoService {
List<Backend> bes = getMixBackends();
return bes.stream().filter(b -> b.getLocationTag().equals(tag)).collect(Collectors.toList());
}
public int getMinPipelineExecutorSize() {
if (idToBackendRef.size() == 0) {
return 1;
}
int minPipelineExecutorSize = Integer.MAX_VALUE;
for (Backend be : idToBackendRef.values()) {
int size = be.getPipelineExecutorSize();
if (size > 0) {
minPipelineExecutorSize = Math.min(minPipelineExecutorSize, size);
}
}
return minPipelineExecutorSize;
}
}