[improvement](partition rebalance) improve partition rebalance choose candidate speed #36509 (#36976)
cherry pick from #36509
This commit is contained in:
@ -30,8 +30,8 @@ import org.apache.doris.thrift.TStorageMedium;
|
||||
|
||||
import com.google.common.base.Preconditions;
|
||||
import com.google.common.collect.Lists;
|
||||
import com.google.common.collect.Maps;
|
||||
import com.google.common.collect.Ordering;
|
||||
import com.google.common.collect.Sets;
|
||||
import com.google.common.collect.TreeMultimap;
|
||||
import org.apache.logging.log4j.LogManager;
|
||||
import org.apache.logging.log4j.Logger;
|
||||
@ -41,7 +41,9 @@ import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.NavigableSet;
|
||||
import java.util.Random;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.atomic.AtomicLong;
|
||||
import java.util.function.BiPredicate;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
/*
|
||||
@ -121,44 +123,64 @@ public class PartitionRebalancer extends Rebalancer {
|
||||
= algo.getNextMoves(clusterBalanceInfo, Config.partition_rebalance_max_moves_num_per_selection);
|
||||
|
||||
List<TabletSchedCtx> alternativeTablets = Lists.newArrayList();
|
||||
List<Long> inProgressIds = movesInProgressList.stream().map(m -> m.tabletId).collect(Collectors.toList());
|
||||
Set<Long> inProgressIds = movesInProgressList.stream().map(m -> m.tabletId).collect(Collectors.toSet());
|
||||
Random rand = new SecureRandom();
|
||||
for (TwoDimensionalGreedyRebalanceAlgo.PartitionMove move : moves) {
|
||||
// Find all tablets of the specified partition that would have a replica at the source be,
|
||||
// but would not have a replica at the destination be. That is to satisfy the restriction
|
||||
// of having no more than one replica of the same tablet per be.
|
||||
List<Long> tabletIds = invertedIndex.getTabletIdsByBackendIdAndStorageMedium(move.fromBe, medium);
|
||||
List<Long> invalidIds = invertedIndex.getTabletIdsByBackendIdAndStorageMedium(move.toBe, medium);
|
||||
tabletIds.removeAll(invalidIds);
|
||||
// In-progress tablets can't be the candidate too.
|
||||
tabletIds.removeAll(inProgressIds);
|
||||
|
||||
Map<Long, TabletMeta> tabletCandidates = Maps.newHashMap();
|
||||
for (long tabletId : tabletIds) {
|
||||
TabletMeta tabletMeta = invertedIndex.getTabletMeta(tabletId);
|
||||
if (tabletMeta != null && tabletMeta.getPartitionId() == move.partitionId
|
||||
&& tabletMeta.getIndexId() == move.indexId) {
|
||||
tabletCandidates.put(tabletId, tabletMeta);
|
||||
}
|
||||
}
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("Find {} candidates for move {}", tabletCandidates.size(), move);
|
||||
}
|
||||
if (tabletCandidates.isEmpty()) {
|
||||
if (tabletIds.isEmpty()) {
|
||||
continue;
|
||||
}
|
||||
|
||||
Set<Long> invalidIds = Sets.newHashSet(
|
||||
invertedIndex.getTabletIdsByBackendIdAndStorageMedium(move.toBe, medium));
|
||||
|
||||
BiPredicate<Long, TabletMeta> canMoveTablet = (Long tabletId, TabletMeta tabletMeta) -> {
|
||||
return tabletMeta != null
|
||||
&& tabletMeta.getPartitionId() == move.partitionId
|
||||
&& tabletMeta.getIndexId() == move.indexId
|
||||
&& !invalidIds.contains(tabletId)
|
||||
&& !inProgressIds.contains(tabletId);
|
||||
};
|
||||
|
||||
// Random pick one candidate to create tabletSchedCtx
|
||||
Random rand = new SecureRandom();
|
||||
Object[] keys = tabletCandidates.keySet().toArray();
|
||||
long pickedTabletId = (long) keys[rand.nextInt(keys.length)];
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("Picked tablet id for move {}: {}", move, pickedTabletId);
|
||||
int startIdx = rand.nextInt(tabletIds.size());
|
||||
long pickedTabletId = -1L;
|
||||
TabletMeta pickedTabletMeta = null;
|
||||
for (int i = startIdx; i < tabletIds.size(); i++) {
|
||||
long tabletId = tabletIds.get(i);
|
||||
TabletMeta tabletMeta = invertedIndex.getTabletMeta(tabletId);
|
||||
if (canMoveTablet.test(tabletId, tabletMeta)) {
|
||||
pickedTabletId = tabletId;
|
||||
pickedTabletMeta = tabletMeta;
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
if (pickedTabletId == -1L) {
|
||||
for (int i = 0; i < startIdx; i++) {
|
||||
long tabletId = tabletIds.get(i);
|
||||
TabletMeta tabletMeta = invertedIndex.getTabletMeta(tabletId);
|
||||
if (canMoveTablet.test(tabletId, tabletMeta)) {
|
||||
pickedTabletId = tabletId;
|
||||
pickedTabletMeta = tabletMeta;
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (pickedTabletId == -1L) {
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("Cann't picked tablet id for move {}", move);
|
||||
}
|
||||
continue;
|
||||
}
|
||||
|
||||
TabletMeta tabletMeta = tabletCandidates.get(pickedTabletId);
|
||||
TabletSchedCtx tabletCtx = new TabletSchedCtx(TabletSchedCtx.Type.BALANCE,
|
||||
tabletMeta.getDbId(), tabletMeta.getTableId(), tabletMeta.getPartitionId(),
|
||||
tabletMeta.getIndexId(), pickedTabletId, null /* replica alloc is not used for balance*/,
|
||||
pickedTabletMeta.getDbId(), pickedTabletMeta.getTableId(), pickedTabletMeta.getPartitionId(),
|
||||
pickedTabletMeta.getIndexId(), pickedTabletId, null /* replica alloc is not used for balance*/,
|
||||
System.currentTimeMillis());
|
||||
tabletCtx.setTag(clusterStat.getTag());
|
||||
// Balance task's priority is always LOW
|
||||
@ -282,7 +304,7 @@ public class PartitionRebalancer extends Rebalancer {
|
||||
List<Long> availPath = paths.stream().filter(path -> path.getStorageMedium() == tabletCtx.getStorageMedium()
|
||||
&& path.isFit(tabletCtx.getTabletSize(), false) == BalanceStatus.OK)
|
||||
.map(RootPathLoadStatistic::getPathHash).collect(Collectors.toList());
|
||||
long pathHash = slot.takeAnAvailBalanceSlotFrom(availPath);
|
||||
long pathHash = slot.takeAnAvailBalanceSlotFrom(availPath, tabletCtx.getStorageMedium());
|
||||
if (pathHash == -1) {
|
||||
throw new SchedException(SchedException.Status.SCHEDULE_FAILED, SubCode.WAITING_SLOT,
|
||||
"paths has no available balance slot: " + availPath);
|
||||
@ -329,6 +351,11 @@ public class PartitionRebalancer extends Rebalancer {
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void invalidateToDeleteReplicaId(TabletSchedCtx tabletCtx) {
|
||||
movesCacheMap.invalidateTablet(tabletCtx);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void updateLoadStatistic(Map<Tag, LoadStatisticForTag> statisticMap) {
|
||||
super.updateLoadStatistic(statisticMap);
|
||||
|
||||
@ -129,6 +129,9 @@ public abstract class Rebalancer {
|
||||
return -1L;
|
||||
}
|
||||
|
||||
public void invalidateToDeleteReplicaId(TabletSchedCtx tabletCtx) {
|
||||
}
|
||||
|
||||
public void onTabletFailed(TabletSchedCtx tabletCtx) {
|
||||
}
|
||||
|
||||
|
||||
@ -994,7 +994,9 @@ public class TabletScheduler extends MasterDaemon {
|
||||
if (chosenReplica == null) {
|
||||
return false;
|
||||
}
|
||||
|
||||
deleteReplicaInternal(tabletCtx, chosenReplica, "src replica of rebalance", force);
|
||||
rebalancer.invalidateToDeleteReplicaId(tabletCtx);
|
||||
|
||||
return true;
|
||||
}
|
||||
@ -2007,11 +2009,10 @@ public class TabletScheduler extends MasterDaemon {
|
||||
private Map<Long, Slot> pathSlots = Maps.newConcurrentMap();
|
||||
private long beId;
|
||||
// only use in takeAnAvailBalanceSlotFrom, make pick RR
|
||||
private long lastPickPathHash;
|
||||
private Map<TStorageMedium, Long> lastPickPathHashs = Maps.newHashMap();
|
||||
|
||||
public PathSlot(Map<Long, TStorageMedium> paths, long beId) {
|
||||
this.beId = beId;
|
||||
this.lastPickPathHash = -1;
|
||||
for (Map.Entry<Long, TStorageMedium> entry : paths.entrySet()) {
|
||||
pathSlots.put(entry.getKey(), new Slot(entry.getValue()));
|
||||
}
|
||||
@ -2158,14 +2159,14 @@ public class TabletScheduler extends MasterDaemon {
|
||||
return -1;
|
||||
}
|
||||
|
||||
public long takeAnAvailBalanceSlotFrom(List<Long> pathHashs) {
|
||||
public long takeAnAvailBalanceSlotFrom(List<Long> pathHashs, TStorageMedium medium) {
|
||||
if (pathHashs.isEmpty()) {
|
||||
return -1;
|
||||
}
|
||||
|
||||
Collections.sort(pathHashs);
|
||||
synchronized (this) {
|
||||
int preferSlotIndex = pathHashs.indexOf(lastPickPathHash) + 1;
|
||||
int preferSlotIndex = pathHashs.indexOf(lastPickPathHashs.getOrDefault(medium, -1L)) + 1;
|
||||
if (preferSlotIndex < 0 || preferSlotIndex >= pathHashs.size()) {
|
||||
preferSlotIndex = 0;
|
||||
}
|
||||
@ -2173,14 +2174,14 @@ public class TabletScheduler extends MasterDaemon {
|
||||
for (int i = preferSlotIndex; i < pathHashs.size(); i++) {
|
||||
long pathHash = pathHashs.get(i);
|
||||
if (takeBalanceSlot(pathHash) != -1) {
|
||||
lastPickPathHash = pathHash;
|
||||
lastPickPathHashs.put(medium, pathHash);
|
||||
return pathHash;
|
||||
}
|
||||
}
|
||||
for (int i = 0; i < preferSlotIndex; i++) {
|
||||
long pathHash = pathHashs.get(i);
|
||||
if (takeBalanceSlot(pathHash) != -1) {
|
||||
lastPickPathHash = pathHash;
|
||||
lastPickPathHashs.put(medium, pathHash);
|
||||
return pathHash;
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user