[improvement](disk balance) Prevent duplicate disk balance tasks afte… (#25990)
This commit is contained in:
@ -120,6 +120,7 @@ public class DiskRebalancer extends Rebalancer {
|
||||
@Override
|
||||
protected List<TabletSchedCtx> selectAlternativeTabletsForCluster(
|
||||
LoadStatisticForTag clusterStat, TStorageMedium medium) {
|
||||
LOG.info("dx test enter selectAlternativeTabletsForCluster");
|
||||
List<TabletSchedCtx> alternativeTablets = Lists.newArrayList();
|
||||
|
||||
// get classification of backends
|
||||
@ -185,7 +186,17 @@ public class DiskRebalancer extends Rebalancer {
|
||||
Set<Long> pathHigh = Sets.newHashSet();
|
||||
// we only select tablets from available high load path
|
||||
beStat.getPathStatisticByClass(pathLow, pathMid, pathHigh, medium);
|
||||
LOG.info("dx test select before low={} mid={} high={} medium={}", pathLow, pathMid, pathHigh, medium);
|
||||
// check if BE has low and high paths for balance after reclassification
|
||||
pathHigh.add(-2606726262674133323L);
|
||||
pathHigh.add(384536254535458899L);
|
||||
pathHigh.add(528047762753362128L);
|
||||
pathLow.add(1252949013258184268L);
|
||||
pathMid.remove(384536254535458899L);
|
||||
pathMid.remove(528047762753362128L);
|
||||
pathMid.remove(-2606726262674133323L);
|
||||
pathMid.remove(1252949013258184268L);
|
||||
LOG.info("dx test select after low={} mid={} high={} medium={}", pathLow, pathMid, pathHigh, medium);
|
||||
if (!checkAndReclassifyPaths(pathLow, pathMid, pathHigh)) {
|
||||
continue;
|
||||
}
|
||||
@ -273,6 +284,7 @@ public class DiskRebalancer extends Rebalancer {
|
||||
medium, alternativeTablets.size(),
|
||||
alternativeTablets.stream().mapToLong(TabletSchedCtx::getTabletId).toArray());
|
||||
}
|
||||
LOG.info("dx test out selectAlternativeTabletsForCluster, alternativeTablets={}", alternativeTablets);
|
||||
return alternativeTablets;
|
||||
}
|
||||
|
||||
@ -284,6 +296,7 @@ public class DiskRebalancer extends Rebalancer {
|
||||
*/
|
||||
@Override
|
||||
public void completeSchedCtx(TabletSchedCtx tabletCtx) throws SchedException {
|
||||
LOG.info("dx test enter completeSchedCtx");
|
||||
LoadStatisticForTag clusterStat = statisticMap.get(tabletCtx.getTag());
|
||||
if (clusterStat == null) {
|
||||
throw new SchedException(Status.UNRECOVERABLE,
|
||||
@ -340,6 +353,18 @@ public class DiskRebalancer extends Rebalancer {
|
||||
Set<Long> pathMid = Sets.newHashSet();
|
||||
Set<Long> pathHigh = Sets.newHashSet();
|
||||
beStat.getPathStatisticByClass(pathLow, pathMid, pathHigh, tabletCtx.getStorageMedium());
|
||||
LOG.info("dx test complete before low={} mid={} high={} medium={}",
|
||||
pathLow, pathMid, pathHigh, tabletCtx.getStorageMedium());
|
||||
pathHigh.add(-2606726262674133323L);
|
||||
pathHigh.add(384536254535458899L);
|
||||
pathHigh.add(528047762753362128L);
|
||||
pathLow.add(1252949013258184268L);
|
||||
pathMid.remove(384536254535458899L);
|
||||
pathMid.remove(528047762753362128L);
|
||||
pathMid.remove(-2606726262674133323L);
|
||||
pathMid.remove(1252949013258184268L);
|
||||
LOG.info("dx test complete after low={} mid={} high={} medium={}",
|
||||
pathLow, pathMid, pathHigh, tabletCtx.getStorageMedium());
|
||||
if (pathHigh.contains(replica.getPathHash())) {
|
||||
pathLow.addAll(pathMid);
|
||||
} else if (!pathMid.contains(replica.getPathHash())) {
|
||||
@ -382,5 +407,6 @@ public class DiskRebalancer extends Rebalancer {
|
||||
if (!setDest) {
|
||||
throw new SchedException(Status.UNRECOVERABLE, "unable to find low load path");
|
||||
}
|
||||
LOG.info("dx test out completeSchedCtx");
|
||||
}
|
||||
}
|
||||
|
||||
@ -79,6 +79,7 @@ import java.util.Collection;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Optional;
|
||||
import java.util.PriorityQueue;
|
||||
import java.util.Queue;
|
||||
import java.util.Set;
|
||||
@ -615,6 +616,17 @@ public class TabletScheduler extends MasterDaemon {
|
||||
}
|
||||
}
|
||||
|
||||
public void updateDestPathHash(TabletSchedCtx tabletCtx) {
|
||||
// find dest replica
|
||||
Optional<Replica> destReplica = tabletCtx.getReplicas()
|
||||
.stream().filter(replica -> replica.getBackendId() == tabletCtx.getDestBackendId()).findAny();
|
||||
if (destReplica.isPresent() && tabletCtx.getDestPathHash() != -1) {
|
||||
LOG.info("dx test success report old {} : new {}",
|
||||
destReplica.get().getPathHash(), tabletCtx.getDestPathHash());
|
||||
destReplica.get().setPathHash(tabletCtx.getDestPathHash());
|
||||
}
|
||||
}
|
||||
|
||||
public void updateDiskBalanceLastSuccTime(long beId, long pathHash) {
|
||||
PathSlot pathSlot = backendsWorkingSlots.get(beId);
|
||||
if (pathSlot == null) {
|
||||
@ -1642,6 +1654,7 @@ public class TabletScheduler extends MasterDaemon {
|
||||
// if we have a success task, then stat must be refreshed before schedule a new task
|
||||
updateDiskBalanceLastSuccTime(tabletCtx.getSrcBackendId(), tabletCtx.getSrcPathHash());
|
||||
updateDiskBalanceLastSuccTime(tabletCtx.getDestBackendId(), tabletCtx.getDestPathHash());
|
||||
updateDestPathHash(tabletCtx);
|
||||
finalizeTabletCtx(tabletCtx, TabletSchedCtx.State.FINISHED, Status.FINISHED, "finished");
|
||||
} else {
|
||||
finalizeTabletCtx(tabletCtx, TabletSchedCtx.State.CANCELLED, Status.UNRECOVERABLE,
|
||||
|
||||
Reference in New Issue
Block a user