[improvement](disk balance) impr disk rebalancer sched with partition rebalancer (#25549)
This commit is contained in:
@ -28,6 +28,7 @@ import org.apache.doris.clone.SchedException.Status;
|
||||
import org.apache.doris.clone.TabletSchedCtx.BalanceType;
|
||||
import org.apache.doris.clone.TabletSchedCtx.Priority;
|
||||
import org.apache.doris.clone.TabletScheduler.PathSlot;
|
||||
import org.apache.doris.common.Config;
|
||||
import org.apache.doris.system.SystemInfoService;
|
||||
import org.apache.doris.thrift.TStorageMedium;
|
||||
|
||||
@ -125,7 +126,14 @@ public class DiskRebalancer extends Rebalancer {
|
||||
List<BackendLoadStatistic> highBEs = Lists.newArrayList();
|
||||
clusterStat.getBackendStatisticByClass(lowBEs, midBEs, highBEs, medium);
|
||||
|
||||
if (!(lowBEs.isEmpty() && highBEs.isEmpty())) {
|
||||
if (Config.tablet_rebalancer_type.equalsIgnoreCase("partition")) {
|
||||
PartitionRebalancer rebalancer = (PartitionRebalancer) Env.getCurrentEnv()
|
||||
.getTabletScheduler().getRebalancer();
|
||||
if (rebalancer != null && rebalancer.checkCacheEmptyForLong()) {
|
||||
midBEs.addAll(lowBEs);
|
||||
midBEs.addAll(highBEs);
|
||||
}
|
||||
} else if (!(lowBEs.isEmpty() && highBEs.isEmpty())) {
|
||||
// the cluster is not balanced
|
||||
if (prioBackends.isEmpty()) {
|
||||
LOG.info("cluster is not balanced with medium: {}. skip", medium);
|
||||
@ -142,13 +150,13 @@ public class DiskRebalancer extends Rebalancer {
|
||||
// if all mid backends is not available, we should not start balance
|
||||
if (midBEs.stream().noneMatch(BackendLoadStatistic::isAvailable)) {
|
||||
LOG.debug("all mid load backends is dead: {} with medium: {}. skip",
|
||||
lowBEs.stream().mapToLong(BackendLoadStatistic::getBeId).toArray(), medium);
|
||||
midBEs.stream().mapToLong(BackendLoadStatistic::getBeId).toArray(), medium);
|
||||
return alternativeTablets;
|
||||
}
|
||||
|
||||
if (midBEs.stream().noneMatch(BackendLoadStatistic::hasAvailDisk)) {
|
||||
LOG.info("all mid load backends {} have no available disk with medium: {}. skip",
|
||||
lowBEs.stream().mapToLong(BackendLoadStatistic::getBeId).toArray(), medium);
|
||||
midBEs.stream().mapToLong(BackendLoadStatistic::getBeId).toArray(), medium);
|
||||
return alternativeTablets;
|
||||
}
|
||||
|
||||
|
||||
@ -65,6 +65,8 @@ public class PartitionRebalancer extends Rebalancer {
|
||||
private final AtomicLong counterBalanceMoveCreated = new AtomicLong(0);
|
||||
private final AtomicLong counterBalanceMoveSucceeded = new AtomicLong(0);
|
||||
|
||||
private long cacheEmptyTimestamp = -1L;
|
||||
|
||||
public PartitionRebalancer(SystemInfoService infoService, TabletInvertedIndex invertedIndex,
|
||||
Map<Long, PathSlot> backendsWorkingSlots) {
|
||||
super(infoService, invertedIndex, backendsWorkingSlots);
|
||||
@ -231,6 +233,11 @@ public class PartitionRebalancer extends Rebalancer {
|
||||
return !bes.contains(move.fromBe) && bes.contains(move.toBe);
|
||||
}
|
||||
|
||||
// cache empty for 10 min
|
||||
public boolean checkCacheEmptyForLong() {
|
||||
return cacheEmptyTimestamp > 0 && System.currentTimeMillis() > cacheEmptyTimestamp + 10 * 60 * 1000L;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void completeSchedCtx(TabletSchedCtx tabletCtx)
|
||||
throws SchedException {
|
||||
@ -318,6 +325,11 @@ public class PartitionRebalancer extends Rebalancer {
|
||||
movesCacheMap.updateMapping(statisticMap, Config.partition_rebalance_move_expire_after_access);
|
||||
// Perform cache maintenance
|
||||
movesCacheMap.maintain();
|
||||
if (movesCacheMap.size() > 0) {
|
||||
cacheEmptyTimestamp = -1;
|
||||
} else if (cacheEmptyTimestamp < 0) {
|
||||
cacheEmptyTimestamp = System.currentTimeMillis();
|
||||
}
|
||||
LOG.debug("Move succeeded/total :{}/{}, current {}",
|
||||
counterBalanceMoveSucceeded.get(), counterBalanceMoveCreated.get(), movesCacheMap);
|
||||
}
|
||||
|
||||
@ -170,6 +170,10 @@ public class TabletScheduler extends MasterDaemon {
|
||||
return stat;
|
||||
}
|
||||
|
||||
public Rebalancer getRebalancer() {
|
||||
return rebalancer;
|
||||
}
|
||||
|
||||
/*
|
||||
* update working slots at the beginning of each round
|
||||
*/
|
||||
|
||||
Reference in New Issue
Block a user