[fix](partition rebalancer) fix migrate tablets between backends back and forth #39333 (#39606)

cherry pick from #39333
This commit is contained in:
yujun
2024-08-21 09:15:31 +08:00
committed by GitHub
parent bb687bd69c
commit 57262a3d5c
12 changed files with 165 additions and 32 deletions

View File

@ -18,6 +18,7 @@
package org.apache.doris.catalog;
import org.apache.doris.catalog.Replica.ReplicaState;
import org.apache.doris.clone.PartitionRebalancer.TabletMove;
import org.apache.doris.common.Config;
import org.apache.doris.common.FeConstants;
import org.apache.doris.common.Pair;
@ -791,7 +792,7 @@ public class TabletInvertedIndex {
// Only build from available bes, exclude colocate tables
public Map<TStorageMedium, TreeMultimap<Long, PartitionBalanceInfo>> buildPartitionInfoBySkew(
List<Long> availableBeIds) {
List<Long> availableBeIds, Map<Long, Pair<TabletMove, Long>> movesInProgress) {
Set<Long> dbIds = Sets.newHashSet();
Set<Long> tableIds = Sets.newHashSet();
Set<Long> partitionIds = Sets.newHashSet();
@ -815,6 +816,26 @@ public class TabletInvertedIndex {
for (Table.Cell<Long, Long, Replica> cell : cells) {
Long tabletId = cell.getRowKey();
Long beId = cell.getColumnKey();
Pair<TabletMove, Long> movePair = movesInProgress.get(tabletId);
TabletMove move = movePair != null ? movePair.first : null;
// there exists move from fromBe to toBe
if (move != null && beId == move.fromBe
&& availableBeIds.contains(move.toBe)) {
// if movePair.second == -1, it means toBe hadn't added this tablet but it will add later;
// otherwise it means toBe had added this tablet
boolean toBeHadReplica = movePair.second != -1L;
if (toBeHadReplica) {
// toBe had add this tablet, fromBe just ignore this tablet
continue;
}
// later fromBe will delete this replica
// and toBe will add a replica
// so this replica should belong to toBe
beId = move.toBe;
}
try {
Preconditions.checkState(availableBeIds.contains(beId), "dead be " + beId);
TabletMeta tabletMeta = tabletMetaMap.get(tabletId);
@ -896,6 +917,11 @@ public class TabletInvertedIndex {
this.indexId = info.indexId;
this.beByReplicaCount = TreeMultimap.create(info.beByReplicaCount);
}
@Override
public String toString() {
return "[partition=" + partitionId + ", index=" + indexId + ", replicaNum2BeId=" + beByReplicaCount + "]";
}
}
// just for ut

View File

@ -20,6 +20,7 @@ package org.apache.doris.clone;
import org.apache.doris.catalog.TabletInvertedIndex;
import org.apache.doris.clone.BackendLoadStatistic.Classification;
import org.apache.doris.clone.BackendLoadStatistic.LoadScore;
import org.apache.doris.clone.PartitionRebalancer.TabletMove;
import org.apache.doris.common.Config;
import org.apache.doris.common.Pair;
import org.apache.doris.common.util.DebugPointUtil;
@ -50,6 +51,7 @@ public class LoadStatisticForTag {
private final SystemInfoService infoService;
private final TabletInvertedIndex invertedIndex;
private final Rebalancer rebalancer;
private final Tag tag;
@ -68,10 +70,11 @@ public class LoadStatisticForTag {
= Maps.newHashMap();
public LoadStatisticForTag(Tag tag, SystemInfoService infoService,
TabletInvertedIndex invertedIndex) {
TabletInvertedIndex invertedIndex, Rebalancer rebalancer) {
this.tag = tag;
this.infoService = infoService;
this.invertedIndex = invertedIndex;
this.rebalancer = rebalancer;
}
public Tag getTag() {
@ -166,10 +169,13 @@ public class LoadStatisticForTag {
// Multimap<skew -> PartitionBalanceInfo>
// PartitionBalanceInfo: <pid -> <partitionReplicaCount, beId>>
// Only count available bes here, aligned with the beByTotalReplicaCountMaps.
skewMaps = invertedIndex.buildPartitionInfoBySkew(beLoadStatistics.stream()
List<Long> availableBeIds = beLoadStatistics.stream()
.filter(BackendLoadStatistic::isAvailable)
.map(BackendLoadStatistic::getBeId)
.collect(Collectors.toList()));
.collect(Collectors.toList());
Map<Long, Pair<TabletMove, Long>> movesInProgress = rebalancer == null ? Maps.newHashMap()
: ((PartitionRebalancer) rebalancer).getMovesInProgress();
skewMaps = invertedIndex.buildPartitionInfoBySkew(availableBeIds, movesInProgress);
}
}

View File

@ -86,6 +86,10 @@ public class MovesCacheMap {
}
}
public Map<Tag, Map<TStorageMedium, MovesCache>> getCacheMap() {
return cacheMap;
}
public MovesCache getCache(Tag tag, TStorageMedium medium) {
Map<TStorageMedium, MovesCache> mediumMoves = cacheMap.get(tag);
if (mediumMoves != null) {

View File

@ -30,6 +30,7 @@ import org.apache.doris.thrift.TStorageMedium;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Ordering;
import com.google.common.collect.Sets;
import com.google.common.collect.TreeMultimap;
@ -304,7 +305,8 @@ public class PartitionRebalancer extends Rebalancer {
List<Long> availPath = paths.stream().filter(path -> path.getStorageMedium() == tabletCtx.getStorageMedium()
&& path.isFit(tabletCtx.getTabletSize(), false) == BalanceStatus.OK)
.map(RootPathLoadStatistic::getPathHash).collect(Collectors.toList());
long pathHash = slot.takeAnAvailBalanceSlotFrom(availPath, tabletCtx.getStorageMedium());
long pathHash = slot.takeAnAvailBalanceSlotFrom(availPath, tabletCtx.getTag(),
tabletCtx.getStorageMedium());
if (pathHash == -1) {
throw new SchedException(SchedException.Status.SCHEDULE_FAILED, SubCode.WAITING_SLOT,
"paths has no available balance slot: " + availPath);
@ -368,12 +370,20 @@ public class PartitionRebalancer extends Rebalancer {
}
}
public Map<Long, Pair<TabletMove, Long>> getMovesInProgress() {
Map<Long, Pair<TabletMove, Long>> moves = Maps.newHashMap();
movesCacheMap.getCacheMap().values().forEach(
m -> m.values().forEach(cache -> moves.putAll(cache.get().asMap())));
return moves;
}
// Represents a concrete move of a tablet from one be to another.
// Formed logically from a PartitionMove by specifying a tablet for the move.
public static class TabletMove {
Long tabletId;
Long fromBe;
Long toBe;
public Long tabletId;
public Long fromBe;
public Long toBe;
TabletMove(Long id, Long from, Long to) {
this.tabletId = id;
@ -397,7 +407,11 @@ public class PartitionRebalancer extends Rebalancer {
TreeMultimap<Long, TabletInvertedIndex.PartitionBalanceInfo> partitionInfoBySkew
= TreeMultimap.create(Ordering.natural(), Ordering.arbitrary());
TreeMultimap<Long, Long> beByTotalReplicaCount = TreeMultimap.create();
@Override
public String toString() {
return "[partitionSkew=" + partitionInfoBySkew + ", totalReplicaNum2Be=" + beByTotalReplicaCount + "]";
}
}
}

View File

@ -67,10 +67,12 @@ import org.apache.doris.transaction.TransactionState;
import com.google.common.base.Joiner;
import com.google.common.base.Preconditions;
import com.google.common.collect.EvictingQueue;
import com.google.common.collect.HashBasedTable;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import com.google.common.collect.Table;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
@ -369,7 +371,7 @@ public class TabletScheduler extends MasterDaemon {
Map<Tag, LoadStatisticForTag> newStatisticMap = Maps.newHashMap();
Set<Tag> tags = infoService.getTags();
for (Tag tag : tags) {
LoadStatisticForTag loadStatistic = new LoadStatisticForTag(tag, infoService, invertedIndex);
LoadStatisticForTag loadStatistic = new LoadStatisticForTag(tag, infoService, invertedIndex, rebalancer);
loadStatistic.init();
newStatisticMap.put(tag, loadStatistic);
if (LOG.isDebugEnabled()) {
@ -2049,7 +2051,7 @@ public class TabletScheduler extends MasterDaemon {
private Map<Long, Slot> pathSlots = Maps.newConcurrentMap();
private long beId;
// only use in takeAnAvailBalanceSlotFrom, make pick RR
private Map<TStorageMedium, Long> lastPickPathHashs = Maps.newHashMap();
private Table<Tag, TStorageMedium, Long> lastPickPathHashs = HashBasedTable.create();
public PathSlot(Map<Long, TStorageMedium> paths, long beId) {
this.beId = beId;
@ -2199,14 +2201,22 @@ public class TabletScheduler extends MasterDaemon {
return -1;
}
public long takeAnAvailBalanceSlotFrom(List<Long> pathHashs, TStorageMedium medium) {
public long takeAnAvailBalanceSlotFrom(List<Long> pathHashs, Tag tag, TStorageMedium medium) {
if (pathHashs.isEmpty()) {
return -1;
}
if (tag == null) {
tag = Tag.DEFAULT_BACKEND_TAG;
}
Collections.sort(pathHashs);
synchronized (this) {
int preferSlotIndex = pathHashs.indexOf(lastPickPathHashs.getOrDefault(medium, -1L)) + 1;
Long lastPathHash = lastPickPathHashs.get(tag, medium);
if (lastPathHash == null) {
lastPathHash = -1L;
}
int preferSlotIndex = pathHashs.indexOf(lastPathHash) + 1;
if (preferSlotIndex < 0 || preferSlotIndex >= pathHashs.size()) {
preferSlotIndex = 0;
}
@ -2214,14 +2224,14 @@ public class TabletScheduler extends MasterDaemon {
for (int i = preferSlotIndex; i < pathHashs.size(); i++) {
long pathHash = pathHashs.get(i);
if (takeBalanceSlot(pathHash) != -1) {
lastPickPathHashs.put(medium, pathHash);
lastPickPathHashs.put(tag, medium, pathHash);
return pathHash;
}
}
for (int i = 0; i < preferSlotIndex; i++) {
long pathHash = pathHashs.get(i);
if (takeBalanceSlot(pathHash) != -1) {
lastPickPathHashs.put(medium, pathHash);
lastPickPathHashs.put(tag, medium, pathHash);
return pathHash;
}
}

View File

@ -140,9 +140,7 @@ public class TwoDimensionalGreedyRebalanceAlgo {
if (LOG.isDebugEnabled()) {
LOG.debug(keySet);
}
Preconditions.checkState(keySet.isEmpty() || keySet.last() == 0L,
"non-zero replica count on be while no partition skew information in skewMap");
// Nothing to balance: cluster is empty.
return Lists.newArrayList();
}
@ -156,7 +154,6 @@ public class TwoDimensionalGreedyRebalanceAlgo {
return Lists.newArrayList();
}
List<PartitionMove> moves = Lists.newArrayList();
for (int i = 0; i < maxMovesNum; ++i) {
PartitionMove move = getNextMove(info.beByTotalReplicaCount, info.partitionInfoBySkew);
@ -178,12 +175,8 @@ public class TwoDimensionalGreedyRebalanceAlgo {
return null;
}
long maxPartitionSkew = skewMap.keySet().last();
long maxBeSkew = beByTotalReplicaCount.keySet().last() - beByTotalReplicaCount.keySet().first();
// 1. Every partition is balanced(maxPartitionSkew<=1) and any move will unbalance a partition, so there
// is no potential for the greedy algorithm to balance the cluster.
// 2. Every partition is balanced(maxPartitionSkew<=1) and the cluster as a whole is balanced(maxBeSkew<=1).
if (maxPartitionSkew == 0L || (maxPartitionSkew <= 1L && maxBeSkew <= 1L)) {
// don't make a global balance because beByTotalReplicaCount may contains tablets for other medium or tag
if (maxPartitionSkew <= 1L) {
return null;
}