[improvement](balance) fix multiple problems for balance on large cluster (#30713)

This commit is contained in:
yujun
2024-02-07 07:02:41 +08:00
committed by yiguolei
parent 5295e16727
commit 4052746f1c
11 changed files with 544 additions and 134 deletions

View File

@ -644,19 +644,28 @@ public class TabletInvertedIndex {
return tabletIds;
}
public List<Long> getTabletIdsByBackendIdAndStorageMedium(long backendId, TStorageMedium storageMedium) {
List<Long> tabletIds = Lists.newArrayList();
public List<Pair<Long, Long>> getTabletSizeByBackendIdAndStorageMedium(long backendId,
TStorageMedium storageMedium) {
List<Pair<Long, Long>> tabletIdSizes = Lists.newArrayList();
long stamp = readLock();
try {
Map<Long, Replica> replicaMetaWithBackend = backingReplicaMetaTable.row(backendId);
if (replicaMetaWithBackend != null) {
tabletIds = replicaMetaWithBackend.keySet().stream().filter(
id -> tabletMetaMap.get(id).getStorageMedium() == storageMedium).collect(Collectors.toList());
tabletIdSizes = replicaMetaWithBackend.entrySet().stream()
.filter(entry -> tabletMetaMap.get(entry.getKey()).getStorageMedium() == storageMedium)
.map(entry -> Pair.of(entry.getKey(), entry.getValue().getDataSize()))
.collect(Collectors.toList());
}
} finally {
readUnlock(stamp);
}
return tabletIds;
return tabletIdSizes;
}
public List<Long> getTabletIdsByBackendIdAndStorageMedium(long backendId,
TStorageMedium storageMedium) {
return getTabletSizeByBackendIdAndStorageMedium(backendId, storageMedium).stream()
.map(Pair::key).collect(Collectors.toList());
}
public int getTabletNumByBackendId(long backendId) {

View File

@ -39,6 +39,8 @@ import java.util.Comparator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.Stream;
public class BackendLoadStatistic {
private static final Logger LOG = LogManager.getLogger(BackendLoadStatistic.class);
@ -166,6 +168,7 @@ public class BackendLoadStatistic {
private Map<TStorageMedium, Long> totalReplicaNumMap = Maps.newHashMap();
private Map<TStorageMedium, LoadScore> loadScoreMap = Maps.newHashMap();
private Map<TStorageMedium, Classification> clazzMap = Maps.newHashMap();
private Map<TStorageMedium, Classification> maxDiskClazzMap = Maps.newHashMap();
private List<RootPathLoadStatistic> pathStatistics = Lists.newArrayList();
public BackendLoadStatistic(long beId, Tag tag, SystemInfoService infoService,
@ -227,6 +230,14 @@ public class BackendLoadStatistic {
return clazzMap.getOrDefault(medium, Classification.INIT);
}
public void setMaxDiskClazz(TStorageMedium medium, Classification clazz) {
this.maxDiskClazzMap.put(medium, clazz);
}
public Classification getMaxDiskClazz(TStorageMedium medium) {
return maxDiskClazzMap.getOrDefault(medium, Classification.INIT);
}
public void init() throws LoadBalanceException {
Backend be = infoService.getBackend(beId);
if (be == null) {
@ -246,9 +257,17 @@ public class BackendLoadStatistic {
+ (diskInfo.getTotalCapacityB() - diskInfo.getAvailableCapacityB()));
}
// Doris-compose put test all backends' disks on the same physical disk.
// Make a little change here.
long usedCapacityB = diskInfo.getDiskUsedCapacityB();
if (Config.be_rebalancer_fuzzy_test) {
usedCapacityB = Math.min(diskInfo.getTotalCapacityB(),
usedCapacityB + Math.abs(diskInfo.getPathHash()) % 10000);
}
RootPathLoadStatistic pathStatistic = new RootPathLoadStatistic(beId, diskInfo.getRootPath(),
diskInfo.getPathHash(), diskInfo.getStorageMedium(),
diskInfo.getTotalCapacityB(), diskInfo.getDiskUsedCapacityB(), diskInfo.getState());
diskInfo.getTotalCapacityB(), usedCapacityB, diskInfo.getState());
pathStatistics.add(pathStatistic);
}
@ -295,14 +314,14 @@ public class BackendLoadStatistic {
if (Math.abs(pathStat.getUsedPercent() - avgUsedPercent)
> Math.max(avgUsedPercent * Config.balance_load_score_threshold, 0.025)) {
if (pathStat.getUsedPercent() > avgUsedPercent) {
pathStat.setClazz(Classification.HIGH);
pathStat.setLocalClazz(Classification.HIGH);
highCounter++;
} else if (pathStat.getUsedPercent() < avgUsedPercent) {
pathStat.setClazz(Classification.LOW);
pathStat.setLocalClazz(Classification.LOW);
lowCounter++;
}
} else {
pathStat.setClazz(Classification.MID);
pathStat.setLocalClazz(Classification.MID);
midCounter++;
}
}
@ -422,14 +441,19 @@ public class BackendLoadStatistic {
BalanceStatus bStatus = pathStatistic.isFit(tabletSize, isSupplement);
if (!bStatus.ok()) {
status.addErrMsgs(bStatus.getErrMsgs());
if (status != BalanceStatus.OK) {
status.addErrMsgs(bStatus.getErrMsgs());
}
continue;
}
result.add(pathStatistic);
if (result != null) {
result.add(pathStatistic);
}
status = BalanceStatus.OK;
}
return result.isEmpty() ? status : BalanceStatus.OK;
return status;
}
/**
@ -508,9 +532,9 @@ public class BackendLoadStatistic {
continue;
}
if (pathStat.getClazz() == Classification.LOW) {
if (pathStat.getLocalClazz() == Classification.LOW) {
low.add(pathStat.getPathHash());
} else if (pathStat.getClazz() == Classification.HIGH) {
} else if (pathStat.getLocalClazz() == Classification.HIGH) {
high.add(pathStat.getPathHash());
} else {
mid.add(pathStat.getPathHash());
@ -529,9 +553,9 @@ public class BackendLoadStatistic {
continue;
}
if (pathStat.getClazz() == Classification.LOW) {
if (pathStat.getLocalClazz() == Classification.LOW) {
low.add(pathStat);
} else if (pathStat.getClazz() == Classification.HIGH) {
} else if (pathStat.getLocalClazz() == Classification.HIGH) {
high.add(pathStat);
} else {
mid.add(pathStat);
@ -569,9 +593,22 @@ public class BackendLoadStatistic {
return pathStatistics;
}
public long getAvailPathNum(TStorageMedium medium) {
return pathStatistics.stream().filter(
p -> p.getDiskState() == DiskState.ONLINE && p.getStorageMedium() == medium).count();
RootPathLoadStatistic getPathStatisticByPathHash(long pathHash) {
return pathStatistics.stream().filter(pathStat -> pathStat.getPathHash() == pathHash)
.findFirst().orElse(null);
}
public List<RootPathLoadStatistic> getAvailPaths(TStorageMedium medium) {
return getAvailPathStream(medium).collect(Collectors.toList());
}
public boolean hasAvailPathWithGlobalClazz(TStorageMedium medium, Classification globalClazz) {
return getAvailPathStream(medium).anyMatch(pathStat -> pathStat.getGlobalClazz() == globalClazz);
}
private Stream<RootPathLoadStatistic> getAvailPathStream(TStorageMedium medium) {
return pathStatistics.stream()
.filter(p -> p.getDiskState() == DiskState.ONLINE && p.getStorageMedium() == medium);
}
public boolean hasMedium(TStorageMedium medium) {
@ -603,6 +640,7 @@ public class BackendLoadStatistic {
long total = totalCapacityMap.getOrDefault(medium, 0L);
info.add(String.valueOf(used));
info.add(String.valueOf(total));
info.add(getMaxDiskClazz(medium).name());
info.add(String.valueOf(DebugUtil.DECIMAL_FORMAT_SCALE_3.format(used * 100
/ (double) total)));
info.add(String.valueOf(totalReplicaNumMap.getOrDefault(medium, 0L)));
@ -610,7 +648,7 @@ public class BackendLoadStatistic {
info.add(String.valueOf(loadScore.capacityCoefficient));
info.add(String.valueOf(loadScore.getReplicaNumCoefficient()));
info.add(String.valueOf(loadScore.score));
info.add(clazzMap.getOrDefault(medium, Classification.INIT).name());
info.add(getClazz(medium).name());
return info;
}

View File

@ -25,12 +25,14 @@ import org.apache.doris.catalog.TabletInvertedIndex;
import org.apache.doris.catalog.TabletMeta;
import org.apache.doris.clone.BackendLoadStatistic.BePathLoadStatPair;
import org.apache.doris.clone.BackendLoadStatistic.BePathLoadStatPairComparator;
import org.apache.doris.clone.BackendLoadStatistic.Classification;
import org.apache.doris.clone.SchedException.Status;
import org.apache.doris.clone.SchedException.SubCode;
import org.apache.doris.clone.TabletSchedCtx.Priority;
import org.apache.doris.clone.TabletScheduler.PathSlot;
import org.apache.doris.common.Config;
import org.apache.doris.common.FeConstants;
import org.apache.doris.common.Pair;
import org.apache.doris.system.Backend;
import org.apache.doris.system.SystemInfoService;
import org.apache.doris.thrift.TStorageMedium;
@ -45,6 +47,7 @@ import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
/*
* BeLoadRebalancer strategy:
@ -79,12 +82,9 @@ public class BeLoadRebalancer extends Rebalancer {
protected List<TabletSchedCtx> selectAlternativeTabletsForCluster(
LoadStatisticForTag clusterStat, TStorageMedium medium) {
List<TabletSchedCtx> alternativeTablets = Lists.newArrayList();
// get classification of backends
List<BackendLoadStatistic> lowBEs = Lists.newArrayList();
List<BackendLoadStatistic> midBEs = Lists.newArrayList();
List<BackendLoadStatistic> highBEs = Lists.newArrayList();
clusterStat.getBackendStatisticByClass(lowBEs, midBEs, highBEs, medium);
boolean isUrgent = clusterStat.getLowHighBEsWithIsUrgent(lowBEs, highBEs, medium);
if (lowBEs.isEmpty() && highBEs.isEmpty()) {
LOG.debug("cluster is balance with medium: {}. skip", medium);
@ -117,6 +117,8 @@ public class BeLoadRebalancer extends Rebalancer {
}
LOG.info("get number of low load paths: {}, with medium: {}", numOfLowPaths, medium);
List<String> alternativeTabletInfos = Lists.newArrayList();
// Clone ut mocked env, but CatalogRecycleBin is not mockable (it extends from Thread)
// so in clone ut recycleBin need to set to null.
CatalogRecycleBin recycleBin = null;
@ -125,6 +127,10 @@ public class BeLoadRebalancer extends Rebalancer {
}
int clusterAvailableBEnum = infoService.getAllBackendIds(true).size();
ColocateTableIndex colocateTableIndex = Env.getCurrentColocateIndex();
List<Set<Long>> lowBETablets = lowBEs.stream()
.map(beStat -> Sets.newHashSet(invertedIndex.getTabletIdsByBackendId(beStat.getBeId())))
.collect(Collectors.toList());
// choose tablets from high load backends.
// BackendLoadStatistic is sorted by load score in ascend order,
// so we need to traverse it from last to first
@ -136,37 +142,73 @@ public class BeLoadRebalancer extends Rebalancer {
continue;
}
// classify the paths.
Set<Long> pathLow = Sets.newHashSet();
Set<Long> pathMid = Sets.newHashSet();
Set<Long> pathHigh = Sets.newHashSet();
beStat.getPathStatisticByClass(pathLow, pathMid, pathHigh, medium);
// we only select tablets from available mid and high load path
pathHigh.addAll(pathMid);
// get all tablets on this backend, and shuffle them for random selection
List<Long> tabletIds = invertedIndex.getTabletIdsByBackendIdAndStorageMedium(beStat.getBeId(), medium);
Collections.shuffle(tabletIds);
boolean choseHighDisk = isUrgent && beStat.getMaxDiskClazz(medium) == Classification.HIGH;
// for each path, we try to select at most BALANCE_SLOT_NUM_FOR_PATH tablets
Map<Long, Integer> remainingPaths = Maps.newHashMap();
Set<Long> pathHigh = null;
if (choseHighDisk) {
pathHigh = beStat.getAvailPaths(medium).stream().filter(RootPathLoadStatistic::isGlobalHighUsage)
.map(RootPathLoadStatistic::getPathHash).collect(Collectors.toSet());
} else {
// classify the paths.
pathHigh = Sets.newHashSet();
Set<Long> pathLow = Sets.newHashSet();
Set<Long> pathMid = Sets.newHashSet();
beStat.getPathStatisticByClass(pathLow, pathMid, pathHigh, medium);
// we only select tablets from available mid and high load path
pathHigh.addAll(pathMid);
}
double highDiskMaxUsage = 0;
for (Long pathHash : pathHigh) {
int availBalanceNum = pathSlot.getAvailableBalanceNum(pathHash);
if (availBalanceNum > 0) {
remainingPaths.put(pathHash, availBalanceNum);
}
RootPathLoadStatistic pathStat = beStat.getPathStatisticByPathHash(pathHash);
if (pathStat != null) {
highDiskMaxUsage = Math.max(highDiskMaxUsage, pathStat.getUsedPercent());
}
}
LOG.debug("high be {}, medium: {}, path high: {}, remainingPaths: {}, chose high disk: {}",
beStat.getBeId(), medium, pathHigh, remainingPaths, choseHighDisk);
if (remainingPaths.isEmpty()) {
continue;
}
// get all tablets on this backend, and shuffle them for random selection
List<Pair<Long, Long>> tabletIdSizes = invertedIndex.getTabletSizeByBackendIdAndStorageMedium(
beStat.getBeId(), medium);
if (!isUrgent
|| tabletIdSizes.size() < Config.urgent_balance_pick_large_tablet_num_threshold
|| highDiskMaxUsage < (double) Config.urgent_balance_pick_large_disk_usage_percentage / 100.0
|| Config.urgent_balance_shuffle_large_tablet_percentage >= 100
|| Config.urgent_balance_shuffle_large_tablet_percentage < 0) {
Collections.shuffle(tabletIdSizes);
} else {
Collections.sort(tabletIdSizes, new Pair.PairComparator<Pair<Long, Long>>());
if (Config.urgent_balance_shuffle_large_tablet_percentage > 0) {
int startIndex = (int) (tabletIdSizes.size()
* (1 - (double) Config.urgent_balance_shuffle_large_tablet_percentage / 100.0));
Collections.shuffle(tabletIdSizes.subList(startIndex, tabletIdSizes.size()));
}
}
// select tablet from shuffled tablets
for (Long tabletId : tabletIds) {
for (int j = tabletIdSizes.size() - 1; j >= 0; j--) {
long tabletId = tabletIdSizes.get(j).key();
if (clusterAvailableBEnum <= invertedIndex.getReplicasByTabletId(tabletId).size()) {
continue;
}
if (alternativeTablets.stream().anyMatch(tabletCtx -> tabletId == tabletCtx.getTabletId())) {
continue;
}
Replica replica = invertedIndex.getReplica(tabletId, beStat.getBeId());
if (replica == null) {
continue;
@ -186,20 +228,40 @@ public class BeLoadRebalancer extends Rebalancer {
continue;
}
// for urgent disk, pick tablets order by size,
// then it may always pick tablets that was on the low backends.
if (!lowBETablets.isEmpty()
&& lowBETablets.stream().allMatch(tablets -> tablets.contains(tabletId))) {
continue;
}
if (recycleBin != null && recycleBin.isRecyclePartition(tabletMeta.getDbId(),
tabletMeta.getTableId(), tabletMeta.getPartitionId())) {
continue;
}
boolean isFit = lowBEs.stream().anyMatch(be -> be.isFit(replica.getDataSize(),
medium, null, false) == BalanceStatus.OK);
if (!isFit) {
if (LOG.isDebugEnabled()) {
LOG.debug("tablet {} with size {} medium {} not fit in low backends",
tabletId, replica.getDataSize(), medium);
}
continue;
}
TabletSchedCtx tabletCtx = new TabletSchedCtx(TabletSchedCtx.Type.BALANCE,
tabletMeta.getDbId(), tabletMeta.getTableId(), tabletMeta.getPartitionId(),
tabletMeta.getIndexId(), tabletId, null /* replica alloc is not used for balance*/,
System.currentTimeMillis());
tabletCtx.setTag(clusterStat.getTag());
// balance task's priority is always LOW
tabletCtx.setPriority(Priority.LOW);
tabletCtx.setPriority(isUrgent ? Priority.NORMAL : Priority.LOW);
alternativeTablets.add(tabletCtx);
alternativeTabletInfos.add("{ tabletId=" + tabletId + ", beId=" + beStat.getBeId()
+ ", pathHash=" + replica.getPathHash()
+ ", replicaLocalSize=" + replica.getDataSize() + " }");
if (--numOfLowPaths <= 0) {
// enough
break OUTER;
@ -217,13 +279,13 @@ public class BeLoadRebalancer extends Rebalancer {
} // end for high backends
if (!alternativeTablets.isEmpty()) {
LOG.info("select alternative tablets, medium: {}, num: {}, detail: {}",
medium, alternativeTablets.size(),
alternativeTablets.stream().mapToLong(TabletSchedCtx::getTabletId).toArray());
LOG.info("select alternative tablets, medium: {}, is urgent: {}, num: {}, detail: {}",
medium, isUrgent, alternativeTablets.size(), alternativeTabletInfos);
}
return alternativeTablets;
}
/*
* Create a clone task of this selected tablet for balance.
* 1. Check if this tablet has replica on high load backend. If not, the balance will be cancelled.
@ -239,17 +301,17 @@ public class BeLoadRebalancer extends Rebalancer {
}
// get classification of backends
List<BackendLoadStatistic> lowBe = Lists.newArrayList();
List<BackendLoadStatistic> midBe = Lists.newArrayList();
List<BackendLoadStatistic> highBe = Lists.newArrayList();
clusterStat.getBackendStatisticByClass(lowBe, midBe, highBe, tabletCtx.getStorageMedium());
List<BackendLoadStatistic> lowBEs = Lists.newArrayList();
List<BackendLoadStatistic> highBEs = Lists.newArrayList();
boolean isUrgent = clusterStat.getLowHighBEsWithIsUrgent(lowBEs, highBEs, tabletCtx.getStorageMedium());
String isUrgentInfo = isUrgent ? " for urgent" : " for non-urgent";
if (lowBe.isEmpty() && highBe.isEmpty()) {
if (lowBEs.isEmpty() && highBEs.isEmpty()) {
throw new SchedException(Status.UNRECOVERABLE, SubCode.DIAGNOSE_IGNORE, "cluster is balance");
}
// if all low backends is not available, return
if (lowBe.stream().noneMatch(BackendLoadStatistic::isAvailable)) {
if (lowBEs.stream().noneMatch(BackendLoadStatistic::isAvailable)) {
throw new SchedException(Status.UNRECOVERABLE, "all low load backends is unavailable");
}
@ -258,21 +320,21 @@ public class BeLoadRebalancer extends Rebalancer {
// Check if this tablet has replica on high load backend.
// Also create a set to save hosts of this tablet.
Set<String> hosts = Sets.newHashSet();
boolean hasHighReplica = false;
for (Replica replica : replicas) {
if (highBe.stream().anyMatch(b -> b.getBeId() == replica.getBackendId())) {
hasHighReplica = true;
List<BackendLoadStatistic> replicaHighBEs = Lists.newArrayList();
for (BackendLoadStatistic beStat : highBEs) {
if (replicas.stream().anyMatch(replica -> beStat.getBeId() == replica.getBackendId())) {
replicaHighBEs.add(beStat);
}
Backend be = infoService.getBackend(replica.getBackendId());
Backend be = infoService.getBackend(beStat.getBeId());
if (be == null) {
throw new SchedException(Status.UNRECOVERABLE, SubCode.DIAGNOSE_IGNORE,
"backend is dropped: " + replica.getBackendId());
"backend is dropped: " + beStat.getBeId());
}
hosts.add(be.getHost());
}
if (!hasHighReplica) {
if (replicaHighBEs.isEmpty()) {
throw new SchedException(Status.UNRECOVERABLE, SubCode.DIAGNOSE_IGNORE,
"no replica on high load backend");
"no replica on high load backend" + isUrgentInfo);
}
// select a replica as source
@ -290,12 +352,12 @@ public class BeLoadRebalancer extends Rebalancer {
}
}
if (!setSource) {
throw new SchedException(Status.UNRECOVERABLE, SubCode.DIAGNOSE_IGNORE, "unable to take src backend slot");
throw new SchedException(Status.UNRECOVERABLE, "unable to take src slot" + isUrgentInfo);
}
// Select a low load backend as destination.
List<BackendLoadStatistic> candidates = Lists.newArrayList();
for (BackendLoadStatistic beStat : lowBe) {
for (BackendLoadStatistic beStat : lowBEs) {
if (beStat.isAvailable() && replicas.stream().noneMatch(r -> r.getBackendId() == beStat.getBeId())) {
// check if on same host.
Backend lowBackend = infoService.getBackend(beStat.getBeId());
@ -308,18 +370,22 @@ public class BeLoadRebalancer extends Rebalancer {
// no replica on this low load backend
// 1. check if this clone task can make the cluster more balance.
List<RootPathLoadStatistic> availPaths = Lists.newArrayList();
BalanceStatus bs;
if ((bs = beStat.isFit(tabletCtx.getTabletSize(), tabletCtx.getStorageMedium(), availPaths,
false /* not supplement */)) != BalanceStatus.OK) {
LOG.debug("tablet not fit in BE {}, reason: {}", beStat.getBeId(), bs.getErrMsgs());
BalanceStatus bs = beStat.isFit(tabletCtx.getTabletSize(), tabletCtx.getStorageMedium(), null,
false /* not supplement */);
if (bs != BalanceStatus.OK) {
LOG.debug("tablet not fit in BE {}, reason: {}, {}",
beStat.getBeId(), bs.getErrMsgs(), isUrgentInfo);
continue;
}
if (!Config.be_rebalancer_fuzzy_test && !clusterStat.isMoreBalanced(
tabletCtx.getSrcBackendId(), beStat.getBeId(), tabletCtx.getTabletId(),
tabletCtx.getTabletSize(), tabletCtx.getStorageMedium())) {
continue;
if (!Config.be_rebalancer_fuzzy_test && !isUrgent) {
boolean moreBalanced = replicaHighBEs.stream().anyMatch(highBeStat ->
clusterStat.isMoreBalanced(highBeStat.getBeId(), beStat.getBeId(),
tabletCtx.getTabletId(), tabletCtx.getTabletSize(),
tabletCtx.getStorageMedium()));
if (!moreBalanced) {
continue;
}
}
PathSlot slot = backendsWorkingSlots.get(beStat.getBeId());
@ -333,7 +399,8 @@ public class BeLoadRebalancer extends Rebalancer {
}
if (candidates.isEmpty()) {
throw new SchedException(Status.UNRECOVERABLE, SubCode.DIAGNOSE_IGNORE, "unable to find low dest backend");
throw new SchedException(Status.UNRECOVERABLE, SubCode.DIAGNOSE_IGNORE,
"unable to find low dest backend" + isUrgentInfo);
}
List<BePathLoadStatPair> candFitPaths = Lists.newArrayList();
@ -343,15 +410,27 @@ public class BeLoadRebalancer extends Rebalancer {
continue;
}
// classify the paths.
// And we only select path from 'low' and 'mid' paths
List<RootPathLoadStatistic> pathLow = Lists.newArrayList();
List<RootPathLoadStatistic> pathMid = Lists.newArrayList();
List<RootPathLoadStatistic> pathHigh = Lists.newArrayList();
beStat.getPathStatisticByClass(pathLow, pathMid, pathHigh, tabletCtx.getStorageMedium());
List<RootPathLoadStatistic> pathLow = null;
if (isUrgent) {
pathLow = beStat.getAvailPaths(tabletCtx.getStorageMedium()).stream()
.filter(RootPathLoadStatistic::isGlobalLowUsage)
.collect(Collectors.toList());
} else {
// classify the paths.
// And we only select path from 'low' and 'mid' paths
pathLow = Lists.newArrayList();
List<RootPathLoadStatistic> pathMid = Lists.newArrayList();
List<RootPathLoadStatistic> pathHigh = Lists.newArrayList();
beStat.getPathStatisticByClass(pathLow, pathMid, pathHigh, tabletCtx.getStorageMedium());
pathLow.addAll(pathMid);
pathLow.stream().forEach(path -> candFitPaths.add(new BePathLoadStatPair(beStat, path)));
pathLow.addAll(pathMid);
}
pathLow.forEach(path -> candFitPaths.add(new BePathLoadStatPair(beStat, path)));
}
if (candFitPaths.isEmpty()) {
throw new SchedException(Status.UNRECOVERABLE, SubCode.DIAGNOSE_IGNORE,
"unable to find low dest backend to fit in paths" + isUrgentInfo);
}
BePathLoadStatPairComparator comparator = new BePathLoadStatPairComparator(candFitPaths);
@ -359,6 +438,7 @@ public class BeLoadRebalancer extends Rebalancer {
for (BePathLoadStatPair bePathLoadStat : candFitPaths) {
BackendLoadStatistic beStat = bePathLoadStat.getBackendLoadStatistic();
RootPathLoadStatistic pathStat = bePathLoadStat.getPathLoadStatistic();
PathSlot slot = backendsWorkingSlots.get(beStat.getBeId());
if (slot == null) {
continue;
@ -370,7 +450,7 @@ public class BeLoadRebalancer extends Rebalancer {
}
throw new SchedException(Status.SCHEDULE_FAILED, SubCode.WAITING_SLOT,
"beload waiting for dest backend slot");
"unable to take dest slot" + isUrgentInfo);
}
}

View File

@ -21,6 +21,7 @@ import org.apache.doris.catalog.TabletInvertedIndex;
import org.apache.doris.clone.BackendLoadStatistic.Classification;
import org.apache.doris.clone.BackendLoadStatistic.LoadScore;
import org.apache.doris.common.Config;
import org.apache.doris.common.Pair;
import org.apache.doris.common.util.DebugPointUtil;
import org.apache.doris.common.util.DebugUtil;
import org.apache.doris.resource.Tag;
@ -38,6 +39,7 @@ import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.function.BiConsumer;
import java.util.stream.Collectors;
/*
@ -144,6 +146,7 @@ public class LoadStatisticForTag {
// classify all backends
for (TStorageMedium medium : TStorageMedium.values()) {
classifyBackendByLoad(medium);
classifyBackendByMaxDiskUsage(medium);
}
// sort be stats by mix load score
@ -245,6 +248,84 @@ public class LoadStatisticForTag {
medium, avgLoadScore, lowCounter, midCounter, highCounter);
}
private void classifyBackendByMaxDiskUsage(TStorageMedium medium) {
calcDiskGlobalUsages(medium);
Classification[] clazzs = { Classification.HIGH, Classification.LOW, Classification.MID };
for (BackendLoadStatistic beStat : beLoadStatistics) {
if (!beStat.hasMedium(medium)) {
continue;
}
for (Classification clazz : clazzs) {
if (beStat.hasAvailPathWithGlobalClazz(medium, clazz)) {
beStat.setMaxDiskClazz(medium, clazz);
break;
}
}
}
}
private void calcDiskGlobalUsages(TStorageMedium medium) {
double urgentDiffUsageThreshold;
if (Config.be_rebalancer_fuzzy_test) {
urgentDiffUsageThreshold = 0;
} else {
urgentDiffUsageThreshold = Config.balance_load_score_threshold
+ Config.urgent_balance_disk_usage_extra_threshold;
if (urgentDiffUsageThreshold <= 0) {
return;
}
}
double totalDiskUsages = 0;
int totalDiskNum = 0;
for (BackendLoadStatistic beStat : getBackendLoadStatistics()) {
if (!beStat.isAvailable()) {
continue;
}
for (RootPathLoadStatistic pathStat : beStat.getAvailPaths(medium)) {
if (pathStat.getCapacityB() > 1L) {
totalDiskUsages += pathStat.getUsedPercent();
totalDiskNum++;
}
}
}
if (totalDiskNum == 0) {
return;
}
double avgDiskUsage = totalDiskUsages / totalDiskNum;
double urgentDiskUsage = avgDiskUsage + urgentDiffUsageThreshold;
boolean hasHighDisk = false;
for (BackendLoadStatistic beStat : getBackendLoadStatistics()) {
if (!beStat.isAvailable()) {
continue;
}
for (RootPathLoadStatistic pathStat : beStat.getAvailPaths(medium)) {
if (pathStat.getCapacityB() > 1L) {
double usage = pathStat.getUsedPercent();
if (usage > urgentDiskUsage) {
pathStat.setGlobalClazz(Classification.HIGH);
hasHighDisk = true;
} else if (usage > avgDiskUsage) {
pathStat.setGlobalClazz(Classification.MID);
} else {
pathStat.setGlobalClazz(Classification.LOW);
}
}
}
}
if (!hasHighDisk) {
for (BackendLoadStatistic beStat : getBackendLoadStatistics()) {
for (RootPathLoadStatistic pathStat : beStat.getAvailPaths(medium)) {
pathStat.setGlobalClazz(Classification.MID);
}
}
}
}
private static void sortBeStats(List<BackendLoadStatistic> beStats, TStorageMedium medium) {
if (medium == null) {
Collections.sort(beStats, BackendLoadStatistic.MIX_COMPARATOR);
@ -353,7 +434,8 @@ public class LoadStatisticForTag {
pathStat.add(String.valueOf(pathStatistic.getCapacityB()));
pathStat.add(String.valueOf(DebugUtil.DECIMAL_FORMAT_SCALE_3.format(
pathStatistic.getUsedCapacityB() * 100 / (double) pathStatistic.getCapacityB())));
pathStat.add(pathStatistic.getClazz().name());
pathStat.add(pathStatistic.getLocalClazz().name());
pathStat.add(pathStatistic.getGlobalClazz().name());
pathStat.add(pathStatistic.getDiskState().name());
statistics.add(pathStat);
}
@ -375,6 +457,88 @@ public class LoadStatisticForTag {
return beLoadStatistics;
}
public boolean getLowHighBEsWithIsUrgent(List<BackendLoadStatistic> lowBEs, List<BackendLoadStatistic> highBEs,
TStorageMedium medium) {
if (getUrgentLowHighBEs(lowBEs, highBEs, medium)) {
return true;
} else {
lowBEs.clear();
highBEs.clear();
List<BackendLoadStatistic> midBEs = Lists.newArrayList();
getBackendStatisticByClass(lowBEs, midBEs, highBEs, medium);
return false;
}
}
private boolean getUrgentLowHighBEs(List<BackendLoadStatistic> lowBEs, List<BackendLoadStatistic> highBEs,
TStorageMedium medium) {
List<BackendLoadStatistic> midBEs = Lists.newArrayList();
for (BackendLoadStatistic beStat : getBackendLoadStatistics()) {
if (!beStat.isAvailable()) {
continue;
}
switch (beStat.getMaxDiskClazz(medium)) {
case LOW:
lowBEs.add(beStat);
break;
case MID:
midBEs.add(beStat);
break;
case HIGH:
highBEs.add(beStat);
break;
default:
break;
}
}
if (lowBEs.isEmpty()) {
lowBEs.addAll(midBEs);
}
if (lowBEs.isEmpty() && highBEs.size() > 1 && Config.enable_urgent_balance_no_low_backend) {
// all backend will exchange tablets among themselves.
lowBEs.addAll(highBEs);
}
if (lowBEs.isEmpty() || highBEs.isEmpty()) {
lowBEs.clear();
highBEs.clear();
return false;
}
BiConsumer<List<BackendLoadStatistic>, Boolean> resortBeStats = (beStats, choseMinPathElseMaxPath) -> {
List<Pair<BackendLoadStatistic, Double>> bePairs = Lists.newArrayList();
for (BackendLoadStatistic beStat : beStats) {
double score = -1.0;
for (RootPathLoadStatistic pathStat : beStat.getAvailPaths(medium)) {
if (pathStat.getCapacityB() > 1) {
double usage = pathStat.getUsedPercent();
if (score < 0 || (choseMinPathElseMaxPath && usage < score)
|| (!choseMinPathElseMaxPath && usage > score)) {
score = usage;
}
}
}
bePairs.add(Pair.of(beStat, score));
}
Collections.sort(bePairs, new Pair.PairComparator<Pair<BackendLoadStatistic, Double>>());
beStats.clear();
bePairs.forEach(pair -> beStats.add(pair.key()));
};
resortBeStats.accept(lowBEs, true);
resortBeStats.accept(highBEs, false);
LOG.debug("urgent backends' classification lowBe {}, highBe {}, medium: {}",
lowBEs.stream().map(BackendLoadStatistic::getBeId).collect(Collectors.toList()),
highBEs.stream().map(BackendLoadStatistic::getBeId).collect(Collectors.toList()),
medium);
return true;
}
/*
* If cluster is balance, all Backends will be in 'mid', and 'high' and 'low' is empty
* If both 'high' and 'low' has Backends, just return

View File

@ -34,7 +34,11 @@ public class RootPathLoadStatistic implements Comparable<RootPathLoadStatistic>
private long copingSizeB;
private DiskState diskState;
private Classification clazz = Classification.INIT;
// localClazz is compare with other disks on the same backend
private Classification localClazz = Classification.INIT;
// globalClazz is compare with other disks on all backends
private Classification globalClazz = Classification.INIT;
public RootPathLoadStatistic(long beId, String path, Long pathHash, TStorageMedium storageMedium,
long capacityB, long usedCapacityB, DiskState diskState) {
@ -80,12 +84,28 @@ public class RootPathLoadStatistic implements Comparable<RootPathLoadStatistic>
copingSizeB += size;
}
public void setClazz(Classification clazz) {
this.clazz = clazz;
public void setLocalClazz(Classification clazz) {
this.localClazz = clazz;
}
public Classification getClazz() {
return clazz;
public Classification getLocalClazz() {
return localClazz;
}
public void setGlobalClazz(Classification clazz) {
this.globalClazz = clazz;
}
public Classification getGlobalClazz() {
return globalClazz;
}
public boolean isGlobalHighUsage() {
return globalClazz == Classification.HIGH;
}
public boolean isGlobalLowUsage() {
return globalClazz == Classification.LOW;
}
public DiskState getDiskState() {

View File

@ -842,6 +842,7 @@ public class TabletScheduler extends MasterDaemon {
|| deleteReplicaOnSameHost(tabletCtx, force)
|| deleteReplicaNotInValidTag(tabletCtx, force)
|| deleteReplicaChosenByRebalancer(tabletCtx, force)
|| deleteReplicaOnUrgentHighDisk(tabletCtx, force)
|| deleteReplicaOnHighLoadBackend(tabletCtx, force)) {
// if we delete at least one redundant replica, we still throw a SchedException with status FINISHED
// to remove this tablet from the pendingTablets(consider it as finished)
@ -990,6 +991,34 @@ public class TabletScheduler extends MasterDaemon {
return true;
}
private boolean deleteReplicaOnUrgentHighDisk(TabletSchedCtx tabletCtx, boolean force) throws SchedException {
Tag tag = chooseProperTag(tabletCtx, false);
LoadStatisticForTag statistic = statisticMap.get(tag);
if (statistic == null) {
return false;
}
Replica chosenReplica = null;
double maxUsages = -1;
for (Replica replica : tabletCtx.getReplicas()) {
BackendLoadStatistic beStatistic = statistic.getBackendLoadStatistic(replica.getBackendId());
if (beStatistic == null) {
continue;
}
RootPathLoadStatistic path = beStatistic.getPathStatisticByPathHash(replica.getPathHash());
if (path != null && path.isGlobalHighUsage() && path.getUsedPercent() > maxUsages) {
maxUsages = path.getUsedPercent();
chosenReplica = replica;
}
}
if (chosenReplica != null) {
deleteReplicaInternal(tabletCtx, chosenReplica, "high usage disk", force);
return true;
}
return false;
}
private boolean deleteReplicaOnHighLoadBackend(TabletSchedCtx tabletCtx, boolean force) throws SchedException {
Tag tag = chooseProperTag(tabletCtx, false);
LoadStatisticForTag statistic = statisticMap.get(tag);
@ -1037,7 +1066,7 @@ public class TabletScheduler extends MasterDaemon {
}
if (chosenReplica != null) {
deleteReplicaInternal(tabletCtx, chosenReplica, "high load", force);
deleteReplicaInternal(tabletCtx, chosenReplica, "high load backend", force);
return true;
}
return false;
@ -1251,48 +1280,71 @@ public class TabletScheduler extends MasterDaemon {
return;
}
// No need to prefetch too many balance task to pending queue.
// Because for every sched, it will re select the balance task.
int needAddBalanceNum = Math.min(Config.schedule_batch_size - getPendingNum(),
Config.max_balancing_tablets - getBalanceTabletsNumber());
if (needAddBalanceNum <= 0) {
return;
// TODO: too ugly, remove balance_be_then_disk later.
if (Config.balance_be_then_disk) {
boolean hasBeBalance = selectTabletsForBeBalance();
selectTabletsForDiskBalance(hasBeBalance);
} else {
selectTabletsForDiskBalance(false);
selectTabletsForBeBalance();
}
}
private boolean selectTabletsForBeBalance() {
int limit = getBalanceSchedQuotoLeft();
if (limit <= 0) {
return false;
}
int addNum = 0;
List<TabletSchedCtx> alternativeTablets = rebalancer.selectAlternativeTablets();
Collections.shuffle(alternativeTablets);
for (TabletSchedCtx tabletCtx : alternativeTablets) {
if (needAddBalanceNum > 0 && addTablet(tabletCtx, false) == AddResult.ADDED) {
needAddBalanceNum--;
if (addNum >= limit) {
break;
}
if (addTablet(tabletCtx, false) == AddResult.ADDED) {
addNum++;
} else {
rebalancer.onTabletFailed(tabletCtx);
}
}
if (needAddBalanceNum <= 0) {
return;
}
return addNum > 0;
}
private void selectTabletsForDiskBalance(boolean hasBeBalance) {
if (Config.disable_disk_balance) {
LOG.info("disk balance is disabled. skip selecting tablets for disk balance");
return;
}
List<TabletSchedCtx> diskBalanceTablets = Lists.newArrayList();
// if default rebalancer can not get new task or user given prio BEs, then use disk rebalancer to get task
if (diskRebalancer.hasPrioBackends() || alternativeTablets.isEmpty()) {
diskBalanceTablets = diskRebalancer.selectAlternativeTablets();
int limit = getBalanceSchedQuotoLeft();
if (limit <= 0) {
return;
}
for (TabletSchedCtx tabletCtx : diskBalanceTablets) {
int addNum = 0;
for (TabletSchedCtx tabletCtx : diskRebalancer.selectAlternativeTablets()) {
if (addNum >= limit) {
break;
}
// add if task from prio backend or cluster is balanced
if (alternativeTablets.isEmpty() || tabletCtx.getPriority() == TabletSchedCtx.Priority.NORMAL) {
if (!hasBeBalance || Config.be_rebalancer_idle_seconds <= 0
|| tabletCtx.getPriority() == TabletSchedCtx.Priority.NORMAL) {
if (addTablet(tabletCtx, false) == AddResult.ADDED) {
needAddBalanceNum--;
if (needAddBalanceNum <= 0) {
break;
}
addNum++;
}
}
}
}
private int getBalanceSchedQuotoLeft() {
// No need to prefetch too many balance task to pending queue.
// Because for every sched, it will re select the balance task.
return Math.min(Config.schedule_batch_size - getPendingNum(),
Config.max_balancing_tablets - getBalanceTabletsNumber());
}
/**
* Try to create a balance task for a tablet.
*/

View File

@ -27,7 +27,7 @@ public class BackendLoadStatisticProcNode implements ProcNodeInterface {
public static final ImmutableList<String> TITLE_NAMES = new ImmutableList.Builder<String>()
.add("RootPath").add("PathHash").add("StorageMedium")
.add("DataUsedCapacity").add("TotalCapacity").add("TotalUsedPct")
.add("Class").add("State")
.add("ClassInOneBE").add("ClassInAllBE").add("State")
.build();
private final LoadStatisticForTag statistic;

View File

@ -29,7 +29,7 @@ import com.google.common.collect.ImmutableList;
// show proc "/cluster_balance/cluster_load_stat/location_default/HDD";
public class ClusterLoadStatisticProcDir implements ProcDirInterface {
public static final ImmutableList<String> TITLE_NAMES = new ImmutableList.Builder<String>()
.add("BeId").add("Available").add("UsedCapacity").add("Capacity")
.add("BeId").add("Available").add("UsedCapacity").add("Capacity").add("MaxDisk")
.add("UsedPercent").add("ReplicaNum").add("CapCoeff").add("ReplCoeff").add("Score")
.add("Class")
.build();