[improvement](tablet clone) improve tablet balance, scaling speed etc (#22317)
This commit is contained in:
@ -932,17 +932,19 @@ public class Config extends ConfigBase {
|
||||
public static long tablet_repair_delay_factor_second = 60;
|
||||
|
||||
/**
|
||||
* the default slot number per path in tablet scheduler
|
||||
* the default slot number per path for hdd in tablet scheduler
|
||||
* TODO(cmy): remove this config and dynamically adjust it by clone task statistic
|
||||
*/
|
||||
@ConfField(mutable = true, masterOnly = true)
|
||||
public static int schedule_slot_num_per_path = 4;
|
||||
public static int schedule_slot_num_per_hdd_path = 4;
|
||||
|
||||
|
||||
/**
|
||||
* the default slot number per path in tablet scheduler for decommission backend
|
||||
* the default slot number per path for ssd in tablet scheduler
|
||||
* TODO(cmy): remove this config and dynamically adjust it by clone task statistic
|
||||
*/
|
||||
@ConfField(mutable = true, masterOnly = true)
|
||||
public static int schedule_decommission_slot_num_per_path = 8;
|
||||
public static int schedule_slot_num_per_ssd_path = 8;
|
||||
|
||||
/**
|
||||
* the default batch size in tablet scheduler for a single schedule.
|
||||
|
||||
@ -68,6 +68,70 @@ public class BackendLoadStatistic {
|
||||
}
|
||||
}
|
||||
|
||||
public static class BePathLoadStatPair {
|
||||
private BackendLoadStatistic beLoadStatistic;
|
||||
private RootPathLoadStatistic pathLoadStatistic;
|
||||
|
||||
BePathLoadStatPair(BackendLoadStatistic beLoadStatistic, RootPathLoadStatistic pathLoadStatistic) {
|
||||
this.beLoadStatistic = beLoadStatistic;
|
||||
this.pathLoadStatistic = pathLoadStatistic;
|
||||
}
|
||||
|
||||
BackendLoadStatistic getBackendLoadStatistic() {
|
||||
return beLoadStatistic;
|
||||
}
|
||||
|
||||
RootPathLoadStatistic getPathLoadStatistic() {
|
||||
return pathLoadStatistic;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return "{ beId: " + beLoadStatistic.getBeId() + ", be score: "
|
||||
+ beLoadStatistic.getLoadScore(pathLoadStatistic.getStorageMedium())
|
||||
+ ", path: " + pathLoadStatistic.getPath()
|
||||
+ ", path used percent: " + pathLoadStatistic.getUsedPercent()
|
||||
+ " }";
|
||||
}
|
||||
}
|
||||
|
||||
public static class BePathLoadStatPairComparator implements Comparator<BePathLoadStatPair> {
|
||||
private double avgBackendLoadScore;
|
||||
private double avgPathUsedPercent;
|
||||
|
||||
BePathLoadStatPairComparator(List<BePathLoadStatPair> loadStats) {
|
||||
avgBackendLoadScore = 0.0;
|
||||
avgPathUsedPercent = 0.0;
|
||||
for (BePathLoadStatPair loadStat : loadStats) {
|
||||
RootPathLoadStatistic pathStat = loadStat.getPathLoadStatistic();
|
||||
avgBackendLoadScore += loadStat.getBackendLoadStatistic().getLoadScore(pathStat.getStorageMedium());
|
||||
avgPathUsedPercent += pathStat.getUsedPercent();
|
||||
}
|
||||
if (!loadStats.isEmpty()) {
|
||||
avgPathUsedPercent /= loadStats.size();
|
||||
avgBackendLoadScore /= loadStats.size();
|
||||
}
|
||||
if (avgBackendLoadScore == 0.0) {
|
||||
avgBackendLoadScore = 1.0;
|
||||
}
|
||||
if (avgPathUsedPercent == 0.0) {
|
||||
avgPathUsedPercent = 1.0;
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public int compare(BePathLoadStatPair o1, BePathLoadStatPair o2) {
|
||||
return Double.compare(getCompareValue(o1), getCompareValue(o2));
|
||||
}
|
||||
|
||||
private double getCompareValue(BePathLoadStatPair loadStat) {
|
||||
BackendLoadStatistic beStat = loadStat.getBackendLoadStatistic();
|
||||
RootPathLoadStatistic pathStat = loadStat.getPathLoadStatistic();
|
||||
return 0.5 * beStat.getLoadScore(pathStat.getStorageMedium()) / avgBackendLoadScore
|
||||
+ 0.5 * pathStat.getUsedPercent() / avgPathUsedPercent;
|
||||
}
|
||||
}
|
||||
|
||||
public static final BeStatComparator HDD_COMPARATOR = new BeStatComparator(TStorageMedium.HDD);
|
||||
public static final BeStatComparator SSD_COMPARATOR = new BeStatComparator(TStorageMedium.SSD);
|
||||
public static final BeStatMixComparator MIX_COMPARATOR = new BeStatMixComparator();
|
||||
@ -362,9 +426,9 @@ public class BackendLoadStatistic {
|
||||
}
|
||||
|
||||
result.add(pathStatistic);
|
||||
return BalanceStatus.OK;
|
||||
}
|
||||
return status;
|
||||
|
||||
return result.isEmpty() ? status : BalanceStatus.OK;
|
||||
}
|
||||
|
||||
/**
|
||||
@ -456,6 +520,50 @@ public class BackendLoadStatistic {
|
||||
beId, low.size(), mid.size(), high.size());
|
||||
}
|
||||
|
||||
public void getPathStatisticByClass(List<RootPathLoadStatistic> low,
|
||||
List<RootPathLoadStatistic> mid, List<RootPathLoadStatistic> high, TStorageMedium storageMedium) {
|
||||
for (RootPathLoadStatistic pathStat : pathStatistics) {
|
||||
if (pathStat.getDiskState() == DiskState.OFFLINE
|
||||
|| (storageMedium != null && pathStat.getStorageMedium() != storageMedium)) {
|
||||
continue;
|
||||
}
|
||||
|
||||
if (pathStat.getClazz() == Classification.LOW) {
|
||||
low.add(pathStat);
|
||||
} else if (pathStat.getClazz() == Classification.HIGH) {
|
||||
high.add(pathStat);
|
||||
} else {
|
||||
mid.add(pathStat);
|
||||
}
|
||||
}
|
||||
|
||||
LOG.debug("after adjust, backend {} path classification low/mid/high: {}/{}/{}",
|
||||
beId, low.size(), mid.size(), high.size());
|
||||
}
|
||||
|
||||
public void incrPathsCopingSize(Map<Long, Long> pathsCopingSize) {
|
||||
boolean updated = false;
|
||||
for (RootPathLoadStatistic pathStat : pathStatistics) {
|
||||
Long copingSize = pathsCopingSize.get(pathStat.getPathHash());
|
||||
if (copingSize != null && copingSize > 0) {
|
||||
pathStat.incrCopingSizeB(copingSize);
|
||||
updated = true;
|
||||
}
|
||||
}
|
||||
if (updated) {
|
||||
Collections.sort(pathStatistics);
|
||||
}
|
||||
}
|
||||
|
||||
public void incrPathCopingSize(long pathHash, long copingSize) {
|
||||
RootPathLoadStatistic pathStat = pathStatistics.stream().filter(
|
||||
p -> p.getPathHash() == pathHash).findFirst().orElse(null);
|
||||
if (pathStat != null) {
|
||||
pathStat.incrCopingSizeB(copingSize);
|
||||
Collections.sort(pathStatistics);
|
||||
}
|
||||
}
|
||||
|
||||
public List<RootPathLoadStatistic> getPathStatistics() {
|
||||
return pathStatistics;
|
||||
}
|
||||
|
||||
@ -22,6 +22,8 @@ import org.apache.doris.catalog.Env;
|
||||
import org.apache.doris.catalog.Replica;
|
||||
import org.apache.doris.catalog.TabletInvertedIndex;
|
||||
import org.apache.doris.catalog.TabletMeta;
|
||||
import org.apache.doris.clone.BackendLoadStatistic.BePathLoadStatPair;
|
||||
import org.apache.doris.clone.BackendLoadStatistic.BePathLoadStatPairComparator;
|
||||
import org.apache.doris.clone.SchedException.Status;
|
||||
import org.apache.doris.clone.SchedException.SubCode;
|
||||
import org.apache.doris.clone.TabletSchedCtx.Priority;
|
||||
@ -51,8 +53,9 @@ import java.util.Set;
|
||||
public class BeLoadRebalancer extends Rebalancer {
|
||||
private static final Logger LOG = LogManager.getLogger(BeLoadRebalancer.class);
|
||||
|
||||
public BeLoadRebalancer(SystemInfoService infoService, TabletInvertedIndex invertedIndex) {
|
||||
super(infoService, invertedIndex);
|
||||
public BeLoadRebalancer(SystemInfoService infoService, TabletInvertedIndex invertedIndex,
|
||||
Map<Long, PathSlot> backendsWorkingSlots) {
|
||||
super(infoService, invertedIndex, backendsWorkingSlots);
|
||||
}
|
||||
|
||||
/*
|
||||
@ -100,9 +103,16 @@ public class BeLoadRebalancer extends Rebalancer {
|
||||
return alternativeTablets;
|
||||
}
|
||||
|
||||
// get the number of low load paths. and we should at most select this number of tablets
|
||||
long numOfLowPaths = lowBEs.stream().filter(b -> b.isAvailable() && b.hasAvailDisk()).mapToLong(
|
||||
b -> b.getAvailPathNum(medium)).sum();
|
||||
long numOfLowPaths = 0;
|
||||
for (BackendLoadStatistic backendLoadStatistic : lowBEs) {
|
||||
if (!backendLoadStatistic.isAvailable()) {
|
||||
continue;
|
||||
}
|
||||
PathSlot pathSlot = backendsWorkingSlots.get(backendLoadStatistic.getBeId());
|
||||
if (pathSlot != null) {
|
||||
numOfLowPaths += pathSlot.getTotalAvailBalanceSlotNum();
|
||||
}
|
||||
}
|
||||
LOG.info("get number of low load paths: {}, with medium: {}", numOfLowPaths, medium);
|
||||
|
||||
int clusterAvailableBEnum = infoService.getAllBackendIds(true).size();
|
||||
@ -113,6 +123,10 @@ public class BeLoadRebalancer extends Rebalancer {
|
||||
OUTER:
|
||||
for (int i = highBEs.size() - 1; i >= 0; i--) {
|
||||
BackendLoadStatistic beStat = highBEs.get(i);
|
||||
PathSlot pathSlot = backendsWorkingSlots.get(beStat.getBeId());
|
||||
if (pathSlot == null) {
|
||||
continue;
|
||||
}
|
||||
|
||||
// classify the paths.
|
||||
Set<Long> pathLow = Sets.newHashSet();
|
||||
@ -129,7 +143,10 @@ public class BeLoadRebalancer extends Rebalancer {
|
||||
// for each path, we try to select at most BALANCE_SLOT_NUM_FOR_PATH tablets
|
||||
Map<Long, Integer> remainingPaths = Maps.newHashMap();
|
||||
for (Long pathHash : pathHigh) {
|
||||
remainingPaths.put(pathHash, Config.balance_slot_num_per_path);
|
||||
int availBalanceNum = pathSlot.getAvailableBalanceNum(pathHash);
|
||||
if (availBalanceNum > 0) {
|
||||
remainingPaths.put(pathHash, availBalanceNum);
|
||||
}
|
||||
}
|
||||
|
||||
if (remainingPaths.isEmpty()) {
|
||||
@ -201,8 +218,7 @@ public class BeLoadRebalancer extends Rebalancer {
|
||||
* 2. Select a low load backend as destination. And tablet should not has replica on this backend.
|
||||
*/
|
||||
@Override
|
||||
public void completeSchedCtx(TabletSchedCtx tabletCtx,
|
||||
Map<Long, PathSlot> backendsWorkingSlots) throws SchedException {
|
||||
public void completeSchedCtx(TabletSchedCtx tabletCtx) throws SchedException {
|
||||
LoadStatisticForTag clusterStat = statisticMap.get(tabletCtx.getTag());
|
||||
if (clusterStat == null) {
|
||||
throw new SchedException(Status.UNRECOVERABLE,
|
||||
@ -305,6 +321,7 @@ public class BeLoadRebalancer extends Rebalancer {
|
||||
throw new SchedException(Status.UNRECOVERABLE, "unable to find low backend");
|
||||
}
|
||||
|
||||
List<BePathLoadStatPair> candFitPaths = Lists.newArrayList();
|
||||
for (BackendLoadStatistic beStat : candidates) {
|
||||
PathSlot slot = backendsWorkingSlots.get(beStat.getBeId());
|
||||
if (slot == null) {
|
||||
@ -313,15 +330,26 @@ public class BeLoadRebalancer extends Rebalancer {
|
||||
|
||||
// classify the paths.
|
||||
// And we only select path from 'low' and 'mid' paths
|
||||
Set<Long> pathLow = Sets.newHashSet();
|
||||
Set<Long> pathMid = Sets.newHashSet();
|
||||
Set<Long> pathHigh = Sets.newHashSet();
|
||||
List<RootPathLoadStatistic> pathLow = Lists.newArrayList();
|
||||
List<RootPathLoadStatistic> pathMid = Lists.newArrayList();
|
||||
List<RootPathLoadStatistic> pathHigh = Lists.newArrayList();
|
||||
beStat.getPathStatisticByClass(pathLow, pathMid, pathHigh, tabletCtx.getStorageMedium());
|
||||
pathLow.addAll(pathMid);
|
||||
|
||||
long pathHash = slot.takeAnAvailBalanceSlotFrom(pathLow);
|
||||
if (pathHash != -1) {
|
||||
tabletCtx.setDest(beStat.getBeId(), pathHash);
|
||||
pathLow.addAll(pathMid);
|
||||
pathLow.stream().forEach(path -> candFitPaths.add(new BePathLoadStatPair(beStat, path)));
|
||||
}
|
||||
|
||||
BePathLoadStatPairComparator comparator = new BePathLoadStatPairComparator(candFitPaths);
|
||||
Collections.sort(candFitPaths, comparator);
|
||||
for (BePathLoadStatPair bePathLoadStat : candFitPaths) {
|
||||
BackendLoadStatistic beStat = bePathLoadStat.getBackendLoadStatistic();
|
||||
RootPathLoadStatistic pathStat = bePathLoadStat.getPathLoadStatistic();
|
||||
PathSlot slot = backendsWorkingSlots.get(beStat.getBeId());
|
||||
if (slot == null) {
|
||||
continue;
|
||||
}
|
||||
if (slot.takeBalanceSlot(pathStat.getPathHash()) != -1) {
|
||||
tabletCtx.setDest(beStat.getBeId(), pathStat.getPathHash());
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
@ -24,7 +24,6 @@ import org.apache.doris.clone.SchedException.Status;
|
||||
import org.apache.doris.clone.TabletSchedCtx.BalanceType;
|
||||
import org.apache.doris.clone.TabletSchedCtx.Priority;
|
||||
import org.apache.doris.clone.TabletScheduler.PathSlot;
|
||||
import org.apache.doris.common.Config;
|
||||
import org.apache.doris.system.SystemInfoService;
|
||||
import org.apache.doris.thrift.TStorageMedium;
|
||||
|
||||
@ -52,8 +51,9 @@ import java.util.Set;
|
||||
public class DiskRebalancer extends Rebalancer {
|
||||
private static final Logger LOG = LogManager.getLogger(DiskRebalancer.class);
|
||||
|
||||
public DiskRebalancer(SystemInfoService infoService, TabletInvertedIndex invertedIndex) {
|
||||
super(infoService, invertedIndex);
|
||||
public DiskRebalancer(SystemInfoService infoService, TabletInvertedIndex invertedIndex,
|
||||
Map<Long, PathSlot> backendsWorkingSlots) {
|
||||
super(infoService, invertedIndex, backendsWorkingSlots);
|
||||
}
|
||||
|
||||
public List<BackendLoadStatistic> filterByPrioBackends(List<BackendLoadStatistic> bes) {
|
||||
@ -152,6 +152,10 @@ public class DiskRebalancer extends Rebalancer {
|
||||
Collections.shuffle(midBEs);
|
||||
for (int i = midBEs.size() - 1; i >= 0; i--) {
|
||||
BackendLoadStatistic beStat = midBEs.get(i);
|
||||
PathSlot pathSlot = backendsWorkingSlots.get(beStat.getBeId());
|
||||
if (pathSlot == null) {
|
||||
continue;
|
||||
}
|
||||
|
||||
// classify the paths.
|
||||
Set<Long> pathLow = Sets.newHashSet();
|
||||
@ -171,7 +175,10 @@ public class DiskRebalancer extends Rebalancer {
|
||||
// for each path, we try to select at most BALANCE_SLOT_NUM_FOR_PATH tablets
|
||||
Map<Long, Integer> remainingPaths = Maps.newHashMap();
|
||||
for (Long pathHash : pathHigh) {
|
||||
remainingPaths.put(pathHash, Config.balance_slot_num_per_path);
|
||||
int availBalanceNum = pathSlot.getAvailableBalanceNum(pathHash);
|
||||
if (availBalanceNum > 0) {
|
||||
remainingPaths.put(pathHash, availBalanceNum);
|
||||
}
|
||||
}
|
||||
|
||||
if (remainingPaths.isEmpty()) {
|
||||
@ -246,8 +253,7 @@ public class DiskRebalancer extends Rebalancer {
|
||||
* 3. Select a low load path from this backend as destination.
|
||||
*/
|
||||
@Override
|
||||
public void completeSchedCtx(TabletSchedCtx tabletCtx,
|
||||
Map<Long, PathSlot> backendsWorkingSlots) throws SchedException {
|
||||
public void completeSchedCtx(TabletSchedCtx tabletCtx) throws SchedException {
|
||||
LoadStatisticForTag clusterStat = statisticMap.get(tabletCtx.getTag());
|
||||
if (clusterStat == null) {
|
||||
throw new SchedException(Status.UNRECOVERABLE,
|
||||
@ -323,7 +329,7 @@ public class DiskRebalancer extends Rebalancer {
|
||||
}
|
||||
long destPathHash = slot.takeBalanceSlot(stat.getPathHash());
|
||||
if (destPathHash == -1) {
|
||||
throw new SchedException(Status.UNRECOVERABLE, "unable to take dest slot");
|
||||
continue;
|
||||
}
|
||||
tabletCtx.setDest(beStat.getBeId(), destPathHash, stat.getPath());
|
||||
setDest = true;
|
||||
|
||||
@ -342,6 +342,10 @@ public class LoadStatisticForTag {
|
||||
return null;
|
||||
}
|
||||
|
||||
public List<BackendLoadStatistic> getBackendLoadStatistics() {
|
||||
return beLoadStatistics;
|
||||
}
|
||||
|
||||
/*
|
||||
* If cluster is balance, all Backends will be in 'mid', and 'high' and 'low' is empty
|
||||
* If both 'high' and 'low' has Backends, just return
|
||||
|
||||
@ -20,6 +20,7 @@ package org.apache.doris.clone;
|
||||
import org.apache.doris.catalog.Replica;
|
||||
import org.apache.doris.catalog.TabletInvertedIndex;
|
||||
import org.apache.doris.catalog.TabletMeta;
|
||||
import org.apache.doris.clone.TabletScheduler.PathSlot;
|
||||
import org.apache.doris.common.Config;
|
||||
import org.apache.doris.common.Pair;
|
||||
import org.apache.doris.resource.Tag;
|
||||
@ -63,8 +64,9 @@ public class PartitionRebalancer extends Rebalancer {
|
||||
private final AtomicLong counterBalanceMoveCreated = new AtomicLong(0);
|
||||
private final AtomicLong counterBalanceMoveSucceeded = new AtomicLong(0);
|
||||
|
||||
public PartitionRebalancer(SystemInfoService infoService, TabletInvertedIndex invertedIndex) {
|
||||
super(infoService, invertedIndex);
|
||||
public PartitionRebalancer(SystemInfoService infoService, TabletInvertedIndex invertedIndex,
|
||||
Map<Long, PathSlot> backendsWorkingSlots) {
|
||||
super(infoService, invertedIndex, backendsWorkingSlots);
|
||||
}
|
||||
|
||||
@Override
|
||||
@ -229,7 +231,7 @@ public class PartitionRebalancer extends Rebalancer {
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void completeSchedCtx(TabletSchedCtx tabletCtx, Map<Long, TabletScheduler.PathSlot> backendsWorkingSlots)
|
||||
protected void completeSchedCtx(TabletSchedCtx tabletCtx)
|
||||
throws SchedException {
|
||||
MovesCacheMap.MovesCache movesInProgress = movesCacheMap.getCache(tabletCtx.getTag(),
|
||||
tabletCtx.getStorageMedium());
|
||||
@ -271,10 +273,10 @@ public class PartitionRebalancer extends Rebalancer {
|
||||
if (pathHash == -1) {
|
||||
throw new SchedException(SchedException.Status.SCHEDULE_FAILED, SchedException.SubCode.WAITING_SLOT,
|
||||
"paths has no available balance slot: " + availPath);
|
||||
} else {
|
||||
tabletCtx.setDest(beStat.getBeId(), pathHash);
|
||||
}
|
||||
|
||||
tabletCtx.setDest(beStat.getBeId(), pathHash);
|
||||
|
||||
// ToDeleteReplica is the source replica
|
||||
pair.second = srcReplica.getId();
|
||||
} catch (IllegalStateException | NullPointerException e) {
|
||||
|
||||
@ -49,14 +49,17 @@ public abstract class Rebalancer {
|
||||
// When Rebalancer init, the statisticMap is usually empty. So it's no need to be an arg.
|
||||
// Only use updateLoadStatistic() to load stats.
|
||||
protected Map<Tag, LoadStatisticForTag> statisticMap = Maps.newHashMap();
|
||||
protected Map<Long, PathSlot> backendsWorkingSlots;
|
||||
protected TabletInvertedIndex invertedIndex;
|
||||
protected SystemInfoService infoService;
|
||||
// be id -> end time of prio
|
||||
protected Map<Long, Long> prioBackends = Maps.newConcurrentMap();
|
||||
|
||||
public Rebalancer(SystemInfoService infoService, TabletInvertedIndex invertedIndex) {
|
||||
public Rebalancer(SystemInfoService infoService, TabletInvertedIndex invertedIndex,
|
||||
Map<Long, PathSlot> backendsWorkingSlots) {
|
||||
this.infoService = infoService;
|
||||
this.invertedIndex = invertedIndex;
|
||||
this.backendsWorkingSlots = backendsWorkingSlots;
|
||||
}
|
||||
|
||||
public List<TabletSchedCtx> selectAlternativeTablets() {
|
||||
@ -74,9 +77,9 @@ public abstract class Rebalancer {
|
||||
protected abstract List<TabletSchedCtx> selectAlternativeTabletsForCluster(
|
||||
LoadStatisticForTag clusterStat, TStorageMedium medium);
|
||||
|
||||
public AgentTask createBalanceTask(TabletSchedCtx tabletCtx, Map<Long, PathSlot> backendsWorkingSlots)
|
||||
public AgentTask createBalanceTask(TabletSchedCtx tabletCtx)
|
||||
throws SchedException {
|
||||
completeSchedCtx(tabletCtx, backendsWorkingSlots);
|
||||
completeSchedCtx(tabletCtx);
|
||||
if (tabletCtx.getBalanceType() == TabletSchedCtx.BalanceType.BE_BALANCE) {
|
||||
return tabletCtx.createCloneReplicaAndTask();
|
||||
} else {
|
||||
@ -90,7 +93,7 @@ public abstract class Rebalancer {
|
||||
// You should check the moves' validation.
|
||||
// 2. If you want to generate {srcReplica, destBe} here, just do it.
|
||||
// 3. You should check the path slots of src & dest.
|
||||
protected abstract void completeSchedCtx(TabletSchedCtx tabletCtx, Map<Long, PathSlot> backendsWorkingSlots)
|
||||
protected abstract void completeSchedCtx(TabletSchedCtx tabletCtx)
|
||||
throws SchedException;
|
||||
|
||||
public Long getToDeleteReplicaId(TabletSchedCtx tabletCtx) {
|
||||
|
||||
@ -31,6 +31,7 @@ public class RootPathLoadStatistic implements Comparable<RootPathLoadStatistic>
|
||||
private TStorageMedium storageMedium;
|
||||
private long capacityB;
|
||||
private long usedCapacityB;
|
||||
private long copingSizeB;
|
||||
private DiskState diskState;
|
||||
|
||||
private Classification clazz = Classification.INIT;
|
||||
@ -43,6 +44,7 @@ public class RootPathLoadStatistic implements Comparable<RootPathLoadStatistic>
|
||||
this.storageMedium = storageMedium;
|
||||
this.capacityB = capacityB <= 0 ? 1 : capacityB;
|
||||
this.usedCapacityB = usedCapacityB;
|
||||
this.copingSizeB = 0;
|
||||
this.diskState = diskState;
|
||||
}
|
||||
|
||||
@ -71,7 +73,11 @@ public class RootPathLoadStatistic implements Comparable<RootPathLoadStatistic>
|
||||
}
|
||||
|
||||
public double getUsedPercent() {
|
||||
return capacityB <= 0 ? 0.0 : usedCapacityB / (double) capacityB;
|
||||
return capacityB <= 0 ? 0.0 : (usedCapacityB + copingSizeB) / (double) capacityB;
|
||||
}
|
||||
|
||||
public void incrCopingSizeB(long size) {
|
||||
copingSizeB += size;
|
||||
}
|
||||
|
||||
public void setClazz(Classification clazz) {
|
||||
|
||||
@ -34,6 +34,7 @@ import org.apache.doris.clone.TabletScheduler.PathSlot;
|
||||
import org.apache.doris.common.Config;
|
||||
import org.apache.doris.common.FeConstants;
|
||||
import org.apache.doris.common.Pair;
|
||||
import org.apache.doris.common.util.DebugUtil;
|
||||
import org.apache.doris.common.util.TimeUtils;
|
||||
import org.apache.doris.persist.ReplicaPersistInfo;
|
||||
import org.apache.doris.resource.Tag;
|
||||
@ -179,6 +180,8 @@ public class TabletSchedCtx implements Comparable<TabletSchedCtx> {
|
||||
private long visibleVersion = -1;
|
||||
private long committedVersion = -1;
|
||||
|
||||
private long tabletSize = 0;
|
||||
|
||||
private Replica srcReplica = null;
|
||||
private long srcPathHash = -1;
|
||||
// for disk balance to keep src path, and avoid take slot on selectAlternativeTablets
|
||||
@ -281,6 +284,10 @@ public class TabletSchedCtx implements Comparable<TabletSchedCtx> {
|
||||
return failedSchedCounter;
|
||||
}
|
||||
|
||||
public void resetFailedSchedCounter() {
|
||||
failedSchedCounter = 0;
|
||||
}
|
||||
|
||||
public void increaseFailedRunningCounter() {
|
||||
++failedRunningCounter;
|
||||
}
|
||||
@ -301,7 +308,7 @@ public class TabletSchedCtx implements Comparable<TabletSchedCtx> {
|
||||
} else {
|
||||
decommissionTime = -1;
|
||||
if (code == SubCode.WAITING_SLOT && type != Type.BALANCE) {
|
||||
return failedSchedCounter > 30 * 1000 / TabletScheduler.SCHEDULE_INTERVAL_MS;
|
||||
return failedSchedCounter > 30 * 1000 / FeConstants.tablet_schedule_interval_ms;
|
||||
} else {
|
||||
return failedSchedCounter > 10;
|
||||
}
|
||||
@ -477,13 +484,13 @@ public class TabletSchedCtx implements Comparable<TabletSchedCtx> {
|
||||
|
||||
// database lock should be held.
|
||||
public long getTabletSize() {
|
||||
long max = Long.MIN_VALUE;
|
||||
for (Replica replica : tablet.getReplicas()) {
|
||||
if (replica.getDataSize() > max) {
|
||||
max = replica.getDataSize();
|
||||
}
|
||||
}
|
||||
return max;
|
||||
return tabletSize;
|
||||
}
|
||||
|
||||
public void updateTabletSize() {
|
||||
tabletSize = 0;
|
||||
tablet.getReplicas().stream().forEach(
|
||||
replica -> tabletSize = Math.max(tabletSize, replica.getDataSize()));
|
||||
}
|
||||
|
||||
/*
|
||||
@ -905,6 +912,7 @@ public class TabletSchedCtx implements Comparable<TabletSchedCtx> {
|
||||
// if this is a balance task, or this is a repair task with
|
||||
// REPLICA_MISSING/REPLICA_RELOCATING,
|
||||
// we create a new replica with state CLONE
|
||||
long replicaId = 0;
|
||||
if (tabletStatus == TabletStatus.REPLICA_MISSING
|
||||
|| tabletStatus == TabletStatus.REPLICA_RELOCATING || type == Type.BALANCE
|
||||
|| tabletStatus == TabletStatus.COLOCATE_MISMATCH
|
||||
@ -917,14 +925,9 @@ public class TabletSchedCtx implements Comparable<TabletSchedCtx> {
|
||||
committedVersion, /* use committed version as last failed version */
|
||||
-1 /* last success version */);
|
||||
|
||||
TBackend tSrcBe = new TBackend(srcBe.getHost(), srcBe.getBePort(), srcBe.getHttpPort());
|
||||
cloneTask = new CloneTask(destBackendId, dbId, tblId, partitionId, indexId,
|
||||
tabletId, cloneReplica.getId(), schemaHash, Lists.newArrayList(tSrcBe), storageMedium,
|
||||
visibleVersion, (int) (taskTimeoutMs / 1000));
|
||||
cloneTask.setPathHash(srcPathHash, destPathHash);
|
||||
|
||||
// addReplica() method will add this replica to tablet inverted index too.
|
||||
tablet.addReplica(cloneReplica);
|
||||
replicaId = cloneReplica.getId();
|
||||
} else if (tabletStatus == TabletStatus.VERSION_INCOMPLETE) {
|
||||
Preconditions.checkState(type == Type.REPAIR, type);
|
||||
// double check
|
||||
@ -937,18 +940,31 @@ public class TabletSchedCtx implements Comparable<TabletSchedCtx> {
|
||||
throw new SchedException(Status.SCHEDULE_FAILED, "dest replica's path hash is changed. "
|
||||
+ "current: " + replica.getPathHash() + ", scheduled: " + destPathHash);
|
||||
}
|
||||
|
||||
TBackend tSrcBe = new TBackend(srcBe.getHost(), srcBe.getBePort(), srcBe.getHttpPort());
|
||||
cloneTask = new CloneTask(destBackendId, dbId, tblId, partitionId, indexId,
|
||||
tabletId, replica.getId(), schemaHash, Lists.newArrayList(tSrcBe), storageMedium,
|
||||
visibleVersion, (int) (taskTimeoutMs / 1000));
|
||||
cloneTask.setPathHash(srcPathHash, destPathHash);
|
||||
replicaId = replica.getId();
|
||||
}
|
||||
|
||||
TBackend tSrcBe = new TBackend(srcBe.getHost(), srcBe.getBePort(), srcBe.getHttpPort());
|
||||
TBackend tDestBe = new TBackend(destBe.getHost(), destBe.getBePort(), destBe.getHttpPort());
|
||||
|
||||
cloneTask = new CloneTask(tDestBe, destBackendId, dbId, tblId, partitionId, indexId, tabletId,
|
||||
replicaId, schemaHash, Lists.newArrayList(tSrcBe), storageMedium,
|
||||
visibleVersion, (int) (taskTimeoutMs / 1000));
|
||||
cloneTask.setPathHash(srcPathHash, destPathHash);
|
||||
|
||||
this.state = State.RUNNING;
|
||||
return cloneTask;
|
||||
}
|
||||
|
||||
// for storage migration or cloning a new replica
|
||||
public long getDestEstimatedCopingSize() {
|
||||
if ((cloneTask != null && tabletStatus != TabletStatus.VERSION_INCOMPLETE)
|
||||
|| storageMediaMigrationTask != null) {
|
||||
return Math.max(getTabletSize(), 10L);
|
||||
} else {
|
||||
return 0;
|
||||
}
|
||||
}
|
||||
|
||||
// timeout is between MIN_CLONE_TASK_TIMEOUT_MS and MAX_CLONE_TASK_TIMEOUT_MS
|
||||
private long getApproximateTimeoutMs() {
|
||||
long tabletSize = getTabletSize();
|
||||
@ -1131,6 +1147,8 @@ public class TabletSchedCtx implements Comparable<TabletSchedCtx> {
|
||||
result.add(TimeUtils.longToTimeString(lastSchedTime));
|
||||
result.add(TimeUtils.longToTimeString(lastVisitedTime));
|
||||
result.add(TimeUtils.longToTimeString(finishedTime));
|
||||
Pair<Double, String> tabletSizeDesc = DebugUtil.getByteUint(tabletSize);
|
||||
result.add(DebugUtil.DECIMAL_FORMAT_SCALE_3.format(tabletSizeDesc.first) + " " + tabletSizeDesc.second);
|
||||
result.add(copyTimeMs > 0 ? String.valueOf(copySize / copyTimeMs / 1000.0) : FeConstants.null_string);
|
||||
result.add(String.valueOf(failedSchedCounter));
|
||||
result.add(String.valueOf(failedRunningCounter));
|
||||
@ -1162,8 +1180,9 @@ public class TabletSchedCtx implements Comparable<TabletSchedCtx> {
|
||||
value += 5 * 1000L;
|
||||
}
|
||||
|
||||
// repair tasks always prior than balance
|
||||
if (type == Type.BALANCE) {
|
||||
value += 30 * 60 * 1000L;
|
||||
value += 10 * 24 * 3600L;
|
||||
}
|
||||
|
||||
return value;
|
||||
@ -1174,12 +1193,19 @@ public class TabletSchedCtx implements Comparable<TabletSchedCtx> {
|
||||
StringBuilder sb = new StringBuilder();
|
||||
sb.append("tablet id: ").append(tabletId).append(", status: ").append(tabletStatus.name());
|
||||
sb.append(", state: ").append(state.name()).append(", type: ").append(type.name());
|
||||
if (type == Type.BALANCE && balanceType != null) {
|
||||
sb.append(", balance: ").append(balanceType.name());
|
||||
}
|
||||
if (priority != null) {
|
||||
sb.append(", priority: ").append(priority.name());
|
||||
}
|
||||
sb.append(", tablet size: ").append(tabletSize);
|
||||
if (srcReplica != null) {
|
||||
sb.append(". from backend: ").append(srcReplica.getBackendId());
|
||||
sb.append(", from backend: ").append(srcReplica.getBackendId());
|
||||
sb.append(", src path hash: ").append(srcPathHash);
|
||||
}
|
||||
if (destPathHash != -1) {
|
||||
sb.append(". to backend: ").append(destBackendId);
|
||||
sb.append(", to backend: ").append(destBackendId);
|
||||
sb.append(", dest path hash: ").append(destPathHash);
|
||||
}
|
||||
sb.append(", visible version: ").append(visibleVersion);
|
||||
|
||||
@ -22,7 +22,6 @@ import org.apache.doris.analysis.AdminRebalanceDiskStmt;
|
||||
import org.apache.doris.catalog.ColocateTableIndex;
|
||||
import org.apache.doris.catalog.ColocateTableIndex.GroupId;
|
||||
import org.apache.doris.catalog.Database;
|
||||
import org.apache.doris.catalog.DiskInfo;
|
||||
import org.apache.doris.catalog.DiskInfo.DiskState;
|
||||
import org.apache.doris.catalog.Env;
|
||||
import org.apache.doris.catalog.MaterializedIndex;
|
||||
@ -36,6 +35,8 @@ import org.apache.doris.catalog.ReplicaAllocation;
|
||||
import org.apache.doris.catalog.Tablet;
|
||||
import org.apache.doris.catalog.Tablet.TabletStatus;
|
||||
import org.apache.doris.catalog.TabletInvertedIndex;
|
||||
import org.apache.doris.clone.BackendLoadStatistic.BePathLoadStatPair;
|
||||
import org.apache.doris.clone.BackendLoadStatistic.BePathLoadStatPairComparator;
|
||||
import org.apache.doris.clone.SchedException.Status;
|
||||
import org.apache.doris.clone.SchedException.SubCode;
|
||||
import org.apache.doris.clone.TabletSchedCtx.Priority;
|
||||
@ -58,6 +59,7 @@ import org.apache.doris.task.DropReplicaTask;
|
||||
import org.apache.doris.task.StorageMediaMigrationTask;
|
||||
import org.apache.doris.thrift.TFinishTaskRequest;
|
||||
import org.apache.doris.thrift.TStatusCode;
|
||||
import org.apache.doris.thrift.TStorageMedium;
|
||||
import org.apache.doris.transaction.DatabaseTransactionMgr;
|
||||
import org.apache.doris.transaction.TransactionState;
|
||||
|
||||
@ -72,12 +74,12 @@ import org.apache.logging.log4j.LogManager;
|
||||
import org.apache.logging.log4j.Logger;
|
||||
|
||||
import java.util.Collection;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.PriorityQueue;
|
||||
import java.util.Queue;
|
||||
import java.util.Set;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
/**
|
||||
* TabletScheduler saved the tablets produced by TabletChecker and try to schedule them.
|
||||
@ -103,8 +105,6 @@ public class TabletScheduler extends MasterDaemon {
|
||||
// the minimum interval of updating cluster statistics and priority of tablet info
|
||||
private static final long STAT_UPDATE_INTERVAL_MS = 20 * 1000; // 20s
|
||||
|
||||
public static final long SCHEDULE_INTERVAL_MS = 100;
|
||||
|
||||
/*
|
||||
* Tablet is added to pendingTablets as well it's id in allTabletTypes.
|
||||
* TabletScheduler will take tablet from pendingTablets but will not remove it's id from allTabletTypes when
|
||||
@ -127,12 +127,11 @@ public class TabletScheduler extends MasterDaemon {
|
||||
private Map<Long, PathSlot> backendsWorkingSlots = Maps.newConcurrentMap();
|
||||
// Tag -> load statistic
|
||||
private Map<Tag, LoadStatisticForTag> statisticMap = Maps.newHashMap();
|
||||
|
||||
private long lastStatUpdateTime = 0;
|
||||
|
||||
private long lastSlotAdjustTime = 0;
|
||||
|
||||
private long lastCheckTimeoutTime = 0;
|
||||
|
||||
private Env env;
|
||||
private SystemInfoService infoService;
|
||||
private TabletInvertedIndex invertedIndex;
|
||||
@ -151,19 +150,19 @@ public class TabletScheduler extends MasterDaemon {
|
||||
|
||||
public TabletScheduler(Env env, SystemInfoService infoService, TabletInvertedIndex invertedIndex,
|
||||
TabletSchedulerStat stat, String rebalancerType) {
|
||||
super("tablet scheduler", SCHEDULE_INTERVAL_MS);
|
||||
super("tablet scheduler", FeConstants.tablet_schedule_interval_ms);
|
||||
this.env = env;
|
||||
this.infoService = infoService;
|
||||
this.invertedIndex = invertedIndex;
|
||||
this.colocateTableIndex = env.getColocateTableIndex();
|
||||
this.stat = stat;
|
||||
if (rebalancerType.equalsIgnoreCase("partition")) {
|
||||
this.rebalancer = new PartitionRebalancer(infoService, invertedIndex);
|
||||
this.rebalancer = new PartitionRebalancer(infoService, invertedIndex, backendsWorkingSlots);
|
||||
} else {
|
||||
this.rebalancer = new BeLoadRebalancer(infoService, invertedIndex);
|
||||
this.rebalancer = new BeLoadRebalancer(infoService, invertedIndex, backendsWorkingSlots);
|
||||
}
|
||||
// if rebalancer can not get new task, then use diskRebalancer to get task
|
||||
this.diskRebalancer = new DiskRebalancer(infoService, invertedIndex);
|
||||
this.diskRebalancer = new DiskRebalancer(infoService, invertedIndex, backendsWorkingSlots);
|
||||
}
|
||||
|
||||
public TabletSchedulerStat getStat() {
|
||||
@ -190,10 +189,11 @@ public class TabletScheduler extends MasterDaemon {
|
||||
Set<Long> deletedBeIds = Sets.newHashSet();
|
||||
for (Long beId : backendsWorkingSlots.keySet()) {
|
||||
if (backends.containsKey(beId)) {
|
||||
List<Long> pathHashes = backends.get(beId).getDisks().values().stream()
|
||||
Map<Long, TStorageMedium> paths = Maps.newHashMap();
|
||||
backends.get(beId).getDisks().values().stream()
|
||||
.filter(v -> v.getState() == DiskState.ONLINE)
|
||||
.map(DiskInfo::getPathHash).collect(Collectors.toList());
|
||||
backendsWorkingSlots.get(beId).updatePaths(pathHashes);
|
||||
.forEach(v -> paths.put(v.getPathHash(), v.getStorageMedium()));
|
||||
backendsWorkingSlots.get(beId).updatePaths(paths);
|
||||
} else {
|
||||
deletedBeIds.add(beId);
|
||||
}
|
||||
@ -208,9 +208,11 @@ public class TabletScheduler extends MasterDaemon {
|
||||
// add new backends
|
||||
for (Backend be : backends.values()) {
|
||||
if (!backendsWorkingSlots.containsKey(be.getId())) {
|
||||
List<Long> pathHashes = be.getDisks().values().stream()
|
||||
.map(DiskInfo::getPathHash).collect(Collectors.toList());
|
||||
PathSlot slot = new PathSlot(pathHashes, be.getId());
|
||||
Map<Long, TStorageMedium> paths = Maps.newHashMap();
|
||||
be.getDisks().values().stream()
|
||||
.filter(v -> v.getState() == DiskState.ONLINE)
|
||||
.forEach(v -> paths.put(v.getPathHash(), v.getStorageMedium()));
|
||||
PathSlot slot = new PathSlot(paths, be.getId());
|
||||
backendsWorkingSlots.put(be.getId(), slot);
|
||||
LOG.info("add new backend {} with slots num: {}", be.getId(), be.getDisks().size());
|
||||
}
|
||||
@ -261,9 +263,7 @@ public class TabletScheduler extends MasterDaemon {
|
||||
|
||||
pendingTablets.offer(tablet);
|
||||
if (!contains) {
|
||||
LOG.info("Add tablet to pending queue, tablet id {}, type {}, status {}, priority {}",
|
||||
tablet.getTabletId(), tablet.getType(), tablet.getTabletStatus(),
|
||||
tablet.getPriority());
|
||||
LOG.info("Add tablet to pending queue, {}", tablet);
|
||||
}
|
||||
|
||||
return AddResult.ADDED;
|
||||
@ -319,24 +319,16 @@ public class TabletScheduler extends MasterDaemon {
|
||||
return;
|
||||
}
|
||||
|
||||
if (System.currentTimeMillis() - lastCheckTimeoutTime >= 1000L) {
|
||||
updateLoadStatisticsAndPriorityIfNecessary();
|
||||
handleRunningTablets();
|
||||
selectTabletsForBalance();
|
||||
lastCheckTimeoutTime = System.currentTimeMillis();
|
||||
}
|
||||
|
||||
updateLoadStatistics();
|
||||
handleRunningTablets();
|
||||
selectTabletsForBalance();
|
||||
schedulePendingTablets();
|
||||
|
||||
stat.counterTabletScheduleRound.incrementAndGet();
|
||||
}
|
||||
|
||||
|
||||
private void updateLoadStatisticsAndPriorityIfNecessary() {
|
||||
if (System.currentTimeMillis() - lastStatUpdateTime < STAT_UPDATE_INTERVAL_MS) {
|
||||
return;
|
||||
}
|
||||
|
||||
private void updateLoadStatistics() {
|
||||
updateLoadStatistic();
|
||||
rebalancer.updateLoadStatistic(statisticMap);
|
||||
diskRebalancer.updateLoadStatistic(statisticMap);
|
||||
@ -359,6 +351,12 @@ public class TabletScheduler extends MasterDaemon {
|
||||
newStatisticMap.put(tag, loadStatistic);
|
||||
LOG.debug("update load statistic for tag {}:\n{}", tag, loadStatistic.getBrief());
|
||||
}
|
||||
Map<Long, Long> pathsCopingSize = getPathsCopingSize();
|
||||
for (LoadStatisticForTag loadStatistic : newStatisticMap.values()) {
|
||||
for (BackendLoadStatistic beLoadStatistic : loadStatistic.getBackendLoadStatistics()) {
|
||||
beLoadStatistic.incrPathsCopingSize(pathsCopingSize);
|
||||
}
|
||||
}
|
||||
|
||||
this.statisticMap = newStatisticMap;
|
||||
}
|
||||
@ -584,6 +582,7 @@ public class TabletScheduler extends MasterDaemon {
|
||||
// we do not concern priority here.
|
||||
// once we take the tablet out of priority queue, priority is meaningless.
|
||||
tabletCtx.setTablet(tablet);
|
||||
tabletCtx.updateTabletSize();
|
||||
tabletCtx.setVersionInfo(partition.getVisibleVersion(), partition.getCommittedVersion());
|
||||
tabletCtx.setSchemaHash(tbl.getSchemaHashByIndexId(idx.getId()));
|
||||
tabletCtx.setStorageMedium(tbl.getPartitionInfo().getDataProperty(partition.getId()).getStorageMedium());
|
||||
@ -691,6 +690,7 @@ public class TabletScheduler extends MasterDaemon {
|
||||
|
||||
// create clone task
|
||||
batchTask.addTask(tabletCtx.createCloneReplicaAndTask());
|
||||
incrDestPathCopingSize(tabletCtx);
|
||||
}
|
||||
|
||||
// In dealing with the case of missing replicas, we need to select a tag with missing replicas
|
||||
@ -782,6 +782,7 @@ public class TabletScheduler extends MasterDaemon {
|
||||
private void handleReplicaRelocating(TabletSchedCtx tabletCtx, AgentBatchTask batchTask)
|
||||
throws SchedException {
|
||||
stat.counterReplicaUnavailableErr.incrementAndGet();
|
||||
tabletCtx.setTabletStatus(TabletStatus.VERSION_INCOMPLETE);
|
||||
handleReplicaVersionIncomplete(tabletCtx, batchTask);
|
||||
}
|
||||
|
||||
@ -1202,6 +1203,7 @@ public class TabletScheduler extends MasterDaemon {
|
||||
|
||||
// create clone task
|
||||
batchTask.addTask(tabletCtx.createCloneReplicaAndTask());
|
||||
incrDestPathCopingSize(tabletCtx);
|
||||
}
|
||||
|
||||
/**
|
||||
@ -1214,16 +1216,23 @@ public class TabletScheduler extends MasterDaemon {
|
||||
return;
|
||||
}
|
||||
|
||||
long numOfBalancingTablets = getBalanceTabletsNumber();
|
||||
if (numOfBalancingTablets > Config.max_balancing_tablets) {
|
||||
LOG.info("number of balancing tablets {} exceed limit: {}, skip selecting tablets for balance",
|
||||
numOfBalancingTablets, Config.max_balancing_tablets);
|
||||
// No need to prefetch too many balance task to pending queue.
|
||||
// Because for every sched, it will re select the balance task.
|
||||
int needAddBalanceNum = Math.min(Config.schedule_batch_size - getPendingNum(),
|
||||
Config.max_balancing_tablets - getBalanceTabletsNumber());
|
||||
if (needAddBalanceNum <= 0) {
|
||||
return;
|
||||
}
|
||||
|
||||
List<TabletSchedCtx> alternativeTablets = rebalancer.selectAlternativeTablets();
|
||||
Collections.shuffle(alternativeTablets);
|
||||
for (TabletSchedCtx tabletCtx : alternativeTablets) {
|
||||
addTablet(tabletCtx, false);
|
||||
if (addTablet(tabletCtx, false) == AddResult.ADDED) {
|
||||
needAddBalanceNum--;
|
||||
if (needAddBalanceNum <= 0) {
|
||||
return;
|
||||
}
|
||||
}
|
||||
}
|
||||
if (Config.disable_disk_balance) {
|
||||
LOG.info("disk balance is disabled. skip selecting tablets for disk balance");
|
||||
@ -1237,7 +1246,12 @@ public class TabletScheduler extends MasterDaemon {
|
||||
for (TabletSchedCtx tabletCtx : diskBalanceTablets) {
|
||||
// add if task from prio backend or cluster is balanced
|
||||
if (alternativeTablets.isEmpty() || tabletCtx.getPriority() == TabletSchedCtx.Priority.NORMAL) {
|
||||
addTablet(tabletCtx, false);
|
||||
if (addTablet(tabletCtx, false) == AddResult.ADDED) {
|
||||
needAddBalanceNum--;
|
||||
if (needAddBalanceNum <= 0) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -1249,16 +1263,17 @@ public class TabletScheduler extends MasterDaemon {
|
||||
stat.counterBalanceSchedule.incrementAndGet();
|
||||
AgentTask task = null;
|
||||
if (tabletCtx.getBalanceType() == TabletSchedCtx.BalanceType.DISK_BALANCE) {
|
||||
task = diskRebalancer.createBalanceTask(tabletCtx, backendsWorkingSlots);
|
||||
task = diskRebalancer.createBalanceTask(tabletCtx);
|
||||
checkDiskBalanceLastSuccTime(tabletCtx.getSrcBackendId(), tabletCtx.getSrcPathHash());
|
||||
checkDiskBalanceLastSuccTime(tabletCtx.getDestBackendId(), tabletCtx.getDestPathHash());
|
||||
} else if (tabletCtx.getBalanceType() == TabletSchedCtx.BalanceType.BE_BALANCE) {
|
||||
task = rebalancer.createBalanceTask(tabletCtx, backendsWorkingSlots);
|
||||
task = rebalancer.createBalanceTask(tabletCtx);
|
||||
} else {
|
||||
throw new SchedException(Status.UNRECOVERABLE,
|
||||
"unknown balance type: " + tabletCtx.getBalanceType().toString());
|
||||
}
|
||||
batchTask.addTask(task);
|
||||
incrDestPathCopingSize(tabletCtx);
|
||||
}
|
||||
|
||||
// choose a path on a backend which is fit for the tablet
|
||||
@ -1294,7 +1309,7 @@ public class TabletScheduler extends MasterDaemon {
|
||||
|
||||
// get all available paths which this tablet can fit in.
|
||||
// beStatistics is sorted by mix load score in ascend order, so select from first to last.
|
||||
List<RootPathLoadStatistic> allFitPaths = Lists.newArrayList();
|
||||
List<BePathLoadStatPair> allFitPaths = Lists.newArrayList();
|
||||
for (BackendLoadStatistic bes : beStatistics) {
|
||||
if (!bes.isAvailable()) {
|
||||
LOG.debug("backend {} is not available, skip. tablet: {}", bes.getBeId(), tabletCtx.getTabletId());
|
||||
@ -1343,18 +1358,21 @@ public class TabletScheduler extends MasterDaemon {
|
||||
}
|
||||
}
|
||||
|
||||
Preconditions.checkState(resultPaths.size() == 1);
|
||||
allFitPaths.add(resultPaths.get(0));
|
||||
resultPaths.stream().forEach(path -> allFitPaths.add(new BePathLoadStatPair(bes, path)));
|
||||
}
|
||||
|
||||
if (allFitPaths.isEmpty()) {
|
||||
throw new SchedException(Status.UNRECOVERABLE, "unable to find dest path for new replica");
|
||||
}
|
||||
|
||||
BePathLoadStatPairComparator comparator = new BePathLoadStatPairComparator(allFitPaths);
|
||||
Collections.sort(allFitPaths, comparator);
|
||||
|
||||
// all fit paths has already been sorted by load score in 'allFitPaths' in ascend order.
|
||||
// just get first available path.
|
||||
// we try to find a path with specified media type, if not find, arbitrarily use one.
|
||||
for (RootPathLoadStatistic rootPathLoadStatistic : allFitPaths) {
|
||||
for (BePathLoadStatPair bePathLoadStat : allFitPaths) {
|
||||
RootPathLoadStatistic rootPathLoadStatistic = bePathLoadStat.getPathLoadStatistic();
|
||||
if (rootPathLoadStatistic.getStorageMedium() != tabletCtx.getStorageMedium()) {
|
||||
LOG.debug("backend {}'s path {}'s storage medium {} "
|
||||
+ "is not equal to tablet's storage medium {}, skip. tablet: {}",
|
||||
@ -1385,7 +1403,8 @@ public class TabletScheduler extends MasterDaemon {
|
||||
boolean hasBePath = false;
|
||||
|
||||
// no root path with specified media type is found, get arbitrary one.
|
||||
for (RootPathLoadStatistic rootPathLoadStatistic : allFitPaths) {
|
||||
for (BePathLoadStatPair bePathLoadStat : allFitPaths) {
|
||||
RootPathLoadStatistic rootPathLoadStatistic = bePathLoadStat.getPathLoadStatistic();
|
||||
PathSlot slot = backendsWorkingSlots.get(rootPathLoadStatistic.getBeId());
|
||||
if (slot == null) {
|
||||
LOG.debug("backend {}'s path {}'s slot is null, skip. tablet: {}",
|
||||
@ -1622,7 +1641,10 @@ public class TabletScheduler extends MasterDaemon {
|
||||
tabletCtx.increaseFailedRunningCounter();
|
||||
if (!tabletCtx.isExceedFailedRunningLimit()) {
|
||||
stat.counterCloneTaskFailed.incrementAndGet();
|
||||
addToRunningTablets(tabletCtx);
|
||||
tabletCtx.releaseResource(this);
|
||||
tabletCtx.resetFailedSchedCounter();
|
||||
tabletCtx.setState(TabletSchedCtx.State.PENDING);
|
||||
addBackToPendingTablets(tabletCtx);
|
||||
return false;
|
||||
} else {
|
||||
// unrecoverable
|
||||
@ -1767,9 +1789,42 @@ public class TabletScheduler extends MasterDaemon {
|
||||
return allTabletTypes.size();
|
||||
}
|
||||
|
||||
public synchronized long getBalanceTabletsNumber() {
|
||||
return pendingTablets.stream().filter(t -> t.getType() == Type.BALANCE).count()
|
||||
+ runningTablets.values().stream().filter(t -> t.getType() == Type.BALANCE).count();
|
||||
public synchronized int getBalanceTabletsNumber() {
|
||||
return (int) (pendingTablets.stream().filter(t -> t.getType() == Type.BALANCE).count()
|
||||
+ runningTablets.values().stream().filter(t -> t.getType() == Type.BALANCE).count());
|
||||
}
|
||||
|
||||
private synchronized Map<Long, Long> getPathsCopingSize() {
|
||||
Map<Long, Long> pathsCopingSize = Maps.newHashMap();
|
||||
for (TabletSchedCtx tablet : runningTablets.values()) {
|
||||
long pathHash = tablet.getDestPathHash();
|
||||
if (pathHash == 0 || pathHash == -1) {
|
||||
continue;
|
||||
}
|
||||
|
||||
long copingSize = tablet.getDestEstimatedCopingSize();
|
||||
if (copingSize > 0) {
|
||||
Long size = pathsCopingSize.getOrDefault(pathHash, 0L);
|
||||
pathsCopingSize.put(pathHash, size + copingSize);
|
||||
}
|
||||
}
|
||||
return pathsCopingSize;
|
||||
}
|
||||
|
||||
private void incrDestPathCopingSize(TabletSchedCtx tablet) {
|
||||
long destPathHash = tablet.getDestPathHash();
|
||||
if (destPathHash == -1 || destPathHash == 0) {
|
||||
return;
|
||||
}
|
||||
|
||||
for (LoadStatisticForTag loadStatistic : statisticMap.values()) {
|
||||
BackendLoadStatistic beLoadStatistic = loadStatistic.getBackendLoadStatistics().stream()
|
||||
.filter(v -> v.getBeId() == tablet.getDestBackendId()).findFirst().orElse(null);
|
||||
if (beLoadStatistic != null) {
|
||||
beLoadStatistic.incrPathCopingSize(destPathHash, tablet.getDestEstimatedCopingSize());
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
@ -1782,22 +1837,22 @@ public class TabletScheduler extends MasterDaemon {
|
||||
private Map<Long, Slot> pathSlots = Maps.newConcurrentMap();
|
||||
private long beId;
|
||||
|
||||
public PathSlot(List<Long> paths, long beId) {
|
||||
public PathSlot(Map<Long, TStorageMedium> paths, long beId) {
|
||||
this.beId = beId;
|
||||
for (Long pathHash : paths) {
|
||||
pathSlots.put(pathHash, new Slot(beId));
|
||||
for (Map.Entry<Long, TStorageMedium> entry : paths.entrySet()) {
|
||||
pathSlots.put(entry.getKey(), new Slot(entry.getValue()));
|
||||
}
|
||||
}
|
||||
|
||||
// update the path
|
||||
public synchronized void updatePaths(List<Long> paths) {
|
||||
public synchronized void updatePaths(Map<Long, TStorageMedium> paths) {
|
||||
// delete non exist path
|
||||
pathSlots.entrySet().removeIf(entry -> !paths.contains(entry.getKey()));
|
||||
pathSlots.entrySet().removeIf(entry -> !paths.containsKey(entry.getKey()));
|
||||
|
||||
// add new path
|
||||
for (Long pathHash : paths) {
|
||||
if (!pathSlots.containsKey(pathHash)) {
|
||||
pathSlots.put(pathHash, new Slot(beId));
|
||||
for (Map.Entry<Long, TStorageMedium> entry : paths.entrySet()) {
|
||||
if (!pathSlots.containsKey(entry.getKey())) {
|
||||
pathSlots.put(entry.getKey(), new Slot(entry.getValue()));
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -1829,6 +1884,20 @@ public class TabletScheduler extends MasterDaemon {
|
||||
return true;
|
||||
}
|
||||
|
||||
public synchronized boolean hasAvailableBalanceSlot(long pathHash) {
|
||||
if (pathHash == -1) {
|
||||
return false;
|
||||
}
|
||||
Slot slot = pathSlots.get(pathHash);
|
||||
if (slot == null) {
|
||||
return false;
|
||||
}
|
||||
if (slot.getAvailableBalance() == 0) {
|
||||
return false;
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
/**
|
||||
* If the specified 'pathHash' has available slot, decrease the slot number and return this path hash
|
||||
*/
|
||||
@ -1872,27 +1941,27 @@ public class TabletScheduler extends MasterDaemon {
|
||||
return total;
|
||||
}
|
||||
|
||||
public synchronized int getTotalAvailBalanceSlotNum() {
|
||||
int num = 0;
|
||||
for (Slot slot : pathSlots.values()) {
|
||||
num += slot.getAvailableBalance();
|
||||
}
|
||||
return num;
|
||||
}
|
||||
|
||||
/**
|
||||
* get path whose balance slot num is larger than 0
|
||||
*/
|
||||
public synchronized Set<Long> getAvailPathsForBalance() {
|
||||
Set<Long> pathHashs = Sets.newHashSet();
|
||||
for (Map.Entry<Long, Slot> entry : pathSlots.entrySet()) {
|
||||
if (entry.getValue().getBalanceAvailable() > 0) {
|
||||
if (entry.getValue().getAvailableBalance() > 0) {
|
||||
pathHashs.add(entry.getKey());
|
||||
}
|
||||
}
|
||||
return pathHashs;
|
||||
}
|
||||
|
||||
public synchronized int getAvailBalanceSlotNum() {
|
||||
int num = 0;
|
||||
for (Map.Entry<Long, Slot> entry : pathSlots.entrySet()) {
|
||||
num += entry.getValue().getBalanceAvailable();
|
||||
}
|
||||
return num;
|
||||
}
|
||||
|
||||
public synchronized List<List<String>> getSlotInfo(long beId) {
|
||||
List<List<String>> results = Lists.newArrayList();
|
||||
pathSlots.forEach((key, value) -> {
|
||||
@ -1901,13 +1970,18 @@ public class TabletScheduler extends MasterDaemon {
|
||||
result.add(String.valueOf(key));
|
||||
result.add(String.valueOf(value.getAvailable()));
|
||||
result.add(String.valueOf(value.getTotal()));
|
||||
result.add(String.valueOf(value.getBalanceAvailable()));
|
||||
result.add(String.valueOf(value.getAvailableBalance()));
|
||||
result.add(String.valueOf(value.getAvgRate()));
|
||||
results.add(result);
|
||||
});
|
||||
return results;
|
||||
}
|
||||
|
||||
public synchronized int getAvailableBalanceNum(long pathHash) {
|
||||
Slot slot = pathSlots.get(pathHash);
|
||||
return slot != null ? slot.getAvailableBalance() : 0;
|
||||
}
|
||||
|
||||
public synchronized long takeBalanceSlot(long pathHash) {
|
||||
Slot slot = pathSlots.get(pathHash);
|
||||
if (slot == null) {
|
||||
@ -1980,10 +2054,10 @@ public class TabletScheduler extends MasterDaemon {
|
||||
// for disk balance
|
||||
public long diskBalanceLastSuccTime = 0;
|
||||
|
||||
private long beId;
|
||||
private TStorageMedium storageMedium;
|
||||
|
||||
public Slot(long beId) {
|
||||
this.beId = beId;
|
||||
public Slot(TStorageMedium storageMedium) {
|
||||
this.storageMedium = storageMedium;
|
||||
this.used = 0;
|
||||
this.balanceUsed = 0;
|
||||
}
|
||||
@ -1993,18 +2067,16 @@ public class TabletScheduler extends MasterDaemon {
|
||||
}
|
||||
|
||||
public int getTotal() {
|
||||
int total = Math.max(1, Config.schedule_slot_num_per_path);
|
||||
|
||||
Backend be = Env.getCurrentSystemInfo().getBackend(beId);
|
||||
if (be != null && be.isDecommissioned()) {
|
||||
total = Math.max(1, Config.schedule_decommission_slot_num_per_path);
|
||||
if (storageMedium == TStorageMedium.SSD) {
|
||||
return Config.schedule_slot_num_per_ssd_path;
|
||||
} else {
|
||||
return Config.schedule_slot_num_per_hdd_path;
|
||||
}
|
||||
|
||||
return total;
|
||||
}
|
||||
|
||||
public int getBalanceAvailable() {
|
||||
return Math.max(0, getBalanceTotal() - balanceUsed);
|
||||
public int getAvailableBalance() {
|
||||
int leftBalance = Math.max(0, getBalanceTotal() - balanceUsed);
|
||||
return Math.min(leftBalance, getAvailable());
|
||||
}
|
||||
|
||||
public int getBalanceTotal() {
|
||||
|
||||
@ -67,6 +67,8 @@ public class FeConstants {
|
||||
public static String null_string = "\\N";
|
||||
|
||||
public static long tablet_checker_interval_ms = 20 * 1000L;
|
||||
public static long tablet_schedule_interval_ms = 100L;
|
||||
|
||||
public static String csv = "csv";
|
||||
public static String csv_with_names = "csv_with_names";
|
||||
public static String csv_with_names_and_types = "csv_with_names_and_types";
|
||||
|
||||
@ -37,7 +37,7 @@ public class TabletSchedulerDetailProcDir implements ProcDirInterface {
|
||||
public static final ImmutableList<String> TITLE_NAMES = new ImmutableList.Builder<String>().add("TabletId")
|
||||
.add("Type").add("Medium").add("Status").add("State").add("SchedCode").add("Priority").add("SrcBe")
|
||||
.add("SrcPath").add("DestBe").add("DestPath").add("Timeout").add("Create").add("LstSched").add("LstVisit")
|
||||
.add("Finished").add("Rate").add("FailedSched").add("FailedRunning").add("VisibleVer")
|
||||
.add("Finished").add("ReplicaSize").add("Rate").add("FailedSched").add("FailedRunning").add("VisibleVer")
|
||||
.add("CmtVer").add("ErrMsg")
|
||||
.build();
|
||||
|
||||
|
||||
@ -33,6 +33,7 @@ public class CloneTask extends AgentTask {
|
||||
private long replicaId;
|
||||
private List<TBackend> srcBackends;
|
||||
private TStorageMedium storageMedium;
|
||||
private TBackend destBackend;
|
||||
|
||||
private long visibleVersion;
|
||||
|
||||
@ -43,10 +44,11 @@ public class CloneTask extends AgentTask {
|
||||
|
||||
private int taskVersion = VERSION_1;
|
||||
|
||||
public CloneTask(long backendId, long dbId, long tableId, long partitionId, long indexId, long tabletId,
|
||||
long replicaId, int schemaHash, List<TBackend> srcBackends, TStorageMedium storageMedium,
|
||||
long visibleVersion, int timeoutS) {
|
||||
public CloneTask(TBackend destBackend, long backendId, long dbId, long tableId, long partitionId,
|
||||
long indexId, long tabletId, long replicaId, int schemaHash, List<TBackend> srcBackends,
|
||||
TStorageMedium storageMedium, long visibleVersion, int timeoutS) {
|
||||
super(null, backendId, TTaskType.CLONE, dbId, tableId, partitionId, indexId, tabletId);
|
||||
this.destBackend = destBackend;
|
||||
this.replicaId = replicaId;
|
||||
this.schemaHash = schemaHash;
|
||||
this.srcBackends = srcBackends;
|
||||
@ -95,15 +97,16 @@ public class CloneTask extends AgentTask {
|
||||
@Override
|
||||
public String toString() {
|
||||
StringBuilder sb = new StringBuilder();
|
||||
sb.append("tablet id: ").append(tabletId).append(", replica id: ").append(replicaId).append(", schema hash: ")
|
||||
.append(schemaHash);
|
||||
sb.append("tablet id: ").append(tabletId)
|
||||
.append(", replica id: ").append(replicaId)
|
||||
.append(", schema hash: ").append(schemaHash);
|
||||
sb.append(", storageMedium: ").append(storageMedium.name());
|
||||
sb.append(", visible version: ").append(visibleVersion);
|
||||
sb.append(", src backend: ").append(srcBackends.get(0).getHost())
|
||||
.append(", src path hash: ").append(srcPathHash);
|
||||
sb.append(", src backend: ").append(srcBackends.get(0).getHost()).append(", src path hash: ")
|
||||
.append(srcPathHash);
|
||||
sb.append(", dest backend: ").append(backendId).append(", dest path hash: ").append(destPathHash);
|
||||
sb.append(", dest backend id: ").append(backendId)
|
||||
.append(", dest backend: ").append(destBackend.getHost())
|
||||
.append(", dest path hash: ").append(destPathHash);
|
||||
return sb.toString();
|
||||
}
|
||||
}
|
||||
|
||||
@ -0,0 +1,174 @@
|
||||
// Licensed to the Apache Software Foundation (ASF) under one
|
||||
// or more contributor license agreements. See the NOTICE file
|
||||
// distributed with this work for additional information
|
||||
// regarding copyright ownership. The ASF licenses this file
|
||||
// to you under the Apache License, Version 2.0 (the
|
||||
// "License"); you may not use this file except in compliance
|
||||
// with the License. You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing,
|
||||
// software distributed under the License is distributed on an
|
||||
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
// KIND, either express or implied. See the License for the
|
||||
// specific language governing permissions and limitations
|
||||
// under the License.
|
||||
|
||||
package org.apache.doris.clone;
|
||||
|
||||
import org.apache.doris.analysis.AlterSystemStmt;
|
||||
import org.apache.doris.analysis.CreateDbStmt;
|
||||
import org.apache.doris.analysis.CreateTableStmt;
|
||||
import org.apache.doris.catalog.Env;
|
||||
import org.apache.doris.catalog.TabletInvertedIndex;
|
||||
import org.apache.doris.common.Config;
|
||||
import org.apache.doris.common.ExceptionChecker;
|
||||
import org.apache.doris.common.FeConstants;
|
||||
import org.apache.doris.qe.ConnectContext;
|
||||
import org.apache.doris.system.Backend;
|
||||
import org.apache.doris.system.SystemInfoService;
|
||||
import org.apache.doris.thrift.TDisk;
|
||||
import org.apache.doris.thrift.TStorageMedium;
|
||||
import org.apache.doris.utframe.UtFrameUtils;
|
||||
|
||||
import com.google.common.collect.Lists;
|
||||
import com.google.common.collect.Maps;
|
||||
import org.apache.logging.log4j.LogManager;
|
||||
import org.apache.logging.log4j.Logger;
|
||||
import org.junit.AfterClass;
|
||||
import org.junit.Assert;
|
||||
import org.junit.BeforeClass;
|
||||
import org.junit.Test;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Random;
|
||||
import java.util.UUID;
|
||||
|
||||
public class DecommissionTest {
|
||||
private static final Logger LOG = LogManager.getLogger(TabletReplicaTooSlowTest.class);
|
||||
// use a unique dir so that it won't be conflict with other unit test which
|
||||
// may also start a Mocked Frontend
|
||||
private static String runningDirBase = "fe";
|
||||
private static String runningDir = runningDirBase + "/mocked/DecommissionTest/" + UUID.randomUUID() + "/";
|
||||
private static ConnectContext connectContext;
|
||||
|
||||
private static Random random = new Random(System.currentTimeMillis());
|
||||
|
||||
private long id = 10086;
|
||||
|
||||
private final SystemInfoService systemInfoService = new SystemInfoService();
|
||||
private final TabletInvertedIndex invertedIndex = new TabletInvertedIndex();
|
||||
|
||||
@BeforeClass
|
||||
public static void beforeClass() throws Exception {
|
||||
FeConstants.runningUnitTest = true;
|
||||
System.out.println(runningDir);
|
||||
FeConstants.runningUnitTest = true;
|
||||
FeConstants.tablet_checker_interval_ms = 200;
|
||||
FeConstants.tablet_schedule_interval_ms = 2000;
|
||||
Config.tablet_repair_delay_factor_second = 1;
|
||||
Config.enable_round_robin_create_tablet = true;
|
||||
Config.schedule_slot_num_per_hdd_path = 10000;
|
||||
Config.max_scheduling_tablets = 10000;
|
||||
Config.schedule_batch_size = 10000;
|
||||
Config.disable_balance = true;
|
||||
// 4 backends:
|
||||
// 127.0.0.1
|
||||
// 127.0.0.2
|
||||
// 127.0.0.3
|
||||
// 127.0.0.4
|
||||
UtFrameUtils.createDorisClusterWithMultiTag(runningDir, 4);
|
||||
List<Backend> backends = Env.getCurrentSystemInfo().getAllBackends();
|
||||
for (Backend be : backends) {
|
||||
Map<String, TDisk> backendDisks = Maps.newHashMap();
|
||||
TDisk tDisk1 = new TDisk();
|
||||
tDisk1.setRootPath("/home/doris1.HDD");
|
||||
tDisk1.setDiskTotalCapacity(20000000);
|
||||
tDisk1.setDataUsedCapacity(1);
|
||||
tDisk1.setUsed(true);
|
||||
tDisk1.setDiskAvailableCapacity(tDisk1.disk_total_capacity - tDisk1.data_used_capacity);
|
||||
tDisk1.setPathHash(random.nextLong());
|
||||
tDisk1.setStorageMedium(TStorageMedium.HDD);
|
||||
backendDisks.put(tDisk1.getRootPath(), tDisk1);
|
||||
|
||||
TDisk tDisk2 = new TDisk();
|
||||
tDisk2.setRootPath("/home/doris2.HHD");
|
||||
tDisk2.setDiskTotalCapacity(20000000);
|
||||
tDisk2.setDataUsedCapacity(1);
|
||||
tDisk2.setUsed(true);
|
||||
tDisk2.setDiskAvailableCapacity(tDisk2.disk_total_capacity - tDisk2.data_used_capacity);
|
||||
tDisk2.setPathHash(random.nextLong());
|
||||
tDisk2.setStorageMedium(TStorageMedium.HDD);
|
||||
backendDisks.put(tDisk2.getRootPath(), tDisk2);
|
||||
|
||||
be.updateDisks(backendDisks);
|
||||
}
|
||||
|
||||
connectContext = UtFrameUtils.createDefaultCtx();
|
||||
|
||||
// create database
|
||||
String createDbStmtStr = "create database test;";
|
||||
CreateDbStmt createDbStmt = (CreateDbStmt) UtFrameUtils.parseAndAnalyzeStmt(createDbStmtStr, connectContext);
|
||||
Env.getCurrentEnv().createDb(createDbStmt);
|
||||
}
|
||||
|
||||
@AfterClass
|
||||
public static void tearDown() {
|
||||
//UtFrameUtils.cleanDorisFeDir(runningDirBase);
|
||||
}
|
||||
|
||||
private static void createTable(String sql) throws Exception {
|
||||
CreateTableStmt createTableStmt = (CreateTableStmt) UtFrameUtils.parseAndAnalyzeStmt(sql, connectContext);
|
||||
Env.getCurrentEnv().createTable(createTableStmt);
|
||||
RebalancerTestUtil.updateReplicaPathHash();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testDecommissionBackend() throws Exception {
|
||||
// test colocate tablet repair
|
||||
String createStr = "create table test.tbl1\n"
|
||||
+ "(k1 date, k2 int)\n"
|
||||
+ "distributed by hash(k2) buckets 2400\n"
|
||||
+ "properties\n"
|
||||
+ "(\n"
|
||||
+ " \"replication_num\" = \"1\"\n"
|
||||
+ ")";
|
||||
ExceptionChecker.expectThrowsNoException(() -> createTable(createStr));
|
||||
int totalReplicaNum = 1 * 2400;
|
||||
checkBalance(1, totalReplicaNum, 4);
|
||||
|
||||
Backend backend = Env.getCurrentSystemInfo().getAllBackends().get(0);
|
||||
String decommissionStmtStr = "alter system decommission backend \"" + backend.getHost()
|
||||
+ ":" + backend.getHeartbeatPort() + "\"";
|
||||
AlterSystemStmt decommissionStmt =
|
||||
(AlterSystemStmt) UtFrameUtils.parseAndAnalyzeStmt(decommissionStmtStr, connectContext);
|
||||
Env.getCurrentEnv().getAlterInstance().processAlterCluster(decommissionStmt);
|
||||
|
||||
Assert.assertEquals(true, backend.isDecommissioned());
|
||||
|
||||
checkBalance(200, totalReplicaNum, 3);
|
||||
}
|
||||
|
||||
void checkBalance(int tryTimes, int totalReplicaNum, int backendNum) throws Exception {
|
||||
int beReplicaNum = totalReplicaNum / backendNum;
|
||||
for (int i = 0; i < tryTimes; i++) {
|
||||
List<Long> backendIds = Env.getCurrentSystemInfo().getAllBackendIds(true);
|
||||
if (backendNum != backendIds.size() && i != tryTimes - 1) {
|
||||
Thread.sleep(1000);
|
||||
continue;
|
||||
}
|
||||
|
||||
List<Integer> tabletNums = Lists.newArrayList();
|
||||
for (long beId : backendIds) {
|
||||
tabletNums.add(Env.getCurrentInvertedIndex().getTabletNumByBackendId(beId));
|
||||
}
|
||||
|
||||
Assert.assertEquals("tablet nums = " + tabletNums, backendNum, backendIds.size());
|
||||
for (int tabletNum : tabletNums) {
|
||||
Assert.assertEquals("tablet nums = " + tabletNums, beReplicaNum, tabletNum);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -20,7 +20,6 @@ package org.apache.doris.clone;
|
||||
import org.apache.doris.catalog.Column;
|
||||
import org.apache.doris.catalog.DataProperty;
|
||||
import org.apache.doris.catalog.Database;
|
||||
import org.apache.doris.catalog.DiskInfo;
|
||||
import org.apache.doris.catalog.Env;
|
||||
import org.apache.doris.catalog.HashDistributionInfo;
|
||||
import org.apache.doris.catalog.KeysType;
|
||||
@ -36,7 +35,6 @@ import org.apache.doris.common.Config;
|
||||
import org.apache.doris.common.FeConstants;
|
||||
import org.apache.doris.datasource.InternalCatalog;
|
||||
import org.apache.doris.resource.Tag;
|
||||
import org.apache.doris.system.Backend;
|
||||
import org.apache.doris.system.SystemInfoService;
|
||||
import org.apache.doris.task.AgentTask;
|
||||
import org.apache.doris.task.StorageMediaMigrationTask;
|
||||
@ -60,7 +58,6 @@ import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.function.Function;
|
||||
import java.util.stream.Collectors;
|
||||
import java.util.stream.LongStream;
|
||||
|
||||
public class DiskRebalanceTest {
|
||||
@ -79,10 +76,12 @@ public class DiskRebalanceTest {
|
||||
private final SystemInfoService systemInfoService = new SystemInfoService();
|
||||
private final TabletInvertedIndex invertedIndex = new TabletInvertedIndex();
|
||||
private Map<Tag, LoadStatisticForTag> statisticMap;
|
||||
private Map<Long, PathSlot> backendsWorkingSlots = Maps.newHashMap();
|
||||
|
||||
@Before
|
||||
public void setUp() throws Exception {
|
||||
Config.used_capacity_percent_max_diff = 1.0;
|
||||
Config.balance_slot_num_per_path = 1;
|
||||
db = new Database(1, "test db");
|
||||
db.setClusterName(SystemInfoService.DEFAULT_CLUSTER);
|
||||
new Expectations() {
|
||||
@ -137,12 +136,19 @@ public class DiskRebalanceTest {
|
||||
Env.getCurrentGlobalTransactionMgr().isPreviousTransactionsFinished(1, 2, Lists.newArrayList(3L)));
|
||||
}
|
||||
|
||||
private void generateStatisticMap() {
|
||||
private void generateStatisticsAndPathSlots() {
|
||||
LoadStatisticForTag loadStatistic = new LoadStatisticForTag(Tag.DEFAULT_BACKEND_TAG, systemInfoService,
|
||||
invertedIndex);
|
||||
loadStatistic.init();
|
||||
statisticMap = Maps.newHashMap();
|
||||
statisticMap.put(Tag.DEFAULT_BACKEND_TAG, loadStatistic);
|
||||
backendsWorkingSlots.clear();
|
||||
for (BackendLoadStatistic beStat : loadStatistic.getSortedBeLoadStats(null)) {
|
||||
Map<Long, TStorageMedium> paths = Maps.newHashMap();
|
||||
beStat.getPathStatistics().stream().forEach(
|
||||
path -> paths.put(path.getPathHash(), path.getStorageMedium()));
|
||||
backendsWorkingSlots.put(beStat.getBeId(), new PathSlot(paths, beStat.getBeId()));
|
||||
}
|
||||
}
|
||||
|
||||
private void createPartitionsForTable(OlapTable olapTable, MaterializedIndex index, Long partitionCount) {
|
||||
@ -187,8 +193,9 @@ public class DiskRebalanceTest {
|
||||
// case start
|
||||
Configurator.setLevel("org.apache.doris.clone.DiskRebalancer", Level.DEBUG);
|
||||
|
||||
Rebalancer rebalancer = new DiskRebalancer(Env.getCurrentSystemInfo(), Env.getCurrentInvertedIndex());
|
||||
generateStatisticMap();
|
||||
generateStatisticsAndPathSlots();
|
||||
Rebalancer rebalancer = new DiskRebalancer(Env.getCurrentSystemInfo(), Env.getCurrentInvertedIndex(),
|
||||
backendsWorkingSlots);
|
||||
rebalancer.updateLoadStatistic(statisticMap);
|
||||
List<TabletSchedCtx> alternativeTablets = rebalancer.selectAlternativeTablets();
|
||||
// check alternativeTablets;
|
||||
@ -229,8 +236,9 @@ public class DiskRebalanceTest {
|
||||
// case start
|
||||
Configurator.setLevel("org.apache.doris.clone.DiskRebalancer", Level.DEBUG);
|
||||
|
||||
Rebalancer rebalancer = new DiskRebalancer(Env.getCurrentSystemInfo(), Env.getCurrentInvertedIndex());
|
||||
generateStatisticMap();
|
||||
generateStatisticsAndPathSlots();
|
||||
Rebalancer rebalancer = new DiskRebalancer(Env.getCurrentSystemInfo(), Env.getCurrentInvertedIndex(),
|
||||
backendsWorkingSlots);
|
||||
rebalancer.updateLoadStatistic(statisticMap);
|
||||
for (Map.Entry<Tag, LoadStatisticForTag> s : statisticMap.entrySet()) {
|
||||
if (s.getValue() != null) {
|
||||
@ -240,16 +248,6 @@ public class DiskRebalanceTest {
|
||||
List<TabletSchedCtx> alternativeTablets = rebalancer.selectAlternativeTablets();
|
||||
// check alternativeTablets;
|
||||
Assert.assertEquals(2, alternativeTablets.size());
|
||||
Map<Long, PathSlot> backendsWorkingSlots = Maps.newConcurrentMap();
|
||||
for (Backend be : Env.getCurrentSystemInfo().getAllBackends()) {
|
||||
if (!backendsWorkingSlots.containsKey(be.getId())) {
|
||||
List<Long> pathHashes = be.getDisks().values().stream().map(DiskInfo::getPathHash)
|
||||
.collect(Collectors.toList());
|
||||
PathSlot slot = new PathSlot(pathHashes, Config.schedule_slot_num_per_path);
|
||||
backendsWorkingSlots.put(be.getId(), slot);
|
||||
}
|
||||
}
|
||||
|
||||
for (TabletSchedCtx tabletCtx : alternativeTablets) {
|
||||
LOG.info("try to schedule tablet {}", tabletCtx.getTabletId());
|
||||
try {
|
||||
@ -259,7 +257,7 @@ public class DiskRebalanceTest {
|
||||
tabletCtx.setSchemaHash(olapTable.getSchemaHashByIndexId(tabletCtx.getIndexId()));
|
||||
tabletCtx.setTabletStatus(Tablet.TabletStatus.HEALTHY); // rebalance tablet should be healthy first
|
||||
|
||||
AgentTask task = rebalancer.createBalanceTask(tabletCtx, backendsWorkingSlots);
|
||||
AgentTask task = rebalancer.createBalanceTask(tabletCtx);
|
||||
if (tabletCtx.getTabletSize() == 0) {
|
||||
Assert.fail("no exception");
|
||||
} else {
|
||||
|
||||
@ -196,7 +196,7 @@ public class RebalanceTest {
|
||||
|
||||
@Test
|
||||
public void testPrioBackends() {
|
||||
Rebalancer rebalancer = new DiskRebalancer(Env.getCurrentSystemInfo(), Env.getCurrentInvertedIndex());
|
||||
Rebalancer rebalancer = new DiskRebalancer(Env.getCurrentSystemInfo(), Env.getCurrentInvertedIndex(), null);
|
||||
// add
|
||||
{ // CHECKSTYLE IGNORE THIS LINE
|
||||
List<Backend> backends = Lists.newArrayList();
|
||||
@ -232,7 +232,7 @@ public class RebalanceTest {
|
||||
// Call runAfterCatalogReady manually instead of starting daemon thread
|
||||
TabletSchedulerStat stat = new TabletSchedulerStat();
|
||||
PartitionRebalancer rebalancer = new PartitionRebalancer(Env.getCurrentSystemInfo(),
|
||||
Env.getCurrentInvertedIndex());
|
||||
Env.getCurrentInvertedIndex(), null);
|
||||
TabletScheduler tabletScheduler = new TabletScheduler(env, systemInfoService, invertedIndex, stat, "");
|
||||
// The rebalancer inside the scheduler will use this rebalancer, for getToDeleteReplicaId
|
||||
Deencapsulation.setField(tabletScheduler, "rebalancer", rebalancer);
|
||||
@ -256,7 +256,7 @@ public class RebalanceTest {
|
||||
tabletCtx.setTabletStatus(Tablet.TabletStatus.HEALTHY); // rebalance tablet should be healthy first
|
||||
|
||||
// createCloneReplicaAndTask, create replica will change invertedIndex too.
|
||||
AgentTask task = rebalancer.createBalanceTask(tabletCtx, tabletScheduler.getBackendsWorkingSlots());
|
||||
AgentTask task = rebalancer.createBalanceTask(tabletCtx);
|
||||
batchTask.addTask(task);
|
||||
} catch (SchedException e) {
|
||||
LOG.warn("schedule tablet {} failed: {}", tabletCtx.getTabletId(), e.getMessage());
|
||||
|
||||
@ -19,6 +19,7 @@ package org.apache.doris.clone;
|
||||
|
||||
import org.apache.doris.catalog.Database;
|
||||
import org.apache.doris.catalog.DiskInfo;
|
||||
import org.apache.doris.catalog.Env;
|
||||
import org.apache.doris.catalog.MaterializedIndex;
|
||||
import org.apache.doris.catalog.OlapTable;
|
||||
import org.apache.doris.catalog.Partition;
|
||||
@ -32,6 +33,7 @@ import org.apache.doris.thrift.TStorageMedium;
|
||||
import com.google.common.collect.ImmutableMap;
|
||||
import com.google.common.collect.Lists;
|
||||
import com.google.common.collect.Maps;
|
||||
import com.google.common.collect.Table;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
@ -106,4 +108,25 @@ public class RebalancerTestUtil {
|
||||
invertedIndex.addReplica(tablet.getId(), replica);
|
||||
});
|
||||
}
|
||||
|
||||
public static void updateReplicaPathHash() {
|
||||
Table<Long, Long, Replica> replicaMetaTable = Env.getCurrentInvertedIndex().getReplicaMetaTable();
|
||||
for (Table.Cell<Long, Long, Replica> cell : replicaMetaTable.cellSet()) {
|
||||
long beId = cell.getColumnKey();
|
||||
Backend be = Env.getCurrentSystemInfo().getBackend(beId);
|
||||
if (be == null) {
|
||||
continue;
|
||||
}
|
||||
Replica replica = cell.getValue();
|
||||
TabletMeta tabletMeta = Env.getCurrentInvertedIndex().getTabletMeta(cell.getRowKey());
|
||||
ImmutableMap<String, DiskInfo> diskMap = be.getDisks();
|
||||
for (DiskInfo diskInfo : diskMap.values()) {
|
||||
if (diskInfo.getStorageMedium() == tabletMeta.getStorageMedium()) {
|
||||
replica.setPathHash(diskInfo.getPathHash());
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@ -31,18 +31,22 @@ public class RootPathLoadStatisticTest {
|
||||
|
||||
@Test
|
||||
public void test() {
|
||||
RootPathLoadStatistic usageLow = new RootPathLoadStatistic(0L, "/home/disk1", 12345L, TStorageMedium.HDD, 4096L,
|
||||
RootPathLoadStatistic usage1 = new RootPathLoadStatistic(0L, "/home/disk1", 12345L, TStorageMedium.HDD, 4096L,
|
||||
1024L, DiskState.ONLINE);
|
||||
RootPathLoadStatistic usageHigh = new RootPathLoadStatistic(0L, "/home/disk2", 67890L, TStorageMedium.HDD,
|
||||
RootPathLoadStatistic usage2 = new RootPathLoadStatistic(0L, "/home/disk2", 67890L, TStorageMedium.HDD,
|
||||
4096L, 2048L, DiskState.ONLINE);
|
||||
|
||||
List<RootPathLoadStatistic> list = Lists.newArrayList();
|
||||
list.add(usageLow);
|
||||
list.add(usageHigh);
|
||||
list.add(usage1);
|
||||
list.add(usage2);
|
||||
|
||||
// low usage should be ahead
|
||||
Collections.sort(list);
|
||||
Assert.assertTrue(list.get(0).getPathHash() == usageLow.getPathHash());
|
||||
Assert.assertTrue(list.get(0).getPathHash() == usage1.getPathHash());
|
||||
|
||||
usage1.incrCopingSizeB(2048L);
|
||||
Collections.sort(list);
|
||||
Assert.assertTrue(list.get(1).getPathHash() == usage1.getPathHash());
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@ -26,7 +26,6 @@ import org.apache.doris.analysis.DropTableStmt;
|
||||
import org.apache.doris.catalog.ColocateGroupSchema;
|
||||
import org.apache.doris.catalog.ColocateTableIndex;
|
||||
import org.apache.doris.catalog.Database;
|
||||
import org.apache.doris.catalog.DiskInfo;
|
||||
import org.apache.doris.catalog.Env;
|
||||
import org.apache.doris.catalog.MaterializedIndex;
|
||||
import org.apache.doris.catalog.OlapTable;
|
||||
@ -36,7 +35,6 @@ import org.apache.doris.catalog.Replica;
|
||||
import org.apache.doris.catalog.ReplicaAllocation;
|
||||
import org.apache.doris.catalog.Tablet;
|
||||
import org.apache.doris.catalog.TabletInvertedIndex;
|
||||
import org.apache.doris.catalog.TabletMeta;
|
||||
import org.apache.doris.common.AnalysisException;
|
||||
import org.apache.doris.common.Config;
|
||||
import org.apache.doris.common.DdlException;
|
||||
@ -54,7 +52,6 @@ import org.apache.doris.thrift.TDisk;
|
||||
import org.apache.doris.thrift.TStorageMedium;
|
||||
import org.apache.doris.utframe.UtFrameUtils;
|
||||
|
||||
import com.google.common.collect.ImmutableMap;
|
||||
import com.google.common.collect.Lists;
|
||||
import com.google.common.collect.Maps;
|
||||
import com.google.common.collect.Table;
|
||||
@ -162,7 +159,7 @@ public class TabletRepairAndBalanceTest {
|
||||
CreateTableStmt createTableStmt = (CreateTableStmt) UtFrameUtils.parseAndAnalyzeStmt(sql, connectContext);
|
||||
Env.getCurrentEnv().createTable(createTableStmt);
|
||||
// must set replicas' path hash, or the tablet scheduler won't work
|
||||
updateReplicaPathHash();
|
||||
RebalancerTestUtil.updateReplicaPathHash();
|
||||
}
|
||||
|
||||
private static void dropTable(String sql) throws Exception {
|
||||
@ -170,26 +167,6 @@ public class TabletRepairAndBalanceTest {
|
||||
Env.getCurrentEnv().dropTable(dropTableStmt);
|
||||
}
|
||||
|
||||
private static void updateReplicaPathHash() {
|
||||
Table<Long, Long, Replica> replicaMetaTable = Env.getCurrentInvertedIndex().getReplicaMetaTable();
|
||||
for (Table.Cell<Long, Long, Replica> cell : replicaMetaTable.cellSet()) {
|
||||
long beId = cell.getColumnKey();
|
||||
Backend be = Env.getCurrentSystemInfo().getBackend(beId);
|
||||
if (be == null) {
|
||||
continue;
|
||||
}
|
||||
Replica replica = cell.getValue();
|
||||
TabletMeta tabletMeta = Env.getCurrentInvertedIndex().getTabletMeta(cell.getRowKey());
|
||||
ImmutableMap<String, DiskInfo> diskMap = be.getDisks();
|
||||
for (DiskInfo diskInfo : diskMap.values()) {
|
||||
if (diskInfo.getStorageMedium() == tabletMeta.getStorageMedium()) {
|
||||
replica.setPathHash(diskInfo.getPathHash());
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private static void alterTable(String sql) throws Exception {
|
||||
AlterTableStmt alterTableStmt = (AlterTableStmt) UtFrameUtils.parseAndAnalyzeStmt(sql, connectContext);
|
||||
Env.getCurrentEnv().getAlterInstance().processAlterTable(alterTableStmt);
|
||||
@ -498,7 +475,7 @@ public class TabletRepairAndBalanceTest {
|
||||
ExceptionChecker.expectThrowsNoException(() -> createTable(createStr6));
|
||||
|
||||
OlapTable tbl3 = db.getOlapTableOrDdlException("col_tbl3");
|
||||
updateReplicaPathHash();
|
||||
RebalancerTestUtil.updateReplicaPathHash();
|
||||
// Set one replica's state as DECOMMISSION, see if it can be changed to NORMAL
|
||||
Tablet oneTablet = null;
|
||||
Replica oneReplica = null;
|
||||
|
||||
@ -114,7 +114,8 @@ public class AgentTaskTest {
|
||||
|
||||
// clone
|
||||
cloneTask =
|
||||
new CloneTask(backendId1, dbId, tableId, partitionId, indexId1, tabletId1, replicaId1, schemaHash1,
|
||||
new CloneTask(new TBackend("host2", 8290, 8390), backendId1, dbId, tableId, partitionId,
|
||||
indexId1, tabletId1, replicaId1, schemaHash1,
|
||||
Arrays.asList(new TBackend("host1", 8290, 8390)), TStorageMedium.HDD, -1, 3600);
|
||||
|
||||
// storageMediaMigrationTask
|
||||
|
||||
Reference in New Issue
Block a user